Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-17108: Earliest pending upload offset #16584

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from

Conversation

abhijeetk88
Copy link
Contributor

@abhijeetk88 abhijeetk88 commented Jul 12, 2024

This is the first part of the implementation of KIP-1023

The purpose of this pull request is for the broker to start returning the correct offset when it receives a -6 as a timestamp in a ListOffsets API request.

Added unit tests for the new timestamp.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@abhijeetk88 abhijeetk88 marked this pull request as draft July 12, 2024 17:18
@abhijeetk88 abhijeetk88 force-pushed the KAFKA-17108 branch 6 times, most recently from b4f63a6 to d89d53b Compare August 5, 2024 06:26
@abhijeetk88 abhijeetk88 marked this pull request as ready for review August 5, 2024 06:30
@kamalcph kamalcph added the tiered-storage Related to the Tiered Storage feature label Aug 5, 2024
@kamalcph kamalcph self-requested a review August 5, 2024 07:55
Copy link
Contributor

@kamalcph kamalcph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch! Left minor comments to address.

Should we need to add an OffsetSpec on the client side to make the earliest_pending_upload_offset queryable from command line tools similar to 6e32448?

Comment on lines 1352 to 1353
val epochResult: Optional[Integer] =
if (leaderEpochCache.isDefined) {
val epochOpt = leaderEpochCache.get.epochForOffset(earliestPendingUploadOffset)
if (epochOpt.isPresent) {
Optional.of(epochOpt.getAsInt)
} else {
Optional.empty()
}
} else {
Optional.empty()
}
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, earliestPendingUploadOffset, epochResult))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val epochResult: Optional[Integer] =
if (leaderEpochCache.isDefined) {
val epochOpt = leaderEpochCache.get.epochForOffset(earliestPendingUploadOffset)
if (epochOpt.isPresent) {
Optional.of(epochOpt.getAsInt)
} else {
Optional.empty()
}
} else {
Optional.empty()
}
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, earliestPendingUploadOffset, epochResult))
val epochResult = leaderEpochCache.flatMap(_.epochForOffset(earliestPendingUploadOffset).asScala)
.asJava.asInstanceOf[Optional[Integer]]
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, earliestPendingUploadOffset, epochResult))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks for the suggestion.

Copy link
Collaborator

@OmniaGM OmniaGM Aug 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are other places in the same method can benefit from this refactor as well like line 1319 and 1298

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the requested changes. Also changed for LATEST_TIMESTAMP


@Test
def testFetchEarliestPendingUploadTimestampWithRemoteStorage(): Unit = {
val remoteLogManager = mock(classOf[RemoteLogManager])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can refactor the test to extract the "prepare" phase which is duplicated across multiple tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@abhijeetk88 abhijeetk88 force-pushed the KAFKA-17108 branch 3 times, most recently from e40dd55 to 5684997 Compare August 8, 2024 15:17
@abhijeetk88
Copy link
Contributor Author

Should we need to add an OffsetSpec on the client side to make the earliest_pending_upload_offset queryable from command line tools similar to 6e32448?

@kamalcph I can make it as part of a separate change. Will that be ok?

@@ -1334,6 +1334,27 @@ class UnifiedLog(@volatile var logStartOffset: Long,
} else {
Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, -1L, Optional.of(-1)))
}
} else if (targetTimestamp == ListOffsetsRequest.EARLIEST_PENDING_UPLOAD_OFFSET_TIMESTAMP) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we extract this into different method and call it from here especially that the logic here has too many if stataments ? I know the rest of fetchOffsetByTimestamp has been done in-place but the method has grown and it hard to follow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@kamalcph kamalcph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. Left one minor comment to address. Thanks for covering the patch with detailed unit tests!

@@ -49,6 +49,8 @@ public class ListOffsetsRequest extends AbstractRequest {

public static final long LATEST_TIERED_TIMESTAMP = -5L;

public static final long EARLIEST_PENDING_UPLOAD_OFFSET_TIMESTAMP = -6L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider renaming it to EARLIEST_PENDING_UPLOAD_TIMESTAMP for uniformity with other names.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@abhijeetk88
Copy link
Contributor Author

@kamalcph @OmniaGM addressed your comments. PTAL again.

Copy link
Contributor

@kamalcph kamalcph left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for addressing the review comments!

Should we have to bump the ListOffsetRequest version for this change? PTAL.

cc @showuon @satishd

@showuon showuon self-assigned this Sep 6, 2024
@kamalcph
Copy link
Contributor

kamalcph commented Oct 7, 2024

@abhijeetk88

Please rebase this PR with trunk.

Copy link

github-actions bot commented Jan 6, 2025

This PR is being marked as stale since it has not had any activity in 90 days. If you
would like to keep this PR alive, please leave a comment asking for a review. If the PR has
merge conflicts, update it with the latest from the base branch.

If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact).

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

@github-actions github-actions bot added the stale Stale PRs label Jan 6, 2025
@abhijeetk88
Copy link
Contributor Author

I would like to revive this PR. @kamalcph @showuon will you be able to help with reviews. I can rebase the PR with trunk.

@github-actions github-actions bot removed the stale Stale PRs label Jan 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
tiered-storage Related to the Tiered Storage feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants