-
Notifications
You must be signed in to change notification settings - Fork 441
[dbal] Use concurrent fetch message approach (no transaction, no pessimistic lock) #613
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
76f6583
1b0c325
4747256
287b0b6
5316a97
09e148c
2445ef2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,6 +11,10 @@ | |
|
||
trait DbalConsumerHelperTrait | ||
{ | ||
private $redeliverMessagesLastExecutedAt; | ||
|
||
private $removeExpiredMessagesLastExecutedAt; | ||
|
||
abstract protected function getContext(): DbalContext; | ||
|
||
abstract protected function getConnection(): Connection; | ||
|
@@ -20,66 +24,72 @@ protected function fetchMessage(array $queues, int $redeliveryDelay): ?array | |
$now = time(); | ||
$deliveryId = (string) Uuid::uuid1(); | ||
|
||
$this->getConnection()->beginTransaction(); | ||
|
||
try { | ||
$query = $this->getConnection()->createQueryBuilder() | ||
->select('*') | ||
->from($this->getContext()->getTableName()) | ||
->andWhere('delivery_id IS NULL') | ||
->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil') | ||
->andWhere('queue IN (:queues)') | ||
->addOrderBy('priority', 'desc') | ||
->addOrderBy('published_at', 'asc') | ||
->setMaxResults(1); | ||
|
||
// select for update | ||
$message = $this->getConnection()->executeQuery( | ||
$query->getSQL().' '.$this->getConnection()->getDatabasePlatform()->getWriteLockSQL(), | ||
['delayedUntil' => $now, 'queues' => array_values($queues)], | ||
['delayedUntil' => ParameterType::INTEGER, 'queues' => Connection::PARAM_STR_ARRAY] | ||
)->fetch(); | ||
|
||
if (!$message) { | ||
$this->getConnection()->commit(); | ||
$endAt = microtime(true) + 0.2; // add 200ms | ||
|
||
$select = $this->getConnection()->createQueryBuilder() | ||
->select('id') | ||
->from($this->getContext()->getTableName()) | ||
->andWhere('delivery_id IS NULL') | ||
->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil') | ||
->andWhere('queue IN (:queues)') | ||
->addOrderBy('priority', 'desc') | ||
->addOrderBy('published_at', 'asc') | ||
->setParameter('delayedUntil', $now, ParameterType::INTEGER) | ||
->setParameter('queues', array_values($queues), Connection::PARAM_STR_ARRAY) | ||
->setMaxResults(1); | ||
|
||
$update = $this->getConnection()->createQueryBuilder() | ||
->update($this->getContext()->getTableName()) | ||
->set('delivery_id', ':deliveryId') | ||
->set('redeliver_after', ':redeliverAfter') | ||
->andWhere('id = :messageId') | ||
->andWhere('delivery_id IS NULL') | ||
->setParameter('deliveryId', $deliveryId, Type::STRING) | ||
->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT) | ||
; | ||
|
||
while (microtime() < $endAt) { | ||
$result = $select->execute()->fetch(); | ||
if (empty($result)) { | ||
return null; | ||
} | ||
|
||
// mark message as delivered to consumer | ||
$this->getConnection()->createQueryBuilder() | ||
->andWhere('id = :id') | ||
->update($this->getContext()->getTableName()) | ||
->set('delivery_id', ':deliveryId') | ||
->set('redeliver_after', ':redeliverAfter') | ||
->setParameter('id', $message['id'], Type::GUID) | ||
->setParameter('deliveryId', $deliveryId, Type::STRING) | ||
->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT) | ||
->execute() | ||
$update | ||
->setParameter('messageId', $result['id'], Type::GUID) | ||
; | ||
|
||
$this->getConnection()->commit(); | ||
|
||
$deliveredMessage = $this->getConnection()->createQueryBuilder() | ||
->select('*') | ||
->from($this->getContext()->getTableName()) | ||
->andWhere('delivery_id = :deliveryId') | ||
->setParameter('deliveryId', $deliveryId, Type::STRING) | ||
->setMaxResults(1) | ||
->execute() | ||
->fetch() | ||
; | ||
|
||
return $deliveredMessage ?: null; | ||
} catch (\Exception $e) { | ||
$this->getConnection()->rollBack(); | ||
|
||
throw $e; | ||
if ($update->execute()) { | ||
$deliveredMessage = $this->getConnection()->createQueryBuilder() | ||
->select('*') | ||
->from($this->getContext()->getTableName()) | ||
->andWhere('delivery_id = :deliveryId') | ||
->setParameter('deliveryId', $deliveryId, Type::STRING) | ||
->setMaxResults(1) | ||
->execute() | ||
->fetch() | ||
; | ||
|
||
if (false == $deliveredMessage) { | ||
throw new \LogicException('There must be a message at all times at this stage but there is no a message.'); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is not certain. it could be deleted manually or some other concurrent process between the update and the select. unlikely but possible. why not just return null? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should not happen. Few lines above the message was updated with deliveryId (which is unique and only known by this consumer) therefor if we cannot select it few lines later something went wrong. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When I truncate the table, this is gone. And the consumer would fail for no reason. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why keep the exception? This is not race-condition safe. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
It would fail, exit and restarted by supervisord or any other process manager. If we remove the exception we might have a bug or something still would not know about it cuz it would silently continue fetching new messages without a warning. Truncating a table while there are consumers working on it is not a valid way to do. I could imagine many cases which would go wrong in this case (for example a message has been processed and a consumer tries to acknowledge it but it has gone). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Before it used a write lock and thus it protects against concurrent access causing race conditions. Now this is gone, so you need to make your consumers handle that gracefully. Throwing an exception and relying on the consumer to be restarted is just a workaround for a buggy consumer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
} | ||
|
||
return $deliveredMessage; | ||
} | ||
} | ||
|
||
return null; | ||
} | ||
|
||
protected function redeliverMessages(): void | ||
{ | ||
if (null === $this->redeliverMessagesLastExecutedAt) { | ||
$this->redeliverMessagesLastExecutedAt = microtime(true); | ||
} | ||
|
||
if ((microtime(true) - $this->redeliverMessagesLastExecutedAt) < 1) { | ||
return; | ||
} | ||
|
||
$this->getConnection()->createQueryBuilder() | ||
->update($this->getContext()->getTableName()) | ||
->set('delivery_id', ':deliveryId') | ||
|
@@ -91,16 +101,28 @@ protected function redeliverMessages(): void | |
->setParameter('redelivered', true, Type::BOOLEAN) | ||
->execute() | ||
; | ||
|
||
$this->redeliverMessagesLastExecutedAt = microtime(true); | ||
} | ||
|
||
protected function removeExpiredMessages(): void | ||
{ | ||
if (null === $this->removeExpiredMessagesLastExecutedAt) { | ||
$this->removeExpiredMessagesLastExecutedAt = microtime(true); | ||
} | ||
|
||
if ((microtime(true) - $this->removeExpiredMessagesLastExecutedAt) < 1) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you could put that in an |
||
return; | ||
} | ||
|
||
$this->getConnection()->createQueryBuilder() | ||
->delete($this->getContext()->getTableName()) | ||
->andWhere('(time_to_live IS NOT NULL) AND (time_to_live < :now)') | ||
->setParameter(':now', (int) time(), Type::BIGINT) | ||
->setParameter('redelivered', false, Type::BOOLEAN) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this param is unused |
||
->execute() | ||
; | ||
|
||
$this->removeExpiredMessagesLastExecutedAt = microtime(true); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are comparing string with float here. maybe you meant to use microtime(true)?