-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17108: Earliest pending upload offset #16584
base: trunk
Are you sure you want to change the base?
Conversation
b4f63a6
to
d89d53b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch! Left minor comments to address.
Should we need to add an OffsetSpec on the client side to make the earliest_pending_upload_offset
queryable from command line tools similar to 6e32448?
val epochResult: Optional[Integer] = | ||
if (leaderEpochCache.isDefined) { | ||
val epochOpt = leaderEpochCache.get.epochForOffset(earliestPendingUploadOffset) | ||
if (epochOpt.isPresent) { | ||
Optional.of(epochOpt.getAsInt) | ||
} else { | ||
Optional.empty() | ||
} | ||
} else { | ||
Optional.empty() | ||
} | ||
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, earliestPendingUploadOffset, epochResult)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
val epochResult: Optional[Integer] = | |
if (leaderEpochCache.isDefined) { | |
val epochOpt = leaderEpochCache.get.epochForOffset(earliestPendingUploadOffset) | |
if (epochOpt.isPresent) { | |
Optional.of(epochOpt.getAsInt) | |
} else { | |
Optional.empty() | |
} | |
} else { | |
Optional.empty() | |
} | |
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, earliestPendingUploadOffset, epochResult)) | |
val epochResult = leaderEpochCache.flatMap(_.epochForOffset(earliestPendingUploadOffset).asScala) | |
.asJava.asInstanceOf[Optional[Integer]] | |
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, earliestPendingUploadOffset, epochResult)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks for the suggestion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are other places in the same method can benefit from this refactor as well like line 1319 and 1298
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made the requested changes. Also changed for LATEST_TIMESTAMP
|
||
@Test | ||
def testFetchEarliestPendingUploadTimestampWithRemoteStorage(): Unit = { | ||
val remoteLogManager = mock(classOf[RemoteLogManager]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can refactor the test to extract the "prepare" phase which is duplicated across multiple tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
e40dd55
to
5684997
Compare
@@ -1334,6 +1334,27 @@ class UnifiedLog(@volatile var logStartOffset: Long, | |||
} else { | |||
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1))) | |||
} | |||
} else if (targetTimestamp == ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_OFFSET_TIMESTAMP) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we extract this into different method and call it from here especially that the logic here has too many if stataments ? I know the rest of fetchOffsetByTimestamp has been done in-place but the method has grown and it hard to follow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. Left one minor comment to address. Thanks for covering the patch with detailed unit tests!
@@ -49,6 +49,8 @@ public class ListOffsetsRequest extends AbstractRequest { | |||
|
|||
public static final long LATEST_TIERED_TIMESTAMP = -5L; | |||
|
|||
public static final long EARLIEST_PENDING_UPLOAD_OFFSET_TIMESTAMP = -6L; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider renaming it to EARLIEST_PENDING_UPLOAD_TIMESTAMP
for uniformity with other names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
5684997
to
bff7bf8
Compare
bff7bf8
to
5e548a3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please rebase this PR with trunk. |
This PR is being marked as stale since it has not had any activity in 90 days. If you If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact). If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed. |
This is the first part of the implementation of KIP-1023
The purpose of this pull request is for the broker to start returning the correct offset when it receives a -6 as a timestamp in a ListOffsets API request.
Added unit tests for the new timestamp.
Committer Checklist (excluded from commit message)