Skip to content

[5.8] Remove prefix setting for Amazon SQS, add url #27086

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/Illuminate/Queue/Connectors/SqsConnector.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ public function connect(array $config)
}

return new SqsQueue(
new SqsClient($config), $config['queue'], $config['prefix'] ?? ''
new SqsClient($config),
$config['queue'],
$config['url'] ?? ''
);
}

Expand Down
44 changes: 15 additions & 29 deletions src/Illuminate/Queue/SqsQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,32 @@ class SqsQueue extends Queue implements QueueContract
protected $sqs;

/**
* The name of the default queue.
* The name of the queue.
*
* @var string
*/
protected $default;
protected $name;

/**
* The queue URL prefix.
* The full queue URL.
*
* @var string
*/
protected $prefix;
protected $url;

/**
* Create a new Amazon SQS queue instance.
*
* @param \Aws\Sqs\SqsClient $sqs
* @param string $default
* @param string $prefix
* @param string $url
* @return void
*/
public function __construct(SqsClient $sqs, $default, $prefix = '')
public function __construct(SqsClient $sqs, $name, $url = '')
{
$this->sqs = $sqs;
$this->prefix = $prefix;
$this->default = $default;
$this->name = $name;
$this->url = $url;
}

/**
Expand All @@ -53,7 +53,7 @@ public function __construct(SqsClient $sqs, $default, $prefix = '')
public function size($queue = null)
{
$response = $this->sqs->getQueueAttributes([
'QueueUrl' => $this->getQueue($queue),
'QueueUrl' => $this->url,
'AttributeNames' => ['ApproximateNumberOfMessages'],
]);

Expand All @@ -72,7 +72,7 @@ public function size($queue = null)
*/
public function push($job, $data = '', $queue = null)
{
return $this->pushRaw($this->createPayload($job, $queue ?: $this->default, $data), $queue);
return $this->pushRaw($this->createPayload($job, $queue ?: $this->name, $data), $queue);
}

/**
Expand All @@ -86,7 +86,7 @@ public function push($job, $data = '', $queue = null)
public function pushRaw($payload, $queue = null, array $options = [])
{
return $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload,
'QueueUrl' => $this->url, 'MessageBody' => $payload,
])->get('MessageId');
}

Expand All @@ -102,8 +102,8 @@ public function pushRaw($payload, $queue = null, array $options = [])
public function later($delay, $job, $data = '', $queue = null)
{
return $this->sqs->sendMessage([
'QueueUrl' => $this->getQueue($queue),
'MessageBody' => $this->createPayload($job, $queue ?: $this->default, $data),
'QueueUrl' => $this->url,
'MessageBody' => $this->createPayload($job, $queue ?: $this->name, $data),
'DelaySeconds' => $this->secondsUntil($delay),
])->get('MessageId');
}
Expand All @@ -117,32 +117,18 @@ public function later($delay, $job, $data = '', $queue = null)
public function pop($queue = null)
{
$response = $this->sqs->receiveMessage([
'QueueUrl' => $queue = $this->getQueue($queue),
'QueueUrl' => $this->url,
'AttributeNames' => ['ApproximateReceiveCount'],
]);

if (! is_null($response['Messages']) && count($response['Messages']) > 0) {
return new SqsJob(
$this->container, $this->sqs, $response['Messages'][0],
$this->connectionName, $queue
$this->connectionName, $queue ?: $this->name
);
}
}

/**
* Get the queue or return the default.
*
* @param string|null $queue
* @return string
*/
public function getQueue($queue)
{
$queue = $queue ?: $this->default;

return filter_var($queue, FILTER_VALIDATE_URL) === false
? rtrim($this->prefix, '/').'/'.$queue : $queue;
}

/**
* Get the underlying SQS instance.
*
Expand Down
139 changes: 76 additions & 63 deletions tests/Queue/QueueSqsQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,28 @@
use PHPUnit\Framework\TestCase;
use Illuminate\Queue\Jobs\SqsJob;
use Illuminate\Container\Container;
use Illuminate\Contracts\Queue\Queue as QueueContract;

class QueueSqsQueueTest extends TestCase
{
/** @var QueueContract */
private $sqsQueue;

const QUEUE_NAME = 'emails';
const URL = 'https://sqs.someregion.amazonaws.com/1234567891011/emails-queue';

private $mockedSqsClient;
private $mockedJob;
private $mockedData;
private $mockedPayload;
private $mockedDelay;
private $mockedMessageId;
private $mockedReceiptHandle;
private $mockedSendMessageResponseModel;
private $mockedReceiveMessageResponseModel;
private $mockedReceiveEmptyMessageResponseModel;
private $mockedQueueAttributesResponseModel;

