Skip to content

Commit

Permalink
[IMPROVED] Bind Object Store bucket stream when getting object. (#1568)
Browse files Browse the repository at this point in the history
The change is needed to properly operate on mirrored object store buckets.
By binding stream nats.go does not need to search for stream by subjects (which does not work for mirrors).
  • Loading branch information
dmitryabramov-f3 authored Mar 3, 2024
1 parent 3fefbb2 commit 21d3482
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 2 deletions.
8 changes: 7 additions & 1 deletion jetstream/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -918,7 +918,13 @@ func (obs *obs) Get(ctx context.Context, name string, opts ...GetObjectOpt) (Obj
}

chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID)
_, err = obs.pushJS.Subscribe(chunkSubj, processChunk, nats.OrderedConsumer(), nats.Context(ctx))
streamName := fmt.Sprintf(objNameTmpl, obs.name)
subscribeOpts := []nats.SubOpt{
nats.OrderedConsumer(),
nats.Context(ctx),
nats.BindStream(streamName),
}
_, err = obs.pushJS.Subscribe(chunkSubj, processChunk, subscribeOpts...)
if err != nil {
return nil, err
}
Expand Down
7 changes: 6 additions & 1 deletion object.go
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,12 @@ func (obs *obs) Get(name string, opts ...GetObjectOpt) (ObjectResult, error) {
}

chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID)
_, err = obs.js.Subscribe(chunkSubj, processChunk, OrderedConsumer())
streamName := fmt.Sprintf(objNameTmpl, obs.name)
subscribeOpts := []SubOpt{
OrderedConsumer(),
BindStream(streamName),
}
_, err = obs.js.Subscribe(chunkSubj, processChunk, subscribeOpts...)
if err != nil {
return nil, err
}
Expand Down
50 changes: 50 additions & 0 deletions test/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1114,3 +1114,53 @@ func TestObjectStoreCompression(t *testing.T) {
t.Fatalf("Expected stream to be compressed with S2")
}
}

func TestObjectStoreMirror(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

bucketName := "test-bucket"

obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: bucketName, Description: "testing"})
expectOk(t, err)

mirrorBucketName := "mirror-test-bucket"

_, err = js.AddStream(&nats.StreamConfig{
Name: fmt.Sprintf("OBJ_%s", mirrorBucketName),
Mirror: &nats.StreamSource{
Name: fmt.Sprintf("OBJ_%s", bucketName),
SubjectTransforms: []nats.SubjectTransformConfig{
{
Source: fmt.Sprintf("$O.%s.>", bucketName),
Destination: fmt.Sprintf("$O.%s.>", mirrorBucketName),
},
},
},
AllowRollup: true, // meta messages are always rollups
})
if err != nil {
t.Fatalf("Error creating object store bucket mirror: %v", err)
}

_, err = obs.PutString("A", "abc")
expectOk(t, err)

mirrorObs, err := js.ObjectStore(mirrorBucketName)
expectOk(t, err)

// Make sure we sync.
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
mirrorValue, err := mirrorObs.GetString("A")
if err != nil {
return err
}
if mirrorValue != "abc" {
t.Fatalf("Expected mirrored object store value to be the same as original")
}
return nil
})
}

0 comments on commit 21d3482

Please sign in to comment.