Skip to content

Commit 5b6ea28

Browse files
committed
[Ingress] Handle IngestRequest message
Summary: Handle the incoming `IngestRequest` messages sent by the `ingress-core`
1 parent d9aa893 commit 5b6ea28

File tree

4 files changed

+188
-29
lines changed

4 files changed

+188
-29
lines changed

crates/worker/src/partition/leadership/leader_state.rs

Lines changed: 67 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use restate_types::identifiers::{
3737
};
3838
use restate_types::invocation::client::{InvocationOutput, SubmittedInvocationNotification};
3939
use restate_types::logs::Keys;
40+
use restate_types::net::ingress::IngestRecord;
4041
use restate_types::net::partition_processor::{
4142
PartitionProcessorRpcError, PartitionProcessorRpcResponse,
4243
};
@@ -436,14 +437,35 @@ impl LeaderState {
436437
Ok(commit_token) => {
437438
self.awaiting_rpc_self_propose.push(SelfAppendFuture::new(
438439
commit_token,
439-
success_response,
440-
reciprocal,
440+
|result: Result<(), PartitionProcessorRpcError>| {
441+
reciprocal.send(result.map(|_| success_response));
442+
},
441443
));
442444
}
443445
Err(e) => reciprocal.send(Err(PartitionProcessorRpcError::Internal(e.to_string()))),
444446
}
445447
}
446448

449+
pub async fn propose_many_with_callback<F>(
450+
&mut self,
451+
records: impl ExactSizeIterator<Item = IngestRecord>,
452+
callback: F,
453+
) where
454+
F: FnOnce(Result<(), PartitionProcessorRpcError>) + Send + Sync + 'static,
455+
{
456+
match self
457+
.self_proposer
458+
.propose_many_with_notification(records)
459+
.await
460+
{
461+
Ok(commit_token) => {
462+
self.awaiting_rpc_self_propose
463+
.push(SelfAppendFuture::new(commit_token, callback));
464+
}
465+
Err(e) => callback(Err(PartitionProcessorRpcError::Internal(e.to_string()))),
466+
}
467+
}
468+
447469
pub fn handle_actions(
448470
&mut self,
449471
invoker_tx: &mut impl restate_invoker_api::InvokerHandle<InvokerStorageReader<PartitionStore>>,
@@ -691,42 +713,72 @@ impl LeaderState {
691713
}
692714
}
693715

716+
trait CallbackInner: Send + Sync + 'static {
717+
fn call(self: Box<Self>, result: Result<(), PartitionProcessorRpcError>);
718+
}
719+
720+
impl<F> CallbackInner for F
721+
where
722+
F: FnOnce(Result<(), PartitionProcessorRpcError>) + Send + Sync + 'static,
723+
{
724+
fn call(self: Box<Self>, result: Result<(), PartitionProcessorRpcError>) {
725+
self(result)
726+
}
727+
}
728+
729+
struct Callback {
730+
inner: Box<dyn CallbackInner>,
731+
}
732+
733+
impl Callback {
734+
fn call(self, result: Result<(), PartitionProcessorRpcError>) {
735+
self.inner.call(result);
736+
}
737+
}
738+
739+
impl<I> From<I> for Callback
740+
where
741+
I: CallbackInner,
742+
{
743+
fn from(value: I) -> Self {
744+
Self {
745+
inner: Box::new(value),
746+
}
747+
}
748+
}
749+
694750
struct SelfAppendFuture {
695751
commit_token: CommitToken,
696-
response: Option<(PartitionProcessorRpcResponse, RpcReciprocal)>,
752+
callback: Option<Callback>,
697753
}
698754

