Skip to content

Commit

Permalink
Merge pull request #3 from mplachter/add-ability-to-pass-context-down…
Browse files Browse the repository at this point in the history
…-so-implementations-can-use-context

add context so implementations can pass down context
  • Loading branch information
sl1pm4t authored May 1, 2023
2 parents fc89862 + 54e19de commit 06976a7
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 20 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@
# Dependency directories (remove the comment below to include it)
# vendor/
.idea
.vscode/
11 changes: 6 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type ExampleMsg struct {
}

// Mandatory - Implement the `gongs.MsgEvent` interface
func (e *ExampleMsg) GetId() string {
func (e *ExampleMsg) GetId(ctx context.Context) string {
return e.eventData.Id
}

Expand All @@ -29,13 +29,13 @@ func (e *ExampleMsg) DecodeEventData(b []byte) error {
return nil
}

func (e *ExampleMsg) EncodeEventData() []byte {
func (e *ExampleMsg) EncodeEventData(ctx context.Context) []byte {
b, _ := json.Marshal(e.eventData)
return b
}
```

Create Generic Stream for the above type:
Create a Generic Stream for the above type:

```go
// create Jetstream for Stream
Expand All @@ -55,8 +55,9 @@ Create Generic Stream for the above type:
Publish event

```go
ctx := context.Background()
// Publish an event
q.Publish(&ExampleMsg{
q.Publish(ctx, &ExampleMsg{
eventData: &ExampleMsgEventData{
Id: "abc123",
Type: "start",
Expand All @@ -65,7 +66,7 @@ Publish event
})
```

Read last event off queue
Read the last event off queue.

```go
// Read event from NATS
Expand Down
11 changes: 7 additions & 4 deletions example_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package gongs_test

import (
"context"
"encoding/json"
"fmt"
"os"

"github.com/nats-io/nats.go"
"github.com/sl1pm4t/gongs"
"github.com/sl1pm4t/gongs/test"
"os"
)

func init() {
Expand All @@ -23,7 +25,7 @@ type ExampleMsg struct {
eventData *ExampleMsgEventData
}

func (e *ExampleMsg) GetId() string {
func (e *ExampleMsg) GetId(ctx context.Context) string {
return e.eventData.Id
}

Expand All @@ -34,7 +36,7 @@ func (e *ExampleMsg) DecodeEventData(b []byte) error {
return nil
}

func (e *ExampleMsg) EncodeEventData() []byte {
func (e *ExampleMsg) EncodeEventData(ctx context.Context) []byte {
b, _ := json.Marshal(e.eventData)
return b
}
Expand All @@ -52,12 +54,13 @@ func Example() {
}
js, _ := nc.JetStream()
js.AddStream(cfg)
ctx := context.Background()

// create Generic Stream
q := gongs.NewGenericStream[ExampleMsg](js, "example.events", cfg.Name)

// Publish an event
q.Publish(&ExampleMsg{
q.Publish(ctx, &ExampleMsg{
eventData: &ExampleMsgEventData{
Id: "abc123",
Type: "start",
Expand Down
8 changes: 5 additions & 3 deletions generic_queue.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package gongs

import (
"context"

"github.com/nats-io/nats.go"
)

Expand All @@ -25,10 +27,10 @@ func NewGenericStream[T any, I MsgEvent[T]](

// Publish will publish a message to nats using a message id returned by MsgEvent.GetId
// The message id is used for deduplication https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive#message-deduplication
func (s *GenericStream[T, I]) Publish(evt I) (*nats.PubAck, error) {
b := evt.EncodeEventData()
func (s *GenericStream[T, I]) Publish(ctx context.Context, evt I) (*nats.PubAck, error) {
b := evt.EncodeEventData(ctx)

wId := nats.MsgId(evt.GetId())
wId := nats.MsgId(evt.GetId(ctx))
return s.js.Publish(s.subject, b, wId)
}

Expand Down
14 changes: 9 additions & 5 deletions generic_queue_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package gongs_test

import (
"context"
"encoding/json"
"fmt"
"sync"
"testing"

"github.com/nats-io/nats.go"
"github.com/sl1pm4t/gongs"
"github.com/sl1pm4t/gongs/test"
"sync"
"testing"
)

type TestStreamMsg struct {
Expand All @@ -19,7 +21,7 @@ type TestStreamEventData struct {
Foo string
}

func (e *TestStreamMsg) GetId() string {
func (e *TestStreamMsg) GetId(ctx context.Context) string {
return fmt.Sprintf("%d", e.eventData.Id)
}

Expand All @@ -33,7 +35,7 @@ func (e *TestStreamMsg) DecodeEventData(b []byte) error {
return nil
}

func (e *TestStreamMsg) EncodeEventData() []byte {
func (e *TestStreamMsg) EncodeEventData(ctx context.Context) []byte {
b, _ := json.Marshal(e.eventData)
return b
}
Expand All @@ -52,6 +54,8 @@ func Test_GenericStream_QueueSubscribe(t *testing.T) {
Storage: nats.MemoryStorage,
}

ctx := context.Background()

_, err := js.AddStream(cfg)
if err != nil {
t.Fatalf("could not create stream: %v", err)
Expand Down Expand Up @@ -83,7 +87,7 @@ func Test_GenericStream_QueueSubscribe(t *testing.T) {

for i := 1; i < 4; i++ {
wg.Add(1)
_, err := workStream.Publish(&TestStreamMsg{eventData: &TestStreamEventData{
_, err := workStream.Publish(ctx, &TestStreamMsg{eventData: &TestStreamEventData{
Id: i,
Foo: fmt.Sprintf("foo-%d", i),
}})
Expand Down
8 changes: 5 additions & 3 deletions msg.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package gongs

import "context"

type MsgHandlerFunc[T any] func(*T) error

type MsgEvent[T any] interface {
GetId() string
DecodeEventData([]byte) error
EncodeEventData() []byte
GetId(ctx context.Context) string
DecodeEventData(b []byte) error
EncodeEventData(ctx context.Context) []byte
*T
}

0 comments on commit 06976a7

Please sign in to comment.