Skip to content

Commit b5cfc4b

Browse files
committed
adds debugging for CorruptMessage errors
1 parent b12c01a commit b5cfc4b

File tree

2 files changed

+48
-1
lines changed

2 files changed

+48
-1
lines changed

lib/kafka/broker.rb

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,29 @@ def list_offsets(**options)
7171
def produce(**options)
7272
request = Protocol::ProduceRequest.new(**options)
7373

74-
send_request(request)
74+
resp = send_request(request)
75+
76+
return if resp.nil?
77+
78+
errored_partitions = []
79+
resp.topics.each do |topic_info|
80+
errored = topic_info.partitions.find {|part_info| part_info.error_code == 2 }
81+
errored_partitions << errored unless errored.nil?
82+
end
83+
84+
errored_partitions.each do |partition_info|
85+
@logger.error "Corrupt message code received for #{request} to partition #{partition_info.partition}"
86+
87+
# {"topic" => {"partition" => [message, message] }}
88+
request.messages_for_topics.each do |topic, messages_for_partition|
89+
messages_for_partition.fetch(partition_info.partition, []).each do |message|
90+
@logger.error "Message key: #{message.key}"
91+
@logger.error "Message value: #{message.value}"
92+
end
93+
end
94+
end
95+
96+
resp
7597
end
7698

7799
def fetch_offsets(**options)

spec/broker_spec.rb

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,31 @@ def close
6969
describe "#produce" do
7070
let(:message) { Kafka::Protocol::Message.new(key: "yo", value: "lo") }
7171

72+
# This spec's only function is to exercise the error handling code path
73+
it "gives me a console if a partition is in error" do
74+
response = Kafka::Protocol::ProduceResponse.new
75+
errored_partition_info = Kafka::Protocol::ProduceResponse::PartitionInfo.new(
76+
partition: 3, error_code: 2, offset: 0, timestamp: Time.now.to_s
77+
)
78+
topic_info = Kafka::Protocol::ProduceResponse::TopicInfo.new(
79+
topic: "articles", partitions: [errored_partition_info]
80+
)
81+
allow(response).to receive(:topics) { [topic_info] }
82+
connection.mock_response(response)
83+
84+
actual_response = broker.produce(
85+
required_acks: -1, # -1 means all replicas must ack
86+
timeout: 1,
87+
messages_for_topics: {
88+
"yolos" => {
89+
3 => [message],
90+
}
91+
}
92+
)
93+
94+
expect(actual_response).to eq response
95+
96+
end
7297
it "waits for a response if acknowledgements are required" do
7398
response = Kafka::Protocol::ProduceResponse.new
7499
connection.mock_response(response)

0 commit comments

Comments
 (0)