699755
impl SelfAppendFuture {
700-
fn new(
701-
commit_token: CommitToken,
702-
success_response: PartitionProcessorRpcResponse,
703-
response_reciprocal: RpcReciprocal,
704-
) -> Self {
756+
fn new(commit_token: CommitToken, callback: impl Into<Callback>) -> Self {
705757
Self {
706758
commit_token,
707-
response: Some((success_response, response_reciprocal)),
759+
callback: Some(callback.into()),
708760
}
709761
}
710762

711763
fn fail_with_internal(&mut self) {
712-
if let Some((_, reciprocal)) = self.response.take() {
713-
reciprocal.send(Err(PartitionProcessorRpcError::Internal(
764+
if let Some(callback) = self.callback.take() {
765+
callback.call(Err(PartitionProcessorRpcError::Internal(
714766
"error when proposing to bifrost".to_string(),
715767
)));
716768
}
717769
}
718770

719771
fn fail_with_lost_leadership(&mut self, this_partition_id: PartitionId) {
720-
if let Some((_, reciprocal)) = self.response.take() {
721-
reciprocal.send(Err(PartitionProcessorRpcError::LostLeadership(
772+
if let Some(callback) = self.callback.take() {
773+
callback.call(Err(PartitionProcessorRpcError::LostLeadership(
722774
this_partition_id,
723775
)));
724776
}
725777
}
726778

727779
fn succeed_with_appended(&mut self) {
728-
if let Some((success_response, reciprocal)) = self.response.take() {
729-
reciprocal.send(Ok(success_response));
780+
if let Some(callback) = self.callback.take() {
781+
callback.call(Ok(()))
730782
}
731783
}
732784
}

crates/worker/src/partition/leadership/mod.rs

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,14 +49,15 @@ use restate_types::errors::GenericError;
4949
use restate_types::identifiers::{InvocationId, PartitionKey, PartitionProcessorRpcRequestId};
5050
use restate_types::identifiers::{LeaderEpoch, PartitionLeaderEpoch};
5151
use restate_types::message::MessageIndex;
52+
use restate_types::net::ingress::IngestRecord;
5253
use restate_types::net::partition_processor::{
5354
PartitionProcessorRpcError, PartitionProcessorRpcResponse,
5455
};
5556
use restate_types::partitions::Partition;
5657
use restate_types::partitions::state::PartitionReplicaSetStates;
5758
use restate_types::retries::with_jitter;
5859
use restate_types::schema::Schema;
59-
use restate_types::storage::StorageEncodeError;
60+
use restate_types::storage::{StorageDecodeError, StorageEncodeError};
6061
use restate_vqueues::{SchedulerService, VQueuesMeta, VQueuesMetaMut};
6162
use restate_wal_protocol::Command;
6263
use restate_wal_protocol::control::{AnnounceLeader, PartitionDurability};
@@ -86,7 +87,9 @@ pub(crate) enum Error {
8687
#[error("failed writing to bifrost: {0}")]
8788
Bifrost(#[from] restate_bifrost::Error),
8889
#[error("failed serializing payload: {0}")]
89-
Codec(#[from] StorageEncodeError),
90+
Encode(#[from] StorageEncodeError),
91+
#[error("failed deserializing payload: {0}")]
92+
Decode(#[from] StorageDecodeError),
9093
#[error(transparent)]
9194
Shutdown(#[from] ShutdownError),
9295
#[error("error when self proposing")]
@@ -648,6 +651,26 @@ impl<I> LeadershipState<I> {
648651
}
649652
}
650653
}
654+
655+
/// propose to this partition
656+
pub async fn propose_many_with_callback<F>(
657+
&mut self,
658+
records: impl ExactSizeIterator<Item = IngestRecord>,
659+
callback: F,
660+
) where
661+
F: FnOnce(Result<(), PartitionProcessorRpcError>) + Send + Sync + 'static,
662+
{
663+
match &mut self.state {
664+
State::Follower | State::Candidate { .. } => callback(Err(
665+
PartitionProcessorRpcError::NotLeader(self.partition.partition_id),
666+
)),
667+
State::Leader(leader_state) => {
668+
leader_state
669+
.propose_many_with_callback(records, callback)
670+
.await;
671+
}
672+
}
673+
}
651674
}
652675
#[derive(Debug, derive_more::From)]
653676
struct TimerReader(PartitionStore);

crates/worker/src/partition/leadership/self_proposer.rs

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ use futures::never::Never;
1414

1515
use restate_bifrost::{Bifrost, CommitToken, ErrorRecoveryStrategy};
1616
use restate_storage_api::deduplication_table::{DedupInformation, EpochSequenceNumber};
17-
use restate_types::{identifiers::PartitionKey, logs::LogId};
17+
use restate_types::{
18+
identifiers::PartitionKey, logs::LogId, net::ingress::IngestRecord, storage::StorageCodec,
19+
};
1820
use restate_wal_protocol::{Command, Destination, Envelope, Header, Source};
1921

2022
use crate::partition::leadership::Error;
@@ -149,6 +151,44 @@ impl SelfProposer {
149151
Ok(commit_token)
150152
}
151153

154+
pub async fn propose_many_with_notification(
155+
&mut self,
156+
records: impl ExactSizeIterator<Item = IngestRecord>,
157+
) -> Result<CommitToken, Error> where {
158+
let sender = self.bifrost_appender.sender();
159+
160+
// This is ideally should be implemented
161+
// by using `sender.enqueue_many`
162+
// but since we have no guarantee over the
163+
// underlying channel size. a `reserve_many()` might
164+
// block forever waiting for n permits that will
165+
// never be available.
166+
//
167+
// sender
168+
// .enqueue_many(records)
169+
// .await
170+
// .map_err(|_| Error::SelfProposer)?;
171+
//
172+
// so instead we do this.
173+
174+
for mut record in records {
175+
// todo: unfortunately we need to decode tha pyaload first although
176+
// the appended will need to encode it eventually. Maybe if there
177+
// is a way to pass the raw encoded data directly to the appender
178+
let envelope = StorageCodec::decode(&mut record.record)?;
179+
180+
sender
181+
.enqueue(Arc::new(envelope))
182+
.await
183+
.map_err(|_| Error::SelfProposer)?;
184+
}
185+
186+
sender
187+
.notify_committed()
188+
.await
189+
.map_err(|_| Error::SelfProposer)
190+
}
191+
152192
fn create_header(&mut self, partition_key: PartitionKey) -> Header {
153193
let esn = self.epoch_sequence_number;
154194
self.epoch_sequence_number = self.epoch_sequence_number.next();

crates/worker/src/partition/mod.rs

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use tracing::{Span, debug, error, info, instrument, trace, warn};
3131

3232
use restate_bifrost::loglet::FindTailOptions;
3333
use restate_bifrost::{Bifrost, LogEntry, MaybeRecord};
34-
use restate_core::network::{Oneshot, Reciprocal, ServiceMessage, Verdict};
34+
use restate_core::network::{Incoming, Oneshot, Reciprocal, Rpc, ServiceMessage, Verdict};
3535
use restate_core::{Metadata, ShutdownError, cancellation_watcher, my_node_id};
3636
use restate_invoker_api::capacity::InvokerCapacity;
3737
use restate_partition_store::{PartitionStore, PartitionStoreTransaction};
@@ -48,6 +48,7 @@ use restate_types::config::Configuration;
4848
use restate_types::identifiers::LeaderEpoch;
4949
use restate_types::logs::{KeyFilter, Lsn, Record, SequenceNumber};
5050
use restate_types::net::RpcRequest;
51+
use restate_types::net::ingress::{IngestResponse, ReceivedIngestRequest};
5152
use restate_types::net::partition_processor::{
5253
PartitionLeaderService, PartitionProcessorRpcError, PartitionProcessorRpcRequest,
5354
PartitionProcessorRpcResponse,
@@ -474,15 +475,7 @@ where
474475
self.status.effective_mode = self.leadership_state.effective_mode();
475476
}
476477
Some(msg) = self.network_leader_svc_rx.recv() => {
477-
match msg {
478-
ServiceMessage::Rpc(msg) if msg.msg_type() == PartitionProcessorRpcRequest::TYPE => {
479-
let msg = msg.into_typed::<PartitionProcessorRpcRequest>();
480-
// note: split() decodes the payload
481-
let (response_tx, body) = msg.split();
482-
self.on_rpc(response_tx, body, &mut partition_store, live_schemas.live_load()).await;
483-
}
484-
msg => { msg.fail(Verdict::MessageUnrecognized); }
485-
}
478+
self.on_rpc(msg, &mut partition_store, live_schemas.live_load()).await;
486479
}
487480
_ = status_update_timer.tick() => {
488481
if durable_lsn_watch.has_changed().map_err(|e| ProcessorError::Other(e.into()))? {
@@ -611,7 +604,7 @@ where
611604
Ok(())
612605
}
613606

614-
async fn on_rpc(
607+
async fn on_pp_rpc_request(
615608
&mut self,
616609
response_tx: Reciprocal<
617610
Oneshot<Result<PartitionProcessorRpcResponse, PartitionProcessorRpcError>>,
@@ -627,6 +620,57 @@ where
627620
)
628621
.await;
629622
}
623+
624+
async fn on_rpc(
625+
&mut self,
626+
msg: ServiceMessage<PartitionLeaderService>,
627+
partition_store: &mut PartitionStore,
628+
schemas: &Schema,
629+
) {
630+
match msg {
631+
ServiceMessage::Rpc(msg) if msg.msg_type() == PartitionProcessorRpcRequest::TYPE => {
632+
let msg = msg.into_typed::<PartitionProcessorRpcRequest>();
633+
// note: split() decodes the payload
634+
let (response_tx, body) = msg.split();
635+
self.on_pp_rpc_request(response_tx, body, partition_store, schemas)
636+
.await;
637+
}
638+
ServiceMessage::Rpc(msg) if msg.msg_type() == ReceivedIngestRequest::TYPE => {
639+
self.on_pp_ingest_request(msg.into_typed()).await;
640+
}
641+
msg => {
642+
msg.fail(Verdict::MessageUnrecognized);
643+
}
644+
}
645+
}
646+
647+
async fn on_pp_ingest_request(&mut self, msg: Incoming<Rpc<ReceivedIngestRequest>>) {
648+
let (reciprocal, request) = msg.split();
649+
self.leadership_state
650+
.propose_many_with_callback(
651+
request.records.into_iter(),
652+
|result: Result<(), PartitionProcessorRpcError>| match result {
653+
Ok(_) => reciprocal.send(IngestResponse::Ack),
654+
Err(err) => match err {
655+
PartitionProcessorRpcError::NotLeader(id)
656+
| PartitionProcessorRpcError::LostLeadership(id) => {
657+
reciprocal.send(IngestResponse::NotLeader { of: id })
658+
}
659+
PartitionProcessorRpcError::Starting => {
660+
reciprocal.send(IngestResponse::Starting)
661+
}
662+
PartitionProcessorRpcError::Stopping => {
663+
reciprocal.send(IngestResponse::Stopping)
664+
}
665+
PartitionProcessorRpcError::Internal(msg) => {
666+
reciprocal.send(IngestResponse::Internal { msg })
667+
}
668+
},
669+
},
670+
)
671+
.await;
672+
}
673+
630674
async fn maybe_advance<'a>(
631675
&mut self,
632676
maybe_record: LogEntry,

0 commit comments

Comments
 (0)