Skip to content

PHP 7.1+. Queue Interop typed interfaces. #512

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Aug 23, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Migrate gearman transport
  • Loading branch information
makasim committed Aug 22, 2018
commit 5c0923d15f96db46d0d77bccaa6a57522b1381b1
17 changes: 4 additions & 13 deletions pkg/gearman/GearmanConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Enqueue\Gearman;

use Interop\Queue\PsrConnectionFactory;
use Interop\Queue\PsrContext;

class GearmanConnectionFactory implements PsrConnectionFactory
{
Expand Down Expand Up @@ -40,21 +41,14 @@ public function __construct($config = 'gearman:')
}

/**
* {@inheritdoc}
*
* @return GearmanContext
*/
public function createContext()
public function createContext(): PsrContext
{
return new GearmanContext($this->config);
}

/**
* @param string $dsn
*
* @return array
*/
private function parseDsn($dsn)
private function parseDsn(string $dsn): array
{
$dsnConfig = parse_url($dsn);
if (false === $dsnConfig) {
Expand All @@ -81,10 +75,7 @@ private function parseDsn($dsn)
];
}

/**
* @return array
*/
private function defaultConfig()
private function defaultConfig(): array
{
return [
'host' => \GEARMAN_DEFAULT_TCP_HOST,
Expand Down
32 changes: 10 additions & 22 deletions pkg/gearman/GearmanConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Interop\Queue\PsrConsumer;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrQueue;

class GearmanConsumer implements PsrConsumer
{
Expand All @@ -22,12 +23,6 @@ class GearmanConsumer implements PsrConsumer
*/
private $context;

/**
* @param GearmanContext $context
* @param GearmanDestination $destination
*
* @internal param \GearmanWorker $worker
*/
public function __construct(GearmanContext $context, GearmanDestination $destination)
{
$this->context = $context;
Expand All @@ -37,21 +32,17 @@ public function __construct(GearmanContext $context, GearmanDestination $destina
}

/**
* {@inheritdoc}
*
* @return GearmanDestination
*/
public function getQueue()
public function getQueue(): PsrQueue
{
return $this->destination;
}

/**
* {@inheritdoc}
*
* @return GearmanMessage
*/
public function receive($timeout = 0)
public function receive(int $timeout = 0): ?PsrMessage
{
set_error_handler(function ($severity, $message, $file, $line) {
throw new \ErrorException($message, 0, $severity, $file, $line);
Expand All @@ -75,34 +66,31 @@ public function receive($timeout = 0)
}

/**
* {@inheritdoc}
* @return GearmanMessage
*/
public function receiveNoWait()
public function receiveNoWait(): ?PsrMessage
{
return $this->receive(100);
}

/**
* {@inheritdoc}
* @param GearmanMessage $message
*/
public function acknowledge(PsrMessage $message)
public function acknowledge(PsrMessage $message): void
{
}

/**
* {@inheritdoc}
* @param GearmanMessage $message
*/
public function reject(PsrMessage $message, $requeue = false)
public function reject(PsrMessage $message, bool $requeue = false): void
{
if ($requeue) {
$this->context->createProducer()->send($this->destination, $message);
}
}

/**
* @return \GearmanWorker
*/
public function getWorker()
public function getWorker(): \GearmanWorker
{
return $this->worker;
}
Expand Down
61 changes: 30 additions & 31 deletions pkg/gearman/GearmanContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,17 @@
namespace Enqueue\Gearman;

use Interop\Queue\InvalidDestinationException;
use Interop\Queue\PsrConsumer;
use Interop\Queue\PsrContext;
use Interop\Queue\PsrDestination;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrProducer;
use Interop\Queue\PsrQueue;
use Interop\Queue\PsrSubscriptionConsumer;
use Interop\Queue\PsrTopic;
use Interop\Queue\PurgeQueueNotSupportedException;
use Interop\Queue\SubscriptionConsumerNotSupportedException;
use Interop\Queue\TemporaryQueueNotSupportedException;

class GearmanContext implements PsrContext
{
Expand All @@ -23,68 +32,54 @@ class GearmanContext implements PsrContext
*/
private $config;

/**
* @param array $config
*/
public function __construct(array $config)
{
$this->config = $config;
}

/**
* {@inheritdoc}
*
* @return GearmanMessage
*/
public function createMessage($body = '', array $properties = [], array $headers = [])
public function createMessage(string $body = '', array $properties = [], array $headers = []): PsrMessage
{
return new GearmanMessage($body, $properties, $headers);
}

/**
* {@inheritdoc}
*
* @return GearmanDestination
*/
public function createTopic($topicName)
public function createTopic(string $topicName): PsrTopic
{
return new GearmanDestination($topicName);
}

/**
* {@inheritdoc}
* @return GearmanDestination
*/
public function createQueue($queueName)
public function createQueue(string $queueName): PsrQueue
{
return new GearmanDestination($queueName);
}

/**
* {@inheritdoc}
*/
public function createTemporaryQueue()
public function createTemporaryQueue(): PsrQueue
{
throw new \LogicException('Not implemented');
throw TemporaryQueueNotSupportedException::providerDoestNotSupportIt();
}

/**
* {@inheritdoc}
*
* @return GearmanProducer
*/
public function createProducer()
public function createProducer(): PsrProducer
{
return new GearmanProducer($this->getClient());
}

/**
* {@inheritdoc}
*
* @param GearmanDestination $destination
*
* @return GearmanConsumer
*/
public function createConsumer(PsrDestination $destination)
public function createConsumer(PsrDestination $destination): PsrConsumer
{
InvalidDestinationException::assertDestinationInstanceOf($destination, GearmanDestination::class);

Expand All @@ -93,7 +88,7 @@ public function createConsumer(PsrDestination $destination)
return $consumer;
}

public function close()
public function close(): void
{
$this->getClient()->clearCallbacks();

Expand All @@ -102,10 +97,17 @@ public function close()
}
}

/**
* @return \GearmanClient
*/
public function getClient()
public function createSubscriptionConsumer(): PsrSubscriptionConsumer
{
throw SubscriptionConsumerNotSupportedException::providerDoestNotSupportIt();
}

public function purgeQueue(PsrQueue $queue): void
{
throw PurgeQueueNotSupportedException::providerDoestNotSupportIt();
}

public function getClient(): \GearmanClient
{
if (false == $this->client) {
$this->client = new \GearmanClient();
Expand All @@ -115,10 +117,7 @@ public function getClient()
return $this->client;
}

/**
* @return \GearmanWorker
*/
public function createWorker()
public function createWorker(): \GearmanWorker
{
$worker = new \GearmanWorker();
$worker->addServer($this->config['host'], $this->config['port']);
Expand Down
24 changes: 6 additions & 18 deletions pkg/gearman/GearmanDestination.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,23 @@ class GearmanDestination implements PsrQueue, PsrTopic
*/
private $destinationName;

/**
* @param string $destinationName
*/
public function __construct($destinationName)
public function __construct(string $destinationName)
{
$this->destinationName = $destinationName;
}

/**
* @return string
*/
public function getName()
public function getName(): string
{
return $this->destinationName;
}

/**
* {@inheritdoc}
*/
public function getQueueName()
public function getQueueName(): string
{
return $this->getName();
return $this->destinationName;
}

/**
* {@inheritdoc}
*/
public function getTopicName()
public function getTopicName(): string
{
return $this->getName();
return $this->destinationName;
}
}
Loading