Skip to content

Commit 5b2bcca

Browse files
author
Jamie Chapman-Brown
committed
fix: correct lag offset
1 parent d1386df commit 5b2bcca

File tree

2 files changed

+2
-4
lines changed

2 files changed

+2
-4
lines changed

extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTaskIOConfig.java

-2
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ public RabbitStreamIndexTaskIOConfig(
4545
// backward
4646
// compabitility
4747
@JsonProperty("baseSequenceName") String baseSequenceName,
48-
// startSequenceNumbers and endSequenceNumbers must be set for new
49-
// versions
5048
@JsonProperty("startSequenceNumbers") SeekableStreamStartSequenceNumbers<String, Long> startSequenceNumbers,
5149
@JsonProperty("endSequenceNumbers") SeekableStreamEndSequenceNumbers<String, Long> endSequenceNumbers,
5250
@JsonProperty("consumerProperties") Map<String, Object> consumerProperties,

extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/supervisor/RabbitStreamSupervisor.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ private Map<String, Long> getRecordLagPerPartitionInLatestSequences(Map<String,
279279
Collectors.toMap(
280280
Entry::getKey,
281281
e -> e.getValue() != null
282-
? e.getValue() - Optional.ofNullable(currentOffsets.get(e.getKey())).orElse(0L)
282+
? e.getValue() + 1 - Optional.ofNullable(currentOffsets.get(e.getKey())).orElse(0L)
283283
: 0));
284284
}
285285

@@ -303,7 +303,7 @@ protected Map<String, Long> getRecordLagPerPartition(Map<String, Long> currentOf
303303
Collectors.toMap(
304304
Entry::getKey,
305305
e -> e.getValue() != null
306-
? latestSequenceFromStream.get(e.getKey()) - e.getValue()
306+
? latestSequenceFromStream.get(e.getKey()) + 1 - e.getValue()
307307
: 0));
308308
}
309309

0 commit comments

Comments
 (0)