Skip to content

Commit

Permalink
[fix] [cli] the variable producerName of BatchMsgContainer is null (#…
Browse files Browse the repository at this point in the history
…20819)

Motivation: If the producer name is generated by the Broker, the producer will update the variable `producerName` after connecting, but not update the same variable of the batch message container.

Modifications: fix bug
(cherry picked from commit aba50f2)
  • Loading branch information
poorbarcode committed Jul 18, 2023
1 parent 721275a commit 7ae3e8f
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1598,11 +1598,12 @@ private void printSendCommandDebug(CommandSend send, ByteBuf headersAndPayload)
headersAndPayload.resetReaderIndex();
if (log.isDebugEnabled()) {
log.debug("[{}] Received send message request. producer: {}:{} {}:{} size: {},"
+ " partition key is: {}, ordering key is {}",
+ " partition key is: {}, ordering key is {}, uncompressedSize is {}",
remoteAddress, send.getProducerId(), send.getSequenceId(), msgMetadata.getProducerName(),
msgMetadata.getSequenceId(), headersAndPayload.readableBytes(),
msgMetadata.hasPartitionKey() ? msgMetadata.getPartitionKey() : null,
msgMetadata.hasOrderingKey() ? msgMetadata.getOrderingKey() : null);
msgMetadata.hasOrderingKey() ? msgMetadata.getOrderingKey() : null,
msgMetadata.getUncompressedSize());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ public abstract class AbstractBatchMessageContainer implements BatchMessageConta
protected CompressionType compressionType;
protected CompressionCodec compressor;
protected String topicName;
protected String producerName;
protected ProducerImpl producer;

protected int maxNumMessagesInBatch;
Expand Down Expand Up @@ -98,7 +97,6 @@ public ProducerImpl.OpSendMsg createOpSendMsg() throws IOException {
public void setProducer(ProducerImpl<?> producer) {
this.producer = producer;
this.topicName = producer.getTopic();
this.producerName = producer.getProducerName();
this.compressionType = CompressionCodecProvider
.convertToWireProtocol(producer.getConfiguration().getCompressionType());
this.compressor = CompressionCodecProvider.getCompressionCodec(compressionType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ public BatchMessageContainerImpl(ProducerImpl<?> producer) {
public boolean add(MessageImpl<?> msg, SendCallback callback) {

if (log.isDebugEnabled()) {
log.debug("[{}] [{}] add message to batch, num messages in batch so far {}", topicName, producerName,
numMessagesInBatch);
log.debug("[{}] [{}] add message to batch, num messages in batch so far {}", topicName,
producer.getProducerName(), numMessagesInBatch);
}

if (++numMessagesInBatch == 1) {
Expand Down Expand Up @@ -205,8 +205,8 @@ public void discard(Exception ex) {
batchedMessageMetadataAndPayload = null;
}
} catch (Throwable t) {
log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topicName, producerName,
lowestSequenceId, t);
log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topicName,
producer.getProducerName(), lowestSequenceId, t);
}
clear();
}
Expand Down Expand Up @@ -271,6 +271,14 @@ public OpSendMsg createOpSendMsg() throws IOException {
ByteBufPair cmd = producer.sendMessage(producer.producerId, messageMetadata.getSequenceId(),
messageMetadata.getHighestSequenceId(), numMessagesInBatch, messageMetadata, encryptedPayload);

if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Build batch msg seq:{}, highest-seq:{}, numMessagesInBatch: {}, uncompressedSize: {},"
+ " payloadSize: {}", topicName, producer.getProducerName(),
messageMetadata.getSequenceId(), messageMetadata.getNumMessagesInBatch(),
messageMetadata.getHighestSequenceId(),
messageMetadata.getUncompressedSize(), encryptedPayload.readableBytes());
}

OpSendMsg op = OpSendMsg.create(messages, cmd, messageMetadata.getSequenceId(),
messageMetadata.getHighestSequenceId(), firstCallback);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ class BatchMessageKeyBasedContainer extends AbstractBatchMessageContainer {
@Override
public boolean add(MessageImpl<?> msg, SendCallback callback) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] add message to batch, num messages in batch so far is {}", topicName, producerName,
numMessagesInBatch);
log.debug("[{}] [{}] add message to batch, num messages in batch so far is {}", topicName,
producer.getProducerName(), numMessagesInBatch);
}
String key = getKey(msg);
final BatchMessageContainerImpl batchMessageContainer = batches.computeIfAbsent(key,
Expand Down

0 comments on commit 7ae3e8f

Please sign in to comment.