Skip to content

Commit 4a474e1

Browse files
author
Jason Yellick
committed
[FAB-4866] Add orderer msg bytes trace
For hard to debug problems, actually having access to the bytes of a message can be the only definitive way to diagnose. This CR adds a debug configuration section, and allows for setting a directory to log all Broadcast messages, as well as a directory to log all Deliver messages. Although there is no support for dynamically changing debugging parameters at this time, the code deliberately retrieves the debug parameters from the debug struct at every instance to allow for dynamic control of the debugging. Change-Id: Ib046f263dc95b374b5883af66397a9d29049ffef Signed-off-by: Jason Yellick <jyellick@us.ibm.com>
1 parent a4b4107 commit 4a474e1

File tree

6 files changed

+228
-10
lines changed

6 files changed

+228
-10
lines changed

orderer/common/localconfig/config.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ type TopLevel struct {
6565
FileLedger FileLedger
6666
RAMLedger RAMLedger
6767
Kafka Kafka
68+
Debug Debug
6869
}
6970

7071
// General contains config which should be common among all orderer types.
@@ -162,6 +163,12 @@ type Consumer struct {
162163
RetryBackoff time.Duration
163164
}
164165

166+
// Debug contains configuration for the orderer's debug parameters
167+
type Debug struct {
168+
BroadcastTraceDir string
169+
DeliverTraceDir string
170+
}
171+
165172
var defaults = TopLevel{
166173
General: General{
167174
LedgerType: "file",
@@ -216,6 +223,10 @@ var defaults = TopLevel{
216223
Enabled: false,
217224
},
218225
},
226+
Debug: Debug{
227+
BroadcastTraceDir: "",
228+
DeliverTraceDir: "",
229+
},
219230
}
220231

221232
// Load parses the orderer.yaml file and environment, producing a struct suitable for config use