public function tearDown()
{
m::close();
Expand All @@ -21,38 +40,40 @@ public function tearDown()
public function setUp()
{
// Use Mockery to mock the SqsClient
$this->sqs = m::mock(SqsClient::class);

$this->account = '1234567891011';
$this->queueName = 'emails';
$this->baseUrl = 'https://sqs.someregion.amazonaws.com';
$this->mockedSqsClient = m::mock(SqsClient::class);

// This is how the modified getQueue builds the queueUrl
$this->prefix = $this->baseUrl.'/'.$this->account.'/';
$this->queueUrl = $this->prefix.$this->queueName;
$this->sqsQueue = new SqsQueue($this->mockedSqsClient, self::QUEUE_NAME, self::URL);
$this->sqsQueue->setContainer(m::mock(Container::class));

$this->mockedJob = 'foo';
$this->mockedData = ['data'];
$this->mockedPayload = json_encode(['job' => $this->mockedJob, 'data' => $this->mockedData]);
$this->mockedPayload = json_encode([
'displayName' => $this->mockedJob,
'job' => $this->mockedJob,
'maxTries' => null,
'timeout' => null,
'data' => $this->mockedData,
]);

$this->mockedDelay = 10;
$this->mockedMessageId = 'e3cd03ee-59a3-4ad8-b0aa-ee2e3808ac81';
$this->mockedReceiptHandle = '0NNAq8PwvXuWv5gMtS9DJ8qEdyiUwbAjpp45w2m6M4SJ1Y+PxCh7R930NRB8ylSacEmoSnW18bgd4nK\/O6ctE+VFVul4eD23mA07vVoSnPI4F\/voI1eNCp6Iax0ktGmhlNVzBwaZHEr91BRtqTRM3QKd2ASF8u+IQaSwyl\/DGK+P1+dqUOodvOVtExJwdyDLy1glZVgm85Yw9Jf5yZEEErqRwzYz\/qSigdvW4sm2l7e4phRol\/+IjMtovOyH\/ukueYdlVbQ4OshQLENhUKe7RNN5i6bE\/e5x9bnPhfj2gbM';

$this->mockedSendMessageResponseModel = new Result([
'Body' => $this->mockedPayload,
'MD5OfBody' => md5($this->mockedPayload),
'Body' => $this->mockedPayload,
'MD5OfBody' => md5($this->mockedPayload),
'ReceiptHandle' => $this->mockedReceiptHandle,
'MessageId' => $this->mockedMessageId,
'Attributes' => ['ApproximateReceiveCount' => 1],
'MessageId' => $this->mockedMessageId,
'Attributes' => ['ApproximateReceiveCount' => 1],
]);

$this->mockedReceiveMessageResponseModel = new Result([
'Messages' => [
0 => [
'Body' => $this->mockedPayload,
'MD5OfBody' => md5($this->mockedPayload),
'Body' => $this->mockedPayload,
'MD5OfBody' => md5($this->mockedPayload),
'ReceiptHandle' => $this->mockedReceiptHandle,
'MessageId' => $this->mockedMessageId,
'MessageId' => $this->mockedMessageId,
],
],
]);
Expand All @@ -70,79 +91,71 @@ public function setUp()

public function testPopProperlyPopsJobOffOfSqs()
{
$queue = $this->getMockBuilder(SqsQueue::class)->setMethods(['getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock();
$queue->setContainer(m::mock(Container::class));
$queue->expects($this->once())->method('getQueue')->with($this->queueName)->will($this->returnValue($this->queueUrl));
$this->sqs->shouldReceive('receiveMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'AttributeNames' => ['ApproximateReceiveCount']])->andReturn($this->mockedReceiveMessageResponseModel);
$result = $queue->pop($this->queueName);
$this->mockedSqsClient->shouldReceive('receiveMessage')->once()->with([
'QueueUrl' => self::URL,
'AttributeNames' => ['ApproximateReceiveCount'],
])->andReturn($this->mockedReceiveMessageResponseModel);

$result = $this->sqsQueue->pop(self::QUEUE_NAME);
$this->assertInstanceOf(SqsJob::class, $result);
}

public function testPopProperlyHandlesEmptyMessage()
{
$queue = $this->getMockBuilder(SqsQueue::class)->setMethods(['getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock();
$queue->setContainer(m::mock(Container::class));
$queue->expects($this->once())->method('getQueue')->with($this->queueName)->will($this->returnValue($this->queueUrl));
$this->sqs->shouldReceive('receiveMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'AttributeNames' => ['ApproximateReceiveCount']])->andReturn($this->mockedReceiveEmptyMessageResponseModel);
$result = $queue->pop($this->queueName);
$this->mockedSqsClient->shouldReceive('receiveMessage')->once()->with([
'QueueUrl' => self::URL,
'AttributeNames' => ['ApproximateReceiveCount'],
])->andReturn($this->mockedReceiveEmptyMessageResponseModel);

$result = $this->sqsQueue->pop(self::QUEUE_NAME);
$this->assertNull($result);
}

public function testDelayedPushWithDateTimeProperlyPushesJobOntoSqs()
{
$now = Carbon::now();
$queue = $this->getMockBuilder(SqsQueue::class)->setMethods(['createPayload', 'secondsUntil', 'getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock();
$queue->expects($this->once())->method('createPayload')->with($this->mockedJob, $this->queueName, $this->mockedData)->will($this->returnValue($this->mockedPayload));
$queue->expects($this->once())->method('secondsUntil')->with($now)->will($this->returnValue(5));
$queue->expects($this->once())->method('getQueue')->with($this->queueName)->will($this->returnValue($this->queueUrl));
$this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload, 'DelaySeconds' => 5])->andReturn($this->mockedSendMessageResponseModel);
$id = $queue->later($now->addSeconds(5), $this->mockedJob, $this->mockedData, $this->queueName);

$this->mockedSqsClient->shouldReceive('sendMessage')->once()->with([
'QueueUrl' => self::URL,
'MessageBody' => $this->mockedPayload,
'DelaySeconds' => 5,
])->andReturn($this->mockedSendMessageResponseModel);

$id = $this->sqsQueue->later($now->addSeconds(5), $this->mockedJob, $this->mockedData, self::QUEUE_NAME);
$this->assertEquals($this->mockedMessageId, $id);
}

public function testDelayedPushProperlyPushesJobOntoSqs()
{
$queue = $this->getMockBuilder(SqsQueue::class)->setMethods(['createPayload', 'secondsUntil', 'getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock();
$queue->expects($this->once())->method('createPayload')->with($this->mockedJob, $this->queueName, $this->mockedData)->will($this->returnValue($this->mockedPayload));
$queue->expects($this->once())->method('secondsUntil')->with($this->mockedDelay)->will($this->returnValue($this->mockedDelay));
$queue->expects($this->once())->method('getQueue')->with($this->queueName)->will($this->returnValue($this->queueUrl));
$this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload, 'DelaySeconds' => $this->mockedDelay])->andReturn($this->mockedSendMessageResponseModel);
$id = $queue->later($this->mockedDelay, $this->mockedJob, $this->mockedData, $this->queueName);
$this->mockedSqsClient->shouldReceive('sendMessage')->once()->with([
'QueueUrl' => self::URL,
'MessageBody' => $this->mockedPayload,
'DelaySeconds' => $this->mockedDelay,
])->andReturn($this->mockedSendMessageResponseModel);

$id = $this->sqsQueue->later($this->mockedDelay, $this->mockedJob, $this->mockedData, self::QUEUE_NAME);
$this->assertEquals($this->mockedMessageId, $id);
}

public function testPushProperlyPushesJobOntoSqs()
{
$queue = $this->getMockBuilder(SqsQueue::class)->setMethods(['createPayload', 'getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock();
$queue->expects($this->once())->method('createPayload')->with($this->mockedJob, $this->queueName, $this->mockedData)->will($this->returnValue($this->mockedPayload));
$queue->expects($this->once())->method('getQueue')->with($this->queueName)->will($this->returnValue($this->queueUrl));
$this->sqs->shouldReceive('sendMessage')->once()->with(['QueueUrl' => $this->queueUrl, 'MessageBody' => $this->mockedPayload])->andReturn($this->mockedSendMessageResponseModel);
$id = $queue->push($this->mockedJob, $this->mockedData, $this->queueName);
$this->mockedSqsClient->shouldReceive('sendMessage')->once()->with([
'QueueUrl' => self::URL,
'MessageBody' => $this->mockedPayload,
])->andReturn($this->mockedSendMessageResponseModel);

$id = $this->sqsQueue->push($this->mockedJob, $this->mockedData, self::QUEUE_NAME);
$this->assertEquals($this->mockedMessageId, $id);
}

public function testSizeProperlyReadsSqsQueueSize()
{
$queue = $this->getMockBuilder(SqsQueue::class)->setMethods(['getQueue'])->setConstructorArgs([$this->sqs, $this->queueName, $this->account])->getMock();
$queue->expects($this->once())->method('getQueue')->with($this->queueName)->will($this->returnValue($this->queueUrl));
$this->sqs->shouldReceive('getQueueAttributes')->once()->with(['QueueUrl' => $this->queueUrl, 'AttributeNames' => ['ApproximateNumberOfMessages']])->andReturn($this->mockedQueueAttributesResponseModel);
$size = $queue->size($this->queueName);
$this->assertEquals($size, 1);
}

public function testGetQueueProperlyResolvesUrlWithPrefix()
{
$queue = new SqsQueue($this->sqs, $this->queueName, $this->prefix);
$this->assertEquals($this->queueUrl, $queue->getQueue(null));
$queueUrl = $this->baseUrl.'/'.$this->account.'/test';
$this->assertEquals($queueUrl, $queue->getQueue('test'));
}
$this->mockedSqsClient->shouldReceive('getQueueAttributes')->once()->with([
'QueueUrl' => self::URL,
'AttributeNames' => ['ApproximateNumberOfMessages'],
])->andReturn($this->mockedQueueAttributesResponseModel);

public function testGetQueueProperlyResolvesUrlWithoutPrefix()
{
$queue = new SqsQueue($this->sqs, $this->queueUrl);
$this->assertEquals($this->queueUrl, $queue->getQueue(null));
$queueUrl = $this->baseUrl.'/'.$this->account.'/test';
$this->assertEquals($queueUrl, $queue->getQueue($queueUrl));
$size = $this->sqsQueue->size(self::QUEUE_NAME);
$this->assertEquals($size, 1);
}
}