Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SQS driver #9

Merged
merged 1 commit into from
Feb 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ services:
env:
global:
- TEST_COMMAND="composer test-ci"
- secure: "nk2Nm/xFmdp/sakc1q0/rEH/yUVvEzMSbh1trRMp6ldHCzscBTYFnVFrnwf8G6qpaYR604sQV3yujEE1r9B8DZ4mDsHD8WbOtUyaNLINeUnL2uEHXyukE76Bf3hjkSLvKdpriQDZA5xvbfXUBYoh81CdmT9OaQnUtblMe5BN93xB7VsNB68+eCiPK0IUFKTby59rm6ewVB4kanljtvwOunuEfZXvqdfg7HtArN1tWxd8Bn58KaLvYlndFXtLsTlhygnmJAbKd4Mk6H+OVd5Z3Qq3UXaKFBh7ovrob/aWHBLFLGhr6Ga1palh7d2sdO4wCn8dVfbTlvVRvcqxQmwBHUoGOr1/918caN3ZWjEF21kmX8BgK+lHOvexrAPo15jOL6ZXYhJY4zAAZmMXHUAYHxM0UUNPd3KywhdFan3PNsVCYgldcpsU6unnP9PKv/upm+holWnUvd7fgUrZtcnD2Q8lAihkxF7HJrFttqnm0qaipbOFp9flwxskkQbE0qAZZm16GuCw0Yfky3h9YyQ7nIOjWlF9t8/Z4d1VxrwTcNc8Zba4vM5OdvB9qVRrc0ZJVj2z/PeEPJOO0ytUBO/Y+hz10iReGLHtZZukR6RDaF0XlGVwynBPIS6Hu54t3S30jg/DYu8e0jwlb2rP/+LpFpv+ALUzl95a3SWhqgVsjCc="
- secure: "kQj2YVCU3Hc4R7CVGKfWl3jieP1vEjAkxbUyUCpNC37IYrXujajKdyXbcWBXUeDa++HfICdlBikPdfh/mPC+/WozaBNzoOIFTOEN5zmE9WPLtnB1d7QB5zTnQBX9rb59il+bYfgm8IL2iwK3mupryP8Kspt4nMjVjsEz4sxtqe/XR23RuEgf4UFPqZEZdKSjnwGiUuERvRderkcpROAu7/6X79OFnbjZLDgT/j+eCQypvYXievn700DnWCwRaCBcqB3DepkrppfiXmZLRAjKUgMRVCi0ILfpoUF7rK3aS8hLC0YlIYjmc62HT+KP16NP9htXxZ+DNCH5OEmZjR8pRli3is1RFqeviM+v/Mx4mEpYkTNcoJygE8blFareudQu6ofwrAT/W6q/7LFtjVA2F6wc8CLztZawaqTBxdIhRdZj5q8FTt7lfJaVPsk5/dimz75iinH1opOJPkpBU2rbg3erPWxBz4tvlq3cEfvS/P7y/OFQ1eCEDl2Wi8Y+kSYcEpDDplAg6WGS2KEMx40bVPSZgZ0H0omnchnwuZuJBoSgoSjcSRvUv21ljttCyCVQJfxHpxoLNk6N5BOsTIi5AXu0grIvGiyoNuO9zqLKzuxdYTMqPmLAqLJL0k/R0HzRyL8q2H3UecEaC0etFOvCV/DT5kQLZSjyVSMRfAJ1sL8="
- secure: "QM3I9ZP1U1eo32iIb/I8x4J4sYlx1W0U7dhvA+2vPoU67JWg/CkBTDBZmNTZl7qP+/TRXO+QlKoClkwBX/CsthEYuuqN35cxs3tVEEMBsnoK3oLxgWG6RmyrHmHd5Jv5JIJucuugcfq3bevVRlqRUJM+uPvO9PvbwPyXxYsMudroXZRhS9a7ZXtkdDh1A2rWJBgakuXHy+rlq8nUnEf3yiKFLxVe9SJdzVVXYeEVthiTe65R8R5AAXMR/UQtdO3CD1/g5Jk2kNXDJKDTUjuRqBo1s2ACX22Eg1eGK//KDR9agMAj8L4thTlJoZMz0q8vqP7FezUvPSN+5GAL8LUy5ODAqO37fqtSmoXWSbcHndNtCnHVTucOe1l0NLLHlTozW//iaK8JknaTwj4gUe9utKsVDaRwAIrkhX6TG0qZtSm375O4sGbgTMRHYNpsZQWkyojC7uAgpBk6RSgz9zvR0db+oDxZTbI/BGR7qkJB6eYiV0Gg8q4NM86D33HGO8XRB7sUjwmJouK+zZVOu4dimwQmuXbFWdXKlJDn3klxBZnPZDK4PIYCWJLZjYDwv6C987H8MMu9oU6ehzaq8RDYwyIoPJmMunc3xqXis8puR66aU2toGC0OeezK2DUI0PFyGktcdLKZQPpUX8tJ5DtOCvMq6pRtQ9Ga5+5298kxDeA="

