Skip to content

Commit e5ee450

Browse files
committed
[PP] Handle IngestRequest message
Summary: Handle the incoming `IngestRequest` messages sent by the `ingress-core`
1 parent bf1547e commit e5ee450

File tree

8 files changed

+242
-65
lines changed

8 files changed

+242
-65
lines changed

crates/bifrost/src/record.rs

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ use core::str;
1212
use std::marker::PhantomData;
1313
use std::sync::Arc;
1414

15+
use bytes::Bytes;
1516
use restate_types::logs::{BodyWithKeys, HasRecordKeys, Keys, Lsn, Record};
1617
use restate_types::logs::{LogletOffset, SequenceNumber};
1718
use restate_types::storage::{PolyBytes, StorageDecode, StorageDecodeError, StorageEncode};
@@ -200,29 +201,19 @@ pub struct Gap<S> {
200201
pub to: S,
201202
}
202203

204+
#[derive(Clone)]
203205
pub struct InputRecord<T> {
204206
created_at: NanosSinceEpoch,
205207
keys: Keys,
206-
body: Arc<dyn StorageEncode>,
208+
body: PolyBytes,
207209
_phantom: PhantomData<T>,
208210
}
209211

210-
impl<T> Clone for InputRecord<T> {
211-
fn clone(&self) -> Self {
212-
Self {
213-
created_at: self.created_at,
214-
keys: self.keys.clone(),
215-
body: Arc::clone(&self.body),
216-
_phantom: self._phantom,
217-
}
218-
}
219-
}
220-
221212
// This is a zero-cost transformation. The type is erased at runtime, but the underlying
222213
// layout is identical.
223214
impl<T: StorageEncode> InputRecord<T> {
224215
pub fn into_record(self) -> Record {
225-
Record::from_parts(self.created_at, self.keys, PolyBytes::Typed(self.body))
216+
Record::from_parts(self.created_at, self.keys, self.body)
226217
}
227218
}
228219

@@ -231,7 +222,24 @@ impl<T: StorageEncode> InputRecord<T> {
231222
Self {
232223
created_at,
233224
keys,
234-
body,
225+
body: PolyBytes::Typed(body),
226+
_phantom: PhantomData,
227+
}
228+
}
229+
230+
/// Builds an [`InputRecord<T>`] directly from raw bytes without validating the payload.
231+
///
232+
/// # Safety
233+
/// Caller must guarantee the bytes are a correctly storage-encoded `T`.
234+
pub unsafe fn from_bytes_unchecked(
235+
created_at: NanosSinceEpoch,
236+
keys: Keys,
237+
body: Bytes,
238+
) -> Self {
239+
Self {
240+
created_at,
241+
keys,
242+
body: PolyBytes::Bytes(body),
235243
_phantom: PhantomData,
236244
}
237245
}
@@ -246,7 +254,7 @@ impl<T: StorageEncode + HasRecordKeys> From<Arc<T>> for InputRecord<T> {
246254
InputRecord {
247255
created_at: NanosSinceEpoch::now(),
248256
keys: val.record_keys(),
249-
body: val,
257+
body: PolyBytes::Typed(val),
250258
_phantom: PhantomData,
251259
}
252260
}
@@ -257,7 +265,7 @@ impl From<String> for InputRecord<String> {
257265
InputRecord {
258266
created_at: NanosSinceEpoch::now(),
259267
keys: Keys::None,
260-
body: Arc::new(val),
268+
body: PolyBytes::Typed(Arc::new(val)),
261269
_phantom: PhantomData,
262270
}
263271
}
@@ -268,7 +276,7 @@ impl From<&str> for InputRecord<String> {
268276
InputRecord {
269277
created_at: NanosSinceEpoch::now(),
270278
keys: Keys::None,
271-
body: Arc::new(String::from(val)),
279+
body: PolyBytes::Typed(Arc::new(String::from(val))),
272280
_phantom: PhantomData,
273281
}
274282
}
@@ -279,7 +287,7 @@ impl<T: StorageEncode> From<BodyWithKeys<T>> for InputRecord<T> {
279287
InputRecord {
280288
created_at: NanosSinceEpoch::now(),
281289
keys: val.record_keys(),
282-
body: Arc::new(val.into_inner()),
290+
body: PolyBytes::Typed(Arc::new(val.into_inner())),
283291
_phantom: PhantomData,
284292
}
285293
}

