Skip to content

Commit f1b41ea

Browse files
authored
Fixes yiisoft#332: Add AWS SQS FIFO support
2 parents 74d72ca + 6f2e1e9 commit f1b41ea

File tree

8 files changed

+145
-5
lines changed

8 files changed

+145
-5
lines changed

.env.example

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,7 @@ AWS_SQS_ENABLED=
55
AWS_SQS_URL=
66
AWS_KEY=
77
AWS_SECRET=
8-
AWS_REGION=
8+
AWS_REGION=
9+
AWS_SQS_FIFO_ENABLED=
10+
AWS_SQS_FIFO_URL=
11+
AWS_SQS_FIFO_MESSAGE_GROUP_ID=

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ Yii2 Queue Extension Change Log
44
2.2.1 under development
55
-----------------------
66

7+
- Enh #332: Add AWS SQS FIFO support (kringkaste, alexkart)
78
- Bug #220: Updated to the latest amqp-lib (alexkart)
89

910
2.2.0 Mar 20, 2019

docs/guide/driver-sqs.md

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ The driver uses AWS SQS to store queue data.
55

66
You have to add `aws/aws-sdk-php` extension to your application in order to use it.
77

8-
Configuration example:
8+
Configuration example for standard queues:
99

1010
```php
1111
return [
@@ -24,6 +24,30 @@ return [
2424
];
2525
```
2626

27+
Configuration example for FIFO queues:
28+
29+
```php
30+
return [
31+
'bootstrap' => [
32+
'queue', // The component registers own console commands
33+
],
34+
'components' => [
35+
'queue' => [
36+
'class' => \yii\queue\sqs\Queue::class,
37+
'url' => '<sqs url>',
38+
'key' => '<key>',
39+
'secret' => '<secret>',
40+
'region' => '<region>',
41+
'messageGroupId' => '<Group ID>',
42+
],
43+
],
44+
];
45+
```
46+
47+
The message group ID is required by SQS for FIFO queues. You can configure your own or use the "default" value.
48+
49+
The deduplication ID is generated automatically, so no matter if you have activated content-based deduplication in the SQS queue or not, this ID will be used.
50+
2751
Console
2852
-------
2953

