Skip to content

Commit

Permalink
feat: parse jet stream message timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
oxidmod authored and nekufa committed May 26, 2023
1 parent 917653d commit 8633fe3
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 2 deletions.
40 changes: 39 additions & 1 deletion src/Message/Msg.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class Msg extends Prototype
public Payload $payload;
public string $sid;
public string $subject;
public ?int $timestampNanos = null;

public static function create(string $data): self
{
Expand Down Expand Up @@ -52,6 +53,8 @@ public static function create(string $data): self
}
}

$values = self::tryParseMessageTime($values);

return new self($values);
}

Expand Down Expand Up @@ -87,12 +90,47 @@ public function parse($payload): self
}
$payload = substr($payload, $this->hlength);
}
$this->payload = new Payload($payload, $headers, $this->subject);
$this->payload = new Payload(
$payload,
$headers,
$this->subject,
$this->timestampNanos
);

return $this;
}

public function render(): string
{
return 'MSG ' . json_encode($this);
}

private static function tryParseMessageTime(array $values): array
{
if (!array_key_exists('replyTo', $values)
|| !str_starts_with($values['replyTo'], '$JS.ACK')
) {
# This is not a JetStream message
return $values;
}

# old format
# "$JS.ACK.<stream>.<consumer>.<redeliveryCount><streamSeq><deliverySequence>.<timestamp>.<pending>"
# new format
# $JS.ACK.<domain>.<accounthash>.<stream>.<consumer>.<redeliveryCount>.<streamSeq>.<deliverySequence>.<timestamp>.<pending>.<random>
$tokens = explode('.', $values['replyTo']);
if (count($tokens) === 9) {
# if it is an old format we will add two missing items to process tokens in the same way
array_splice($tokens, 2, 0, ['', '']);
}

if (count($tokens) < 11) {
# Looks like invalid format was given
return $values;
}

$values['timestampNanos'] = (int)$tokens[9];

return $values;
}
}
3 changes: 2 additions & 1 deletion src/Message/Payload.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ public static function parse(mixed $data): self
public function __construct(
public string $body,
public array $headers = [],
public ?string $subject = null
public ?string $subject = null,
public ?int $timestampNanos = null
) {
$hdrs = $this->getValue('message.hdrs');
if ($hdrs) {
Expand Down
3 changes: 3 additions & 0 deletions tests/Functional/StreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ class StreamTest extends FunctionalTestCase
{
private mixed $called;

private bool $empty;

public function testDeduplication()
{
$stream = $this->getClient()->getApi()->getStream('tester');
Expand Down Expand Up @@ -213,6 +215,7 @@ public function testConsumer()

$this->assertNotNull($this->called);
$this->assertSame($this->called->name, 'nekufa');
$this->assertNotNull($this->called->timestampNanos);

$this->called = null;
$consumer = $stream->getConsumer('bye');
Expand Down

0 comments on commit 8633fe3

Please sign in to comment.