Skip to content

Commit 4fab108

Browse files
authored
feat: support nats queue (#191)
1 parent 656d2d7 commit 4fab108

File tree

7 files changed

+222
-0
lines changed

7 files changed

+222
-0
lines changed

common/constants.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ const (
2121
MemoryTubeType = "memory"
2222
HttpTubeType = "http"
2323
EmptyTubeType = "empty"
24+
NatsTubeType = "nats"
2425

2526
WASMRuntime = "wasm"
2627
ExternalRuntime = "external"

fs/contube/nats.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Copyright 2024 Function Stream Org.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package contube
18+
19+
import (
20+
"context"
21+
"time"
22+
23+
"github.com/functionstream/function-stream/common"
24+
"github.com/nats-io/nats.go"
25+
"github.com/pkg/errors"
26+
)
27+
28+
type NatsTubeFactoryConfig struct {
29+
NatsURL string `json:"nats_url"`
30+
}
31+
32+
type NatsEventQueueFactory struct {
33+
nc *nats.Conn
34+
}
35+
36+
type NatsSourceTubeConfig struct {
37+
Subject string `json:"subject" validate:"required"`
38+
}
39+
40+
func (n NatsEventQueueFactory) NewSourceTube(ctx context.Context, configMap ConfigMap) (<-chan Record, error) {
41+
config := &NatsSourceTubeConfig{}
42+
if err := configMap.ToConfigStruct(config); err != nil {
43+
return nil, err
44+
}
45+
c := make(chan Record)
46+
sub, err := n.nc.SubscribeSync(config.Subject)
47+
if err != nil {
48+
return nil, err
49+
}
50+
log := common.NewDefaultLogger()
51+
go func() {
52+
for {
53+
msg, err := sub.NextMsg(10 * time.Millisecond)
54+
if err != nil {
55+
if !errors.Is(err, nats.ErrTimeout) {
56+
log.Error(err, "Failed to get next message", "subject", config.Subject)
57+
}
58+
continue
59+
}
60+
select {
61+
case c <- NewRecordImpl(msg.Data, func() {
62+
_ = msg.Ack()
63+
}): // do nothing
64+
case <-ctx.Done():
65+
return
66+
}
67+
}
68+
}()
69+
return c, nil
70+
}
71+
72+
type NatsSinkTubeConfig struct {
73+
Subject string `json:"subject" validate:"required"`
74+
}
75+
76+
func (n NatsEventQueueFactory) NewSinkTube(ctx context.Context, configMap ConfigMap) (chan<- Record, error) {
77+
config := &NatsSinkTubeConfig{}
78+
if err := configMap.ToConfigStruct(config); err != nil {
79+
return nil, err
80+
}
81+
c := make(chan Record)
82+
log := common.NewDefaultLogger()
83+
go func() {
84+
for {
85+
select {
86+
case <-ctx.Done():
87+
return
88+
case event, ok := <-c:
89+
if !ok {
90+
return
91+
}
92+
err := n.nc.Publish(config.Subject, event.GetPayload())
93+
log.Info("Published message", "subject", config.Subject, "err", err)
94+
if err != nil {
95+
log.Error(err, "Failed to publish message", "subject", config.Subject)
96+
continue
97+
}
98+
event.Commit()
99+
}
100+
}
101+
}()
102+
return c, nil
103+
}
104+
105+
func NewNatsEventQueueFactory(ctx context.Context, configMap ConfigMap) (TubeFactory, error) {
106+
config := &NatsTubeFactoryConfig{}
107+
if err := configMap.ToConfigStruct(config); err != nil {
108+
return nil, err
109+
}
110+
if config.NatsURL == "" {
111+
config.NatsURL = "nats://localhost:4222"
112+
}
113+
nc, err := nats.Connect(config.NatsURL)
114+
if err != nil {
115+
return nil, err
116+
}
117+
log := common.NewDefaultLogger()
118+
go func() {
119+
<-ctx.Done()
120+
// Close the nats queue factory
121+
log.Info("Closing nats queue factory", "url", config.NatsURL)
122+
err := nc.Drain()
123+
if err != nil {
124+
log.Error(err, "Failed to drain nats connection", "url", config.NatsURL)
125+
}
126+
}()
127+
return &NatsEventQueueFactory{
128+
nc: nc,
129+
}, nil
130+
}

go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ require (
1212
github.com/go-logr/zapr v1.3.0
1313
github.com/go-openapi/spec v0.21.0
1414
github.com/go-playground/validator/v10 v10.11.1
15+
github.com/nats-io/nats.go v1.37.0
1516
github.com/pkg/errors v0.9.1
1617
github.com/spf13/cobra v1.8.0
1718
github.com/spf13/viper v1.18.2
@@ -74,6 +75,8 @@ require (
7475
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
7576
github.com/modern-go/reflect2 v1.0.2 // indirect
7677
github.com/mtibben/percent v0.2.1 // indirect
78+
github.com/nats-io/nkeys v0.4.7 // indirect
79+
github.com/nats-io/nuid v1.0.1 // indirect
7780
github.com/pelletier/go-toml/v2 v2.1.0 // indirect
7881
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
7982
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect

go.sum

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,12 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
152152
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
153153
github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs=
154154
github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns=
155+
github.com/nats-io/nats.go v1.37.0 h1:07rauXbVnnJvv1gfIyghFEo6lUcYRY0WXc3x7x0vUxE=
156+
github.com/nats-io/nats.go v1.37.0/go.mod h1:Ubdu4Nh9exXdSz0RVWRFBbRfrbSxOYd26oF0wkWclB8=
157+
github.com/nats-io/nkeys v0.4.7 h1:RwNJbbIdYCoClSDNY7QVKZlyb/wfT6ugvFCiKy6vDvI=
158+
github.com/nats-io/nkeys v0.4.7/go.mod h1:kqXRgRDPlGy7nGaEDMuYzmiJCIAAWDK0IMBtDmGD0nc=
159+
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
160+
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
155161
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
156162
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
157163
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=

server/server.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,10 @@ func GetBuiltinTubeFactoryBuilder() map[string]func(configMap config.ConfigMap)
189189
common.EmptyTubeType: func(_ config.ConfigMap) (contube.TubeFactory, error) {
190190
return contube.NewEmptyTubeFactory(), nil
191191
},
192+
//nolint:unparam
193+
common.NatsTubeType: func(configMap config.ConfigMap) (contube.TubeFactory, error) {
194+
return contube.NewNatsEventQueueFactory(context.Background(), contube.ConfigMap(configMap))
195+
},
192196
}
193197
}
194198

server/server_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ import (
2323
"net"
2424
"strconv"
2525
"testing"
26+
"time"
27+
28+
"github.com/nats-io/nats.go"
2629

2730
"github.com/functionstream/function-stream/common/config"
2831

@@ -47,6 +50,13 @@ func getListener(t *testing.T) net.Listener {
4750
func startStandaloneSvr(t *testing.T, ctx context.Context, opts ...ServerOption) (*Server, string) {
4851
ln := getListener(t)
4952
defaultOpts := []ServerOption{
53+
WithConfig(&Config{
54+
TubeConfig: map[string]config.ConfigMap{
55+
common.NatsTubeType: {
56+
"nats_url": "nats://localhost:4222",
57+
},
58+
},
59+
}),
5060
WithHttpListener(ln),
5161
WithTubeFactoryBuilders(GetBuiltinTubeFactoryBuilder()),
5262
WithRuntimeFactoryBuilders(GetBuiltinRuntimeFactoryBuilder()),
@@ -189,6 +199,66 @@ func TestHttpTube(t *testing.T) {
189199
}
190200
}
191201

202+
func TestNatsTube(t *testing.T) {
203+
ctx, cancel := context.WithCancel(context.Background())
204+
defer cancel()
205+
s, _ := startStandaloneSvr(t, ctx, nil, nil)
206+
207+
funcConf := &model.Function{
208+
Package: "../bin/example_basic.wasm",
209+
Sources: []model.TubeConfig{{
210+
Type: common.NatsTubeType,
211+
Config: map[string]interface{}{
212+
"subject": "input",
213+
},
214+
}},
215+
Sink: model.TubeConfig{
216+
Type: common.NatsTubeType,
217+
Config: map[string]interface{}{
218+
"subject": "output",
219+
},
220+
},
221+
Name: "test-func",
222+
Replicas: 1,
223+
}
224+
225+
err := s.Manager.StartFunction(funcConf)
226+
assert.Nil(t, err)
227+
228+
p := &tests.Person{
229+
Name: "rbt",
230+
Money: 0,
231+
}
232+
jsonBytes, err := json.Marshal(p)
233+
if err != nil {
234+
t.Fatal(err)
235+
}
236+
237+
nc, err := nats.Connect("nats://localhost:4222")
238+
assert.NoError(t, err)
239+
240+
sub, err := nc.SubscribeSync("output")
241+
assert.NoError(t, err)
242+
243+
assert.NoError(t, nc.Publish("input", jsonBytes))
244+
245+
event, err := sub.NextMsg(3 * time.Second)
246+
if err != nil {
247+
t.Error(err)
248+
return
249+
}
250+
var out tests.Person
251+
err = json.Unmarshal(event.Data, &out)
252+
if err != nil {
253+
t.Error(err)
254+
return
255+
}
256+
if out.Money != 1 {
257+
t.Errorf("expected 1, got %d", out.Money)
258+
return
259+
}
260+
}
261+
192262
type MockRuntimeFactory struct {
193263
}
194264

tests/docker-compose.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,11 @@ services:
2020
ports:
2121
- "6650:6650"
2222
- "8080:8080"
23+
nats:
24+
image: nats:latest
25+
container_name: nats-server
26+
ports:
27+
- "4222:4222"
28+
- "8222:8222"
29+
environment:
30+
- NATS_ALLOW_NEW_USERS=true

0 commit comments

Comments
 (0)