diff --git a/jetstream/object.go b/jetstream/object.go index 271cc2235..8b2b7097b 100644 --- a/jetstream/object.go +++ b/jetstream/object.go @@ -918,7 +918,13 @@ func (obs *obs) Get(ctx context.Context, name string, opts ...GetObjectOpt) (Obj } chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID) - _, err = obs.pushJS.Subscribe(chunkSubj, processChunk, nats.OrderedConsumer(), nats.Context(ctx)) + streamName := fmt.Sprintf(objNameTmpl, obs.name) + subscribeOpts := []nats.SubOpt{ + nats.OrderedConsumer(), + nats.Context(ctx), + nats.BindStream(streamName), + } + _, err = obs.pushJS.Subscribe(chunkSubj, processChunk, subscribeOpts...) if err != nil { return nil, err } diff --git a/object.go b/object.go index 2b818ac86..4a965adfc 100644 --- a/object.go +++ b/object.go @@ -694,7 +694,12 @@ func (obs *obs) Get(name string, opts ...GetObjectOpt) (ObjectResult, error) { } chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID) - _, err = obs.js.Subscribe(chunkSubj, processChunk, OrderedConsumer()) + streamName := fmt.Sprintf(objNameTmpl, obs.name) + subscribeOpts := []SubOpt{ + OrderedConsumer(), + BindStream(streamName), + } + _, err = obs.js.Subscribe(chunkSubj, processChunk, subscribeOpts...) if err != nil { return nil, err } diff --git a/test/object_test.go b/test/object_test.go index 616cc1df9..ed1455469 100644 --- a/test/object_test.go +++ b/test/object_test.go @@ -1121,3 +1121,54 @@ func TestObjectStoreCompression(t *testing.T) { t.Fatalf("Expected stream to be compressed with S2") } } + +func TestObjectStoreMirror(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + bucketName := "test-bucket" + + obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: bucketName, Description: "testing"}) + expectOk(t, err) + + mirrorBucketName := "mirror-test-bucket" + + _, err = js.AddStream(&nats.StreamConfig{ + Name: fmt.Sprintf("OBJ_%s", mirrorBucketName), + Mirror: &nats.StreamSource{ + Name: fmt.Sprintf("OBJ_%s", bucketName), + //FilterSubject: fmt.Sprintf("$O.%s.>", bucketName), + SubjectTransforms: []nats.SubjectTransformConfig{ + { + Source: fmt.Sprintf("$O.%s.>", bucketName), + Destination: fmt.Sprintf("$O.%s.>", mirrorBucketName), + }, + }, + }, + AllowRollup: true, // meta messages are always rollups + }) + if err != nil { + t.Fatalf("Error creating object store bucket mirror: %v", err) + } + + _, err = obs.PutString("A", "abc") + expectOk(t, err) + + mirrorObs, err := js.ObjectStore(mirrorBucketName) + expectOk(t, err) + + // Make sure we sync. + checkFor(t, 2*time.Second, 15*time.Millisecond, func() error { + mirrorValue, err := mirrorObs.GetString("A") + if err != nil { + return err + } + if mirrorValue != "abc" { + t.Fatalf("Expected mirrored object store value to be the same as original") + } + return nil + }) +}