fix negative lag metircs issue + improve API design for parition lag #17060
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Fixes #XXXX.
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisor.java org.apache.druid.indexing.rabbitstream.supervisor.RabbitStreamSupervisor
Description
Problem 1: negtive lag issue for 2 scenarios
S1: No issue for kafka, but there is occasional negative lag due to thread-safe issue. (we should fix this issue)
S2: If can't connect to kafka or kafka connection was broken, we can see negative lag. (negative is helpful to application, should not be skipped)
skip emitting if there was negative lag, this is bad idea.
For S1, looks no impact on data ingestion, only emitter metrics is missing at few time points.
For S2, some companies already build monitoring based on the negative partition lag, if negative lag was not reported, their monitor will not work.
Back to the negative lag issue, I think we should fix it instead of skipping.
For S1, why there was negative lag?
In class
SeekableStreamSupervisor
:updateCurrentAndLatestOffsets() : PT30S
--updateCurrentOffsets() -> get task reading offset : OFFSET1
--updatePartitionLagFromStream() -> get partition writing end offset : OFFSET2
in another thread:
/druid/indexer/v1/supervisor//status -> SeekableStreamSupervisor::getStatus() -> SeekableStreamSupervisor::generateReport() -> calculation lag = OFFSET2 - OFFSET1
When the negative lag issue happend?
So the idea is we can make the OFFSET1 & OFFSET2 have the same version.
=================================================================================
Problem 2: Bad design for getPartitionRecordLag() & getPartitionTimeLag()
We want to support 2 kinds of partition lag, but like KafkaSupervisor, only need record partition lag. Like KinesisSupervisor, only need time partition lag.
For expansibility, if we want to add another type of partition lag, do we plan to add another method like: getPartition*Lag() ?
And all the existing *Supervisor class need to implements the new method, sounds not make sense.
From the design pattern side, provide 1 method with returned type is better than provide 2 separated methods.
vs
Fixed the bug ...
Renamed the class ...
Added a forbidden-apis entry ...
Release note
Key changed/added classes in this PR
MyFoo
OurBar
TheirBaz
This PR has: