Skip to content

Commit

Permalink
Calling internalFlushCurrentBatch will be forwaded to internalFlushCu…
Browse files Browse the repository at this point in the history
…rrentBatches if the associated batch builder contains multi batches(KeyBasedBatchBuilder).

Doing transparent forwarding could centralized the branching logic at only one place.
  • Loading branch information
Karma Shi committed Mar 17, 2022
1 parent bb2fa81 commit 2970451
Showing 1 changed file with 13 additions and 20 deletions.
33 changes: 13 additions & 20 deletions pulsar/producer_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,11 +383,7 @@ func (p *partitionProducer) runEventsLoop() {
return
}
case <-p.batchFlushTicker.C:
if p.batchBuilder.IsMultiBatches() {
p.internalFlushCurrentBatches()
} else {
p.internalFlushCurrentBatch()
}
p.internalFlushCurrentBatch()
}
}
}
Expand Down Expand Up @@ -478,11 +474,8 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
msg.ReplicationClusters, deliverAt)
if !added {
// The current batch is full.. flush it and retry
if p.batchBuilder.IsMultiBatches() {
p.internalFlushCurrentBatches()
} else {
p.internalFlushCurrentBatch()
}

p.internalFlushCurrentBatch()

// after flushing try again to add the current payload
if ok := p.batchBuilder.Add(smm, p.sequenceIDGenerator, payload, request,
Expand All @@ -497,11 +490,9 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
}

if !sendAsBatch || request.flushImmediately {
if p.batchBuilder.IsMultiBatches() {
p.internalFlushCurrentBatches()
} else {
p.internalFlushCurrentBatch()
}

p.internalFlushCurrentBatch()

}
}

Expand All @@ -515,6 +506,11 @@ type pendingItem struct {
}

func (p *partitionProducer) internalFlushCurrentBatch() {
if p.batchBuilder.IsMultiBatches() {
p.internalFlushCurrentBatches()
return
}

batchData, sequenceID, callbacks, err := p.batchBuilder.Flush()
if batchData == nil {
return
Expand Down Expand Up @@ -676,11 +672,8 @@ func (p *partitionProducer) internalFlushCurrentBatches() {
}

func (p *partitionProducer) internalFlush(fr *flushRequest) {
if p.batchBuilder.IsMultiBatches() {
p.internalFlushCurrentBatches()
} else {
p.internalFlushCurrentBatch()
}

p.internalFlushCurrentBatch()

pi, ok := p.pendingQueue.PeekLast().(*pendingItem)
if !ok {
Expand Down

0 comments on commit 2970451

Please sign in to comment.