Skip to content

Commit

Permalink
Support for bulk consumer (#45)
Browse files Browse the repository at this point in the history
* Support for bulk consumer

* Updated README.md

* Added test for base consumer

* Fixed styles in README.md

Co-authored-by: Petr Kučera <kontakt@kucerapetr.cz>
  • Loading branch information
Radovan Kepák and kucix authored May 20, 2021
1 parent ae30026 commit ab8a81a
Show file tree
Hide file tree
Showing 11 changed files with 787 additions and 44 deletions.
44 changes: 44 additions & 0 deletions .docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,50 @@ final class TestConsumer implements IConsumer
}
```

### Consuming messages in bulk

Sometimes, you want to consume more messages at once, for this purpose, there is BulkConsumer.

TestBulkConsumer.php

```php
<?php

declare(strict_types=1);

use Bunny\Message;
use Contributte\RabbitMQ\Consumer\IConsumer;

final class TestConsumer
{

/**
* @param Message[] $messages
* @return array(delivery_tag => MESSAGE_STATUS)
*/
public function consume(array $messages): array
{
$return = [];
$data = [];
foreach($messages as $message) {
$data[$message->deliveryTag] = json_decode($message->content);
}

/**
* @todo bulk message action
*/

foreach(array_keys($data) as $tag) {
$return[$tag] = IConsumer::MESSAGE_ACK; // Or ::MESSAGE_NACK || ::MESSAGE_REJECT
}

return $return;
}

}
```


### Running a consumer trough CLI

There are two consumer commands prepared. `rabbitmq:consumer` wiil consume messages for specified amount of time (in
Expand Down
2 changes: 1 addition & 1 deletion src/Console/Command/AbstractConsumerCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protected function validateConsumerName(string $consumerName): void
throw new \InvalidArgumentException(
sprintf(
"Consumer [$consumerName] does not exist. \n\n Available consumers: %s",
implode('', array_map(function($s): string {
implode('', array_map(static function($s): string {
return "\n\t- [{$s}]";
}, $this->consumersDataBag->getDataKeys()))
)
Expand Down
152 changes: 152 additions & 0 deletions src/Consumer/BulkConsumer.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
<?php

declare(strict_types=1);

namespace Contributte\RabbitMQ\Consumer;

use Bunny\AbstractClient;
use Bunny\Channel;
use Bunny\Client;
use Bunny\Message;
use Contributte\RabbitMQ\Consumer\Exception\UnexpectedConsumerResultTypeException;
use Contributte\RabbitMQ\Queue\IQueue;

class BulkConsumer extends Consumer
{
/**
* @var BulkMessage[]
*/
protected array $buffer = [];
protected int $bulkSize;
protected int $bulkTime;
protected ?int $stopTime = null;

public function __construct(
string $name,
IQueue $queue,
callable $callback,
?int $prefetchSize,
?int $prefetchCount,
int $bulkSize,
int $bulkTime
) {
parent::__construct($name, $queue, $callback, $prefetchSize, $prefetchCount);

if ($bulkSize > 0 && $bulkTime > 0) {
$this->bulkSize = $bulkSize;
$this->bulkTime = $bulkTime;
} else {
throw new \InvalidArgumentException("Configuration values bulkSize and bulkTime must have value greater than zero");
}
}

public function consume(?int $maxSeconds = null, ?int $maxMessages = null): void
{
$this->maxMessages = $maxMessages;
if ($maxSeconds > 0) {
$this->stopTime = time() + $maxSeconds;
}

$channel = $this->queue->getConnection()->getChannel();

if ($this->prefetchSize !== null || $this->prefetchCount !== null) {
$channel->qos($this->prefetchSize ?? 0, $this->prefetchCount ?? 0);
}

$this->setupConsume($channel);
$this->startConsumeLoop($channel);

//process rest of items
$this->processBuffer($channel->getClient());
}

private function setupConsume(Channel $channel): void
{
$channel->consume(
function (Message $message, Channel $channel, Client $client): void {
$this->messages++;
$bulkCount = $this->addToBuffer(new BulkMessage($message, $channel));
if ($bulkCount >= $this->bulkSize || $this->isMaxMessages() || $this->isStopTime()) {
$client->stop();
}
},
$this->queue->getName()
);
}

private function startConsumeLoop(Channel $channel): void
{
do {
$channel->getClient()->run($this->getTtl());
$this->processBuffer($channel->getClient());
} while (!$this->isStopTime() && !$this->isMaxMessages());
}

private function addToBuffer(BulkMessage $message): int
{
$this->buffer[] = $message;

return count($this->buffer);
}

private function processBuffer(AbstractClient $client): void
{
if (count($this->buffer) === 0) {
return;
}

$messages = [];
foreach ($this->buffer as $bulkMessage) {
$message = $bulkMessage->getMessage();
$messages[$message->deliveryTag] = $message;
}

try {
$result = call_user_func($this->callback, $messages);
} catch (\Throwable $e) {
$result = array_map(static fn () => IConsumer::MESSAGE_NACK, $messages);
}

if (!is_array($result)) {
$result = array_map(static fn () => IConsumer::MESSAGE_NACK, $messages);
$this->sendResultsBack($client, $result);

throw new UnexpectedConsumerResultTypeException(
'Unexpected result from consumer. Expected array(delivery_tag => MESSAGE_STATUS [constant from IConsumer]) but get ' . gettype($result)
);
}
$result = array_map('intval', $result);

$this->sendResultsBack($client, $result);

$this->buffer = [];
}

private function sendResultsBack(AbstractClient $client, array $result): void
{
if ($client instanceof Client) {
foreach ($this->buffer as $bulkMessage) {
$this->sendResponse(
$bulkMessage->getMessage(),
$bulkMessage->getChannel(),
$result[$bulkMessage->getMessage()->deliveryTag] ?? IConsumer::MESSAGE_NACK,
$client
);
}
}
}

private function isStopTime(): bool
{
return $this->stopTime !== null && $this->stopTime < time();
}

private function getTtl(): int
{
if ($this->stopTime > 0) {
return min($this->bulkTime, $this->stopTime - time());
}

return $this->bulkTime;
}
}
30 changes: 30 additions & 0 deletions src/Consumer/BulkMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?php

declare(strict_types=1);

namespace Contributte\RabbitMQ\Consumer;

use Bunny\Channel;
use Bunny\Message;

class BulkMessage
{
private Message $message;
private Channel $channel;

public function __construct(
Message $message,
Channel $channel
) {
$this->message = $message;
$this->channel = $channel;
}

public function getMessage(): Message {
return $this->message;
}

public function getChannel(): Channel {
return $this->channel;
}
}
100 changes: 57 additions & 43 deletions src/Consumer/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@
use Bunny\Message;
use Contributte\RabbitMQ\Queue\IQueue;

final class Consumer
class Consumer
{

private string $name;
private IQueue $queue;
protected string $name;
protected IQueue $queue;

/**
* @var callable
*/
private $callback;
private int $messages = 0;
private ?int $prefetchSize = null;
private ?int $prefetchCount = null;
protected $callback;
protected int $messages = 0;
protected ?int $prefetchSize = null;
protected ?int $prefetchCount = null;
protected ?int $maxMessages = null;


public function __construct(
Expand Down Expand Up @@ -52,62 +53,75 @@ public function getCallback(): callable

public function consume(?int $maxSeconds = null, ?int $maxMessages = null): void
{
$this->maxMessages = $maxMessages;
$channel = $this->queue->getConnection()->getChannel();

if ($this->prefetchSize !== null || $this->prefetchCount !== null) {
$channel->qos($this->prefetchSize ?? 0, $this->prefetchCount ?? 0);
}

$channel->consume(
function (Message $message, Channel $channel, Client $client) use ($maxMessages): void {
function (Message $message, Channel $channel, Client $client): void {
$this->messages++;
$result = call_user_func($this->callback, $message);

switch ($result) {
case IConsumer::MESSAGE_ACK:
// Acknowledge message
$channel->ack($message);
$this->sendResponse($message, $channel, $result, $client);

break;
if ($this->isMaxMessages()) {
$client->stop();
}
},
$this->queue->getName()
);

case IConsumer::MESSAGE_NACK:
// Message will be requeued
$channel->nack($message);
$channel->getClient()->run($maxSeconds);
}

break;
protected function sendResponse(Message $message, Channel $channel, int $result, Client $client): void
{
switch ($result) {
case IConsumer::MESSAGE_ACK:
// Acknowledge message
$channel->ack($message);

case IConsumer::MESSAGE_REJECT:
// Message will be discarded
$channel->reject($message, false);
break;

break;
case IConsumer::MESSAGE_NACK:
// Message will be requeued
$channel->nack($message);

case IConsumer::MESSAGE_REJECT_AND_TERMINATE:
// Message will be discarded
$channel->reject($message, false);
$client->stop();
break;

break;
case IConsumer::MESSAGE_REJECT:
// Message will be discarded
$channel->reject($message, false);

case IConsumer::MESSAGE_ACK_AND_TERMINATE:
// Acknowledge message and terminate
$channel->ack($message);
$client->stop();
break;

break;
case IConsumer::MESSAGE_REJECT_AND_TERMINATE:
// Message will be discarded
$channel->reject($message, false);
$client->stop();

default:
throw new \InvalidArgumentException(
"Unknown return value of consumer [{$this->name}] user callback"
);
}
break;

if ($maxMessages !== null && ++$this->messages >= $maxMessages) {
$client->stop();
}
},
$this->queue->getName()
);
case IConsumer::MESSAGE_ACK_AND_TERMINATE:
// Acknowledge message and terminate
$channel->ack($message);
$client->stop();

$channel->getClient()->run($maxSeconds);
break;

default:
throw new \InvalidArgumentException(
"Unknown return value of consumer [{$this->name}] user callback"
);
}
}

protected function isMaxMessages(): bool
{
return $this->maxMessages !== null && $this->messages >= $this->maxMessages;
}

}
Loading

0 comments on commit ab8a81a

Please sign in to comment.