Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 71 additions & 49 deletions pkg/dbal/DbalConsumerHelperTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@

trait DbalConsumerHelperTrait
{
private $redeliverMessagesLastExecutedAt;

private $removeExpiredMessagesLastExecutedAt;

abstract protected function getContext(): DbalContext;

abstract protected function getConnection(): Connection;
Expand All @@ -20,66 +24,72 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?array
$now = time();
$deliveryId = (string) Uuid::uuid1();

$this->getConnection()->beginTransaction();

try {
$query = $this->getConnection()->createQueryBuilder()
->select('*')
->from($this->getContext()->getTableName())
->andWhere('delivery_id IS NULL')
->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil')
->andWhere('queue IN (:queues)')
->addOrderBy('priority', 'desc')
->addOrderBy('published_at', 'asc')
->setMaxResults(1);

// select for update
$message = $this->getConnection()->executeQuery(
$query->getSQL().' '.$this->getConnection()->getDatabasePlatform()->getWriteLockSQL(),
['delayedUntil' => $now, 'queues' => array_values($queues)],
['delayedUntil' => ParameterType::INTEGER, 'queues' => Connection::PARAM_STR_ARRAY]
)->fetch();

if (!$message) {
$this->getConnection()->commit();
$endAt = microtime(true) + 0.2; // add 200ms

$select = $this->getConnection()->createQueryBuilder()
->select('id')
->from($this->getContext()->getTableName())
->andWhere('delivery_id IS NULL')
->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil')
->andWhere('queue IN (:queues)')
->addOrderBy('priority', 'desc')
->addOrderBy('published_at', 'asc')
->setParameter('delayedUntil', $now, ParameterType::INTEGER)
->setParameter('queues', array_values($queues), Connection::PARAM_STR_ARRAY)
->setMaxResults(1);

$update = $this->getConnection()->createQueryBuilder()
->update($this->getContext()->getTableName())
->set('delivery_id', ':deliveryId')
->set('redeliver_after', ':redeliverAfter')
->andWhere('id = :messageId')
->andWhere('delivery_id IS NULL')
->setParameter('deliveryId', $deliveryId, Type::STRING)
->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT)
;

while (microtime() < $endAt) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are comparing string with float here. maybe you meant to use microtime(true)?

$result = $select->execute()->fetch();
if (empty($result)) {
return null;
}

// mark message as delivered to consumer
$this->getConnection()->createQueryBuilder()
->andWhere('id = :id')
->update($this->getContext()->getTableName())
->set('delivery_id', ':deliveryId')
->set('redeliver_after', ':redeliverAfter')
->setParameter('id', $message['id'], Type::GUID)
->setParameter('deliveryId', $deliveryId, Type::STRING)
->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT)
->execute()
$update
->setParameter('messageId', $result['id'], Type::GUID)
;

$this->getConnection()->commit();

$deliveredMessage = $this->getConnection()->createQueryBuilder()
->select('*')
->from($this->getContext()->getTableName())
->andWhere('delivery_id = :deliveryId')
->setParameter('deliveryId', $deliveryId, Type::STRING)
->setMaxResults(1)
->execute()
->fetch()
;

return $deliveredMessage ?: null;
} catch (\Exception $e) {
$this->getConnection()->rollBack();

throw $e;
if ($update->execute()) {
$deliveredMessage = $this->getConnection()->createQueryBuilder()
->select('*')
->from($this->getContext()->getTableName())
->andWhere('delivery_id = :deliveryId')
->setParameter('deliveryId', $deliveryId, Type::STRING)
->setMaxResults(1)
->execute()
->fetch()
;

if (false == $deliveredMessage) {
throw new \LogicException('There must be a message at all times at this stage but there is no a message.');
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not certain. it could be deleted manually or some other concurrent process between the update and the select. unlikely but possible. why not just return null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not happen. Few lines above the message was updated with deliveryId (which is unique and only known by this consumer) therefor if we cannot select it few lines later something went wrong.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I truncate the table, this is gone. And the consumer would fail for no reason.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why keep the exception? This is not race-condition safe.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I truncate the table, this is gone. And the consumer would fail for no reason.

It would fail, exit and restarted by supervisord or any other process manager.

If we remove the exception we might have a bug or something still would not know about it cuz it would silently continue fetching new messages without a warning.

Truncating a table while there are consumers working on it is not a valid way to do. I could imagine many cases which would go wrong in this case (for example a message has been processed and a consumer tries to acknowledge it but it has gone).

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before it used a write lock and thus it protects against concurrent access causing race conditions. Now this is gone, so you need to make your consumers handle that gracefully. Throwing an exception and relying on the consumer to be restarted is just a workaround for a buggy consumer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

return $deliveredMessage;
}
}

return null;
}

protected function redeliverMessages(): void
{
if (null === $this->redeliverMessagesLastExecutedAt) {
$this->redeliverMessagesLastExecutedAt = microtime(true);
}

if ((microtime(true) - $this->redeliverMessagesLastExecutedAt) < 1) {
return;
}

$this->getConnection()->createQueryBuilder()
->update($this->getContext()->getTableName())
->set('delivery_id', ':deliveryId')
Expand All @@ -91,16 +101,28 @@ protected function redeliverMessages(): void
->setParameter('redelivered', true, Type::BOOLEAN)
->execute()
;

$this->redeliverMessagesLastExecutedAt = microtime(true);
}

protected function removeExpiredMessages(): void
{
if (null === $this->removeExpiredMessagesLastExecutedAt) {
$this->removeExpiredMessagesLastExecutedAt = microtime(true);
}

if ((microtime(true) - $this->removeExpiredMessagesLastExecutedAt) < 1) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could put that in an elseif so it's not executed needlessly on the first execution

return;
}

$this->getConnection()->createQueryBuilder()
->delete($this->getContext()->getTableName())
->andWhere('(time_to_live IS NOT NULL) AND (time_to_live < :now)')
->setParameter(':now', (int) time(), Type::BIGINT)
->setParameter('redelivered', false, Type::BOOLEAN)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this param is unused

->execute()
;

$this->removeExpiredMessagesLastExecutedAt = microtime(true);
}
}