Skip to content

Commit

Permalink
feature: make consumer name nullable in consumer configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
oxidmod authored and nekufa committed Feb 6, 2023
1 parent 1508388 commit 917653d
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 7 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ $goodbyer
use Basis\Nats\Consumer\Configuration as ConsumerConfiguration;
use Basis\Nats\Consumer\DeliverPolicy;

$configuration = (new ConsumerConfiguration($stream->getName(), ''))
$configuration = (new ConsumerConfiguration($stream->getName()))
->setDeliverPolicy(DeliverPolicy::NEW)
->setSubjectFilter('mailer.greet');

Expand Down
6 changes: 3 additions & 3 deletions src/Consumer/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class Configuration

public function __construct(
private readonly string $stream,
private string $name
private ?string $name = null
) {
}

Expand All @@ -35,7 +35,7 @@ public function getAckPolicy(): string
return $this->ackPolicy;
}

public function getName(): string
public function getName(): ?string
{
return $this->name;
}
Expand Down Expand Up @@ -185,7 +185,7 @@ public function toArray(): array
'deliver_policy' => $this->getDeliverPolicy(),
'deliver_subject' => $this->getDeliverSubject(),
'description' => $this->getDescription(),
'durable_name' => $this->isEphemeral() ? null : $this->getName(),
'durable_name' => $this->isEphemeral() ? null : $this->getName(),
'flow_control' => $this->getFlowControl(),
'headers_only' => $this->getHeadersOnly(),
'idle_heartbeat' => $this->getIdleHeartbeat(),
Expand Down
8 changes: 7 additions & 1 deletion src/Consumer/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public function __construct(

public function create($ifNotExists = true): self
{
if (!$this->exists()) {
if ($this->shouldCreateConsumer($ifNotExists)) {
$command = $this->configuration->isEphemeral() ?
'CONSUMER.CREATE.' . $this->getStream() :
'CONSUMER.DURABLE.CREATE.' . $this->getStream() . '.' . $this->getName();
Expand Down Expand Up @@ -191,4 +191,10 @@ public function setIterations(int $iterations): self

return $this;
}

private function shouldCreateConsumer(bool $ifNotExists): bool
{
return ($this->configuration->isEphemeral() && $this->configuration->getName() === null)
|| !$this->exists();
}
}
8 changes: 6 additions & 2 deletions tests/Functional/StreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,14 @@ public function testEphemeralConsumer()

$this->called = null;

$configuration = new Configuration('my_stream', '');
$configuration = new Configuration('my_stream');
$configuration->setSubjectFilter('tester.greet');
$consumer1 = $stream->createEphemeralConsumer($configuration);
$this->assertNull($this->called);

// check that consumer can be received by name after creation
$this->assertSame($consumer1, $stream->getConsumer($consumer1->getName()));

$consumer1->setIterations(1);
$consumer1->handle($this->persistMessage(...));

Expand All @@ -261,7 +265,7 @@ public function testEphemeralConsumer()
$this->assertSame($this->called->name, 'oxidmod');

$this->called = null;
$configuration = new Configuration('my_stream', '');
$configuration = new Configuration('my_stream');
$configuration->setSubjectFilter('tester.bye')->setAckPolicy('explicit');
$consumer2 = $stream->createEphemeralConsumer($configuration);

Expand Down

0 comments on commit 917653d

Please sign in to comment.