crates/core/src/worker_api/partition_processor_rpc_client.rs

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,6 @@ pub enum RpcErrorKind {
7171
Busy,
7272
#[error("internal error: {0}")]
7373
Internal(String),
74-
#[error("partition processor starting")]
75-
Starting,
76-
#[error("partition processor stopping")]
77-
Stopping,
7874
}
7975

8076
impl PartitionProcessorInvocationClientError {
@@ -106,10 +102,8 @@ impl RpcError {
106102
match self.source {
107103
RpcErrorKind::Connect(_)
108104
| RpcErrorKind::NotLeader
109-
| RpcErrorKind::Starting
110105
| RpcErrorKind::Busy
111-
| RpcErrorKind::SendFailed
112-
| RpcErrorKind::Stopping => {
106+
| RpcErrorKind::SendFailed => {
113107
// These are pre-flight error that we can distinguish,
114108
// and for which we know for certain that no message was proposed yet to the log.
115109
true
@@ -143,7 +137,7 @@ impl From<RpcReplyError> for RpcErrorKind {
143137
RpcReplyError::ServiceNotFound | RpcReplyError::SortCodeNotFound => Self::NotLeader,
144138
RpcReplyError::LoadShedding => Self::Busy,
145139
RpcReplyError::ServiceNotReady => Self::Busy,
146-
RpcReplyError::ServiceStopped => Self::Stopping,
140+
RpcReplyError::ServiceStopped => Self::LostLeadership,
147141
}
148142
}
149143
}
@@ -154,8 +148,6 @@ impl From<PartitionProcessorRpcError> for RpcErrorKind {
154148
PartitionProcessorRpcError::NotLeader(_) => RpcErrorKind::NotLeader,
155149
PartitionProcessorRpcError::LostLeadership(_) => RpcErrorKind::LostLeadership,
156150
PartitionProcessorRpcError::Internal(msg) => RpcErrorKind::Internal(msg),
157-
PartitionProcessorRpcError::Starting => RpcErrorKind::Starting,
158-
PartitionProcessorRpcError::Stopping => RpcErrorKind::Stopping,
159151
}
160152
}
161153
}

crates/types/src/net/partition_processor.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -142,20 +142,14 @@ pub enum PartitionProcessorRpcError {
142142
//Busy,
143143
#[error("internal error: {0}")]
144144
Internal(String),
145-
#[error("partition processor starting")]
146-
Starting,
147-
#[error("partition processor stopping")]
148-
Stopping,
149145
}
150146

151147
impl PartitionProcessorRpcError {
152148
pub fn likely_stale_route(&self) -> bool {
153149
match self {
154150
PartitionProcessorRpcError::NotLeader(_) => true,
155151
PartitionProcessorRpcError::LostLeadership(_) => true,
156-
PartitionProcessorRpcError::Stopping => true,
157152
PartitionProcessorRpcError::Internal(_) => false,
158-
PartitionProcessorRpcError::Starting => false,
159153
}
160154
}
161155
}

crates/worker/src/metric_definitions.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ pub const PARTITION_IS_EFFECTIVE_LEADER: &str = "restate.partition.is_effective_
3333
pub const PARTITION_RECORD_COMMITTED_TO_READ_LATENCY_SECONDS: &str =
3434
"restate.partition.record_committed_to_read_latency.seconds";
3535

36+
pub const PARTITION_INGESTION_REQUEST_LEN: &str = "restate.partition.ingest.request.len";
37+
pub const PARTITION_INGESTION_REQUEST_SIZE: &str = "restate.partition.ingest.request.size.bytes";
38+
3639
pub(crate) fn describe_metrics() {
3740
describe_gauge!(
3841
PARTITION_BLOCKED_FLARE,
@@ -97,4 +100,16 @@ pub(crate) fn describe_metrics() {
97100
Unit::Count,
98101
"Number of records between last applied lsn and the log tail"
99102
);
103+
104+
describe_histogram!(
105+
PARTITION_INGESTION_REQUEST_LEN,
106+
Unit::Count,
107+
"Number of records in a single ingestion request"
108+
);
109+
110+
describe_histogram!(
111+
PARTITION_INGESTION_REQUEST_SIZE,
112+
Unit::Bytes,
113+
"Total size of records in a single ingestion request"
114+
);
100115
}

crates/worker/src/partition/leadership/leader_state.rs

Lines changed: 67 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use restate_types::identifiers::{
3737
};
3838
use restate_types::invocation::client::{InvocationOutput, SubmittedInvocationNotification};
3939
use restate_types::logs::Keys;
40+
use restate_types::net::ingest::IngestRecord;
4041
use restate_types::net::partition_processor::{
4142
PartitionProcessorRpcError, PartitionProcessorRpcResponse,
4243
};
@@ -436,14 +437,35 @@ impl LeaderState {
436437
Ok(commit_token) => {
437438
self.awaiting_rpc_self_propose.push(SelfAppendFuture::new(
438439
commit_token,
439-
success_response,
440-
reciprocal,
440+
|result: Result<(), PartitionProcessorRpcError>| {
441+
reciprocal.send(result.map(|_| success_response));
442+
},
441443
));
442444
}
443445
Err(e) => reciprocal.send(Err(PartitionProcessorRpcError::Internal(e.to_string()))),
444446
}
445447
}
446448

449+
pub async fn propose_many_with_callback<F>(
450+
&mut self,
451+
records: impl ExactSizeIterator<Item = IngestRecord>,
452+
callback: F,
453+
) where
454+
F: FnOnce(Result<(), PartitionProcessorRpcError>) + Send + Sync + 'static,
455+
{
456+
match self
457+
.self_proposer
458+
.propose_many_with_notification(records)
459+
.await
460+
{
461+
Ok(commit_token) => {
462+
self.awaiting_rpc_self_propose
463+
.push(SelfAppendFuture::new(commit_token, callback));
464+
}
465+
Err(e) => callback(Err(PartitionProcessorRpcError::Internal(e.to_string()))),
466+
}
467+
}
468+
447469
pub fn handle_actions(
448470
&mut self,
449471
invoker_tx: &mut impl restate_invoker_api::InvokerHandle<InvokerStorageReader<PartitionStore>>,
@@ -691,42 +713,72 @@ impl LeaderState {
691713
}
692714
}
693715

716+
trait CallbackInner: Send + Sync + 'static {
717+
fn call(self: Box<Self>, result: Result<(), PartitionProcessorRpcError>);
718+
}
719+
720+
impl<F> CallbackInner for F
721+
where
722+
F: FnOnce(Result<(), PartitionProcessorRpcError>) + Send + Sync + 'static,
723+
{
724+
fn call(self: Box<Self>, result: Result<(), PartitionProcessorRpcError>) {
725+
self(result)
726+
}
727+
}
728+
729+
struct Callback {
730+
inner: Box<dyn CallbackInner>,
731+
}
732+
733+
impl Callback {
734+
fn call(self, result: Result<(), PartitionProcessorRpcError>) {
735+
self.inner.call(result);
736+
}
737+
}
738+
739+
impl<I> From<I> for Callback
740+
where
741+
I: CallbackInner,
742+
{
743+
fn from(value: I) -> Self {
744+
Self {
745+
inner: Box::new(value),
746+
}
747+
}
748+
}
749+
694750
struct SelfAppendFuture {
695751
commit_token: CommitToken,
696-
response: Option<(PartitionProcessorRpcResponse, RpcReciprocal)>,
752+
callback: Option<Callback>,
697753
}
698754

