Skip to content

Commit da7a03e

Browse files
committed
Add recover to msg handlers.
Add more tests to stanmsg.
1 parent 38932f2 commit da7a03e

File tree

4 files changed

+266
-15
lines changed

4 files changed

+266
-15
lines changed

natsrpc/natsrpc_test.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/json"
66
"errors"
7+
"fmt"
78
"log"
89
"math"
910
"os"
@@ -65,9 +66,9 @@ func newRawMessage(s string) *json.RawMessage {
6566
return &r
6667
}
6768

68-
func TestNatsRPC(t *testing.T) {
69+
func TestCall(t *testing.T) {
6970
log.Printf("\n")
70-
log.Printf(">>> TestNatsRPC.\n")
71+
log.Printf(">>> TestCall.\n")
7172
var err error
7273
assert := assert.New(t)
7374

@@ -329,6 +330,18 @@ func TestNatsRPC(t *testing.T) {
329330
}
330331
)
331332

333+
var (
334+
panicSpec = MustRPCSpec(
335+
svcName,
336+
"panic",
337+
func() interface{} { return &emptypb.Empty{} },
338+
func() interface{} { return &emptypb.Empty{} },
339+
)
340+
panicHandler = func(ctx context.Context, in interface{}) (interface{}, error) {
341+
panic(fmt.Errorf("Something goes wrong"))
342+
}
343+
)
344+
332345
// Regist handlers.
333346
if err := server.RegistHandler(sqrtSpec, sqrtHandler); err != nil {
334347
log.Panic(err)
@@ -348,6 +361,9 @@ func TestNatsRPC(t *testing.T) {
348361
if err := server.RegistHandler(blockSpec, blockHandler); err != nil {
349362
log.Panic(err)
350363
}
364+
if err := server.RegistHandler(panicSpec, panicHandler); err != nil {
365+
log.Panic(err)
366+
}
351367

352368
{
353369
// Regist and deregist
@@ -617,7 +633,7 @@ func TestNatsRPC(t *testing.T) {
617633
ExpectOutput: nil,
618634
ExpectError: true,
619635
},
620-
// block handlers.
636+
// block to test handler busy.
621637
{
622638
Client: jsonClient,
623639
ClientName: "json",
@@ -639,6 +655,17 @@ func TestNatsRPC(t *testing.T) {
639655
ExpectOutput: nil,
640656
ExpectError: true,
641657
},
658+
// handler panic.
659+
{
660+
Client: pbClient,
661+
ClientName: "pb",
662+
Spec: panicSpec,
663+
GenInput: func() (context.Context, interface{}) {
664+
return context.Background(), &emptypb.Empty{}
665+
},
666+
ExpectOutput: nil,
667+
ExpectError: true,
668+
},
642669
}
643670
for i, testCase := range testCases {
644671
testCase := testCase
@@ -672,9 +699,9 @@ func TestNatsRPC(t *testing.T) {
672699

673700
}
674701

675-
func TestPbJsonAndFanout(t *testing.T) {
702+
func TestFanout(t *testing.T) {
676703
log.Printf("\n")
677-
log.Printf(">>> TestPbJsonAndFanout.\n")
704+
log.Printf(">>> TestFanout.\n")
678705
var err error
679706
assert := assert.New(t)
680707

natsrpc/server.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,15 @@ func (sc *ServerConn) msgHandler(svcName string, mm *methodMap) nats.MsgHandler
219219
}
220220

221221
if err := sc.runner.Submit(func() {
222+
defer func() {
223+
if e := recover(); e != nil {
224+
err, ok := e.(error)
225+
if !ok {
226+
err = fmt.Errorf("%+v", e)
227+
}
228+
logger().Error(err, "handler panic")
229+
}
230+
}()
222231

223232
info := mm.Lookup(methodName)
224233
if info == nil {

stanmsg/durconn.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,11 +361,20 @@ func (dc *DurConn) subscribe(sub *subscription, sc stan.Conn) error {
361361

362362
func (dc *DurConn) msgHandler(sub *subscription) stan.MsgHandler {
363363

364-
logger := dc.logger.WithValues("subject", sub.spec.SubjectName, "queue", sub.queue)
364+
logger := dc.logger.WithValues("subject", sub.spec.SubjectName(), "queue", sub.queue)
365365

366366
return func(stanMsg *stan.Msg) {
367367

368368
if err := dc.runner.Submit(func() {
369+
defer func() {
370+
if e := recover(); e != nil {
371+
err, ok := e.(error)
372+
if !ok {
373+
err = fmt.Errorf("%+v", e)
374+
}
375+
logger.Error(err, "handler panic")
376+
}
377+
}()
369378

370379
m := &nppbmsg.MessageWithMD{}
371380
if err := proto.Unmarshal(stanMsg.Data, m); err != nil {

0 commit comments

Comments
 (0)