Skip to content

Commit

Permalink
Merge pull request #613 from php-enqueue/dbal-concurent-fetch
Browse files Browse the repository at this point in the history
[dbal] Use concurrent fetch message approach (no transaction, no pessimistic lock)
  • Loading branch information
makasim authored Nov 15, 2018
2 parents 2fd6cc2 + 2445ef2 commit 1f96246
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 139 deletions.
23 changes: 1 addition & 22 deletions pkg/dbal/DbalConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Enqueue\Dbal;

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Types\Type;
use Interop\Queue\Consumer;
use Interop\Queue\Exception\InvalidMessageException;
use Interop\Queue\Impl\ConsumerPollingTrait;
Expand Down Expand Up @@ -81,14 +80,7 @@ public function receiveNoWait(): ?Message
$this->removeExpiredMessages();
$this->redeliverMessages();

// get top message from the queue
if ($message = $this->fetchMessage([$this->queue->getQueueName()], $redeliveryDelay)) {
if ($message['redelivered'] || empty($message['time_to_live']) || $message['time_to_live'] > time()) {
return $this->getContext()->convertMessage($message);
}
}

return null;
return $this->fetchMessage([$this->queue->getQueueName()], $redeliveryDelay);
}

/**
Expand Down Expand Up @@ -129,17 +121,4 @@ protected function getConnection(): Connection
{
return $this->dbal;
}

private function deleteMessage(string $deliveryId): void
{
if (empty($deliveryId)) {
throw new \LogicException(sprintf('Expected record was removed but it is not. Delivery id: "%s"', $deliveryId));
}

$this->getConnection()->delete(
$this->getContext()->getTableName(),
['delivery_id' => $deliveryId],
['delivery_id' => Type::STRING]
);
}
}
163 changes: 102 additions & 61 deletions pkg/dbal/DbalConsumerHelperTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,96 +11,137 @@

trait DbalConsumerHelperTrait
{
private $redeliverMessagesLastExecutedAt;

private $removeExpiredMessagesLastExecutedAt;

abstract protected function getContext(): DbalContext;

abstract protected function getConnection(): Connection;

protected function fetchMessage(array $queues, int $redeliveryDelay): ?array
protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessage
{
if (empty($queues)) {
throw new \LogicException('Queues must not be empty.');
}

$now = time();
$deliveryId = (string) Uuid::uuid1();

$this->getConnection()->beginTransaction();

try {
$query = $this->getConnection()->createQueryBuilder()
->select('*')
->from($this->getContext()->getTableName())
->andWhere('delivery_id IS NULL')
->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil')
->andWhere('queue IN (:queues)')
->addOrderBy('priority', 'desc')
->addOrderBy('published_at', 'asc')
->setMaxResults(1);

// select for update
$message = $this->getConnection()->executeQuery(
$query->getSQL().' '.$this->getConnection()->getDatabasePlatform()->getWriteLockSQL(),
['delayedUntil' => $now, 'queues' => array_values($queues)],
['delayedUntil' => ParameterType::INTEGER, 'queues' => Connection::PARAM_STR_ARRAY]
)->fetch();

if (!$message) {
$this->getConnection()->commit();
$deliveryId = Uuid::uuid4();

$endAt = microtime(true) + 0.2; // add 200ms

$select = $this->getConnection()->createQueryBuilder()
->select('id')
->from($this->getContext()->getTableName())
->andWhere('queue IN (:queues)')
->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil')
->andWhere('delivery_id IS NULL')
->addOrderBy('priority', 'asc')
->addOrderBy('published_at', 'asc')
->setParameter('queues', $queues, Connection::PARAM_STR_ARRAY)
->setParameter('delayedUntil', $now, ParameterType::INTEGER)
->setMaxResults(1);

$update = $this->getConnection()->createQueryBuilder()
->update($this->getContext()->getTableName())
->set('delivery_id', ':deliveryId')
->set('redeliver_after', ':redeliverAfter')
->andWhere('id = :messageId')
->andWhere('delivery_id IS NULL')
->setParameter('deliveryId', $deliveryId->getBytes(), Type::GUID)
->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT)
;

while (microtime(true) < $endAt) {
$result = $select->execute()->fetch();
if (empty($result)) {
return null;
}

// mark message as delivered to consumer
$this->getConnection()->createQueryBuilder()
->andWhere('id = :id')
->update($this->getContext()->getTableName())
->set('delivery_id', ':deliveryId')
->set('redeliver_after', ':redeliverAfter')
->setParameter('id', $message['id'], Type::GUID)
->setParameter('deliveryId', $deliveryId, Type::STRING)
->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT)
->execute()
$update
->setParameter('messageId', $result['id'], Type::GUID)
;

$this->getConnection()->commit();

$deliveredMessage = $this->getConnection()->createQueryBuilder()
->select('*')
->from($this->getContext()->getTableName())
->andWhere('delivery_id = :deliveryId')
->setParameter('deliveryId', $deliveryId, Type::STRING)
->setMaxResults(1)
->execute()
->fetch()
;

return $deliveredMessage ?: null;
} catch (\Exception $e) {
$this->getConnection()->rollBack();

throw $e;
if ($update->execute()) {
$deliveredMessage = $this->getConnection()->createQueryBuilder()
->select('*')
->from($this->getContext()->getTableName())
->andWhere('delivery_id = :deliveryId')
->setParameter('deliveryId', $deliveryId->getBytes(), Type::GUID)
->setMaxResults(1)
->execute()
->fetch()
;

if (false == $deliveredMessage) {
throw new \LogicException('There must be a message at all times at this stage but there is no a message.');
}

if ($deliveredMessage['redelivered'] || empty($deliveredMessage['time_to_live']) || $deliveredMessage['time_to_live'] > time()) {
return $this->getContext()->convertMessage($deliveredMessage);
}
}
}

return null;
}

protected function redeliverMessages(): void
{
$this->getConnection()->createQueryBuilder()
if (null === $this->redeliverMessagesLastExecutedAt) {
$this->redeliverMessagesLastExecutedAt = microtime(true);
} elseif ((microtime(true) - $this->redeliverMessagesLastExecutedAt) < 1) {
return;
}

$update = $this->getConnection()->createQueryBuilder()
->update($this->getContext()->getTableName())
->set('delivery_id', ':deliveryId')
->set('redelivered', ':redelivered')
->andWhere('delivery_id IS NOT NULL')
->andWhere('redeliver_after < :now')
->setParameter(':now', (int) time(), Type::BIGINT)
->setParameter('deliveryId', null, Type::STRING)
->andWhere('delivery_id IS NOT NULL')
->setParameter(':now', time(), Type::BIGINT)
->setParameter('deliveryId', null, Type::GUID)
->setParameter('redelivered', true, Type::BOOLEAN)
->execute()
;

$update->execute();

$this->redeliverMessagesLastExecutedAt = microtime(true);
}

protected function removeExpiredMessages(): void
{
$this->getConnection()->createQueryBuilder()
if (null === $this->removeExpiredMessagesLastExecutedAt) {
$this->removeExpiredMessagesLastExecutedAt = microtime(true);
} elseif ((microtime(true) - $this->removeExpiredMessagesLastExecutedAt) < 1) {
return;
}

$delete = $this->getConnection()->createQueryBuilder()
->delete($this->getContext()->getTableName())
->andWhere('(time_to_live IS NOT NULL) AND (time_to_live < :now)')
->setParameter(':now', (int) time(), Type::BIGINT)
->setParameter('redelivered', false, Type::BOOLEAN)
->execute()
->andWhere('delivery_id IS NULL')
->andWhere('redelivered = false')

->setParameter(':now', time(), Type::BIGINT)
;

$delete->execute();

$this->removeExpiredMessagesLastExecutedAt = microtime(true);
}

private function deleteMessage(string $deliveryId): void
{
if (empty($deliveryId)) {
throw new \LogicException(sprintf('Expected record was removed but it is not. Delivery id: "%s"', $deliveryId));
}

$this->getConnection()->delete(
$this->getContext()->getTableName(),
['delivery_id' => Uuid::fromString($deliveryId)->getBytes()],
['delivery_id' => Type::GUID]
);
}
}
57 changes: 30 additions & 27 deletions pkg/dbal/DbalContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use Interop\Queue\Queue;
use Interop\Queue\SubscriptionConsumer;
use Interop\Queue\Topic;
use Ramsey\Uuid\Uuid;

class DbalContext implements Context
{
Expand Down Expand Up @@ -141,32 +142,38 @@ public function createSubscriptionConsumer(): SubscriptionConsumer
/**
* @internal It must be used here and in the consumer only
*/
public function convertMessage(array $dbalMessage): DbalMessage
public function convertMessage(array $arrayMessage): DbalMessage
{
/** @var DbalMessage $dbalMessageObj */
$dbalMessageObj = $this->createMessage(
$dbalMessage['body'],
$dbalMessage['properties'] ? JSON::decode($dbalMessage['properties']) : [],
$dbalMessage['headers'] ? JSON::decode($dbalMessage['headers']) : []
/** @var DbalMessage $message */
$message = $this->createMessage(
$arrayMessage['body'],
$arrayMessage['properties'] ? JSON::decode($arrayMessage['properties']) : [],
$arrayMessage['headers'] ? JSON::decode($arrayMessage['headers']) : []
);

if (isset($dbalMessage['redelivered'])) {
$dbalMessageObj->setRedelivered((bool) $dbalMessage['redelivered']);
if (isset($arrayMessage['id'])) {
$message->setMessageId(Uuid::fromBytes($arrayMessage['id'])->toString());
}
if (isset($dbalMessage['priority'])) {
$dbalMessageObj->setPriority((int) $dbalMessage['priority']);
if (isset($arrayMessage['queue'])) {
$message->setQueue($arrayMessage['queue']);
}
if (isset($dbalMessage['published_at'])) {
$dbalMessageObj->setPublishedAt((int) $dbalMessage['published_at']);
if (isset($arrayMessage['redelivered'])) {
$message->setRedelivered((bool) $arrayMessage['redelivered']);
}
if (isset($dbalMessage['delivery_id'])) {
$dbalMessageObj->setDeliveryId((string) $dbalMessage['delivery_id']);
if (isset($arrayMessage['priority'])) {
$message->setPriority((int) (-1 * $arrayMessage['priority']));
}
if (isset($dbalMessage['redeliver_after'])) {
$dbalMessageObj->setRedeliverAfter((int) $dbalMessage['redeliver_after']);
if (isset($arrayMessage['published_at'])) {
$message->setPublishedAt((int) $arrayMessage['published_at']);
}
if (isset($arrayMessage['delivery_id'])) {
$message->setDeliveryId(Uuid::fromBytes($arrayMessage['delivery_id'])->toString());
}
if (isset($arrayMessage['redeliver_after'])) {
$message->setRedeliverAfter((int) $arrayMessage['redeliver_after']);
}

return $dbalMessageObj;
return $message;
}

/**
Expand Down Expand Up @@ -218,8 +225,7 @@ public function createDataBaseTable(): void

$table = new Table($this->getTableName());

$table->addColumn('id', Type::BINARY, ['length' => 16, 'fixed' => true]);
$table->addColumn('human_id', Type::STRING, ['length' => 36]);
$table->addColumn('id', Type::GUID, ['length' => 16, 'fixed' => true]);
$table->addColumn('published_at', Type::BIGINT);
$table->addColumn('body', Type::TEXT, ['notnull' => false]);
$table->addColumn('headers', Type::TEXT, ['notnull' => false]);
Expand All @@ -229,17 +235,14 @@ public function createDataBaseTable(): void
$table->addColumn('priority', Type::SMALLINT, ['notnull' => false]);
$table->addColumn('delayed_until', Type::BIGINT, ['notnull' => false]);
$table->addColumn('time_to_live', Type::BIGINT, ['notnull' => false]);
$table->addColumn('delivery_id', Type::STRING, ['notnull' => false]);
$table->addColumn('delivery_id', Type::GUID, ['length' => 16, 'fixed' => true, 'notnull' => false]);
$table->addColumn('redeliver_after', Type::BIGINT, ['notnull' => false]);

$table->setPrimaryKey(['id']);
$table->addIndex(['published_at']);
$table->addIndex(['queue']);
$table->addIndex(['priority']);
$table->addIndex(['delayed_until']);
$table->addIndex(['priority', 'published_at']);
$table->addIndex(['redeliver_after']);
$table->addUniqueIndex(['delivery_id']);
$table->addIndex(['priority', 'published_at', 'queue', 'delivery_id', 'delayed_until', 'id']);

$table->addIndex(['redeliver_after', 'delivery_id']);
$table->addIndex(['time_to_live', 'delivery_id']);

$sm->createTable($table);
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/dbal/DbalMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ class DbalMessage implements Message
*/
private $deliveryId;

/**
* @var string|null
*/
private $queue;

/**
* Milliseconds, for example 15186054527288.
*
Expand Down Expand Up @@ -249,4 +254,14 @@ public function setPublishedAt(int $publishedAt = null): void
{
$this->publishedAt = $publishedAt;
}

public function getQueue(): ?string
{
return $this->queue;
}

public function setQueue(?string $queue): void
{
$this->queue = $queue;
}
}
5 changes: 2 additions & 3 deletions pkg/dbal/DbalProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public function send(Destination $destination, Message $message): void
}

$body = $message->getBody();
$uuid = Uuid::uuid1();
$uuid = Uuid::uuid4();

$publishedAt = null !== $message->getPublishedAt() ?
$message->getPublishedAt() :
Expand All @@ -80,12 +80,11 @@ public function send(Destination $destination, Message $message): void

$dbalMessage = [
'id' => $this->uuidCodec->encodeBinary($uuid),
'human_id' => $uuid->toString(),
'published_at' => $publishedAt,
'body' => $body,
'headers' => JSON::encode($message->getHeaders()),
'properties' => JSON::encode($message->getProperties()),
'priority' => $message->getPriority(),
'priority' => -1 * $message->getPriority(),
'queue' => $destination->getQueueName(),
'redelivered' => false,
'delivery_id' => null,
Expand Down
Loading

0 comments on commit 1f96246

Please sign in to comment.