-
Notifications
You must be signed in to change notification settings - Fork 734
Support read from timestamp for topics autopartitioning #11882
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
23 commits
Select commit
Hold shift + click to select a range
56a16f2
WIP
nshestakov 2532149
Initializing
nshestakov f097c0f
tests
nshestakov 8609208
Merge branch 'main' into AP-fromTime-test
nshestakov 099da38
fix
nshestakov aeb937d
fix
nshestakov 7fc3950
fix
nshestakov 9dbf02e
fix
nshestakov 869cf03
fix
nshestakov 62dd563
Merge branch 'main' into AP-fromTime-test
nshestakov af07beb
WIP
nshestakov 710b29b
tests
nshestakov cd3b03c
more test
nshestakov e7a6eae
fix
nshestakov 3073817
fix
nshestakov 24db53e
fix
nshestakov e0f2755
fix
nshestakov 8175ab8
fix
nshestakov a13f4a3
fix
nshestakov 40a5e60
fix
nshestakov 16824da
Merge branch 'main' into AP-fromTime-test
nshestakov c646af3
fix comments
nshestakov dfa2d6a
fix comments #2
nshestakov File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,24 @@ namespace NKikimr::NPQ { | |
|
|
||
| static const ui32 MAX_USER_ACTS = 1000; | ||
|
|
||
| TMaybe<TInstant> GetReadFrom(ui32 maxTimeLagMs, ui64 readTimestampMs, TInstant consumerReadFromTimestamp, const TActorContext& ctx) { | ||
| if (!(maxTimeLagMs > 0 || readTimestampMs > 0 || consumerReadFromTimestamp > TInstant::MilliSeconds(1))) { | ||
| return {}; | ||
| } | ||
|
|
||
| TInstant timestamp = maxTimeLagMs > 0 ? ctx.Now() - TDuration::MilliSeconds(maxTimeLagMs) : TInstant::Zero(); | ||
| timestamp = Max(timestamp, TInstant::MilliSeconds(readTimestampMs)); | ||
| timestamp = Max(timestamp, consumerReadFromTimestamp); | ||
| return timestamp; | ||
| } | ||
|
|
||
| ui64 TPartition::GetReadOffset(ui64 offset, TMaybe<TInstant> readTimestamp) const { | ||
| if (!readTimestamp) { | ||
| return offset; | ||
| } | ||
| return Max(GetOffsetEstimate(DataKeysBody, *readTimestamp, Min(Head.Offset, EndOffset - 1)), offset); | ||
| } | ||
|
|
||
| void TPartition::SendReadingFinished(const TString& consumer) { | ||
| Send(Tablet, new TEvPQ::TEvReadingPartitionStatusRequest(consumer, Partition.OriginalPartitionId, TabletGeneration, ++PQRBCookie)); | ||
| } | ||
|
|
@@ -133,7 +151,7 @@ void TPartition::ProcessHasDataRequests(const TActorContext& ctx) { | |
| }; | ||
|
|
||
| for (auto request = HasDataRequests.begin(); request != HasDataRequests.end();) { | ||
| if (request->Offset < EndOffset) { | ||
| if (request->Offset < EndOffset && (IsActive() || !request->ReadTimestamp || *request->ReadTimestamp < EndWriteTimestamp)) { | ||
| auto response = MakeHasDataInfoResponse(GetSizeLag(request->Offset), request->Cookie); | ||
| ctx.Send(request->Sender, response.Release()); | ||
| } else if (!IsActive()) { | ||
|
|
@@ -170,16 +188,18 @@ void TPartition::Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorCont | |
|
|
||
| auto cookie = record.HasCookie() ? TMaybe<ui64>(record.GetCookie()) : TMaybe<ui64>(); | ||
|
|
||
| auto readTimestamp = GetReadFrom(record.GetMaxTimeLagMs(), record.GetReadTimestampMs(), TInstant::Zero() /* TODO */, ctx); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TODO what? |
||
|
|
||
| TActorId sender = ActorIdFromProto(record.GetSender()); | ||
| if (InitDone && EndOffset > (ui64)record.GetOffset()) { //already has data, answer right now | ||
| if (InitDone && EndOffset > (ui64)record.GetOffset() && (!readTimestamp || EndWriteTimestamp >= *readTimestamp)) { //already has data, answer right now | ||
| auto response = MakeHasDataInfoResponse(GetSizeLag(record.GetOffset()), cookie); | ||
| ctx.Send(sender, response.Release()); | ||
| } else if (InitDone && !IsActive()) { | ||
| auto response = MakeHasDataInfoResponse(0, cookie, true); | ||
| ctx.Send(sender, response.Release()); | ||
| } else { | ||
| THasDataReq req{++HasDataReqNum, (ui64)record.GetOffset(), sender, cookie, | ||
| record.HasClientId() && InitDone ? record.GetClientId() : ""}; | ||
| record.HasClientId() && InitDone ? record.GetClientId() : "", readTimestamp}; | ||
| THasDataDeadline dl{TInstant::MilliSeconds(record.GetDeadline()), req}; | ||
| auto res = HasDataRequests.insert(req); | ||
| HasDataDeadlines.insert(dl); | ||
|
|
@@ -763,11 +783,10 @@ void TPartition::DoRead(TEvPQ::TEvRead::TPtr&& readEvent, TDuration waitQuotaTim | |
| } | ||
| userInfo->ReadsInQuotaQueue--; | ||
| ui64 offset = read->Offset; | ||
| if (read->PartNo == 0 && (read->MaxTimeLagMs > 0 || read->ReadTimestampMs > 0 || userInfo->ReadFromTimestamp > TInstant::MilliSeconds(1))) { | ||
| TInstant timestamp = read->MaxTimeLagMs > 0 ? ctx.Now() - TDuration::MilliSeconds(read->MaxTimeLagMs) : TInstant::Zero(); | ||
| timestamp = Max(timestamp, TInstant::MilliSeconds(read->ReadTimestampMs)); | ||
| timestamp = Max(timestamp, userInfo->ReadFromTimestamp); | ||
| offset = Max(GetOffsetEstimate(DataKeysBody, timestamp, Min(Head.Offset, EndOffset - 1)), offset); | ||
|
|
||
| auto readTimestamp = GetReadFrom(read->MaxTimeLagMs, read->ReadTimestampMs, userInfo->ReadFromTimestamp, ctx); | ||
| if (read->PartNo == 0 && readTimestamp) { | ||
| offset = GetReadOffset(offset, readTimestamp); | ||
| userInfo->ReadOffsetRewindSum += offset - read->Offset; | ||
| } | ||
|
|
||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add Y_ABORT_UNLESS that checks old StartOffset and this StartOffset. And the same for EndOffset. At this point offsets must be equal. You need to remove this ABORT only when empty partitions with StartOffset > 0 will be acceptable.