Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Direct Gets by subject #1030

Merged
merged 6 commits into from
Aug 3, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go_test.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.17

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.8.5-0.20220729163007-8aee7d5e51d4
github.com/nats-io/nats-server/v2 v2.8.5-0.20220803150712-d7847c97c116
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand All @@ -14,6 +14,7 @@ require (
github.com/klauspost/compress v1.15.9 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.3.0 // indirect
go.uber.org/automaxprocs v1.5.1 // indirect
codegangsta marked this conversation as resolved.
Show resolved Hide resolved
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/time v0.0.0-20220411224347-583f2d630306 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go_test.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.8.4 h1:0jQzze1T9mECg8YZEl8+WYUXb9JKluJfCBriPUtluB4=
github.com/nats-io/nats-server/v2 v2.8.4/go.mod h1:8zZa+Al3WsESfmgSs98Fi06dRWLH5Bnq90m5bKD/eT4=
github.com/nats-io/nats-server/v2 v2.8.5-0.20220729163007-8aee7d5e51d4 h1:hAhXb/iuQNPZe9y0wFift4hkiquMlNF5WuMhpoMlUUM=
github.com/nats-io/nats-server/v2 v2.8.5-0.20220729163007-8aee7d5e51d4/go.mod h1:3Yg3ApyQxPlAs1KKHKV5pobV5VtZk+TtOiUJx/iqkkg=
github.com/nats-io/nats-server/v2 v2.8.5-0.20220803150712-d7847c97c116 h1:NoZ5jkLgMNijnDh96QENq4M06AF34GXlvaYtHGXm/Jk=
github.com/nats-io/nats-server/v2 v2.8.5-0.20220803150712-d7847c97c116/go.mod h1:3Yg3ApyQxPlAs1KKHKV5pobV5VtZk+TtOiUJx/iqkkg=
github.com/nats-io/nats.go v1.16.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
Expand All @@ -32,6 +36,7 @@ github.com/prashantv/gostub v1.1.0/go.mod h1:A5zLQHz7ieHGG7is6LLXLz7I8+3LZzsrV0P
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
go.uber.org/automaxprocs v1.5.1 h1:e1YG66Lrk73dn4qhg8WFSvhF0JuFQF0ERIp4rpuV8Qk=
go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU=
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM=
Expand Down
3 changes: 3 additions & 0 deletions js.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ const (
// apiMsgGetT is the endpoint to perform a direct get of a message.
apiDirectMsgGetT = "DIRECT.GET.%s"

// apiDirectMsgGetLastBySubjectT is the endpoint to perform a direct get of a message by subject.
apiDirectMsgGetLastBySubjectT = "DIRECT.GET.%s.%s"

// apiMsgDeleteT is the endpoint to remove a message.
apiMsgDeleteT = "STREAM.MSG.DELETE.%s"

Expand Down
14 changes: 14 additions & 0 deletions jsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,20 @@ func (js *js) getMsg(name string, mreq *apiMsgGetRequest, opts ...JSOpt) (*RawSt
}

var apiSubj string

doDirectGetLastBySubject := o.directGet && mreq.LastFor != ""

if doDirectGetLastBySubject {
apiSubj = apiDirectMsgGetLastBySubjectT
dsSubj := js.apiSubj(fmt.Sprintf(apiSubj, name, mreq.LastFor))
r, err := js.apiRequestWithContext(o.ctx, dsSubj, nil)
if err != nil {
return nil, err
}

return convertDirectGetMsgResponseToMsg(name, r)
}

if o.directGet {
apiSubj = apiDirectMsgGetT
mreq.NextFor = o.directNextFor
Expand Down
13 changes: 13 additions & 0 deletions test/js_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7409,6 +7409,19 @@ func TestJetStreamDirectGetMsg(t *testing.T) {
if _, err := js.GetMsg("DGM", 0, nats.DirectGet()); err == nil || !strings.Contains(err.Error(), "Empty Request") {
t.Fatalf("Unexpected error: %v", err)
}

// Test direct get by subject by trying to get 'bar' directly
r, err = js.GetLastMsg("DGM", "bar", nats.DirectGet())
if err != nil {
t.Fatalf("Error getting message: %v", err)
}
if r.Subject != "bar" {
t.Fatalf("expected subject to be 'bar', got: %v", r.Subject)
}
if string(r.Data) != "d" {
t.Fatalf("Error getting message: %v", err)
codegangsta marked this conversation as resolved.
Show resolved Hide resolved
t.Fatalf("expected data to be 'd', got: %v", string(r.Data))
}
}

func TestJetStreamConsumerReplicasOption(t *testing.T) {
Expand Down