Skip to content

Commit e66a354

Browse files
authored
Merge pull request #1 from catawiki/sav_include_clean_offsets
include clean offsets on stable 0.6
2 parents ae24a0e + 7c3bfcb commit e66a354

File tree

2 files changed

+11
-1
lines changed

2 files changed

+11
-1
lines changed

lib/kafka/consumer.rb

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,13 +439,15 @@ def join_group
439439
if old_generation_id && @group.generation_id != old_generation_id + 1
440440
# We've been out of the group for at least an entire generation, no
441441
# sense in trying to hold on to offset data
442+
clear_current_offsets
442443
@offset_manager.clear_offsets
443444
else
444445
# After rejoining the group we may have been assigned a new set of
445446
# partitions. Keeping the old offset commits around forever would risk
446447
# having the consumer go back and reprocess messages if it's assigned
447448
# a partition it used to be assigned to way back. For that reason, we
448449
# only keep commits for the partitions that we're still assigned.
450+
clear_current_offsets(excluding: @group.assigned_partitions)
449451
@offset_manager.clear_offsets_excluding(@group.assigned_partitions)
450452
end
451453

@@ -532,5 +534,13 @@ def fetch_batches
532534
def pause_for(topic, partition)
533535
@pauses[topic][partition]
534536
end
537+
538+
def clear_current_offsets(excluding: {})
539+
@current_offsets.each do |topic, partitions|
540+
partitions.keep_if do |partition, _|
541+
excluding.fetch(topic, []).include?(partition)
542+
end
543+
end
544+
end
535545
end
536546
end

lib/kafka/version.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# frozen_string_literal: true
22

33
module Kafka
4-
VERSION = "0.6.8"
4+
VERSION = "0.6.10-beta1"
55
end

0 commit comments

Comments
 (0)