699755
impl SelfAppendFuture {
700-
fn new(
701-
commit_token: CommitToken,
702-
success_response: PartitionProcessorRpcResponse,
703-
response_reciprocal: RpcReciprocal,
704-
) -> Self {
756+
fn new(commit_token: CommitToken, callback: impl Into<Callback>) -> Self {
705757
Self {
706758
commit_token,
707-
response: Some((success_response, response_reciprocal)),
759+
callback: Some(callback.into()),
708760
}
709761
}
710762

711763
fn fail_with_internal(&mut self) {
712-
if let Some((_, reciprocal)) = self.response.take() {
713-
reciprocal.send(Err(PartitionProcessorRpcError::Internal(
764+
if let Some(callback) = self.callback.take() {
765+
callback.call(Err(PartitionProcessorRpcError::Internal(
714766
"error when proposing to bifrost".to_string(),
715767
)));
716768
}
717769
}
718770

719771
fn fail_with_lost_leadership(&mut self, this_partition_id: PartitionId) {
720-
if let Some((_, reciprocal)) = self.response.take() {
721-
reciprocal.send(Err(PartitionProcessorRpcError::LostLeadership(
772+
if let Some(callback) = self.callback.take() {
773+
callback.call(Err(PartitionProcessorRpcError::LostLeadership(
722774
this_partition_id,
723775
)));
724776
}
725777
}
726778

727779
fn succeed_with_appended(&mut self) {
728-
if let Some((success_response, reciprocal)) = self.response.take() {
729-
reciprocal.send(Ok(success_response));
780+
if let Some(callback) = self.callback.take() {
781+
callback.call(Ok(()))
730782
}
731783
}
732784
}

crates/worker/src/partition/leadership/mod.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,15 @@ use restate_types::errors::GenericError;
4949
use restate_types::identifiers::{InvocationId, PartitionKey, PartitionProcessorRpcRequestId};
5050
use restate_types::identifiers::{LeaderEpoch, PartitionLeaderEpoch};
5151
use restate_types::message::MessageIndex;
52+
use restate_types::net::ingest::IngestRecord;
5253
use restate_types::net::partition_processor::{
5354
PartitionProcessorRpcError, PartitionProcessorRpcResponse,
5455
};
5556
use restate_types::partitions::Partition;
5657
use restate_types::partitions::state::PartitionReplicaSetStates;
5758
use restate_types::retries::with_jitter;
5859
use restate_types::schema::Schema;
59-
use restate_types::storage::StorageEncodeError;
60+
use restate_types::storage::{StorageDecodeError, StorageEncodeError};
6061
use restate_vqueues::{SchedulerService, VQueuesMeta, VQueuesMetaMut};
6162
use restate_wal_protocol::Command;
6263
use restate_wal_protocol::control::{AnnounceLeader, PartitionDurability};
@@ -86,7 +87,9 @@ pub(crate) enum Error {
8687
#[error("failed writing to bifrost: {0}")]
8788
Bifrost(#[from] restate_bifrost::Error),
8889
#[error("failed serializing payload: {0}")]
89-
Codec(#[from] StorageEncodeError),
90+
Encode(#[from] StorageEncodeError),
91+
#[error("failed deserializing payload: {0}")]
92+
Decode(#[from] StorageDecodeError),
9093
#[error(transparent)]
9194
Shutdown(#[from] ShutdownError),
9295
#[error("error when self proposing")]
@@ -648,6 +651,26 @@ impl<I> LeadershipState<I> {
648651
}
649652
}
650653
}
654+
655+
/// propose to this partition
656+
pub async fn propose_many_with_callback<F>(
657+
&mut self,
658+
records: impl ExactSizeIterator<Item = IngestRecord>,
659+
callback: F,
660+
) where
661+
F: FnOnce(Result<(), PartitionProcessorRpcError>) + Send + Sync + 'static,
662+
{
663+
match &mut self.state {
664+
State::Follower | State::Candidate { .. } => callback(Err(
665+
PartitionProcessorRpcError::NotLeader(self.partition.partition_id),
666+
)),
667+
State::Leader(leader_state) => {
668+
leader_state
669+
.propose_many_with_callback(records, callback)
670+
.await;
671+
}
672+
}
673+
}
651674
}
652675
#[derive(Debug, derive_more::From)]
653676
struct TimerReader(PartitionStore);

0 commit comments

Comments
 (0)