Skip to content

PHP 7.1+. Queue Interop typed interfaces. #512

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Aug 23, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions bin/pre-commit
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,18 @@ function runPhpLint()
{
global $phpBin, $projectRootDir;

$filesWithErrors = array();
$output = [];
foreach (getFilesToFix() as $file) {
$output = '';
$commandOutput = null;
$returnCode = null;
exec(sprintf('%s -l %s 2>/dev/null', $phpBin, $projectRootDir.'/'.$file), $output, $returnCode);
exec(sprintf('%s -l %s', $phpBin, $projectRootDir.'/'.$file), $commandOutput, $returnCode);

if ($returnCode) {
$filesWithErrors[] = $file;
$output[] = $commandOutput;
}
}

return $filesWithErrors;
return $output;
}

function runPhpCsFixer()
Expand Down Expand Up @@ -145,9 +145,10 @@ echo sprintf('Found %s staged files', count(getFilesToFix())).PHP_EOL;
$phpSyntaxErrors = runPhpLint();
if ($phpSyntaxErrors) {
echo "Php syntax errors were found in next files:" . PHP_EOL;

foreach ($phpSyntaxErrors as $error) {
echo $error . PHP_EOL;
foreach ($phpSyntaxErrors as $phpSyntaxErrors) {
echo array_walk_recursive($phpSyntaxErrors, function($item) {
Copy link

@ronfroy ronfroy Aug 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implode(PHP_EOL, $phpSyntaxErrors) . PHP_EOL . PHP_EOL ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is array of arrays,

echo $item.PHP_EOL;
}) . PHP_EOL;
}

exit(1);
Expand Down
6 changes: 3 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
"ext-mongodb": "^1.3",
"ext-rdkafka": "^3.0.3",

"queue-interop/amqp-interop": "^0.7.4",
"queue-interop/queue-interop": "^0.6.2",
"queue-interop/amqp-interop": "0.8.x-dev",
"queue-interop/queue-interop": "0.7.x-dev",
"bunny/bunny": "^0.2.4|^0.3|^0.4",
"php-amqplib/php-amqplib": "^2.7",
"doctrine/dbal": "~2.5",
Expand All @@ -33,7 +33,7 @@
"require-dev": {
"phpunit/phpunit": "^5.5",
"phpstan/phpstan": "^0.10",
"queue-interop/queue-spec": "^0.5.9@dev",
"queue-interop/queue-spec": "0.6.x-dev",
"symfony/browser-kit": "4.0.*",
"symfony/config": "4.0.*",
"symfony/process": "4.0.*",
Expand Down
25 changes: 4 additions & 21 deletions pkg/amqp-bunny/AmqpConnectionFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use Enqueue\AmqpTools\DelayStrategyAware;
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
use Interop\Amqp\AmqpConnectionFactory as InteropAmqpConnectionFactory;
use Interop\Queue\PsrContext;

class AmqpConnectionFactory implements InteropAmqpConnectionFactory, DelayStrategyAware
{
Expand All @@ -24,9 +25,6 @@ class AmqpConnectionFactory implements InteropAmqpConnectionFactory, DelayStrate
/**
* @see \Enqueue\AmqpTools\ConnectionConfig for possible config formats and values
*
* In addition this factory accepts next options:
* receive_method - Could be either basic_get or basic_consume
*
* @param array|string|null $config
*/
public function __construct($config = 'amqp:')
Expand All @@ -37,21 +35,12 @@ public function __construct($config = 'amqp:')
->addDefaultOption('tcp_nodelay', null)
->parse()
;

$supportedMethods = ['basic_get', 'basic_consume'];
if (false == in_array($this->config->getOption('receive_method'), $supportedMethods, true)) {
throw new \LogicException(sprintf(
'Invalid "receive_method" option value "%s". It could be only "%s"',
$this->config->getOption('receive_method'),
implode('", "', $supportedMethods)
));
}
}

/**
* @return AmqpContext
*/
public function createContext()
public function createContext(): PsrContext
{
if ($this->config->isLazy()) {
$context = new AmqpContext(function () {
Expand All @@ -72,18 +61,12 @@ public function createContext()
return $context;
}

/**
* @return ConnectionConfig
*/
public function getConfig()
public function getConfig(): ConnectionConfig
{
return $this->config;
}

/**
* @return BunnyClient
*/
private function establishConnection()
private function establishConnection(): BunnyClient
{
if ($this->config->isSslOn()) {
throw new \LogicException('The bunny library does not support SSL connections');
Expand Down
147 changes: 26 additions & 121 deletions pkg/amqp-bunny/AmqpConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use Interop\Amqp\AmqpQueue as InteropAmqpQueue;
use Interop\Queue\InvalidMessageException;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrQueue;

class AmqpConsumer implements InteropAmqpConsumer
{
Expand All @@ -27,16 +28,6 @@ class AmqpConsumer implements InteropAmqpConsumer
*/
private $queue;

/**
* @var Buffer
*/
private $buffer;

/**
* @var string
*/
private $receiveMethod;

/**
* @var int
*/
Expand All @@ -47,108 +38,86 @@ class AmqpConsumer implements InteropAmqpConsumer
*/
private $consumerTag;

/**
* @param AmqpContext $context
* @param InteropAmqpQueue $queue
* @param Buffer $buffer
* @param string $receiveMethod
*/
public function __construct(AmqpContext $context, InteropAmqpQueue $queue, Buffer $buffer, $receiveMethod)
public function __construct(AmqpContext $context, InteropAmqpQueue $queue)
{
$this->context = $context;
$this->channel = $context->getBunnyChannel();
$this->queue = $queue;
$this->buffer = $buffer;
$this->receiveMethod = $receiveMethod;
$this->flags = self::FLAG_NOPARAM;
}

/**
* {@inheritdoc}
*/
public function setConsumerTag($consumerTag)
public function setConsumerTag(string $consumerTag = null): void
{
$this->consumerTag = $consumerTag;
}

/**
* {@inheritdoc}
*/
public function getConsumerTag()
public function getConsumerTag(): ?string
{
return $this->consumerTag;
}

/**
* {@inheritdoc}
*/
public function clearFlags()
public function clearFlags(): void
{
$this->flags = self::FLAG_NOPARAM;
}

/**
* {@inheritdoc}
*/
public function addFlag($flag)
public function addFlag(int $flag): void
{
$this->flags |= $flag;
}

/**
* {@inheritdoc}
*/
public function getFlags()
public function getFlags(): int
{
return $this->flags;
}

/**
* {@inheritdoc}
*/
public function setFlags($flags)
public function setFlags(int $flags): void
{
$this->flags = $flags;
}

/**
* {@inheritdoc}
* @return InteropAmqpQueue
*/
public function getQueue()
public function getQueue(): PsrQueue
{
return $this->queue;
}

/**
* {@inheritdoc}
* @return InteropAmqpMessage
*/
public function receive($timeout = 0)
public function receive(int $timeout = 0): ?PsrMessage
{
if ('basic_get' == $this->receiveMethod) {
return $this->receiveBasicGet($timeout);
}
$end = microtime(true) + ($timeout / 1000);

while (0 === $timeout || microtime(true) < $end) {
if ($message = $this->receiveNoWait()) {
return $message;
}

if ('basic_consume' == $this->receiveMethod) {
return $this->receiveBasicConsume($timeout);
usleep(100000); //100ms
}

throw new \LogicException('The "receiveMethod" is not supported');
return null;
}

/**
* {@inheritdoc}
* @return InteropAmqpMessage
*/
public function receiveNoWait()
public function receiveNoWait(): ?PsrMessage
{
if ($message = $this->channel->get($this->queue->getQueueName(), (bool) ($this->getFlags() & InteropAmqpConsumer::FLAG_NOACK))) {
return $this->context->convertMessage($message);
}

return null;
}

/**
* @param InteropAmqpMessage $message
*/
public function acknowledge(PsrMessage $message)
public function acknowledge(PsrMessage $message): void
{
InvalidMessageException::assertMessageInstanceOf($message, InteropAmqpMessage::class);

Expand All @@ -158,76 +127,12 @@ public function acknowledge(PsrMessage $message)

/**
* @param InteropAmqpMessage $message
* @param bool $requeue
*/
public function reject(PsrMessage $message, $requeue = false)
public function reject(PsrMessage $message, bool $requeue = false): void
{
InvalidMessageException::assertMessageInstanceOf($message, InteropAmqpMessage::class);

$bunnyMessage = new Message('', $message->getDeliveryTag(), '', '', '', [], '');
$this->channel->reject($bunnyMessage, $requeue);
}

/**
* @param int $timeout
*
* @return InteropAmqpMessage|null
*/
private function receiveBasicGet($timeout)
{
$end = microtime(true) + ($timeout / 1000);

while (0 === $timeout || microtime(true) < $end) {
if ($message = $this->receiveNoWait()) {
return $message;
}

usleep(100000); //100ms
}
}

/**
* @param int $timeout
*
* @return InteropAmqpMessage|null
*/
private function receiveBasicConsume($timeout)
{
if (false == $this->consumerTag) {
$this->context->subscribe($this, function (InteropAmqpMessage $message) {
$this->buffer->push($message->getConsumerTag(), $message);

return false;
});
}

if ($message = $this->buffer->pop($this->consumerTag)) {
return $message;
}

while (true) {
$start = microtime(true);

$this->context->consume($timeout);

if ($message = $this->buffer->pop($this->consumerTag)) {
return $message;
}

// is here when consumed message is not for this consumer

// as timeout is infinite have to continue consumption, but it can overflow message buffer
if ($timeout <= 0) {
continue;
}

// compute remaining timeout and continue until time is up
$stop = microtime(true);
$timeout -= ($stop - $start) * 1000;

if ($timeout <= 0) {
break;
}
}
}
}
Loading