diff --git a/pkg/dbal/DbalConsumer.php b/pkg/dbal/DbalConsumer.php index ede0cfa40..e95a8f513 100644 --- a/pkg/dbal/DbalConsumer.php +++ b/pkg/dbal/DbalConsumer.php @@ -5,7 +5,6 @@ namespace Enqueue\Dbal; use Doctrine\DBAL\Connection; -use Doctrine\DBAL\Types\Type; use Interop\Queue\Consumer; use Interop\Queue\Exception\InvalidMessageException; use Interop\Queue\Impl\ConsumerPollingTrait; @@ -81,14 +80,7 @@ public function receiveNoWait(): ?Message $this->removeExpiredMessages(); $this->redeliverMessages(); - // get top message from the queue - if ($message = $this->fetchMessage([$this->queue->getQueueName()], $redeliveryDelay)) { - if ($message['redelivered'] || empty($message['time_to_live']) || $message['time_to_live'] > time()) { - return $this->getContext()->convertMessage($message); - } - } - - return null; + return $this->fetchMessage([$this->queue->getQueueName()], $redeliveryDelay); } /** @@ -129,17 +121,4 @@ protected function getConnection(): Connection { return $this->dbal; } - - private function deleteMessage(string $deliveryId): void - { - if (empty($deliveryId)) { - throw new \LogicException(sprintf('Expected record was removed but it is not. Delivery id: "%s"', $deliveryId)); - } - - $this->getConnection()->delete( - $this->getContext()->getTableName(), - ['delivery_id' => $deliveryId], - ['delivery_id' => Type::STRING] - ); - } } diff --git a/pkg/dbal/DbalConsumerHelperTrait.php b/pkg/dbal/DbalConsumerHelperTrait.php index 2dc29db26..e9c5252db 100644 --- a/pkg/dbal/DbalConsumerHelperTrait.php +++ b/pkg/dbal/DbalConsumerHelperTrait.php @@ -11,96 +11,137 @@ trait DbalConsumerHelperTrait { + private $redeliverMessagesLastExecutedAt; + + private $removeExpiredMessagesLastExecutedAt; + abstract protected function getContext(): DbalContext; abstract protected function getConnection(): Connection; - protected function fetchMessage(array $queues, int $redeliveryDelay): ?array + protected function fetchMessage(array $queues, int $redeliveryDelay): ?DbalMessage { + if (empty($queues)) { + throw new \LogicException('Queues must not be empty.'); + } + $now = time(); - $deliveryId = (string) Uuid::uuid1(); - - $this->getConnection()->beginTransaction(); - - try { - $query = $this->getConnection()->createQueryBuilder() - ->select('*') - ->from($this->getContext()->getTableName()) - ->andWhere('delivery_id IS NULL') - ->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil') - ->andWhere('queue IN (:queues)') - ->addOrderBy('priority', 'desc') - ->addOrderBy('published_at', 'asc') - ->setMaxResults(1); - - // select for update - $message = $this->getConnection()->executeQuery( - $query->getSQL().' '.$this->getConnection()->getDatabasePlatform()->getWriteLockSQL(), - ['delayedUntil' => $now, 'queues' => array_values($queues)], - ['delayedUntil' => ParameterType::INTEGER, 'queues' => Connection::PARAM_STR_ARRAY] - )->fetch(); - - if (!$message) { - $this->getConnection()->commit(); + $deliveryId = Uuid::uuid4(); + + $endAt = microtime(true) + 0.2; // add 200ms + + $select = $this->getConnection()->createQueryBuilder() + ->select('id') + ->from($this->getContext()->getTableName()) + ->andWhere('queue IN (:queues)') + ->andWhere('delayed_until IS NULL OR delayed_until <= :delayedUntil') + ->andWhere('delivery_id IS NULL') + ->addOrderBy('priority', 'asc') + ->addOrderBy('published_at', 'asc') + ->setParameter('queues', $queues, Connection::PARAM_STR_ARRAY) + ->setParameter('delayedUntil', $now, ParameterType::INTEGER) + ->setMaxResults(1); + + $update = $this->getConnection()->createQueryBuilder() + ->update($this->getContext()->getTableName()) + ->set('delivery_id', ':deliveryId') + ->set('redeliver_after', ':redeliverAfter') + ->andWhere('id = :messageId') + ->andWhere('delivery_id IS NULL') + ->setParameter('deliveryId', $deliveryId->getBytes(), Type::GUID) + ->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT) + ; + while (microtime(true) < $endAt) { + $result = $select->execute()->fetch(); + if (empty($result)) { return null; } - // mark message as delivered to consumer - $this->getConnection()->createQueryBuilder() - ->andWhere('id = :id') - ->update($this->getContext()->getTableName()) - ->set('delivery_id', ':deliveryId') - ->set('redeliver_after', ':redeliverAfter') - ->setParameter('id', $message['id'], Type::GUID) - ->setParameter('deliveryId', $deliveryId, Type::STRING) - ->setParameter('redeliverAfter', $now + $redeliveryDelay, Type::BIGINT) - ->execute() + $update + ->setParameter('messageId', $result['id'], Type::GUID) ; - $this->getConnection()->commit(); - - $deliveredMessage = $this->getConnection()->createQueryBuilder() - ->select('*') - ->from($this->getContext()->getTableName()) - ->andWhere('delivery_id = :deliveryId') - ->setParameter('deliveryId', $deliveryId, Type::STRING) - ->setMaxResults(1) - ->execute() - ->fetch() - ; - - return $deliveredMessage ?: null; - } catch (\Exception $e) { - $this->getConnection()->rollBack(); - - throw $e; + if ($update->execute()) { + $deliveredMessage = $this->getConnection()->createQueryBuilder() + ->select('*') + ->from($this->getContext()->getTableName()) + ->andWhere('delivery_id = :deliveryId') + ->setParameter('deliveryId', $deliveryId->getBytes(), Type::GUID) + ->setMaxResults(1) + ->execute() + ->fetch() + ; + + if (false == $deliveredMessage) { + throw new \LogicException('There must be a message at all times at this stage but there is no a message.'); + } + + if ($deliveredMessage['redelivered'] || empty($deliveredMessage['time_to_live']) || $deliveredMessage['time_to_live'] > time()) { + return $this->getContext()->convertMessage($deliveredMessage); + } + } } + + return null; } protected function redeliverMessages(): void { - $this->getConnection()->createQueryBuilder() + if (null === $this->redeliverMessagesLastExecutedAt) { + $this->redeliverMessagesLastExecutedAt = microtime(true); + } elseif ((microtime(true) - $this->redeliverMessagesLastExecutedAt) < 1) { + return; + } + + $update = $this->getConnection()->createQueryBuilder() ->update($this->getContext()->getTableName()) ->set('delivery_id', ':deliveryId') ->set('redelivered', ':redelivered') - ->andWhere('delivery_id IS NOT NULL') ->andWhere('redeliver_after < :now') - ->setParameter(':now', (int) time(), Type::BIGINT) - ->setParameter('deliveryId', null, Type::STRING) + ->andWhere('delivery_id IS NOT NULL') + ->setParameter(':now', time(), Type::BIGINT) + ->setParameter('deliveryId', null, Type::GUID) ->setParameter('redelivered', true, Type::BOOLEAN) - ->execute() ; + + $update->execute(); + + $this->redeliverMessagesLastExecutedAt = microtime(true); } protected function removeExpiredMessages(): void { - $this->getConnection()->createQueryBuilder() + if (null === $this->removeExpiredMessagesLastExecutedAt) { + $this->removeExpiredMessagesLastExecutedAt = microtime(true); + } elseif ((microtime(true) - $this->removeExpiredMessagesLastExecutedAt) < 1) { + return; + } + + $delete = $this->getConnection()->createQueryBuilder() ->delete($this->getContext()->getTableName()) ->andWhere('(time_to_live IS NOT NULL) AND (time_to_live < :now)') - ->setParameter(':now', (int) time(), Type::BIGINT) - ->setParameter('redelivered', false, Type::BOOLEAN) - ->execute() + ->andWhere('delivery_id IS NULL') + ->andWhere('redelivered = false') + + ->setParameter(':now', time(), Type::BIGINT) ; + + $delete->execute(); + + $this->removeExpiredMessagesLastExecutedAt = microtime(true); + } + + private function deleteMessage(string $deliveryId): void + { + if (empty($deliveryId)) { + throw new \LogicException(sprintf('Expected record was removed but it is not. Delivery id: "%s"', $deliveryId)); + } + + $this->getConnection()->delete( + $this->getContext()->getTableName(), + ['delivery_id' => Uuid::fromString($deliveryId)->getBytes()], + ['delivery_id' => Type::GUID] + ); } } diff --git a/pkg/dbal/DbalContext.php b/pkg/dbal/DbalContext.php index 0cac7ac45..63ce0b1da 100644 --- a/pkg/dbal/DbalContext.php +++ b/pkg/dbal/DbalContext.php @@ -17,6 +17,7 @@ use Interop\Queue\Queue; use Interop\Queue\SubscriptionConsumer; use Interop\Queue\Topic; +use Ramsey\Uuid\Uuid; class DbalContext implements Context { @@ -141,32 +142,38 @@ public function createSubscriptionConsumer(): SubscriptionConsumer /** * @internal It must be used here and in the consumer only */ - public function convertMessage(array $dbalMessage): DbalMessage + public function convertMessage(array $arrayMessage): DbalMessage { - /** @var DbalMessage $dbalMessageObj */ - $dbalMessageObj = $this->createMessage( - $dbalMessage['body'], - $dbalMessage['properties'] ? JSON::decode($dbalMessage['properties']) : [], - $dbalMessage['headers'] ? JSON::decode($dbalMessage['headers']) : [] + /** @var DbalMessage $message */ + $message = $this->createMessage( + $arrayMessage['body'], + $arrayMessage['properties'] ? JSON::decode($arrayMessage['properties']) : [], + $arrayMessage['headers'] ? JSON::decode($arrayMessage['headers']) : [] ); - if (isset($dbalMessage['redelivered'])) { - $dbalMessageObj->setRedelivered((bool) $dbalMessage['redelivered']); + if (isset($arrayMessage['id'])) { + $message->setMessageId(Uuid::fromBytes($arrayMessage['id'])->toString()); } - if (isset($dbalMessage['priority'])) { - $dbalMessageObj->setPriority((int) $dbalMessage['priority']); + if (isset($arrayMessage['queue'])) { + $message->setQueue($arrayMessage['queue']); } - if (isset($dbalMessage['published_at'])) { - $dbalMessageObj->setPublishedAt((int) $dbalMessage['published_at']); + if (isset($arrayMessage['redelivered'])) { + $message->setRedelivered((bool) $arrayMessage['redelivered']); } - if (isset($dbalMessage['delivery_id'])) { - $dbalMessageObj->setDeliveryId((string) $dbalMessage['delivery_id']); + if (isset($arrayMessage['priority'])) { + $message->setPriority((int) (-1 * $arrayMessage['priority'])); } - if (isset($dbalMessage['redeliver_after'])) { - $dbalMessageObj->setRedeliverAfter((int) $dbalMessage['redeliver_after']); + if (isset($arrayMessage['published_at'])) { + $message->setPublishedAt((int) $arrayMessage['published_at']); + } + if (isset($arrayMessage['delivery_id'])) { + $message->setDeliveryId(Uuid::fromBytes($arrayMessage['delivery_id'])->toString()); + } + if (isset($arrayMessage['redeliver_after'])) { + $message->setRedeliverAfter((int) $arrayMessage['redeliver_after']); } - return $dbalMessageObj; + return $message; } /** @@ -218,8 +225,7 @@ public function createDataBaseTable(): void $table = new Table($this->getTableName()); - $table->addColumn('id', Type::BINARY, ['length' => 16, 'fixed' => true]); - $table->addColumn('human_id', Type::STRING, ['length' => 36]); + $table->addColumn('id', Type::GUID, ['length' => 16, 'fixed' => true]); $table->addColumn('published_at', Type::BIGINT); $table->addColumn('body', Type::TEXT, ['notnull' => false]); $table->addColumn('headers', Type::TEXT, ['notnull' => false]); @@ -229,17 +235,14 @@ public function createDataBaseTable(): void $table->addColumn('priority', Type::SMALLINT, ['notnull' => false]); $table->addColumn('delayed_until', Type::BIGINT, ['notnull' => false]); $table->addColumn('time_to_live', Type::BIGINT, ['notnull' => false]); - $table->addColumn('delivery_id', Type::STRING, ['notnull' => false]); + $table->addColumn('delivery_id', Type::GUID, ['length' => 16, 'fixed' => true, 'notnull' => false]); $table->addColumn('redeliver_after', Type::BIGINT, ['notnull' => false]); $table->setPrimaryKey(['id']); - $table->addIndex(['published_at']); - $table->addIndex(['queue']); - $table->addIndex(['priority']); - $table->addIndex(['delayed_until']); - $table->addIndex(['priority', 'published_at']); - $table->addIndex(['redeliver_after']); - $table->addUniqueIndex(['delivery_id']); + $table->addIndex(['priority', 'published_at', 'queue', 'delivery_id', 'delayed_until', 'id']); + + $table->addIndex(['redeliver_after', 'delivery_id']); + $table->addIndex(['time_to_live', 'delivery_id']); $sm->createTable($table); } diff --git a/pkg/dbal/DbalMessage.php b/pkg/dbal/DbalMessage.php index 8464d56e3..af62c1079 100644 --- a/pkg/dbal/DbalMessage.php +++ b/pkg/dbal/DbalMessage.php @@ -53,6 +53,11 @@ class DbalMessage implements Message */ private $deliveryId; + /** + * @var string|null + */ + private $queue; + /** * Milliseconds, for example 15186054527288. * @@ -249,4 +254,14 @@ public function setPublishedAt(int $publishedAt = null): void { $this->publishedAt = $publishedAt; } + + public function getQueue(): ?string + { + return $this->queue; + } + + public function setQueue(?string $queue): void + { + $this->queue = $queue; + } } diff --git a/pkg/dbal/DbalProducer.php b/pkg/dbal/DbalProducer.php index 38ad33414..014c7775c 100644 --- a/pkg/dbal/DbalProducer.php +++ b/pkg/dbal/DbalProducer.php @@ -71,7 +71,7 @@ public function send(Destination $destination, Message $message): void } $body = $message->getBody(); - $uuid = Uuid::uuid1(); + $uuid = Uuid::uuid4(); $publishedAt = null !== $message->getPublishedAt() ? $message->getPublishedAt() : @@ -80,12 +80,11 @@ public function send(Destination $destination, Message $message): void $dbalMessage = [ 'id' => $this->uuidCodec->encodeBinary($uuid), - 'human_id' => $uuid->toString(), 'published_at' => $publishedAt, 'body' => $body, 'headers' => JSON::encode($message->getHeaders()), 'properties' => JSON::encode($message->getProperties()), - 'priority' => $message->getPriority(), + 'priority' => -1 * $message->getPriority(), 'queue' => $destination->getQueueName(), 'redelivered' => false, 'delivery_id' => null, diff --git a/pkg/dbal/DbalSubscriptionConsumer.php b/pkg/dbal/DbalSubscriptionConsumer.php index 12c1cef37..60d30cc7e 100644 --- a/pkg/dbal/DbalSubscriptionConsumer.php +++ b/pkg/dbal/DbalSubscriptionConsumer.php @@ -75,6 +75,7 @@ public function consume(int $timeout = 0): void } $timeout /= 1000; + $now = time(); $redeliveryDelay = $this->getRedeliveryDelay() / 1000; // milliseconds to seconds $currentQueueNames = []; @@ -83,24 +84,21 @@ public function consume(int $timeout = 0): void $currentQueueNames = $queueNames; } - $now = time(); $this->removeExpiredMessages(); $this->redeliverMessages(); if ($message = $this->fetchMessage($currentQueueNames, $redeliveryDelay)) { - $dbalMessage = $this->getContext()->convertMessage($message); - /** * @var DbalConsumer * @var callable $callback */ - list($consumer, $callback) = $this->subscribers[$message['queue']]; + list($consumer, $callback) = $this->subscribers[$message->getQueue()]; - if (false === call_user_func($callback, $dbalMessage, $consumer)) { + if (false === call_user_func($callback, $message, $consumer)) { return; } - unset($currentQueueNames[$message['queue']]); + unset($currentQueueNames[$message->getQueue()]); } else { $currentQueueNames = []; diff --git a/pkg/dbal/Tests/DbalConsumerTest.php b/pkg/dbal/Tests/DbalConsumerTest.php index 4eeb70a75..1478fe89a 100644 --- a/pkg/dbal/Tests/DbalConsumerTest.php +++ b/pkg/dbal/Tests/DbalConsumerTest.php @@ -16,6 +16,7 @@ use Interop\Queue\Exception\InvalidMessageException; use Interop\Queue\Message; use PHPUnit\Framework\TestCase; +use Ramsey\Uuid\Uuid; class DbalConsumerTest extends TestCase { @@ -55,11 +56,13 @@ public function testAcknowledgeShouldThrowIfInstanceOfMessageIsInvalid() public function testShouldDeleteMessageOnAcknowledge() { + $deliveryId = Uuid::uuid4(); + $queue = new DbalDestination('queue'); $message = new DbalMessage(); $message->setBody('theBody'); - $message->setDeliveryId('foo-delivery-id'); + $message->setDeliveryId($deliveryId->toString()); $dbal = $this->createConectionMock(); $dbal @@ -67,8 +70,8 @@ public function testShouldDeleteMessageOnAcknowledge() ->method('delete') ->with( 'some-table-name', - ['delivery_id' => $message->getDeliveryId()], - ['delivery_id' => Type::STRING] + ['delivery_id' => $deliveryId->getBytes()], + ['delivery_id' => Type::GUID] ) ; @@ -124,11 +127,13 @@ public function testRejectShouldThrowIfInstanceOfMessageIsInvalid() public function testShouldDeleteMessageFromQueueOnReject() { + $deliveryId = Uuid::uuid4(); + $queue = new DbalDestination('queue'); $message = new DbalMessage(); $message->setBody('theBody'); - $message->setDeliveryId('foo-delivery-id'); + $message->setDeliveryId($deliveryId->toString()); $dbal = $this->createConectionMock(); $dbal @@ -136,8 +141,8 @@ public function testShouldDeleteMessageFromQueueOnReject() ->method('delete') ->with( 'some-table-name', - ['delivery_id' => $message->getDeliveryId()], - ['delivery_id' => Type::STRING] + ['delivery_id' => $deliveryId->getBytes()], + ['delivery_id' => Type::GUID] ) ; diff --git a/pkg/dbal/Tests/Functional/DbalConsumerTest.php b/pkg/dbal/Tests/Functional/DbalConsumerTest.php index ecee1bc71..01e18fe94 100644 --- a/pkg/dbal/Tests/Functional/DbalConsumerTest.php +++ b/pkg/dbal/Tests/Functional/DbalConsumerTest.php @@ -43,7 +43,7 @@ public function testShouldSetPublishedAtDateToReceivedMessage() $consumer = $context->createConsumer($queue); // guard - $this->assertNull($consumer->receiveNoWait()); + $this->assertSame(0, $this->getQuerySize()); $time = (int) (microtime(true) * 10000); @@ -51,11 +51,12 @@ public function testShouldSetPublishedAtDateToReceivedMessage() $producer = $context->createProducer(); + /** @var DbalMessage $message */ $message = $context->createMessage($expectedBody); $message->setPublishedAt($time); $producer->send($queue, $message); - $message = $consumer->receive(8000); // 8 sec + $message = $consumer->receive(100); // 100ms $this->assertInstanceOf(DbalMessage::class, $message); $consumer->acknowledge($message); @@ -71,7 +72,7 @@ public function testShouldOrderMessagesWithSamePriorityByPublishedAtDate() $consumer = $context->createConsumer($queue); // guard - $this->assertNull($consumer->receiveNoWait()); + $this->assertSame(0, $this->getQuerySize()); $time = (int) (microtime(true) * 10000); $olderTime = $time - 10000; @@ -97,7 +98,7 @@ public function testShouldOrderMessagesWithSamePriorityByPublishedAtDate() $consumer->acknowledge($message); $this->assertSame($expectedPriority5BodyOlderTime, $message->getBody()); - $message = $consumer->receive(8000); // 8 sec + $message = $consumer->receive(100); // 8 sec $this->assertInstanceOf(DbalMessage::class, $message); $consumer->acknowledge($message); @@ -109,17 +110,14 @@ public function testShouldDeleteExpiredMessage() $context = $this->context; $queue = $context->createQueue(__METHOD__); - $consumer = $context->createConsumer($queue); - // guard - $this->assertNull($consumer->receiveNoWait()); + $this->assertSame(0, $this->getQuerySize()); $producer = $context->createProducer(); $this->context->getDbalConnection()->insert( $this->context->getTableName(), [ 'id' => 'id', - 'human_id' => 'id', 'published_at' => '123', 'body' => 'expiredMessage', 'headers' => json_encode([]), @@ -133,20 +131,22 @@ public function testShouldDeleteExpiredMessage() $message->setRedelivered(false); $producer->send($queue, $message); - $this->assertSame('2', $this->getQuerySize()); + $this->assertSame(2, $this->getQuerySize()); - $message = $consumer->receive(8000); + // we need a new consumer to workaround redeliver + $consumer = $context->createConsumer($queue); + $message = $consumer->receive(100); - $this->assertSame('1', $this->getQuerySize()); + $this->assertSame(1, $this->getQuerySize()); $consumer->acknowledge($message); - $this->assertSame('0', $this->getQuerySize()); + $this->assertSame(0, $this->getQuerySize()); } - private function getQuerySize(): string + private function getQuerySize(): int { - return $this->context->getDbalConnection() + return (int) $this->context->getDbalConnection() ->executeQuery('SELECT count(*) FROM '.$this->context->getTableName()) ->fetchColumn(0) ;