Skip to content

PHP 7.1+. Queue Interop typed interfaces. #512

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Aug 23, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[sqs] Migrate SQS transport.
  • Loading branch information
makasim committed Aug 21, 2018
commit 6a521b8f84c13d753bf74842b6fe80778a672e90
29 changes: 5 additions & 24 deletions pkg/sqs/SqsConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Aws\Sqs\SqsClient;
use Interop\Queue\PsrConnectionFactory;
use Interop\Queue\PsrContext;

class SqsConnectionFactory implements PsrConnectionFactory
{
Expand Down Expand Up @@ -62,11 +63,9 @@ public function __construct($config = 'sqs:')
}

/**
* {@inheritdoc}
*
* @return SqsContext
*/
public function createContext()
public function createContext(): PsrContext
{
if ($this->config['lazy']) {
return new SqsContext(function () {
Expand All @@ -77,17 +76,7 @@ public function createContext()
return new SqsContext($this->establishConnection());
}

/**
* {@inheritdoc}
*/
public function close()
{
}

/**
* @return SqsClient
*/
private function establishConnection()
private function establishConnection(): SqsClient
{
if ($this->client) {
return $this->client;
Expand Down Expand Up @@ -119,12 +108,7 @@ private function establishConnection()
return $this->client;
}

/**
* @param string $dsn
*
* @return array
*/
private function parseDsn($dsn)
private function parseDsn(string $dsn): array
{
if (false === strpos($dsn, 'sqs:')) {
throw new \LogicException(sprintf('The given DSN "%s" is not supported. Must start with "sqs:".', $dsn));
Expand All @@ -148,10 +132,7 @@ private function parseDsn($dsn)
return $config;
}

/**
* @return array
*/
private function defaultConfig()
private function defaultConfig(): array
{
return [
'key' => null,
Expand Down
63 changes: 18 additions & 45 deletions pkg/sqs/SqsConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Interop\Queue\InvalidMessageException;
use Interop\Queue\PsrConsumer;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrQueue;

class SqsConsumer implements PsrConsumer
{
Expand Down Expand Up @@ -33,10 +34,6 @@ class SqsConsumer implements PsrConsumer
*/
private $messages;

/**
* @param SqsContext $context
* @param SqsDestination $queue
*/
public function __construct(SqsContext $context, SqsDestination $queue)
{
$this->context = $context;
Expand All @@ -45,58 +42,46 @@ public function __construct(SqsContext $context, SqsDestination $queue)
$this->maxNumberOfMessages = 1;
}

/**
* @return int|null
*/
public function getVisibilityTimeout()
public function getVisibilityTimeout(): ?int
{
return $this->visibilityTimeout;
}

/**
* The duration (in seconds) that the received messages are hidden from subsequent retrieve
* requests after being retrieved by a ReceiveMessage request.
*
* @param int|null $visibilityTimeout
*/
public function setVisibilityTimeout($visibilityTimeout)
public function setVisibilityTimeout(int $visibilityTimeout = null): void
{
$this->visibilityTimeout = null === $visibilityTimeout ? null : (int) $visibilityTimeout;
$this->visibilityTimeout = $visibilityTimeout;
}

/**
* @return int
*/
public function getMaxNumberOfMessages()
public function getMaxNumberOfMessages(): int
{
return $this->maxNumberOfMessages;
}

/**
* The maximum number of messages to return. Amazon SQS never returns more messages than this value
* (however, fewer messages might be returned). Valid values are 1 to 10. Default is 1.
*
* @param int $maxNumberOfMessages
*/
public function setMaxNumberOfMessages($maxNumberOfMessages)
public function setMaxNumberOfMessages(int $maxNumberOfMessages): void
{
$this->maxNumberOfMessages = (int) $maxNumberOfMessages;
$this->maxNumberOfMessages = $maxNumberOfMessages;
}

/**
* {@inheritdoc}
*
* @return SqsDestination
*/
public function getQueue()
public function getQueue(): PsrQueue
{
return $this->queue;
}

/**
* {@inheritdoc}
* @return SqsMessage
*/
public function receive($timeout = 0)
public function receive(int $timeout = 0): ?PsrMessage
{
$maxLongPollingTime = 20; // 20 is max allowed long polling value

Expand All @@ -118,19 +103,17 @@ public function receive($timeout = 0)
}

/**
* {@inheritdoc}
* @return SqsMessage
*/
public function receiveNoWait()
public function receiveNoWait(): ?PsrMessage
{
return $this->receiveMessage(0);
}

/**
* {@inheritdoc}
*
* @param SqsMessage $message
*/
public function acknowledge(PsrMessage $message)
public function acknowledge(PsrMessage $message): void
{
InvalidMessageException::assertMessageInstanceOf($message, SqsMessage::class);

Expand All @@ -141,11 +124,9 @@ public function acknowledge(PsrMessage $message)
}

/**
* {@inheritdoc}
*
* @param SqsMessage $message
*/
public function reject(PsrMessage $message, $requeue = false)
public function reject(PsrMessage $message, bool $requeue = false): void
{
InvalidMessageException::assertMessageInstanceOf($message, SqsMessage::class);

Expand All @@ -159,12 +140,7 @@ public function reject(PsrMessage $message, $requeue = false)
}
}

/**
* @param int $timeoutSeconds
*
* @return SqsMessage|null
*/
protected function receiveMessage($timeoutSeconds)
protected function receiveMessage(int $timeoutSeconds): ?SqsMessage
{
if ($message = array_pop($this->messages)) {
return $this->convertMessage($message);
Expand All @@ -191,14 +167,11 @@ protected function receiveMessage($timeoutSeconds)
if ($message = array_pop($this->messages)) {
return $this->convertMessage($message);
}

return null;
}

/**
* @param array $sqsMessage
*
* @return SqsMessage
*/
protected function convertMessage(array $sqsMessage)
protected function convertMessage(array $sqsMessage): SqsMessage
{
$message = $this->context->createMessage();

Expand Down
85 changes: 35 additions & 50 deletions pkg/sqs/SqsContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,16 @@

use Aws\Sqs\SqsClient;
use Interop\Queue\InvalidDestinationException;
use Interop\Queue\PsrConsumer;
use Interop\Queue\PsrContext;
use Interop\Queue\PsrDestination;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrProducer;
use Interop\Queue\PsrQueue;
use Interop\Queue\PsrSubscriptionConsumer;
use Interop\Queue\PsrTopic;
use Interop\Queue\SubscriptionConsumerNotSupportedException;
use Interop\Queue\TemporaryQueueNotSupportedException;

class SqsContext implements PsrContext
{
Expand Down Expand Up @@ -45,78 +53,76 @@ public function __construct($client)
}

/**
* {@inheritdoc}
*
* @return SqsMessage
*/
public function createMessage($body = '', array $properties = [], array $headers = [])
public function createMessage(string $body = '', array $properties = [], array $headers = []): PsrMessage
{
return new SqsMessage($body, $properties, $headers);
}

/**
* {@inheritdoc}
*
* @return SqsDestination
*/
public function createTopic($topicName)
public function createTopic(string $topicName): PsrTopic
{
return new SqsDestination($topicName);
}

/**
* {@inheritdoc}
*
* @return SqsDestination
*/
public function createQueue($queueName)
public function createQueue(string $queueName): PsrQueue
{
return new SqsDestination($queueName);
}

/**
* {@inheritdoc}
*/
public function createTemporaryQueue()
public function createTemporaryQueue(): PsrQueue
{
throw new \BadMethodCallException('SQS transport does not support temporary queues');
throw TemporaryQueueNotSupportedException::providerDoestNotSupportIt();
}

/**
* {@inheritdoc}
*
* @return SqsProducer
*/
public function createProducer()
public function createProducer(): PsrProducer
{
return new SqsProducer($this);
}

/**
* {@inheritdoc}
*
* @param SqsDestination $destination
*
* @return SqsConsumer
*/
public function createConsumer(PsrDestination $destination)
public function createConsumer(PsrDestination $destination): PsrConsumer
{
InvalidDestinationException::assertDestinationInstanceOf($destination, SqsDestination::class);

return new SqsConsumer($this, $destination);
}

/**
* {@inheritdoc}
*/
public function close()
public function close(): void
{
}

/**
* @return SqsClient
* @param SqsDestination $queue
*/
public function getClient()
public function purgeQueue(PsrQueue $queue): void
{
InvalidDestinationException::assertDestinationInstanceOf($queue, SqsDestination::class);

$this->getClient()->purgeQueue([
'QueueUrl' => $this->getQueueUrl($queue),
]);
}

public function createSubscriptionConsumer(): PsrSubscriptionConsumer
{
throw SubscriptionConsumerNotSupportedException::providerDoestNotSupportIt();
}

public function getClient(): SqsClient
{
if (false == $this->client) {
$client = call_user_func($this->clientFactory);
Expand All @@ -134,12 +140,7 @@ public function getClient()
return $this->client;
}

/**
* @param SqsDestination $destination
*
* @return string
*/
public function getQueueUrl(SqsDestination $destination)
public function getQueueUrl(SqsDestination $destination): string
{
if (isset($this->queueUrls[$destination->getQueueName()])) {
return $this->queueUrls[$destination->getQueueName()];
Expand All @@ -156,10 +157,7 @@ public function getQueueUrl(SqsDestination $destination)
return $this->queueUrls[$destination->getQueueName()] = $result->get('QueueUrl');
}

/**
* @param SqsDestination $dest
*/
public function declareQueue(SqsDestination $dest)
public function declareQueue(SqsDestination $dest): void
{
$result = $this->getClient()->createQueue([
'Attributes' => $dest->getAttributes(),
Expand All @@ -173,25 +171,12 @@ public function declareQueue(SqsDestination $dest)
$this->queueUrls[$dest->getQueueName()] = $result->get('QueueUrl');
}

/**
* @param SqsDestination $dest
*/
public function deleteQueue(SqsDestination $dest)
public function deleteQueue(SqsDestination $dest): void
{
$this->getClient()->deleteQueue([
'QueueUrl' => $this->getQueueUrl($dest),
]);

unset($this->queueUrls[$dest->getQueueName()]);
}

/**
* @param SqsDestination $destination
*/
public function purgeQueue(SqsDestination $destination)
{
$this->getClient()->purgeQueue([
'QueueUrl' => $this->getQueueUrl($destination),
]);
}
}
Loading