Skip to content

Commit 11d0a95

Browse files
Replace Rewrite by Around Plugin on message reject function (#15)
* - Replace Rewrite by Around Plugin on message reject function - Fix Retries Count - Refactoring - Update Tests --------- Co-authored-by: Vladyslav Sikailo <vladyslav.sikailo@run-as-root.sh>
1 parent fedf594 commit 11d0a95

12 files changed

+464
-465
lines changed
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace RunAsRoot\MessageQueueRetry\Plugin;
6+
7+
use JsonException;
8+
use Magento\Framework\MessageQueue\EnvelopeInterface;
9+
use Magento\Framework\MessageQueue\QueueInterface;
10+
use RunAsRoot\MessageQueueRetry\Exception\MessageCouldNotBeCreatedException;
11+
use RunAsRoot\MessageQueueRetry\Service\IsMessageShouldBeSavedForRetryService;
12+
use RunAsRoot\MessageQueueRetry\Service\SaveFailedMessageService;
13+
14+
class HandleQueueMessageRejectPlugin
15+
{
16+
public function __construct(
17+
private IsMessageShouldBeSavedForRetryService $isMessageShouldBeSavedForRetryService,
18+
private SaveFailedMessageService $saveFailedMessageService
19+
) {
20+
}
21+
22+
/**
23+
* @throws MessageCouldNotBeCreatedException
24+
* @throws JsonException
25+
*/
26+
public function aroundReject(
27+
QueueInterface $subject,
28+
callable $proceed,
29+
EnvelopeInterface $envelope,
30+
bool $requeue,
31+
string $error
32+
): void {
33+
if (!$error) {
34+
$proceed($envelope, $requeue, $error);
35+
return;
36+
}
37+
38+
$shouldBeSavedForRetry = $this->isMessageShouldBeSavedForRetryService->execute($envelope);
39+
40+
if (!$shouldBeSavedForRetry) {
41+
$proceed($envelope, $requeue, $error);
42+
return;
43+
}
44+
45+
$this->saveFailedMessageService->execute($envelope, $error);
46+
$subject->acknowledge($envelope);
47+
}
48+
}

src/Queue/Consumer.php

Lines changed: 0 additions & 181 deletions
This file was deleted.
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace RunAsRoot\MessageQueueRetry\Service;
6+
7+
use Magento\Framework\MessageQueue\EnvelopeInterface;
8+
use PhpAmqpLib\Wire\AMQPTable;
9+
10+
class GetMessageRetriesCountService
11+
{
12+
public function execute(EnvelopeInterface $message): int
13+
{
14+
$messageProperties = $message->getProperties();
15+
$applicationHeaders = $messageProperties['application_headers'] ?? null;
16+
17+
// If there are no application headers, then it is the first time the message has been processed.
18+
if (!$applicationHeaders instanceof AMQPTable) {
19+
return 0;
20+
}
21+
22+
if (isset($applicationHeaders->getNativeData()['x-death'][0]['count'])) {
23+
return $applicationHeaders->getNativeData()['x-death'][0]['count'];
24+
}
25+
26+
return 0;
27+
}
28+
}

src/Service/HandleQueueFailureService.php

Lines changed: 0 additions & 104 deletions
This file was deleted.

0 commit comments

Comments
 (0)