src/drivers/sqs/Queue.php

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ class Queue extends CliQueue
4646
* @var string
4747
*/
4848
public $version = 'latest';
49+
/**
50+
* Message Group ID for FIFO queues.
51+
* @var string
52+
* @since 2.2.1
53+
*/
54+
public $messageGroupId = 'default';
4955
/**
5056
* @var string command class name
5157
* @inheritdoc
@@ -172,7 +178,7 @@ protected function pushMessage($message, $ttr, $delay, $priority)
172178
throw new NotSupportedException('Priority is not supported in this driver');
173179
}
174180

175-
$response = $this->getClient()->sendMessage([
181+
$request = [
176182
'QueueUrl' => $this->url,
177183
'MessageBody' => $message,
178184
'DelaySeconds' => $delay,
@@ -182,7 +188,14 @@ protected function pushMessage($message, $ttr, $delay, $priority)
182188
'StringValue' => $ttr,
183189
],
184190
],
185-
]);
191+
];
192+
193+
if (substr($this->url, -5) === '.fifo') {
194+
$request['MessageGroupId'] = $this->messageGroupId;
195+
$request['MessageDeduplicationId'] = hash('sha256', $message);
196+
}
197+
198+
$response = $this->getClient()->sendMessage($request);
186199
return $response['MessageId'];
187200
}
188201

tests/app/config/main.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,4 +122,16 @@
122122
];
123123
}
124124

125+
if (getenv('AWS_SQS_FIFO_ENABLED')) {
126+
$config['bootstrap'][] = 'sqsFifoQueue';
127+
$config['components']['sqsFifoQueue'] = [
128+
'class' => \yii\queue\sqs\Queue::class,
129+
'url' => getenv('AWS_SQS_FIFO_URL'),
130+
'key' => getenv('AWS_KEY'),
131+
'secret' => getenv('AWS_SECRET'),
132+
'region' => getenv('AWS_REGION'),
133+
'messageGroupId' => getenv('AWS_SQS_FIFO_MESSAGE_GROUP_ID'),
134+
];
135+
}
136+
125137
return $config;

tests/docker-compose.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ services:
3434
AWS_SECRET: ${AWS_SECRET}
3535
AWS_REGION: ${AWS_REGION}
3636
AWS_SQS_URL: ${AWS_SQS_URL}
37+
AWS_SQS_FIFO_ENABLED: ${AWS_SQS_FIFO_ENABLED}
38+
AWS_SQS_FIFO_URL: ${AWS_SQS_FIFO_URL}
39+
AWS_SQS_FIFO_MESSAGE_GROUP_ID: ${AWS_SQS_FIFO_MESSAGE_GROUP_ID}
3740
depends_on: &php_depends_on
3841
- mysql
3942
- postgres

tests/drivers/CliTestCase.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ private function prepareCmd($cmd)
6262

6363
return strtr($cmd, [
6464
'php' => PHP_BINARY,
65-
'yii' => 'tests/yii',
65+
'yii' => __DIR__ . '/../yii',
6666
'queue' => $method->invoke($this->getQueue()),
6767
]);
6868
}

tests/drivers/sqs/FifoQueueTest.php

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
<?php
2+
/**
3+
* @link http://www.yiiframework.com/
4+
* @copyright Copyright (c) 2008 Yii Software LLC
5+
* @license http://www.yiiframework.com/license/
6+
*/
7+
8+
namespace tests\drivers\sqs;
9+
10+
use tests\app\RetryJob;
11+
use tests\drivers\CliTestCase;
12+
use Yii;
13+
use yii\queue\sqs\Queue;
14+
15+
/**
16+
* SQS FIFO Queue Test.
17+
*/
18+
class FifoQueueTest extends CliTestCase
19+
{
20+
public function testRun()
21+
{
22+
$job = $this->createSimpleJob();
23+
$this->getQueue()->push($job);
24+
$this->runProcess('php yii queue/run');
25+
26+
$this->assertSimpleJobDone($job);
27+
}
28+
29+
public function testListen()
30+
{
31+
$this->startProcess('php yii queue/listen 1');
32+
$job = $this->createSimpleJob();
33+
$this->getQueue()->push($job);
34+
35+
$this->assertSimpleJobDone($job);
36+
}
37+
38+
public function testFifoQueueDoesNotSupportPerMessageDelays()
39+
{
40+
$this->startProcess('php yii queue/listen 1');
41+
$job = $this->createSimpleJob();
42+
43+
$this->setExpectedException('\Aws\Sqs\Exception\SqsException');
44+
$this->getQueue()->delay(2)->push($job);
45+
}
46+
47+
public function testRetry()
48+
{
49+
$this->startProcess('php yii queue/listen 1');
50+
$job = new RetryJob(['uid' => uniqid()]);
51+
$this->getQueue()->push($job);
52+
sleep(6);
53+
54+
$this->assertFileExists($job->getFileName());
55+
$this->assertEquals('aa', file_get_contents($job->getFileName()));
56+
}
57+
58+
public function testClear()
59+
{
60+
if (!getenv('AWS_SQS_FIFO_CLEAR_TEST_ENABLED')) {
61+
$this->markTestSkipped(__METHOD__ . ' is disabled');
62+
}
63+
64+
$this->getQueue()->push($this->createSimpleJob());
65+
$this->runProcess('php yii queue/clear --interactive=0');
66+
}
67+
68+
/**
69+
* @return Queue
70+
*/
71+
protected function getQueue()
72+
{
73+
return Yii::$app->sqsFifoQueue;
74+
}
75+
76+
protected function setUp()
77+
{
78+
if (!getenv('AWS_SQS_FIFO_ENABLED')) {
79+
$this->markTestSkipped('AWS SQS FIFO tests are disabled');
80+
}
81+
82+
parent::setUp();
83+
}
84+
}

0 commit comments

Comments
 (0)