Skip to content

Commit 4b419a6

Browse files
author
Luis Sanchez
committed
[FAB-7887] log hint of Kafka.Version mistach
Log a hint that the Kafka.Version property might not be appropriate for the Kafka broker that orderer is using. Change-Id: Ib741ccf8b07a1775ddcd921fd654fa8ee8419216 Signed-off-by: Luis Sanchez <sanchezl@us.ibm.com>
1 parent 53264cb commit 4b419a6

File tree

2 files changed

+78
-0
lines changed

2 files changed

+78
-0
lines changed

orderer/consensus/kafka/logger.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,22 @@ func init() {
4444
saramaLogger = saramaEventLogger
4545
}
4646

47+
// init starts a go routine that detects a possible configuration issue
48+
func init() {
49+
listener := saramaLogger.NewListener("insufficient data to decode packet")
50+
go func() {
51+
for {
52+
select {
53+
case <-listener:
54+
logger.Critical("Unable to decode a Kafka packet. Usually, this " +
55+
"indicates that the Kafka.Version specified in the orderer " +
56+
"configuration is incorrectly set to a version which is newer than " +
57+
"the actual Kafka broker version.")
58+
}
59+
}
60+
}()
61+
}
62+
4763
// eventLogger adapts a go-logging Logger to the sarama.Logger interface.
4864
// Additionally, listeners can be registered to be notified when a substring has
4965
// been logged.

orderer/consensus/kafka/logger_test.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ SPDX-License-Identifier: Apache-2.0
77
package kafka
88

99
import (
10+
"bytes"
1011
"fmt"
12+
"os"
1113
"testing"
1214
"time"
1315

@@ -135,3 +137,63 @@ func TestEventListener(t *testing.T) {
135137
}
136138
}
137139
}
140+
141+
func TestLogPossibleKafkaVersionMismatch(t *testing.T) {
142+
143+
logging.SetLevel(logging.DEBUG, saramaLogID)
144+
145+
topic := channelNameForTest(t)
146+
partition := int32(0)
147+
148+
var buffer bytes.Buffer
149+
logger.SetBackend(logging.AddModuleLevel(
150+
logging.MultiLogger(
151+
logging.NewBackendFormatter(
152+
logging.NewLogBackend(os.Stderr, "", 0),
153+
logging.MustStringFormatter("%{color}%{time:2006-01-02 15:04:05.000 MST} [%{module}] %{shortfunc} -> %{level:.4s} %{id:03x}%{color:reset} %{message}"),
154+
),
155+
logging.NewLogBackend(&buffer, "", 0),
156+
),
157+
))
158+
defer logging.Reset()
159+
160+
broker := sarama.NewMockBroker(t, 500)
161+
defer broker.Close()
162+
163+
config := sarama.NewConfig()
164+
config.ClientID = t.Name()
165+
config.Metadata.Retry.Max = 0
166+
config.Metadata.Retry.Backoff = 250 * time.Millisecond
167+
config.Net.ReadTimeout = 100 * time.Millisecond
168+
config.Version = sarama.V0_10_0_0
169+
170+
broker.SetHandlerByMap(map[string]sarama.MockResponse{
171+
"MetadataRequest": sarama.NewMockMetadataResponse(t).
172+
SetBroker(broker.Addr(), broker.BrokerID()).
173+
SetLeader(topic, partition, broker.BrokerID()),
174+
"OffsetRequest": sarama.NewMockOffsetResponse(t).
175+
SetOffset(topic, partition, sarama.OffsetNewest, 1000).
176+
SetOffset(topic, partition, sarama.OffsetOldest, 0),
177+
"FetchRequest": sarama.NewMockFetchResponse(t, 1).
178+
SetMessage(topic, partition, 0, sarama.StringEncoder("MSG 00")),
179+
})
180+
181+
consumer, err := sarama.NewConsumer([]string{broker.Addr()}, config)
182+
if err != nil {
183+
t.Fatal(err)
184+
}
185+
defer consumer.Close()
186+
187+
partitionConsumer, err := consumer.ConsumePartition(topic, partition, 1)
188+
if err != nil {
189+
t.Fatal(err)
190+
}
191+
defer partitionConsumer.Close()
192+
193+
select {
194+
case <-partitionConsumer.Messages():
195+
t.Fatalf("did not expect to receive message")
196+
case <-time.After(shortTimeout):
197+
assert.Regexp(t, "Kafka.Version specified in the orderer configuration is incorrectly set", buffer.String())
198+
}
199+
}

0 commit comments

Comments
 (0)