Skip to content
This repository was archived by the owner on Jul 15, 2023. It is now read-only.

Commit b87b743

Browse files
committed
make new offset behavior standard
* remove config setting for NewCommitBehavior * use committed offset as consumed offset when constructing PartitionTopicInfo as having this being committed - 1 is causing errors on first fetch... apparently this should be the same as the first fetched offset or it will indicate potential data loss.
1 parent c3dc44a commit b87b743

File tree

3 files changed

+3
-14
lines changed

3 files changed

+3
-14
lines changed

src/KafkaNET.Library/Cfg/ConsumerConfiguration.cs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,6 @@ public class ConsumerConfiguration
7676

7777
public const int DefaultConsumeGroupFindNewLeaderSleepIntervalMs = 2000;
7878

79-
public const bool DefaultNewCommitBehavior = false;
80-
8179
#region Constructor
8280
public ConsumerConfiguration()
8381
{
@@ -101,7 +99,6 @@ public ConsumerConfiguration()
10199
this.Verbose = false;
102100
this.ConsumeGroupRebalanceRetryIntervalMs = DefaultConsumeGroupRebalanceRetryIntervalMs;
103101
this.ConsumeGroupFindNewLeaderSleepIntervalMs = DefaultConsumeGroupFindNewLeaderSleepIntervalMs;
104-
this.NewCommitBehavior = DefaultNewCommitBehavior;
105102
}
106103

107104
private string GetHostName()
@@ -354,13 +351,6 @@ public int MaxFetchSize
354351
/// </summary>
355352
public int MaxFetchFactor { get; set; }
356353

357-
/// <summary>
358-
/// Whether to use the new offset commit behaviour of the next fetch offset
359-
/// or the traditional behaviour of the consumed offset.
360-
/// Default = false.
361-
/// </summary>
362-
public bool NewCommitBehavior { get; set; }
363-
364354
#endregion
365355

366356
private static void Validate(ConsumerConfigurationSection config)

src/KafkaNET.Library/Consumers/ZookeeperConsumerConnector.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public void CommitOffsets()
142142
var topicDirs = new ZKGroupTopicDirs(this.config.GroupId, topic.Key);
143143
foreach (KeyValuePair<int, PartitionTopicInfo> partition in topic.Value)
144144
{
145-
var newOffset = partition.Value.ConsumeOffset + (config.NewCommitBehavior ? 1 : 0);
145+
var newOffset = partition.Value.ConsumeOffset + 1;
146146
try
147147
{
148148
if (partition.Value.ConsumeOffsetValid)

src/KafkaNET.Library/ZooKeeperIntegration/Listeners/ZKRebalancerListener.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -520,20 +520,19 @@ private void AddPartitionTopicInfo(ZKGroupTopicDirs topicDirs, string partition,
520520
{
521521
offsetCommited = long.Parse(offsetString);
522522
long latestOffset = this.EarliestOrLatestOffset(topic, leader, partitionId, OffsetRequest.LatestTime);
523-
offset = config.NewCommitBehavior ? offsetCommited : offsetCommited + 1;
523+
offset = offsetCommited;
524524
offset = Math.Min(offset, latestOffset);
525525
Logger.InfoFormat("Final offset {0} for topic {1} partition {2} OffsetCommited {3} latestOffset {4}"
526526
, offset, topic, partition, offsetCommited, latestOffset);
527527
}
528528

529-
var offsetConsumed = config.NewCommitBehavior ? offsetCommited - 1 : offsetCommited;
530529
var queue = this.queues[new Tuple<string, string>(topic, consumerThreadId)];
531530
var partTopicInfo = new PartitionTopicInfo(
532531
topic,
533532
leader,
534533
partitionId,
535534
queue,
536-
offsetConsumed,
535+
offset,
537536
offset,
538537
offset,
539538
this.config.FetchSize,

0 commit comments

Comments
 (0)