Skip to content

Commit

Permalink
#23 Add function for delete exchange/queue
Browse files Browse the repository at this point in the history
  • Loading branch information
ZhukV committed Sep 6, 2023
1 parent 618be68 commit e500e08
Show file tree
Hide file tree
Showing 11 changed files with 141 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ next release

* Add `EventableConsumerInterface` for control specific errors (consumer timeout, stop after N executes, etc...).
* Add possible to shuffle spool connections.
* Add function for delete exchange and queue.

v2.0.1
------
Expand Down
8 changes: 8 additions & 0 deletions src/Adapter/Amqp/Exchange/AmqpExchange.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,4 +94,12 @@ public function publish(Message $message, string $routingKey = ''): void

$this->exchange->publish($message->payload->data, $routingKey, AMQP_NOPARAM, $options);
}

/**
* {@inheritdoc}
*/
public function delete(): void
{
$this->exchange->delete();
}
}
8 changes: 8 additions & 0 deletions src/Adapter/Amqp/Queue/AmqpQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ public function purge(): void
$this->queue->purge();
}

/**
* {@inheritdoc}
*/
public function delete(): void
{
$this->queue->delete();
}

/**
* {@inheritdoc}
*/
Expand Down
8 changes: 8 additions & 0 deletions src/Adapter/AmqpLib/Exchange/AmqpExchange.php
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,12 @@ public function publish(Message $message, string $routingKey = ''): void
$amqplibMessage = new AMQPMessage($message->payload->data, $options);
$this->channel->getChannel()->basic_publish($amqplibMessage, $this->getName(), $routingKey);
}

/**
* {@inheritdoc}
*/
public function delete(): void
{
$this->channel->getChannel()->exchange_delete($this->getName());
}
}
8 changes: 8 additions & 0 deletions src/Adapter/AmqpLib/Queue/AmqpQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@ public function purge(): void
$this->channel->getChannel()->queue_purge($this->getName());
}

/**
* {@inheritdoc}
*/
public function delete(): void
{
$this->channel->getChannel()->queue_delete($this->getName());
}

/**
* {@inheritdoc}
*
Expand Down
5 changes: 5 additions & 0 deletions src/Exchange/ExchangeInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,9 @@ public function getName(): string;
* @param string $routingKey
*/
public function publish(Message $message, string $routingKey = ''): void;

/**
* Delete exchange
*/
public function delete(): void;
}
5 changes: 5 additions & 0 deletions src/Queue/QueueInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ public function get(): ?ReceivedMessage;
*/
public function purge(): void;

/**
* Delete a queue
*/
public function delete(): void;

/**
* Get a count messages
*
Expand Down
21 changes: 21 additions & 0 deletions tests/Functional/Adapter/ExchangeFactoryTestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,27 @@ public function shouldSuccessCreateWithBindings(): void
], $bindings);
}

#[Test]
public function shouldSuccessDelete(): void
{
$this->management->createExchange('direct', 'some');

$definition = new ExchangeDefinition('some', AMQP_EX_TYPE_DIRECT);

$factory = $this->createExchangeFactory($definition);
$exchange = $factory->create();

$this->management->exchangeByName('some');
$this->addToAssertionCount(1); // because we success get exchange by name

$exchange->delete();

$this->expectException(\RuntimeException::class);
$this->expectExceptionMessage('The exchange with name "some" was not found.');

$this->management->exchangeByName('some');
}

#[Test]
public function shouldThrowExceptionOnCreatePassiveIfExchangeNotFound(): void
{
Expand Down
59 changes: 59 additions & 0 deletions tests/Functional/Adapter/QueueFactoryTestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,65 @@ public function shouldSuccessConsumeMessage(): void
self::assertTrue($consumed, 'The queue not receive message.');
}

#[Test]
public function shouldSuccessPurgeQueue(): void
{
$this->management->createExchange(AMQP_EX_TYPE_DIRECT, 'test.direct');
$this->management->createQueue('some');
$this->management->queueBind('some', 'test.direct', 'test');

$this->management->publishMessage('test.direct', 'test', 'some foo bar');
$this->management->publishMessage('test.direct', 'test', 'some foo bar');

$definition = new QueueDefinition(
'some',
new BindingDefinitions(
new BindingDefinition('test.direct', 'test')
)
);

$factory = $this->createQueueFactory($definition);
$queue = $factory->create();

$messages = $this->management->queueGetMessages('some', 2);

self::assertCount(2, $messages);

$queue->purge();

$messages = $this->management->queueGetMessages('some', 1);

self::assertCount(0, $messages);
}

#[Test]
public function shouldSuccessDeleteQueue(): void
{
$this->management->createExchange(AMQP_EX_TYPE_DIRECT, 'test.direct');
$this->management->createQueue('some');
$this->management->queueBind('some', 'test.direct', 'test');

$definition = new QueueDefinition(
'some',
new BindingDefinitions(
new BindingDefinition('test.direct', 'test')
)
);

$factory = $this->createQueueFactory($definition);
$queue = $factory->create();

$this->management->queueByName('some');
$this->addToAssertionCount(1); // Because success get queue by name

$queue->delete();

$this->expectException(\RuntimeException::class);
$this->expectExceptionMessage('The queue with name "some" was not found.');

$this->management->queueByName('some');
}

#[Test]
public function shouldSuccessGetCountMessages(): void
{
Expand Down
9 changes: 9 additions & 0 deletions tests/Unit/Adapter/Amqp/Exchange/AmqpExchangeTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,13 @@ public function shouldSuccessPublishMessageWithHeaders(): void

$this->exchange->publish($message, 'foo-bar');
}

#[Test]
public function shouldSuccessDelete(): void
{
$this->originalExchange->expects(self::once())
->method('delete');

$this->exchange->delete();
}
}
9 changes: 9 additions & 0 deletions tests/Unit/Adapter/Amqp/Queue/AmqpQueueTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,15 @@ public function shouldSuccessPurge(): void
$this->queue->purge();
}

#[Test]
public function shouldSuccessDelete(): void
{
$this->originQueue->expects(self::once())
->method('delete');

$this->queue->delete();
}

#[Test]
public function shouldSuccessGetChannel(): void
{
Expand Down

0 comments on commit e500e08

Please sign in to comment.