forked from ConvertGroupsAS/magento2-patches
-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathPatch-Message-Queue-Consumer-Transaction.patch
66 lines (66 loc) · 2.25 KB
/
Patch-Message-Queue-Consumer-Transaction.patch
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
--- Consumer.php
+++ Consumer.php
@@ -136,20 +136,59 @@
$this->getMessageController()->lock($message, $this->configuration->getConsumerName());
if (in_array($topicName, $allowedTopics)) {
$this->dispatchMessage($message);
- $this->resource->getConnection()->commit();
+ $this->commit();
$queue->acknowledge($message);
} else {
$queue->reject($message); //push message back to the queue
}
} catch (MessageLockException $exception) {
- $this->resource->getConnection()->rollBack();
+ $this->rollBack();
$queue->acknowledge($message);
} catch (\Magento\Framework\MessageQueue\ConnectionLostException $e) {
- $this->resource->getConnection()->rollBack();
+ $this->rollBack();
} catch (\Exception $e) {
- $this->resource->getConnection()->rollBack();
+ $this->rollBack();
$queue->reject($message, false, $e->getMessage());
}
};
}
+
+ /**
+ * Commit transaction
+ *
+ * @return void
+ * @throws \Exception
+ */
+ private function commit()
+ {
+ $connection = $this->resource->getConnection();
+ $connection->commit();
+
+ /**
+ * Process after commit callbacks
+ */
+ if ($this->resource->getConnection()->getTransactionLevel() === 0) {
+ $callbacks = \Magento\Framework\Model\CallbackPool::get(spl_object_hash($connection));
+ try {
+ foreach ($callbacks as $callback) {
+ call_user_func($callback);
+ }
+ } catch (\Exception $e) {
+ throw $e;
+ } finally {
+ \Magento\Framework\Model\CallbackPool::clear(spl_object_hash($connection));
+ }
+ }
+ }
+
+ /**
+ * Roll back transaction
+ *
+ * @return void
+ */
+ private function rollBack()
+ {
+ $this->resource->getConnection()->rollBack();
+ \Magento\Framework\Model\CallbackPool::clear(spl_object_hash($this->resource->getConnection()));
+ }
}