Skip to content

Commit

Permalink
implement NATS JetStream connectors
Browse files Browse the repository at this point in the history
  • Loading branch information
reugn committed May 28, 2022
1 parent 1ca36d9 commit 3a93f27
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 28 deletions.
2 changes: 1 addition & 1 deletion nats/doc.go
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
// Package nats implements the NATS Streaming connector.
// Package nats provides NATS streaming connectors.
package nats
1 change: 1 addition & 0 deletions nats/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.15
require (
github.com/golang/protobuf v1.4.3 // indirect
github.com/nats-io/nats-streaming-server v0.19.0 // indirect
github.com/nats-io/nats.go v1.15.0
github.com/nats-io/stan.go v0.10.2
github.com/reugn/go-streams v0.7.0
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 // indirect
Expand Down
3 changes: 2 additions & 1 deletion nats/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ github.com/nats-io/nats-server/v2 v2.1.9/go.mod h1:9qVyoewoYXzG1ME9ox0HwkkzyYvnl
github.com/nats-io/nats-streaming-server v0.19.0 h1:NVYusu6kcMxRBj1wOWRdXBUHf1bzkJQbsHovsg+Fr1o=
github.com/nats-io/nats-streaming-server v0.19.0/go.mod h1:oqrRqpMg84aiPDyroTornjVWNYJKh+6ozh2Mgt8dslE=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nats.go v1.13.0 h1:LvYqRB5epIzZWQp6lmeltOOZNLqCvm4b+qfvzZO03HE=
github.com/nats-io/nats.go v1.13.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.15.0 h1:3IXNBolWrwIUf2soxh6Rla8gPzYWEZQBUBK6RV21s+o=
github.com/nats-io/nats.go v1.15.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.4/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
Expand Down
173 changes: 173 additions & 0 deletions nats/nats_jetstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package nats

import (
"context"
"log"

"github.com/nats-io/nats.go"
"github.com/reugn/go-streams"
"github.com/reugn/go-streams/flow"
)

var (
PullMaxWaiting = 128
FetchBatchSize = 16
)

// JetStreamSource represents a NATS JetStream source connector.
// Uses a pull-based consumer.
type JetStreamSource struct {
conn *nats.Conn
jetStreamContext nats.JetStreamContext
subscription *nats.Subscription
out chan interface{}
ctx context.Context
}

// NewNatsSink returns a new JetStreamSource instance.
func NewJetStreamSource(ctx context.Context, subjectName, url string) (*JetStreamSource, error) {
nc, err := nats.Connect(url)
if err != nil {
return nil, err
}

// create JetStreamContext
js, err := nc.JetStream()
if err != nil {
return nil, err
}

// create pull based consumer
sub, err := js.PullSubscribe(subjectName, "JetStreamSource", nats.PullMaxWaiting(PullMaxWaiting))
if err != nil {
return nil, err
}

jetStreamSource := &JetStreamSource{
conn: nc,
jetStreamContext: js,
subscription: sub,
out: make(chan interface{}),
ctx: ctx,
}

go jetStreamSource.init()
return jetStreamSource, nil
}

func (js *JetStreamSource) init() {
loop:
for {
select {
case <-js.ctx.Done():
break loop
default:
}

messages, err := js.subscription.Fetch(FetchBatchSize, nats.Context(js.ctx))
if err != nil {
log.Printf("JetStreamSource fetch error: %s", err)
break loop
}
for _, msg := range messages {
if err := msg.Ack(); err != nil {
log.Printf("Failed to Ack JetStream message: %s", err)
}
js.out <- msg
}
}

log.Printf("Closing JetStream consumer")
if err := js.subscription.Drain(); err != nil {
log.Printf("Failed to Drain JetStream subscription: %s", err)
}
close(js.out)
}

// Via streams data through the given flow
func (js *JetStreamSource) Via(_flow streams.Flow) streams.Flow {
flow.DoStream(js, _flow)
return _flow
}

// Out returns an output channel for sending data
func (js *JetStreamSource) Out() <-chan interface{} {
return js.out
}

// JetStreamSink represents a NATS JetStream sink connector.
type JetStreamSink struct {
conn *nats.Conn
jetStreamContext nats.JetStreamContext
subjectName string
in chan interface{}
}

// NewNatsSink returns a new JetStreamSource instance.
func NewJetStreamSink(streamName, subjectName, url string) (*JetStreamSink, error) {
nc, err := nats.Connect(url)
if err != nil {
return nil, err
}

// create JetStreamContext
js, err := nc.JetStream()
if err != nil {
return nil, err
}

// check if the given stream already exists; if not, create it.
stream, _ := js.StreamInfo(streamName)
if stream == nil {
log.Printf("Creating JetStream %s with subject %s", streamName, subjectName)
_, err = js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{subjectName},
})
if err != nil {
return nil, err
}
} else {
log.Printf("JetStream %s exists", streamName)
}

jetStreamSink := &JetStreamSink{
conn: nc,
jetStreamContext: js,
subjectName: subjectName,
in: make(chan interface{}),
}

go jetStreamSink.init()
return jetStreamSink, nil
}

