-
Notifications
You must be signed in to change notification settings - Fork 112
[PP] Handle IngestRequest message
#3974
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
c093ea2 to
2a7342c
Compare
8b8db7f to
953b39a
Compare
966834c to
8ae259e
Compare
fc8efad to
5b6ea28
Compare
tillrohrmann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for creating this PR @muhamadazmy. The changes make a lot of sense to me. I left a few minor comments.
My main question was about the impact on the partition processor event loop if we are ingesting a lot of entries (potentially also large entries). Did you observe any negative effects?
It would be great to address the problem of deserializing the Envelope records in a follow-up PR since this is really unnecessary work that the system now needs to do.
| // sender | ||
| // .enqueue_many(records) | ||
| // .await | ||
| // .map_err(|_| Error::SelfProposer)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
minor optimization: We might be able to use sender.try_enqueue_many and only fall back to enqueue individual records if the try one fails.
In the future we might have a LogSender::enqueue_many_optimized which checks for the available capacity and obtains as many permits as possible in a batch. But this is probably premature optimization.
| // is a way to pass the raw encoded data directly to the appender | ||
| let envelope = StorageCodec::decode(&mut record.record)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is indeed a pity. Let's do this as a follow-up. I think there isn't a lot missing because Record already supports carrying Bytes and we only need a way to create a Record from (Keys, Bytes) that we have available here. This is literally wasted work that we are doing here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed
| .propose_many_with_callback(records, callback) | ||
| .await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this await become a problem for the partition processor event loop if there are too many records to ingest? Should we maybe think about a throttling mechanism to not starve the other select branches?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! I will look into it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is now mainly controlled by the ingestion client "batch_size" (in bytes) This put an upper limit on the size of a single ingest request. Default is 50kB.
9dd6b0e to
2554660
Compare
IngestRequest messageIngestRequest message
e5ee450 to
ac216d6
Compare
0f3f506 to
a16b71e
Compare
- `ingestion-client` implements the runtime layer that receives WAL envelopes, fans it out to the correct partition, and tracks completion. It exposes: - `IngestionClient`, enforces inflight budgets, and resolves partition IDs before sending work downstream. - The session subsystem that batches `IngestRecords`, retries connections, and reports commit status to callers. - `ingestion-client` only ingests records and notify the caller once the record is "committed" to bifrost by the PP. This makes it useful to implement kafka ingress and other external ingestion
Summary: Handle the incoming `IngestRequest` messages sent by the `ingestion-client`
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating this PR @muhamadazmy. I think we shouldn't map ServiceStopped to LostLeadership as this has a slightly different semantical meaning than before. Instead mapping it to NotLeader should be fine, I believe. Apart from this, +1 for merging :-)
It's really nice that we no longer need to do the deserialization serialization steps when handling the Envelope 👏
| RpcReplyError::LoadShedding => Self::Busy, | ||
| RpcReplyError::ServiceNotReady => Self::Busy, | ||
| RpcReplyError::ServiceStopped => Self::Stopping, | ||
| RpcReplyError::ServiceStopped => Self::LostLeadership, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think ServiceStopped should rather be NotLeader. The difference is whether the partition processor has potentially processed this message or not. With LostLeadership it is not possible to safely retry an rpc because it might have processed it (e.g. written to Bifrost).
| PartitionProcessorRpcError::NotLeader(id) | ||
| | PartitionProcessorRpcError::LostLeadership(id) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just want to highlight that NotLeader means the message has not been processed by the pp and LostLeadership means we don't know. If this should make any difference for the IngestionClient, then we probably need two different return values. If not, then ignore my comment.
[PP] Handle
IngestRequestmessageSummary:
Handle the incoming
IngestRequestmessages sent by theingestion-clientStack created with Sapling. Best reviewed with ReviewStack.
Shuffler#4024IngestionClientfor invocation and state mgmt #3980ingestion-client#3975IngestRequestmessage #3974CommitTokenback from notify_committed() #3968ingestion-clientcrate #3976