orderer/common/server/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func Main() {
7272
func Start(cmd string, conf *config.TopLevel) {
7373
signer := localmsp.NewSigner()
7474
manager := initializeMultichannelRegistrar(conf, signer)
75-
server := NewServer(manager, signer)
75+
server := NewServer(manager, signer, &conf.Debug)
7676

7777
switch cmd {
7878
case start.FullCommand(): // "start" command

orderer/common/server/server.go

Lines changed: 81 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,21 @@ SPDX-License-Identifier: Apache-2.0
77
package server
88

99
import (
10+
"fmt"
11+
"io/ioutil"
12+
"os"
13+
"runtime/debug"
14+
"time"
15+
1016
"github.com/hyperledger/fabric/common/crypto"
1117
"github.com/hyperledger/fabric/orderer/common/broadcast"
1218
"github.com/hyperledger/fabric/orderer/common/deliver"
19+
localconfig "github.com/hyperledger/fabric/orderer/common/localconfig"
1320
"github.com/hyperledger/fabric/orderer/common/multichannel"
1421
cb "github.com/hyperledger/fabric/protos/common"
1522
ab "github.com/hyperledger/fabric/protos/orderer"
1623

17-
"runtime/debug"
24+
"github.com/golang/protobuf/proto"
1825
)
1926

2027
type broadcastSupport struct {
@@ -34,19 +41,73 @@ func (bs deliverSupport) GetChain(chainID string) (deliver.Support, bool) {
3441
}
3542

3643
type server struct {
37-
bh broadcast.Handler
38-
dh deliver.Handler
44+
bh broadcast.Handler
45+
dh deliver.Handler
46+
debug *localconfig.Debug
3947
}
4048

4149
// NewServer creates an ab.AtomicBroadcastServer based on the broadcast target and ledger Reader
42-
func NewServer(r *multichannel.Registrar, signer crypto.LocalSigner) ab.AtomicBroadcastServer {
50+
func NewServer(r *multichannel.Registrar, signer crypto.LocalSigner, debug *localconfig.Debug) ab.AtomicBroadcastServer {
4351
s := &server{
44-
dh: deliver.NewHandlerImpl(deliverSupport{Registrar: r}),
45-
bh: broadcast.NewHandlerImpl(broadcastSupport{Registrar: r}),
52+
dh: deliver.NewHandlerImpl(deliverSupport{Registrar: r}),
53+
bh: broadcast.NewHandlerImpl(broadcastSupport{Registrar: r}),
54+
debug: debug,
4655
}
4756
return s
4857
}
4958

59+
type msgTracer struct {
60+
function string
61+
debug *localconfig.Debug
62+
}
63+
64+
func (mt *msgTracer) trace(traceDir string, msg *cb.Envelope, err error) {
65+
if err != nil {
66+
return
67+
}
68+
69+
now := time.Now().UnixNano()
70+
path := fmt.Sprintf("%s%c%d_%p.%s", traceDir, os.PathSeparator, now, msg, mt.function)
71+
logger.Debugf("Writing %s request trace to %s", mt.function, path)
72+
go func() {
73+
pb, err := proto.Marshal(msg)
74+
if err != nil {
75+
logger.Debugf("Error marshaling trace msg for %s: %s", path, err)
76+
return
77+
}
78+
err = ioutil.WriteFile(path, pb, 0660)
79+
if err != nil {
80+
logger.Debugf("Error writing trace msg for %s: %s", path, err)
81+
}
82+
}()
83+
}
84+
85+
type broadcastMsgTracer struct {
86+
ab.AtomicBroadcast_BroadcastServer
87+
msgTracer
88+
}
89+
90+
func (bmt *broadcastMsgTracer) Recv() (*cb.Envelope, error) {
91+
msg, err := bmt.AtomicBroadcast_BroadcastServer.Recv()
92+
if traceDir := bmt.debug.BroadcastTraceDir; traceDir != "" {
93+
bmt.trace(bmt.debug.BroadcastTraceDir, msg, err)
94+
}
95+
return msg, err
96+
}
97+
98+
type deliverMsgTracer struct {
99+
ab.AtomicBroadcast_DeliverServer
100+
msgTracer
101+
}
102+
103+
func (dmt *deliverMsgTracer) Recv() (*cb.Envelope, error) {
104+
msg, err := dmt.AtomicBroadcast_DeliverServer.Recv()
105+
if traceDir := dmt.debug.DeliverTraceDir; traceDir != "" {
106+
dmt.trace(traceDir, msg, err)
107+
}
108+
return msg, err
109+
}
110+
50111
// Broadcast receives a stream of messages from a client for ordering
51112
func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {
52113
logger.Debugf("Starting new Broadcast handler")
@@ -56,7 +117,13 @@ func (s *server) Broadcast(srv ab.AtomicBroadcast_BroadcastServer) error {
56117
}
57118
logger.Debugf("Closing Broadcast stream")
58119
}()
59-
return s.bh.Handle(srv)
120+
return s.bh.Handle(&broadcastMsgTracer{
121+
AtomicBroadcast_BroadcastServer: srv,
122+
msgTracer: msgTracer{
123+
debug: s.debug,
124+
function: "Broadcast",
125+
},
126+
})
60127
}
61128

62129
// Deliver sends a stream of blocks to a client after ordering
@@ -68,5 +135,11 @@ func (s *server) Deliver(srv ab.AtomicBroadcast_DeliverServer) error {
68135
}
69136
logger.Debugf("Closing Deliver stream")
70137
}()
71-
return s.dh.Handle(srv)
138+
return s.dh.Handle(&deliverMsgTracer{
139+
AtomicBroadcast_DeliverServer: srv,
140+
msgTracer: msgTracer{
141+
debug: s.debug,
142+
function: "Deliver",
143+
},
144+
})
72145
}

orderer/common/server/server_test.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,26 @@ SPDX-License-Identifier: Apache-2.0
77
package server
88

99
import (
10+
"io/ioutil"
11+
"os"
12+
"path/filepath"
1013
"testing"
14+
"time"
15+
16+
localconfig "github.com/hyperledger/fabric/orderer/common/localconfig"
17+
cb "github.com/hyperledger/fabric/protos/common"
18+
ab "github.com/hyperledger/fabric/protos/orderer"
19+
"github.com/hyperledger/fabric/protos/utils"
20+
21+
logging "github.com/op/go-logging"
22+
"github.com/stretchr/testify/assert"
23+
"google.golang.org/grpc"
1124
)
1225

26+
func init() {
27+
logging.SetLevel(logging.DEBUG, "")
28+
}
29+
1330
func TestBroadcastNoPanic(t *testing.T) {
1431
// Defer recovers from the panic
1532
_ = (&server{}).Broadcast(nil)
@@ -19,3 +36,102 @@ func TestDeliverNoPanic(t *testing.T) {
1936
// Defer recovers from the panic
2037
_ = (&server{}).Deliver(nil)
2138
}
39+
40+
type recvr interface {
41+
Recv() (*cb.Envelope, error)
42+
}
43+
44+
type mockSrv struct {
45+
grpc.ServerStream
46+
msg *cb.Envelope
47+
err error
48+
}
49+
50+
type mockBroadcastSrv mockSrv
51+
52+
func (mbs *mockBroadcastSrv) Recv() (*cb.Envelope, error) {
53+
return mbs.msg, mbs.err
54+
}
55+
56+
func (mb *mockBroadcastSrv) Send(br *ab.BroadcastResponse) error {
57+
panic("Unimplimented")
58+
}
59+
60+
type mockDeliverSrv mockSrv
61+
62+
func (mds *mockDeliverSrv) Recv() (*cb.Envelope, error) {
63+
return mds.msg, mds.err
64+
}
65+
66+
func (mds *mockDeliverSrv) Send(br *ab.DeliverResponse) error {
67+
panic("Unimplimented")
68+
}
69+
70+
func testMsgTrace(handler func(dir string, msg *cb.Envelope) recvr, t *testing.T) {
71+
dir, err := ioutil.TempDir("", "TestMsgTrace")
72+
if err != nil {
73+
t.Fatalf("Could not create temp dir")
74+
}
75+
defer os.RemoveAll(dir)
76+
77+
msg := &cb.Envelope{Payload: []byte("somedata")}
78+
79+
r := handler(dir, msg)
80+
81+
rMsg, err := r.Recv()
82+
assert.Equal(t, msg, rMsg)
83+
assert.Nil(t, err)
84+
85+
var fileData []byte
86+
for i := 0; i < 100; i++ {
87+
// Writing the trace file is deliberately non-blocking, wait up to a second, checking every 10 ms to see if the file now exists.
88+
time.Sleep(10 * time.Millisecond)
89+
filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
90+
assert.Nil(t, err)
91+
if path == dir {
92+
return nil
93+
}
94+
assert.Nil(t, fileData, "Should only be one file")
95+
fileData, err = ioutil.ReadFile(path)
96+
assert.Nil(t, err)
97+
return nil
98+
})
99+
if fileData != nil {
100+
break
101+
}
102+
}
103+
104+
assert.Equal(t, utils.MarshalOrPanic(msg), fileData)
105+
}
106+
107+
func TestBroadcastMsgTrace(t *testing.T) {
108+
testMsgTrace(func(dir string, msg *cb.Envelope) recvr {
109+
return &broadcastMsgTracer{
110+
AtomicBroadcast_BroadcastServer: &mockBroadcastSrv{
111+
msg: msg,
112+
},
113+
msgTracer: msgTracer{
114+
debug: &localconfig.Debug{
115+
BroadcastTraceDir: dir,
116+
},
117+
function: "Broadcast",
118+
},
119+
}
120+
}, t)
121+
}
122+
123+
func TestDeliverMsgTrace(t *testing.T) {
124+
testMsgTrace(func(dir string, msg *cb.Envelope) recvr {
125+
return &deliverMsgTracer{
126+
AtomicBroadcast_DeliverServer: &mockDeliverSrv{
127+
msg: msg,
128+
},
129+
msgTracer: msgTracer{
130+
debug: &localconfig.Debug{
131+
DeliverTraceDir: dir,
132+
},
133+
function: "Deliver",
134+
},
135+
}
136+
}, t)
137+
}

protos/peer/proposal.pb.go

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

sampleconfig/orderer.yaml

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,23 @@ Kafka:
221221
# following "File" key and specify the file name from which to load the
222222
# value of RootCAs.
223223
#File: path/to/RootCAs
224-
225224
# Kafka version of the Kafka cluster brokers (defaults to 0.9.0.1)
226225
Version:
226+
#
227+
228+
################################################################################
229+
#
230+
# Debug Configuration
231+
#
232+
# - This controls the debugging options for the orderer
233+
#
234+
################################################################################
235+
Debug:
236+
237+
# BroadcasTraceDir when set will cause each request to the Broadcast service
238+
# for this orderer to be written to a file in this directory
239+
BroadcastTraceDir:
240+
241+
# DeliverTraceDir when set will cause each request to the Deliver service
242+
# for this orderer to be written to a file in this directory
243+
DeliverTraceDir:

0 commit comments

Comments
 (0)