Skip to content

Commit 8a2d0ae

Browse files
matthiashanelalexellis
authored andcommitted
Allow up to max_inflight many concurrent function invocations.
Fixes #94 Switch from auto ack to manual ack. This enables message handling in individual go routines. In main.go switch to atomic counter and include index in traces. Going forward these may be mixed with concurrent traces. Removes option to run without durable. Generates durable name base on subject. Removes issue where the durable would not see every messages. Signed-off-by: Matthias Hanel <mh@synadia.com>
1 parent 1f4e16e commit 8a2d0ae

File tree

4 files changed

+62
-86
lines changed

4 files changed

+62
-86
lines changed

main.go

Lines changed: 26 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"os/signal"
1515
"strings"
1616
"sync"
17+
"sync/atomic"
1718
"time"
1819

1920
stan "github.com/nats-io/stan.go"
@@ -34,12 +35,6 @@ func main() {
3435

3536
hostname, _ := os.Hostname()
3637

37-
var durable string
38-
if config.NatsDurableQueueSubscription {
39-
durable = "faas"
40-
}
41-
42-
var unsubscribe bool
4338
var credentials *auth.BasicAuthCredentials
4439
var err error
4540

@@ -53,9 +48,9 @@ func main() {
5348

5449
client := makeClient(config.TLSInsecure)
5550

56-
i := 0
51+
counter := uint64(0)
5752
messageHandler := func(msg *stan.Msg) {
58-
i++
53+
i := atomic.AddUint64(&counter, 1)
5954

6055
log.Printf("[#%d] Received on [%s]: '%s'\n", i, msg.Subject, msg)
6156

@@ -65,14 +60,14 @@ func main() {
6560
unmarshalErr := json.Unmarshal(msg.Data, &req)
6661

6762
if unmarshalErr != nil {
68-
log.Printf("Unmarshal error: %s with data %s", unmarshalErr, msg.Data)
63+
log.Printf("[#%d] Unmarshal error: %s with data %s", i, unmarshalErr, msg.Data)
6964
return
7065
}
7166

7267
xCallID := req.Header.Get("X-Call-Id")
7368

7469
functionURL := makeFunctionURL(&req, &config, req.Path, req.QueryString)
75-
fmt.Printf("Invoking: %s with %d bytes, via: %s\n", req.Function, len(req.Body), functionURL)
70+
fmt.Printf("[#%d] Invoking: %s with %d bytes, via: %s\n", i, req.Function, len(req.Body), functionURL)
7671

7772
if config.DebugPrintBody {
7873
fmt.Println(string(req.Body))
@@ -81,7 +76,7 @@ func main() {
8176
start := time.Now()
8277
request, err := http.NewRequest(http.MethodPost, functionURL, bytes.NewReader(req.Body))
8378
if err != nil {
84-
log.Printf("Unable to post message due to invalid URL, error: %s", err.Error())
79+
log.Printf("[#%d] Unable to post message due to invalid URL, error: %s", i, err.Error())
8580
return
8681
}
8782

@@ -102,12 +97,12 @@ func main() {
10297

10398
duration := time.Since(start)
10499

105-
log.Printf("Invoked: %s [%d] in %fs", req.Function, statusCode, duration.Seconds())
100+
log.Printf("[#%d] Invoked: %s [%d] in %fs", i, req.Function, statusCode, duration.Seconds())
106101

107102
if err != nil {
108103
status = http.StatusServiceUnavailable
109104

110-
log.Printf("Error invoking %s, error: %s", req.Function, err)
105+
log.Printf("[#%d] Error invoking %s, error: %s", i, req.Function, err)
111106

112107
timeTaken := time.Since(started).Seconds()
113108

@@ -122,18 +117,18 @@ func main() {
122117
timeTaken)
123118

124119
if resultErr != nil {
125-
log.Printf("Posted callback to: %s - status %d, error: %s\n", req.CallbackURL.String(), http.StatusServiceUnavailable, resultErr.Error())
120+
log.Printf("[#%d] Posted callback to: %s - status %d, error: %s\n", i, req.CallbackURL.String(), http.StatusServiceUnavailable, resultErr.Error())
126121
} else {
127-
log.Printf("Posted result to %s - status: %d", req.CallbackURL.String(), resultStatusCode)
122+
log.Printf("[#%d] Posted result to %s - status: %d", i, req.CallbackURL.String(), resultStatusCode)
128123
}
129124
}
130125

131126
if config.GatewayInvoke == false {
132127
statusCode, reportErr := postReport(&client, req.Function, status, timeTaken, config.GatewayAddressURL(), credentials)
133128
if reportErr != nil {
134-
log.Printf("Error posting report: %s\n", reportErr)
129+
log.Printf("[#%d] Error posting report: %s\n", i, reportErr)
135130
} else {
136-
log.Printf("Posting report to gateway for %s - status: %d\n", req.Function, statusCode)
131+
log.Printf("[#%d] Posting report to gateway for %s - status: %d\n", i, req.Function, statusCode)
137132
}
138133
return
139134
}
@@ -148,20 +143,20 @@ func main() {
148143
functionResult = resData
149144

150145
if err != nil {
151-
log.Printf("Error reading body for: %s, error: %s", req.Function, err)
146+
log.Printf("[#%d] Error reading body for: %s, error: %s",i, req.Function, err)
152147
}
153148

154149
if config.WriteDebug {
155150
fmt.Println(string(functionResult))
156151
} else {
157-
fmt.Printf("%s returned %d bytes\n", req.Function, len(functionResult))
152+
fmt.Printf("[#%d] %s returned %d bytes\n", i, req.Function, len(functionResult))
158153
}
159154
}
160155

161156
timeTaken := time.Since(started).Seconds()
162157

163158
if req.CallbackURL != nil {
164-
log.Printf("Callback to: %s\n", req.CallbackURL.String())
159+
log.Printf("[#%d] Callback to: %s\n", i, req.CallbackURL.String())
165160

166161
resultStatusCode, resultErr := postResult(&client,
167162
res,
@@ -173,18 +168,18 @@ func main() {
173168
timeTaken)
174169

175170
if resultErr != nil {
176-
log.Printf("Error posting to callback-url: %s\n", resultErr)
171+
log.Printf("[#%d] Error posting to callback-url: %s\n", i, resultErr)
177172
} else {
178-
log.Printf("Posted result for %s to callback-url: %s, status: %d", req.Function, req.CallbackURL.String(), resultStatusCode)
173+
log.Printf("[#%d] Posted result for %s to callback-url: %s, status: %d", i, req.Function, req.CallbackURL.String(), resultStatusCode)
179174
}
180175
}
181176

182177
if config.GatewayInvoke == false {
183178
statusCode, reportErr := postReport(&client, req.Function, res.StatusCode, timeTaken, config.GatewayAddressURL(), credentials)
184179
if reportErr != nil {
185-
log.Printf("Error posting report: %s\n", reportErr.Error())
180+
log.Printf("[#%d] Error posting report: %s\n", i, reportErr.Error())
186181
} else {
187-
log.Printf("Posting report for %s, status: %d\n", req.Function, statusCode)
182+
log.Printf("[#%d] Posting report for %s, status: %d\n", i, req.Function, statusCode)
188183
}
189184
}
190185

@@ -204,10 +199,8 @@ func main() {
204199

205200
subject: config.NatsChannel,
206201
qgroup: config.NatsQueueGroup,
207-
durable: durable,
208202
messageHandler: messageHandler,
209-
startOption: stan.StartWithLastReceived(),
210-
maxInFlight: stan.MaxInflight(config.MaxInflight),
203+
maxInFlight: config.MaxInflight,
211204
ackWait: config.AckWait,
212205
}
213206

@@ -218,29 +211,13 @@ func main() {
218211
// Wait for a SIGINT (perhaps triggered by user with CTRL-C)
219212
// Run cleanup when signal is received
220213
signalChan := make(chan os.Signal, 1)
221-
cleanupDone := make(chan bool)
222214
signal.Notify(signalChan, os.Interrupt)
223-
go func() {
224-
for range signalChan {
225-
fmt.Printf("\nReceived an interrupt, unsubscribing and closing connection...\n\n")
226-
// Do not unsubscribe a durable on exit, except if asked to.
227-
if durable == "" || unsubscribe {
228-
if err := natsQueue.unsubscribe(); err != nil {
229-
log.Panicf(
230-
"Cannot unsubscribe subject: %s from %s because of an error: %v",
231-
natsQueue.subject,
232-
natsQueue.natsURL,
233-
err,
234-
)
235-
}
236-
}
237-
if err := natsQueue.closeConnection(); err != nil {
238-
log.Panicf("Cannot close connection to %s because of an error: %v\n", natsQueue.natsURL, err)
239-
}
240-
cleanupDone <- true
241-
}
242-
}()
243-
<-cleanupDone
215+
<-signalChan
216+
fmt.Printf("\nReceived an interrupt, unsubscribing and closing connection...\n\n")
217+
if err := natsQueue.closeConnection(); err != nil {
218+
log.Panicf("Cannot close connection to %s because of an error: %v\n", natsQueue.natsURL, err)
219+
}
220+
close(signalChan)
244221
}
245222

246223
// makeClient constructs a HTTP client with keep-alive turned

readconfig.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,6 @@ func (ReadConfig) Read() (QueueWorkerConfig, error) {
4545
cfg.NatsClusterName = "faas-cluster"
4646
}
4747

48-
if val, exists := os.LookupEnv("faas_nats_durable_queue_subscription"); exists {
49-
if v, err := strconv.ParseBool(val); err == nil {
50-
cfg.NatsDurableQueueSubscription = v
51-
} else {
52-
cfg.NatsDurableQueueSubscription = false
53-
}
54-
}
55-
5648
if val, exists := os.LookupEnv("faas_nats_channel"); exists && val != "" {
5749
cfg.NatsChannel = val
5850
} else {
@@ -173,7 +165,6 @@ type QueueWorkerConfig struct {
173165
NatsAddress string
174166
NatsPort int
175167
NatsClusterName string
176-
NatsDurableQueueSubscription bool
177168
NatsChannel string
178169
NatsQueueGroup string
179170

readconfig_test.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@ func Test_ReadConfig(t *testing.T) {
101101
os.Setenv("faas_nats_address", "test_nats")
102102
os.Setenv("faas_nats_port", "1234")
103103
os.Setenv("faas_nats_cluster_name", "example-nats-cluster")
104-
os.Setenv("faas_nats_durable_queue_subscription", "true")
105104
os.Setenv("faas_nats_channel", "foo")
106105
os.Setenv("faas_nats_queue_group", "bar")
107106
os.Setenv("faas_gateway_address", "test_gatewayaddr")
@@ -132,11 +131,6 @@ func Test_ReadConfig(t *testing.T) {
132131
t.Fail()
133132
}
134133

135-
wantNatsDurableQueueSubscription := true
136-
if config.NatsDurableQueueSubscription != wantNatsDurableQueueSubscription {
137-
t.Logf("NatsDurableQueueSubscription want `%t`, got `%t`\n", wantNatsDurableQueueSubscription, config.NatsDurableQueueSubscription)
138-
}
139-
140134
want = "foo"
141135
if config.NatsChannel != want {
142136
t.Logf("NatsChannel want `%s`, got `%s`\n", want, config.NatsChannel)

types.go

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package main
33
import (
44
"fmt"
55
"log"
6+
"strings"
67
"sync"
78
"time"
89

@@ -30,12 +31,11 @@ type NATSQueue struct {
3031

3132
subject string
3233
qgroup string
33-
durable string
3434
ackWait time.Duration
3535
messageHandler func(*stan.Msg)
36-
startOption stan.SubscriptionOption
37-
maxInFlight stan.SubscriptionOption
36+
maxInFlight int
3837
subscription stan.Subscription
38+
msgChan chan *stan.Msg
3939
}
4040

4141
// connect creates a subscription to NATS Streaming
@@ -64,29 +64,52 @@ func (q *NATSQueue) connect() error {
6464
log.Printf("Subscribing to: %s at %s\n", q.subject, q.natsURL)
6565
log.Println("Wait for ", q.ackWait)
6666

67+
// Pre-fill chan with q.maxInFlight tokens
68+
msgChan := make(chan *stan.Msg)
69+
70+
handler := q.messageHandler
71+
opts := []stan.SubscriptionOption{
72+
stan.DurableName(strings.ReplaceAll(q.subject, ".", "_")),
73+
stan.AckWait(q.ackWait),
74+
stan.DeliverAllAvailable(),
75+
stan.MaxInflight(q.maxInFlight),
76+
}
77+
if q.maxInFlight > 1 {
78+
for i := 0; i < q.maxInFlight; i++ {
79+
go func() {
80+
for msg := range msgChan {
81+
q.messageHandler(msg)
82+
msg.Ack()
83+
}
84+
}()
85+
}
86+
87+
opts = append(opts, stan.SetManualAckMode())
88+
handler = func(msg *stan.Msg) {
89+
msgChan <- msg
90+
}
91+
}
6792
subscription, err := q.conn.QueueSubscribe(
6893
q.subject,
6994
q.qgroup,
70-
q.messageHandler,
71-
stan.DurableName(q.durable),
72-
stan.AckWait(q.ackWait),
73-
q.startOption,
74-
q.maxInFlight,
95+
handler,
96+
opts...
7597
)
7698

7799
if err != nil {
78100
return fmt.Errorf("couldn't subscribe to %s at %s. Error: %v", q.subject, q.natsURL, err)
79101
}
80102

81103
log.Printf(
82-
"Listening on [%s], clientID=[%s], qgroup=[%s] durable=[%s]\n",
104+
"Listening on [%s], clientID=[%s], qgroup=[%s] maxInFlight=[%d]\n",
83105
q.subject,
84106
q.clientID,
85107
q.qgroup,
86-
q.durable,
108+
q.maxInFlight,
87109
)
88110

89111
q.subscription = subscription
112+
q.msgChan = msgChan
90113

91114
return nil
92115
}
@@ -117,17 +140,6 @@ func (q *NATSQueue) reconnect() {
117140
log.Printf("Reconnecting limit (%d) reached\n", q.maxReconnect)
118141
}
119142

120-
func (q *NATSQueue) unsubscribe() error {
121-
q.connMutex.Lock()
122-
defer q.connMutex.Unlock()
123-
124-
if q.subscription != nil {
125-
return fmt.Errorf("q.subscription is nil")
126-
}
127-
128-
return q.subscription.Unsubscribe()
129-
}
130-
131143
func (q *NATSQueue) closeConnection() error {
132144
q.connMutex.Lock()
133145
defer q.connMutex.Unlock()
@@ -136,7 +148,9 @@ func (q *NATSQueue) closeConnection() error {
136148
return fmt.Errorf("q.conn is nil")
137149
}
138150

151+
err := q.conn.Close()
152+
close(q.msgChan)
139153
close(q.quitCh)
140154

141-
return q.conn.Close()
155+
return err
142156
}

0 commit comments

Comments
 (0)