Skip to content

Commit

Permalink
chore: Bind Object Store bucket stream when getting object.
Browse files Browse the repository at this point in the history
The change is needed to properly operate on mirrored object store buckets.
By binding stream nats.go does not need to search for stream by subjects (which does not work for mirrors).
  • Loading branch information
dmitryabramov-f3 committed Feb 27, 2024
1 parent f7c1fe9 commit dce3c36
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 2 deletions.
8 changes: 7 additions & 1 deletion jetstream/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,13 @@ func (obs *obs) Get(ctx context.Context, name string, opts ...GetObjectOpt) (Obj
}

chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID)
_, err = obs.pushJS.Subscribe(chunkSubj, processChunk, nats.OrderedConsumer(), nats.Context(ctx))
streamName := fmt.Sprintf(objNameTmpl, obs.name)
subscribeOpts := []nats.SubOpt{
nats.OrderedConsumer(),
nats.Context(ctx),
nats.BindStream(streamName),
}
_, err = obs.pushJS.Subscribe(chunkSubj, processChunk, subscribeOpts...)
if err != nil {
return nil, err
}
Expand Down
7 changes: 6 additions & 1 deletion object.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,12 @@ func (obs *obs) Get(name string, opts ...GetObjectOpt) (ObjectResult, error) {
}

chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID)
_, err = obs.js.Subscribe(chunkSubj, processChunk, OrderedConsumer())
streamName := fmt.Sprintf(objNameTmpl, obs.name)
subscribeOpts := []SubOpt{
OrderedConsumer(),
BindStream(streamName),
}
_, err = obs.js.Subscribe(chunkSubj, processChunk, subscribeOpts...)
if err != nil {
return nil, err
}
Expand Down
50 changes: 50 additions & 0 deletions test/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,3 +1121,53 @@ func TestObjectStoreCompression(t *testing.T) {
t.Fatalf("Expected stream to be compressed with S2")
}
}

func TestObjectStoreMirror(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

bucketName := "test-bucket"

obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: bucketName, Description: "testing"})
expectOk(t, err)

mirrorBucketName := "mirror-test-bucket"

_, err = js.AddStream(&nats.StreamConfig{
Name: fmt.Sprintf("OBJ_%s", mirrorBucketName),
Mirror: &nats.StreamSource{
Name: fmt.Sprintf("OBJ_%s", bucketName),
SubjectTransforms: []nats.SubjectTransformConfig{
{
Source: fmt.Sprintf("$O.%s.>", bucketName),
Destination: fmt.Sprintf("$O.%s.>", mirrorBucketName),
},
},
},
AllowRollup: true, // meta messages are always rollups
})
if err != nil {
t.Fatalf("Error creating object store bucket mirror: %v", err)
}

_, err = obs.PutString("A", "abc")
expectOk(t, err)

mirrorObs, err := js.ObjectStore(mirrorBucketName)
expectOk(t, err)

// Make sure we sync.
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
mirrorValue, err := mirrorObs.GetString("A")
if err != nil {
return err
}
if mirrorValue != "abc" {
t.Fatalf("Expected mirrored object store value to be the same as original")
}
return nil
})
}

0 comments on commit dce3c36

Please sign in to comment.