@@ -7,7 +7,9 @@ SPDX-License-Identifier: Apache-2.0
7
7
package kafka
8
8
9
9
import (
10
+ "bytes"
10
11
"fmt"
12
+ "os"
11
13
"testing"
12
14
"time"
13
15
@@ -135,3 +137,63 @@ func TestEventListener(t *testing.T) {
135
137
}
136
138
}
137
139
}
140
+
141
+ func TestLogPossibleKafkaVersionMismatch (t * testing.T ) {
142
+
143
+ logging .SetLevel (logging .DEBUG , saramaLogID )
144
+
145
+ topic := channelNameForTest (t )
146
+ partition := int32 (0 )
147
+
148
+ var buffer bytes.Buffer
149
+ logger .SetBackend (logging .AddModuleLevel (
150
+ logging .MultiLogger (
151
+ logging .NewBackendFormatter (
152
+ logging .NewLogBackend (os .Stderr , "" , 0 ),
153
+ logging .MustStringFormatter ("%{color}%{time:2006-01-02 15:04:05.000 MST} [%{module}] %{shortfunc} -> %{level:.4s} %{id:03x}%{color:reset} %{message}" ),
154
+ ),
155
+ logging .NewLogBackend (& buffer , "" , 0 ),
156
+ ),
157
+ ))
158
+ defer logging .Reset ()
159
+
160
+ broker := sarama .NewMockBroker (t , 500 )
161
+ defer broker .Close ()
162
+
163
+ config := sarama .NewConfig ()
164
+ config .ClientID = t .Name ()
165
+ config .Metadata .Retry .Max = 0
166
+ config .Metadata .Retry .Backoff = 250 * time .Millisecond
167
+ config .Net .ReadTimeout = 100 * time .Millisecond
168
+ config .Version = sarama .V0_10_0_0
169
+
170
+ broker .SetHandlerByMap (map [string ]sarama.MockResponse {
171
+ "MetadataRequest" : sarama .NewMockMetadataResponse (t ).
172
+ SetBroker (broker .Addr (), broker .BrokerID ()).
173
+ SetLeader (topic , partition , broker .BrokerID ()),
174
+ "OffsetRequest" : sarama .NewMockOffsetResponse (t ).
175
+ SetOffset (topic , partition , sarama .OffsetNewest , 1000 ).
176
+ SetOffset (topic , partition , sarama .OffsetOldest , 0 ),
177
+ "FetchRequest" : sarama .NewMockFetchResponse (t , 1 ).
178
+ SetMessage (topic , partition , 0 , sarama .StringEncoder ("MSG 00" )),
179
+ })
180
+
181
+ consumer , err := sarama .NewConsumer ([]string {broker .Addr ()}, config )
182
+ if err != nil {
183
+ t .Fatal (err )
184
+ }
185
+ defer consumer .Close ()
186
+
187
+ partitionConsumer , err := consumer .ConsumePartition (topic , partition , 1 )
188
+ if err != nil {
189
+ t .Fatal (err )
190
+ }
191
+ defer partitionConsumer .Close ()
192
+
193
+ select {
194
+ case <- partitionConsumer .Messages ():
195
+ t .Fatalf ("did not expect to receive message" )
196
+ case <- time .After (shortTimeout ):
197
+ assert .Regexp (t , "Kafka.Version specified in the orderer configuration is incorrectly set" , buffer .String ())
198
+ }
199
+ }
0 commit comments