branches:
except:
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ $ composer require bernard/drivers

## Drivers

- [Amazon SQS](src/Sqs)
- [AMQP](src/Amqp)
- [Iron MQ](src/IronMQ)
- [Pheanstalk](src/Pheanstalk)
Expand Down
4 changes: 3 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
"bernard/ironmq-driver": "self.version",
"bernard/pheanstalk-driver": "self.version",
"bernard/predis-driver": "self.version",
"bernard/queue-interop-driver": "self.version",
"bernard/redis-driver": "self.version",
"bernard/queue-interop-driver": "self.version"
"bernard/sqs-driver": "self.version"
},
"require-dev": {
"aws/aws-sdk-php": "^3.20",
"ext-redis": "*",
"iron-io/iron_mq": "^4.0",
"pda/pheanstalk": "^3.0",
Expand Down
275 changes: 275 additions & 0 deletions src/Sqs/Driver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,275 @@
<?php

namespace Bernard\Driver\Sqs;

use Aws\Sqs\Exception\SqsException;
use Aws\Sqs\SqsClient;
use Bernard\Driver\AbstractPrefetchDriver;

/**
* Implements a Driver for use with AWS SQS client API: https://aws.amazon.com/sqs/
*/
final class Driver extends AbstractPrefetchDriver
{
const AWS_SQS_FIFO_SUFFIX = '.fifo';
const AWS_SQS_EXCEPTION_BAD_REQUEST = 400;
const AWS_SQS_EXCEPTION_NOT_FOUND = 404;

private $sqs;
private $queueUrls;

/**
* @param SqsClient $sqs
* @param array $queueUrls
* @param int|null $prefetch
*/
public function __construct(SqsClient $sqs, array $queueUrls = [], $prefetch = null)
{
parent::__construct($prefetch);

$this->sqs = $sqs;
$this->queueUrls = $queueUrls;
}

/**
* {@inheritdoc}
*/
public function listQueues()
{
$result = $this->sqs->listQueues();

// TODO: drop this as it can easily get inconsistent?
if (!$queueUrls = $result->get('QueueUrls')) {
return array_keys($this->queueUrls);
}

foreach ($queueUrls as $queueUrl) {
if (in_array($queueUrl, $this->queueUrls)) {
continue;
}

$queueName = current(array_reverse(explode('/', $queueUrl)));
$this->queueUrls[$queueName] = $queueUrl;
}

return array_keys($this->queueUrls);
}

/**
* {@inheritdoc}
*
* @see http://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#createqueue
*
* @throws SqsException
*/
public function createQueue($queueName)
{
if ($this->queueExists($queueName)) {
return;
}

$parameters = [
'QueueName' => $queueName,
];

if ($this->isFifoQueue($queueName)) {
$parameters['Attributes'] = [
'FifoQueue' => 'true',
];
}

$result = $this->sqs->createQueue($parameters);

$this->queueUrls[$queueName] = $result['QueueUrl'];
}

/**
* @param string $queueName
*
* @return bool
*
* @throws SqsException
*/
private function queueExists($queueName)
{
try {
$this->resolveUrl($queueName);

return true;
} catch (\InvalidArgumentException $exception) {
return false;
} catch (SqsException $exception) {
if ($previousException = $exception->getPrevious()) {
switch ($previousException->getCode()) {
case self::AWS_SQS_EXCEPTION_BAD_REQUEST:
case self::AWS_SQS_EXCEPTION_NOT_FOUND:
return false;
}
}

throw $exception;
}
}

/**
* @param string $queueName
*
* @return bool
*/
private function isFifoQueue($queueName)
{
return $this->endsWith($queueName, self::AWS_SQS_FIFO_SUFFIX);
}

/**
* @param string $haystack
* @param string $needle
*
* @return bool
*/
private function endsWith($haystack, $needle)
{
$length = strlen($needle);
if ($length === 0) {
return true;
}

return substr($haystack, -$length) === $needle;
}

/**
* {@inheritdoc}
*/
public function countMessages($queueName)
{
$queueUrl = $this->resolveUrl($queueName);

$result = $this->sqs->getQueueAttributes([
'QueueUrl' => $queueUrl,
'AttributeNames' => ['ApproximateNumberOfMessages'],
]);

if (isset($result['Attributes']['ApproximateNumberOfMessages'])) {
return (int) $result['Attributes']['ApproximateNumberOfMessages'];
}

return 0;
}

/**
* {@inheritdoc}
*/
public function pushMessage($queueName, $message)
{
$queueUrl = $this->resolveUrl($queueName);

$parameters = [
'QueueUrl' => $queueUrl,
'MessageBody' => $message,
];

if ($this->isFifoQueue($queueName)) {
$parameters['MessageGroupId'] = __METHOD__;
$parameters['MessageDeduplicationId'] = md5($message);
}

$this->sqs->sendMessage($parameters);
}

/**
* {@inheritdoc}
*/
public function popMessage($queueName, $duration = 5)
{
if ($message = $this->cache->pop($queueName)) {
return $message;
}

$queueUrl = $this->resolveUrl($queueName);

$result = $this->sqs->receiveMessage([
'QueueUrl' => $queueUrl,
'MaxNumberOfMessages' => $this->prefetch,
'WaitTimeSeconds' => $duration,
]);

if (!$result || !$messages = $result->get('Messages')) {
return [null, null];
}

foreach ($messages as $message) {
$this->cache->push($queueName, [$message['Body'], $message['ReceiptHandle']]);
}

return $this->cache->pop($queueName);
}

/**
* {@inheritdoc}
*/
public function acknowledgeMessage($queueName, $receipt)
{
$queueUrl = $this->resolveUrl($queueName);

$this->sqs->deleteMessage([
'QueueUrl' => $queueUrl,
'ReceiptHandle' => $receipt,
]);
}

/**
* {@inheritdoc}
*/
public function peekQueue($queueName, $index = 0, $limit = 20)
{
return [];
}

/**
* {@inheritdoc}
*
* @see http://docs.aws.amazon.com/aws-sdk-php/v3/api/api-sqs-2012-11-05.html#deletequeue
*/
public function removeQueue($queueName)
{
$queueUrl = $this->resolveUrl($queueName);

$this->sqs->deleteQueue([
'QueueUrl' => $queueUrl,
]);
}

/**
* {@inheritdoc}
*/
public function info()
{
return [
'prefetch' => $this->prefetch,
];
}

/**
* AWS works with queue URLs rather than queue names. Returns either queue URL (if queue exists) for given name or null if not.
*
* @param string $queueName
*
* @return mixed
*
* @throws SqsException
*/
private function resolveUrl($queueName)
{
if (isset($this->queueUrls[$queueName])) {
return $this->queueUrls[$queueName];
}

$result = $this->sqs->getQueueUrl(['QueueName' => $queueName]);

if ($result && $queueUrl = $result->get('QueueUrl')) {
return $this->queueUrls[$queueName] = $queueUrl;
}

throw new \InvalidArgumentException('Queue "'.$queueName.'" cannot be resolved to an url.');
}
}
19 changes: 19 additions & 0 deletions src/Sqs/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
Copyright (c) 2017 Henrik Bjornskov

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is furnished
to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
48 changes: 48 additions & 0 deletions src/Sqs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
# Amazon SQS driver

[![Latest Version](https://img.shields.io/github/release/bernardphp/sqs-driver.svg?style=flat-square)](https://github.com/bernardphp/sqs-driver/releases)

**[Amazon SQS](https://aws.amazon.com/sqs/) driver for Bernard.**


## Install

Via Composer

```bash
$ composer require bernard/sqs-driver
```


## Usage

```php
<?php

use Aws\Sqs\SqsClient;
use Bernard\Driver\Sqs\Driver;

$client = new SqsClient([
'credentials' => [
'key' => 'your_access_key',
'secret' => 'your_secret_key',
],
'region' => 'us-east-1',
'version' => '2012-11-05'
]);

$driver = new Driver($client);

// or with prefetching
$driver = new Driver($client, [], 5);

// or with aliased queue urls
$driver = new Driver($client, [
'queue-name' => 'queue-url',
]);
```


## License

The MIT License (MIT). Please see [License File](LICENSE) for more information.
Loading