Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dbal] Use concurrent fetch message approach (no transaction, no pessimistic lock) #613

Merged
merged 7 commits into from
Nov 15, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 1 addition & 22 deletions pkg/dbal/DbalConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
namespace Enqueue\Dbal;

use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Types\Type;
use Interop\Queue\Consumer;
use Interop\Queue\Exception\InvalidMessageException;
use Interop\Queue\Impl\ConsumerPollingTrait;
Expand Down Expand Up @@ -81,14 +80,7 @@ public function receiveNoWait(): ?Message
$this->removeExpiredMessages();
$this->redeliverMessages();

// get top message from the queue
if ($message = $this->fetchMessage([$this->queue->getQueueName()], $redeliveryDelay)) {
if ($message['redelivered'] || empty($message['time_to_live']) || $message['time_to_live'] > time()) {
return $this->getContext()->convertMessage($message);
}
}

return null;
return $this->fetchMessage([$this->queue->getQueueName()], $redeliveryDelay);
}

/**
Expand Down Expand Up @@ -129,17 +121,4 @@ protected function getConnection(): Connection
{
return $this->dbal;
}

private function deleteMessage(string $deliveryId): void
{
if (empty($deliveryId)) {
throw new \LogicException(sprintf('Expected record was removed but it is not. Delivery id: "%s"', $deliveryId));
}

$this->getConnection()->delete(
$this->getContext()->getTableName(),
['delivery_id' => $deliveryId],
['delivery_id' => Type::STRING]
);
}
}
163 changes: 102 additions & 61 deletions pkg/dbal/DbalConsumerHelperTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,96 +11,137 @@

trait DbalConsumerHelperTrait
{
private $redeliverMessagesLastExecutedAt;

private $removeExpiredMessagesLastExecutedAt;

abstract protected function getContext(): DbalContext;

abstract protected function getConnection(): Connection;

protected function fetchMessage(array $queues, int $redeliveryDelay): ?array
protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessage
{
if (empty($queues)) {
throw new \LogicException('Queues must not be empty.');
}

$now = time();
$deliveryId = (string) Uuid::uuid1();

$this->getConnection()->beginTransaction();

try {
$query = $this->getConnection()->createQueryBuilder()
->select('*')
->from($this->getContext()->getTableName())
->andWhere('delivery_id IS NULL')
->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil')
->andWhere('queue IN (:queues)')
->addOrderBy('priority', 'desc')
->addOrderBy('published_at', 'asc')
->setMaxResults(1);

// select for update
$message = $this->getConnection()->executeQuery(
$query->getSQL().' '.$this->getConnection()->getDatabasePlatform()->getWriteLockSQL(),
['delayedUntil' => $now, 'queues' => array_values($queues)],
['delayedUntil' => ParameterType::INTEGER, 'queues' => Connection::PARAM_STR_ARRAY]
)->fetch();

if (!$message) {
$this->getConnection()->commit();
$deliveryId = Uuid::uuid4();

$endAt = microtime(true) + 0.2; // add 200ms

$select = $this->getConnection()->createQueryBuilder()
->select('id')
->from($this->getContext()->getTableName())
->andWhere('queue IN (:queues)')
->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil')
->andWhere('delivery_id IS NULL')
->addOrderBy('priority', 'asc')
->addOrderBy('published_at', 'asc')
->setParameter('queues', $queues, Connection::PARAM_STR_ARRAY)
->setParameter('delayedUntil', $now, ParameterType::INTEGER)
->setMaxResults(1);

$update = $this->getConnection()->createQueryBuilder()
->update($this->getContext()->getTableName())
->set('delivery_id', ':deliveryId')
->set('redeliver_after', ':redeliverAfter')
->andWhere('id = :messageId')
->andWhere('delivery_id IS NULL')
->setParameter('deliveryId', $deliveryId->getBytes(), Type::GUID)
->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT)
;

while (microtime(true) < $endAt) {
$result = $select->execute()->fetch();
if (empty($result)) {
return null;
}

// mark message as delivered to consumer
$this->getConnection()->createQueryBuilder()
->andWhere('id = :id')
->update($this->getContext()->getTableName())
->set('delivery_id', ':deliveryId')
->set('redeliver_after', ':redeliverAfter')
->setParameter('id', $message['id'], Type::GUID)
->setParameter('deliveryId', $deliveryId, Type::STRING)
->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT)
->execute()
$update
->setParameter('messageId', $result['id'], Type::GUID)
;

$this->getConnection()->commit();

$deliveredMessage = $this->getConnection()->createQueryBuilder()
->select('*')
->from($this->getContext()->getTableName())
->andWhere('delivery_id = :deliveryId')
->setParameter('deliveryId', $deliveryId, Type::STRING)
->setMaxResults(1)
->execute()
->fetch()
;

return $deliveredMessage ?: null;
} catch (\Exception $e) {
$this->getConnection()->rollBack();

throw $e;
if ($update->execute()) {
$deliveredMessage = $this->getConnection()->createQueryBuilder()
->select('*')
->from($this->getContext()->getTableName())
->andWhere('delivery_id = :deliveryId')
->setParameter('deliveryId', $deliveryId->getBytes(), Type::GUID)
->setMaxResults(1)
->execute()
->fetch()
;

if (false == $deliveredMessage) {
throw new \LogicException('There must be a message at all times at this stage but there is no a message.');
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not certain. it could be deleted manually or some other concurrent process between the update and the select. unlikely but possible. why not just return null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not happen. Few lines above the message was updated with deliveryId (which is unique and only known by this consumer) therefor if we cannot select it few lines later something went wrong.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I truncate the table, this is gone. And the consumer would fail for no reason.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why keep the exception? This is not race-condition safe.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I truncate the table, this is gone. And the consumer would fail for no reason.

It would fail, exit and restarted by supervisord or any other process manager.

If we remove the exception we might have a bug or something still would not know about it cuz it would silently continue fetching new messages without a warning.

Truncating a table while there are consumers working on it is not a valid way to do. I could imagine many cases which would go wrong in this case (for example a message has been processed and a consumer tries to acknowledge it but it has gone).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before it used a write lock and thus it protects against concurrent access causing race conditions. Now this is gone, so you need to make your consumers handle that gracefully. Throwing an exception and relying on the consumer to be restarted is just a workaround for a buggy consumer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

if ($deliveredMessage['redelivered'] || empty($deliveredMessage['time_to_live']) || $deliveredMessage['time_to_live'] > time()) {
return $this->getContext()->convertMessage($deliveredMessage);
}
}
}

return null;
}

protected function redeliverMessages(): void
{
$this->getConnection()->createQueryBuilder()
if (null === $this->redeliverMessagesLastExecutedAt) {
$this->redeliverMessagesLastExecutedAt = microtime(true);
} elseif ((microtime(true) - $this->redeliverMessagesLastExecutedAt) < 1) {
return;
}

$update = $this->getConnection()->createQueryBuilder()
->update($this->getContext()->getTableName())
->set('delivery_id', ':deliveryId')
->set('redelivered', ':redelivered')
->andWhere('delivery_id IS NOT NULL')
->andWhere('redeliver_after < :now')
->setParameter(':now', (int) time(), Type::BIGINT)
->setParameter('deliveryId', null, Type::STRING)
->andWhere('delivery_id IS NOT NULL')
->setParameter(':now', time(), Type::BIGINT)
->setParameter('deliveryId', null, Type::GUID)
->setParameter('redelivered', true, Type::BOOLEAN)
->execute()
;

$update->execute();

$this->redeliverMessagesLastExecutedAt = microtime(true);
}

protected function removeExpiredMessages(): void
{
$this->getConnection()->createQueryBuilder()
if (null === $this->removeExpiredMessagesLastExecutedAt) {
$this->removeExpiredMessagesLastExecutedAt = microtime(true);
} elseif ((microtime(true) - $this->removeExpiredMessagesLastExecutedAt) < 1) {
return;
}

$delete = $this->getConnection()->createQueryBuilder()
->delete($this->getContext()->getTableName())
->andWhere('(time_to_live IS NOT NULL) AND (time_to_live < :now)')
->setParameter(':now', (int) time(), Type::BIGINT)
->setParameter('redelivered', false, Type::BOOLEAN)
->execute()
->andWhere('delivery_id IS NULL')
->andWhere('redelivered = false')

->setParameter(':now', time(), Type::BIGINT)
;

$delete->execute();

$this->removeExpiredMessagesLastExecutedAt = microtime(true);
}

private function deleteMessage(string $deliveryId): void
{
if (empty($deliveryId)) {
throw new \LogicException(sprintf('Expected record was removed but it is not. Delivery id: "%s"', $deliveryId));
}

$this->getConnection()->delete(
$this->getContext()->getTableName(),
['delivery_id' => Uuid::fromString($deliveryId)->getBytes()],
['delivery_id' => Type::GUID]
);
}
}
57 changes: 30 additions & 27 deletions pkg/dbal/DbalContext.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use Interop\Queue\Queue;
use Interop\Queue\SubscriptionConsumer;
use Interop\Queue\Topic;
use Ramsey\Uuid\Uuid;

class DbalContext implements Context
{
Expand Down Expand Up @@ -141,32 +142,38 @@ public function createSubscriptionConsumer(): SubscriptionConsumer
/**
* @internal It must be used here and in the consumer only
*/
public function convertMessage(array $dbalMessage): DbalMessage
public function convertMessage(array $arrayMessage): DbalMessage
{
/** @var DbalMessage $dbalMessageObj */
$dbalMessageObj = $this->createMessage(
$dbalMessage['body'],
$dbalMessage['properties'] ? JSON::decode($dbalMessage['properties']) : [],
$dbalMessage['headers'] ? JSON::decode($dbalMessage['headers']) : []
/** @var DbalMessage $message */
$message = $this->createMessage(
$arrayMessage['body'],
$arrayMessage['properties'] ? JSON::decode($arrayMessage['properties']) : [],
$arrayMessage['headers'] ? JSON::decode($arrayMessage['headers']) : []
);

if (isset($dbalMessage['redelivered'])) {
$dbalMessageObj->setRedelivered((bool) $dbalMessage['redelivered']);
if (isset($arrayMessage['id'])) {
$message->setMessageId(Uuid::fromBytes($arrayMessage['id'])->toString());
}
if (isset($dbalMessage['priority'])) {
$dbalMessageObj->setPriority((int) $dbalMessage['priority']);
if (isset($arrayMessage['queue'])) {
$message->setQueue($arrayMessage['queue']);
}
if (isset($dbalMessage['published_at'])) {
$dbalMessageObj->setPublishedAt((int) $dbalMessage['published_at']);
if (isset($arrayMessage['redelivered'])) {
$message->setRedelivered((bool) $arrayMessage['redelivered']);
}
if (isset($dbalMessage['delivery_id'])) {
$dbalMessageObj->setDeliveryId((string) $dbalMessage['delivery_id']);
if (isset($arrayMessage['priority'])) {
$message->setPriority((int) (-1 * $arrayMessage['priority']));
}
if (isset($dbalMessage['redeliver_after'])) {
$dbalMessageObj->setRedeliverAfter((int) $dbalMessage['redeliver_after']);
if (isset($arrayMessage['published_at'])) {
$message->setPublishedAt((int) $arrayMessage['published_at']);
}
if (isset($arrayMessage['delivery_id'])) {
$message->setDeliveryId(Uuid::fromBytes($arrayMessage['delivery_id'])->toString());
}
if (isset($arrayMessage['redeliver_after'])) {
$message->setRedeliverAfter((int) $arrayMessage['redeliver_after']);
}

return $dbalMessageObj;
return $message;
}

/**
Expand Down Expand Up @@ -218,8 +225,7 @@ public function createDataBaseTable(): void

$table = new Table($this->getTableName());

$table->addColumn('id', Type::BINARY, ['length' => 16, 'fixed' => true]);
$table->addColumn('human_id', Type::STRING, ['length' => 36]);
$table->addColumn('id', Type::GUID, ['length' => 16, 'fixed' => true]);
$table->addColumn('published_at', Type::BIGINT);
$table->addColumn('body', Type::TEXT, ['notnull' => false]);
$table->addColumn('headers', Type::TEXT, ['notnull' => false]);
Expand All @@ -229,17 +235,14 @@ public function createDataBaseTable(): void
$table->addColumn('priority', Type::SMALLINT, ['notnull' => false]);
$table->addColumn('delayed_until', Type::BIGINT, ['notnull' => false]);
$table->addColumn('time_to_live', Type::BIGINT, ['notnull' => false]);
$table->addColumn('delivery_id', Type::STRING, ['notnull' => false]);
$table->addColumn('delivery_id', Type::GUID, ['length' => 16, 'fixed' => true, 'notnull' => false]);
$table->addColumn('redeliver_after', Type::BIGINT, ['notnull' => false]);

$table->setPrimaryKey(['id']);
$table->addIndex(['published_at']);
$table->addIndex(['queue']);
$table->addIndex(['priority']);
$table->addIndex(['delayed_until']);
$table->addIndex(['priority', 'published_at']);
$table->addIndex(['redeliver_after']);
$table->addUniqueIndex(['delivery_id']);
$table->addIndex(['priority', 'published_at', 'queue', 'delivery_id', 'delayed_until', 'id']);

$table->addIndex(['redeliver_after', 'delivery_id']);
$table->addIndex(['time_to_live', 'delivery_id']);

$sm->createTable($table);
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/dbal/DbalMessage.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ class DbalMessage implements Message
*/
private $deliveryId;

/**
* @var string|null
*/
private $queue;

/**
* Milliseconds, for example 15186054527288.
*
Expand Down Expand Up @@ -249,4 +254,14 @@ public function setPublishedAt(int $publishedAt = null): void
{
$this->publishedAt = $publishedAt;
}

public function getQueue(): ?string
{
return $this->queue;
}

public function setQueue(?string $queue): void
{
$this->queue = $queue;
}
}
5 changes: 2 additions & 3 deletions pkg/dbal/DbalProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public function send(Destination $destination, Message $message): void
}

$body = $message->getBody();
$uuid = Uuid::uuid1();
$uuid = Uuid::uuid4();

$publishedAt = null !== $message->getPublishedAt() ?
$message->getPublishedAt() :
Expand All @@ -80,12 +80,11 @@ public function send(Destination $destination, Message $message): void

$dbalMessage = [
'id' => $this->uuidCodec->encodeBinary($uuid),
'human_id' => $uuid->toString(),
'published_at' => $publishedAt,
'body' => $body,
'headers' => JSON::encode($message->getHeaders()),
'properties' => JSON::encode($message->getProperties()),
'priority' => $message->getPriority(),
'priority' => -1 * $message->getPriority(),
'queue' => $destination->getQueueName(),
'redelivered' => false,
'delivery_id' => null,
Expand Down
Loading