From dce3c368c223901e66420afa21850e87f5782594 Mon Sep 17 00:00:00 2001 From: Dmitry Abramov Date: Mon, 26 Feb 2024 17:44:23 +0200 Subject: [PATCH] chore: Bind Object Store bucket stream when getting object. The change is needed to properly operate on mirrored object store buckets. By binding stream nats.go does not need to search for stream by subjects (which does not work for mirrors). --- jetstream/object.go | 8 +++++++- object.go | 7 ++++++- test/object_test.go | 50 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 2 deletions(-) diff --git a/jetstream/object.go b/jetstream/object.go index 271cc2235..8b2b7097b 100644 --- a/jetstream/object.go +++ b/jetstream/object.go @@ -918,7 +918,13 @@ func (obs *obs) Get(ctx context.Context, name string, opts ...GetObjectOpt) (Obj } chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID) - _, err = obs.pushJS.Subscribe(chunkSubj, processChunk, nats.OrderedConsumer(), nats.Context(ctx)) + streamName := fmt.Sprintf(objNameTmpl, obs.name) + subscribeOpts := []nats.SubOpt{ + nats.OrderedConsumer(), + nats.Context(ctx), + nats.BindStream(streamName), + } + _, err = obs.pushJS.Subscribe(chunkSubj, processChunk, subscribeOpts...) if err != nil { return nil, err } diff --git a/object.go b/object.go index 2b818ac86..4a965adfc 100644 --- a/object.go +++ b/object.go @@ -694,7 +694,12 @@ func (obs *obs) Get(name string, opts ...GetObjectOpt) (ObjectResult, error) { } chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID) - _, err = obs.js.Subscribe(chunkSubj, processChunk, OrderedConsumer()) + streamName := fmt.Sprintf(objNameTmpl, obs.name) + subscribeOpts := []SubOpt{ + OrderedConsumer(), + BindStream(streamName), + } + _, err = obs.js.Subscribe(chunkSubj, processChunk, subscribeOpts...) if err != nil { return nil, err } diff --git a/test/object_test.go b/test/object_test.go index 616cc1df9..272c464b2 100644 --- a/test/object_test.go +++ b/test/object_test.go @@ -1121,3 +1121,53 @@ func TestObjectStoreCompression(t *testing.T) { t.Fatalf("Expected stream to be compressed with S2") } } + +func TestObjectStoreMirror(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + bucketName := "test-bucket" + + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: bucketName, Description: "testing"}) + expectOk(t, err) + + mirrorBucketName := "mirror-test-bucket" + + _, err = js.AddStream(&nats.StreamConfig{ + Name: fmt.Sprintf("OBJ_%s", mirrorBucketName), + Mirror: &nats.StreamSource{ + Name: fmt.Sprintf("OBJ_%s", bucketName), + SubjectTransforms: []nats.SubjectTransformConfig{ + { + Source: fmt.Sprintf("$O.%s.>", bucketName), + Destination: fmt.Sprintf("$O.%s.>", mirrorBucketName), + }, + }, + }, + AllowRollup: true, // meta messages are always rollups + }) + if err != nil { + t.Fatalf("Error creating object store bucket mirror: %v", err) + } + + _, err = obs.PutString("A", "abc") + expectOk(t, err) + + mirrorObs, err := js.ObjectStore(mirrorBucketName) + expectOk(t, err) + + // Make sure we sync. + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + mirrorValue, err := mirrorObs.GetString("A") + if err != nil { + return err + } + if mirrorValue != "abc" { + t.Fatalf("Expected mirrored object store value to be the same as original") + } + return nil + }) +}