Skip to content

Commit

Permalink
chore: Bind Object Store bucket stream when getting object.
Browse files Browse the repository at this point in the history
The change is needed to properly operate on mirrored object store buckets.
By binding stream nats.go does not need to search for stream by subjects (which does not work for mirrors).
  • Loading branch information
dmitryabramov-f3 committed Feb 26, 2024
1 parent f7c1fe9 commit 0648187
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 4 deletions.
11 changes: 9 additions & 2 deletions jetstream/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ import (
"sync"
"time"

"github.com/nats-io/nuid"

"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/internal/parser"
"github.com/nats-io/nuid"
)

type (
Expand Down Expand Up @@ -918,7 +919,13 @@ func (obs *obs) Get(ctx context.Context, name string, opts ...GetObjectOpt) (Obj
}

chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID)
_, err = obs.pushJS.Subscribe(chunkSubj, processChunk, nats.OrderedConsumer(), nats.Context(ctx))
streamName := fmt.Sprintf(objNameTmpl, obs.name)
subscribeOpts := []nats.SubOpt{
nats.OrderedConsumer(),
nats.Context(ctx),
nats.BindStream(streamName),
}
_, err = obs.pushJS.Subscribe(chunkSubj, processChunk, subscribeOpts...)
if err != nil {
return nil, err
}
Expand Down
10 changes: 8 additions & 2 deletions object.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ import (
"sync"
"time"

"github.com/nats-io/nats.go/internal/parser"
"github.com/nats-io/nuid"

"github.com/nats-io/nats.go/internal/parser"
)

// ObjectStoreManager creates, loads and deletes Object Stores
Expand Down Expand Up @@ -694,7 +695,12 @@ func (obs *obs) Get(name string, opts ...GetObjectOpt) (ObjectResult, error) {
}

chunkSubj := fmt.Sprintf(objChunksPreTmpl, obs.name, info.NUID)
_, err = obs.js.Subscribe(chunkSubj, processChunk, OrderedConsumer())
streamName := fmt.Sprintf(objNameTmpl, obs.name)
subscribeOpts := []SubOpt{
OrderedConsumer(),
BindStream(streamName),
}
_, err = obs.js.Subscribe(chunkSubj, processChunk, subscribeOpts...)
if err != nil {
return nil, err
}
Expand Down
48 changes: 48 additions & 0 deletions test/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1121,3 +1121,51 @@ func TestObjectStoreCompression(t *testing.T) {
t.Fatalf("Expected stream to be compressed with S2")
}
}

func TestObjectStoreMirror(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

bucketName := "test-bucket"

obs, err := js.CreateObjectStore(&nats.ObjectStoreConfig{Bucket: bucketName, Description: "testing"})
expectOk(t, err)

mirrorBucketName := "mirror-test-bucket"

_, err = js.AddStream(&nats.StreamConfig{
Name: fmt.Sprintf("OBJ_%s", mirrorBucketName),
Mirror: &nats.StreamSource{
Name: fmt.Sprintf("OBJ_%s", bucketName),
//FilterSubject: fmt.Sprintf("$O.%s.>", bucketName),
SubjectTransforms: []nats.SubjectTransformConfig{
{
Source: fmt.Sprintf("$O.%s.>", bucketName),
Destination: fmt.Sprintf("$O.%s.>", mirrorBucketName),
},
},
},
AllowRollup: true, // meta messages are always rollups
})
if err != nil {
t.Fatalf("Error creating object store bucket mirror: %v", err)
}

_, err = obs.PutString("A", "abc")
expectOk(t, err)

mirrorObs, err := js.ObjectStore(mirrorBucketName)
expectOk(t, err)

time.Sleep(time.Second * 2)

mirrorValue, err := mirrorObs.GetString("A")
expectOk(t, err)

if mirrorValue != "abc" {
t.Fatalf("Expected mirrored object store value to be the same as original")
}
}

0 comments on commit 0648187

Please sign in to comment.