func (js *JetStreamSink) init() {
for msg := range js.in {
var err error
switch m := msg.(type) {
case *nats.Msg:
_, err = js.jetStreamContext.Publish(js.subjectName, m.Data)

case []byte:
_, err = js.jetStreamContext.Publish(js.subjectName, m)

default:
log.Printf("Unsupported message type %v", m)
}

if err != nil {
log.Printf("Error processing JetStream message: %s", err)
}
}

log.Printf("Closing JetStream producer")
if err := js.conn.Drain(); err != nil {
log.Printf("Failed to Drain JetStream connection: %s", err)
}
}

// In returns an input channel for receiving data
func (js *JetStreamSink) In() chan<- interface{} {
return js.in
}
54 changes: 28 additions & 26 deletions nats/nats_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import (
"github.com/reugn/go-streams/flow"
)

// NatsSource represents a NATS Streaming source connector.
type NatsSource struct {
// StreamingSource represents a NATS Streaming source connector.
// Deprecated: Use JetStreamSource instead.
type StreamingSource struct {
conn stan.Conn
subscriptions []stan.Subscription
subscriptionType stan.SubscriptionOption
Expand All @@ -26,12 +27,12 @@ type NatsSource struct {
wg *sync.WaitGroup
}

// NewNatsSource returns a new NatsSource instance.
func NewNatsSource(ctx context.Context, conn stan.Conn, subscriptionType stan.SubscriptionOption,
topics ...string) *NatsSource {
// NewStreamingSource returns a new StreamingSource instance.
func NewStreamingSource(ctx context.Context, conn stan.Conn, subscriptionType stan.SubscriptionOption,
topics ...string) *StreamingSource {
cctx, cancel := context.WithCancel(ctx)

natsSource := NatsSource{
streamingSource := &StreamingSource{
conn: conn,
subscriptions: []stan.Subscription{},
subscriptionType: subscriptionType,
Expand All @@ -42,11 +43,11 @@ func NewNatsSource(ctx context.Context, conn stan.Conn, subscriptionType stan.Su
wg: &sync.WaitGroup{},
}

go natsSource.init()
return &natsSource
go streamingSource.init()
return streamingSource
}

func (ns *NatsSource) init() {
func (ns *StreamingSource) init() {
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

Expand All @@ -62,7 +63,7 @@ func (ns *NatsSource) init() {
log.Fatal("Failed to subscribe to NATS cluster")
}

log.Printf("NATS source subscribed to topic %s", t)
log.Printf("StreamingSource subscribed to topic %s", t)
ns.subscriptions = append(ns.subscriptions, sub)
}(topic)
}
Expand All @@ -72,7 +73,7 @@ func (ns *NatsSource) init() {

select {
case <-sigchan:
log.Println("NATS source received termination signal, cleaning up...")
log.Println("StreamingSource received termination signal, cleaning up...")
ns.cancelCtx()
case <-ns.ctx.Done():
}
Expand All @@ -81,60 +82,61 @@ func (ns *NatsSource) init() {

close(ns.out)
ns.conn.Close()
log.Println("NATS source cleanup complete")
log.Println("StreamingSource cleanup complete")
}

func (ns *NatsSource) awaitCleanup() {
func (ns *StreamingSource) awaitCleanup() {
ns.wg.Add(1)
defer ns.wg.Done()

select {
case <-ns.ctx.Done():
for _, s := range ns.subscriptions {
for _, sub := range ns.subscriptions {
ns.wg.Add(1)
go func(sub stan.Subscription) {
defer ns.wg.Done()

if err := sub.Unsubscribe(); err != nil {
log.Fatal("Failed to remove NATS subscription")
}
}(s)
}(sub)
}
default:
}
}

// Via streams data through a given flow
func (ns *NatsSource) Via(_flow streams.Flow) streams.Flow {
func (ns *StreamingSource) Via(_flow streams.Flow) streams.Flow {
flow.DoStream(ns, _flow)
return _flow
}

// Out returns the output channel for the data
func (ns *NatsSource) Out() <-chan interface{} {
func (ns *StreamingSource) Out() <-chan interface{} {
return ns.out
}

// NatsSink represents a NATS Streaming sink connector.
type NatsSink struct {
// StreamingSink represents a NATS Streaming sink connector.
// Deprecated: Use JetStreamSink instead.
type StreamingSink struct {
conn stan.Conn
topic string
in chan interface{}
}

// NewNatsSink returns a new NatsSink instance
func NewNatsSink(conn stan.Conn, topic string) *NatsSink {
natsSink := NatsSink{
// NewStreamingSink returns a new StreamingSink instance.
func NewStreamingSink(conn stan.Conn, topic string) *StreamingSink {
streamingSink := &StreamingSink{
conn: conn,
topic: topic,
in: make(chan interface{}),
}

go natsSink.init()
return &natsSink
go streamingSink.init()
return streamingSink
}

func (ns *NatsSink) init() {
func (ns *StreamingSink) init() {
for msg := range ns.in {
var err error
switch m := msg.(type) {
Expand All @@ -158,6 +160,6 @@ func (ns *NatsSink) init() {
}

// In returns an input channel for receiving data
func (ns *NatsSink) In() chan<- interface{} {
func (ns *StreamingSink) In() chan<- interface{} {
return ns.in
}

0 comments on commit 3a93f27

Please sign in to comment.