From 8633fe3f8ca9b46e2595a9862a7a9a3dc8c49934 Mon Sep 17 00:00:00 2001 From: oxidmod Date: Thu, 25 May 2023 19:07:49 +0200 Subject: [PATCH] feat: parse jet stream message timestamp --- src/Message/Msg.php | 40 ++++++++++++++++++++++++++++++++- src/Message/Payload.php | 3 ++- tests/Functional/StreamTest.php | 3 +++ 3 files changed, 44 insertions(+), 2 deletions(-) diff --git a/src/Message/Msg.php b/src/Message/Msg.php index 42f1379..2184bb3 100644 --- a/src/Message/Msg.php +++ b/src/Message/Msg.php @@ -14,6 +14,7 @@ class Msg extends Prototype public Payload $payload; public string $sid; public string $subject; + public ?int $timestampNanos = null; public static function create(string $data): self { @@ -52,6 +53,8 @@ public static function create(string $data): self } } + $values = self::tryParseMessageTime($values); + return new self($values); } @@ -87,7 +90,13 @@ public function parse($payload): self } $payload = substr($payload, $this->hlength); } - $this->payload = new Payload($payload, $headers, $this->subject); + $this->payload = new Payload( + $payload, + $headers, + $this->subject, + $this->timestampNanos + ); + return $this; } @@ -95,4 +104,33 @@ public function render(): string { return 'MSG ' . json_encode($this); } + + private static function tryParseMessageTime(array $values): array + { + if (!array_key_exists('replyTo', $values) + || !str_starts_with($values['replyTo'], '$JS.ACK') + ) { + # This is not a JetStream message + return $values; + } + + # old format + # "$JS.ACK....." + # new format + # $JS.ACK.......... + $tokens = explode('.', $values['replyTo']); + if (count($tokens) === 9) { + # if it is an old format we will add two missing items to process tokens in the same way + array_splice($tokens, 2, 0, ['', '']); + } + + if (count($tokens) < 11) { + # Looks like invalid format was given + return $values; + } + + $values['timestampNanos'] = (int)$tokens[9]; + + return $values; + } } diff --git a/src/Message/Payload.php b/src/Message/Payload.php index 48ac0ef..a2d15d3 100644 --- a/src/Message/Payload.php +++ b/src/Message/Payload.php @@ -24,7 +24,8 @@ public static function parse(mixed $data): self public function __construct( public string $body, public array $headers = [], - public ?string $subject = null + public ?string $subject = null, + public ?int $timestampNanos = null ) { $hdrs = $this->getValue('message.hdrs'); if ($hdrs) { diff --git a/tests/Functional/StreamTest.php b/tests/Functional/StreamTest.php index 0233401..f4a1057 100644 --- a/tests/Functional/StreamTest.php +++ b/tests/Functional/StreamTest.php @@ -14,6 +14,8 @@ class StreamTest extends FunctionalTestCase { private mixed $called; + private bool $empty; + public function testDeduplication() { $stream = $this->getClient()->getApi()->getStream('tester'); @@ -213,6 +215,7 @@ public function testConsumer() $this->assertNotNull($this->called); $this->assertSame($this->called->name, 'nekufa'); + $this->assertNotNull($this->called->timestampNanos); $this->called = null; $consumer = $stream->getConsumer('bye');