Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add deduplication layer to deduplicate shuffle messages #308

Merged
merged 4 commits into from
Apr 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/common/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,13 @@ pub type MessageIndex = u64;
#[derive(Debug, Clone, Copy)]
pub enum AckKind {
Acknowledge(MessageIndex),
Duplicate(MessageIndex),
Duplicate {
// Sequence number of the duplicate message.
seq_number: MessageIndex,
// Currently last known sequence number by the receiver for a producer.
// See `DeduplicatingStateMachine` for more details.
last_known_seq_number: MessageIndex,
tillrohrmann marked this conversation as resolved.
Show resolved Hide resolved
},
}

/// Milliseconds since the unix epoch
Expand Down
6 changes: 3 additions & 3 deletions src/ingress_grpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,12 @@ impl AckTarget {
#[derive(Debug)]
pub enum IngressInput {
Response(IngressResponseMessage),
MessageAck(AckKind),
MessageAck(MessageIndex),
}

impl IngressInput {
pub fn message_ack(ack_kind: AckKind) -> Self {
IngressInput::MessageAck(ack_kind)
pub fn message_ack(seq_number: MessageIndex) -> Self {
IngressInput::MessageAck(seq_number)
}

pub fn response(response: IngressResponseMessage) -> Self {
Expand Down
4 changes: 2 additions & 2 deletions src/worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::range_partitioner::RangePartitioner;
use crate::service_invocation_factory::DefaultServiceInvocationFactory;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use partition::ack::AckableCommand;
use partition::ack::AckCommand;
use partition::shuffle;
use restate_common::types::{IngressId, PartitionKey, PeerId, PeerTarget};
use restate_consensus::Consensus;
Expand All @@ -31,7 +31,7 @@ mod range_partitioner;
mod service_invocation_factory;
mod util;

type PartitionProcessorCommand = AckableCommand;
type PartitionProcessorCommand = AckCommand;
type ConsensusCommand = restate_consensus::Command<PartitionProcessorCommand>;
type ConsensusMsg = PeerTarget<PartitionProcessorCommand>;
type PartitionProcessor = partition::PartitionProcessor<
Expand Down
55 changes: 31 additions & 24 deletions src/worker/src/network_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use restate_common::types::PartitionKey;
use restate_network::{PartitionTable, PartitionTableError};

pub(super) type Network = restate_network::Network<
partition::AckableCommand,
partition::AckCommand,
shuffle::ShuffleInput,
shuffle::ShuffleOutput,
shuffle_integration::ShuffleToConsensus,
Expand All @@ -18,7 +18,7 @@ pub(super) type Network = restate_network::Network<
ingress_integration::IngressToShuffle,
restate_ingress_grpc::IngressInput,
partition::AckResponse,
partition::ShuffleAckResponse,
partition::ShuffleDeduplicationResponse,
partition::IngressAckResponse,
FixedPartitionTable,
>;
Expand Down Expand Up @@ -91,15 +91,15 @@ mod ingress_integration {
}
}

impl From<IngressToConsensus> for partition::AckableCommand {
impl From<IngressToConsensus> for partition::AckCommand {
fn from(ingress_to_consensus: IngressToConsensus) -> Self {
let IngressToConsensus {
service_invocation,
ingress_id,
msg_index,
} = ingress_to_consensus;

partition::AckableCommand::require_ack(
partition::AckCommand::ack(
partition::Command::Invocation(service_invocation),
partition::AckTarget::ingress(ingress_id, msg_index),
)
Expand Down Expand Up @@ -130,14 +130,15 @@ mod shuffle_integration {
use crate::partition::shuffle;
use bytes::Bytes;
use restate_common::traits::KeyedMessage;
use restate_common::types::{MessageIndex, PeerId, ResponseResult};
use restate_common::types::{MessageIndex, PartitionId, PeerId, ResponseResult};
use restate_ingress_grpc::{IngressError, IngressResponseMessage};
use restate_network::{ConsensusOrIngressTarget, TargetConsensusOrIngress};

#[derive(Debug)]
pub(crate) struct ShuffleToConsensus {
msg: shuffle::InvocationOrResponse,
shuffle_id: PeerId,
partition_id: PartitionId,
msg_index: MessageIndex,
}

Expand All @@ -154,29 +155,29 @@ mod shuffle_integration {
}
}

impl From<ShuffleToConsensus> for partition::AckableCommand {
impl From<ShuffleToConsensus> for partition::AckCommand {
fn from(value: ShuffleToConsensus) -> Self {
let ShuffleToConsensus {
msg,
shuffle_id,
partition_id,
msg_index,
} = value;

let ack_target = partition::AckTarget::shuffle(shuffle_id, msg_index);
let deduplication_source =
partition::DeduplicationSource::shuffle(shuffle_id, partition_id, msg_index);

match msg {
shuffle::InvocationOrResponse::Invocation(invocation) => {
partition::AckableCommand::require_ack(
partition::AckCommand::dedup(
partition::Command::Invocation(invocation),
ack_target,
)
}
shuffle::InvocationOrResponse::Response(response) => {
partition::AckableCommand::require_ack(
partition::Command::Response(response),
ack_target,
deduplication_source,
)
}
shuffle::InvocationOrResponse::Response(response) => partition::AckCommand::dedup(
partition::Command::Response(response),
deduplication_source,
),
}
}
}
Expand All @@ -190,12 +191,13 @@ mod shuffle_integration {

impl TargetConsensusOrIngress<ShuffleToConsensus, ShuffleToIngress> for shuffle::ShuffleOutput {
fn target(self) -> ConsensusOrIngressTarget<ShuffleToConsensus, ShuffleToIngress> {
let (shuffle_id, msg_index, target) = self.into_inner();
let (shuffle_id, partition_id, msg_index, target) = self.into_inner();

match target {
shuffle::ShuffleMessageDestination::PartitionProcessor(outbox_message) => {
ConsensusOrIngressTarget::Consensus(ShuffleToConsensus {
msg: outbox_message,
partition_id,
shuffle_id,
msg_index,
})
Expand Down Expand Up @@ -244,35 +246,40 @@ mod partition_integration {
use restate_common::types::PeerId;
use restate_network::{ShuffleOrIngressTarget, TargetShuffle, TargetShuffleOrIngress};

impl TargetShuffleOrIngress<partition::ShuffleAckResponse, partition::IngressAckResponse>
for partition::AckResponse
impl
TargetShuffleOrIngress<
partition::ShuffleDeduplicationResponse,
partition::IngressAckResponse,
> for partition::AckResponse
{
fn target(
self,
) -> ShuffleOrIngressTarget<partition::ShuffleAckResponse, partition::IngressAckResponse>
{
) -> ShuffleOrIngressTarget<
partition::ShuffleDeduplicationResponse,
partition::IngressAckResponse,
> {
match self {
partition::AckResponse::Shuffle(ack) => ShuffleOrIngressTarget::Shuffle(ack),
partition::AckResponse::Ingress(ack) => ShuffleOrIngressTarget::Ingress(ack),
}
}
}

impl From<partition::ShuffleAckResponse> for shuffle::ShuffleInput {
fn from(value: partition::ShuffleAckResponse) -> Self {
impl From<partition::ShuffleDeduplicationResponse> for shuffle::ShuffleInput {
fn from(value: partition::ShuffleDeduplicationResponse) -> Self {
shuffle::ShuffleInput(value.kind)
}
}

impl TargetShuffle for partition::ShuffleAckResponse {
impl TargetShuffle for partition::ShuffleDeduplicationResponse {
fn shuffle_target(&self) -> PeerId {
self.shuffle_target
}
}

impl From<partition::IngressAckResponse> for restate_ingress_grpc::IngressInput {
fn from(value: partition::IngressAckResponse) -> Self {
restate_ingress_grpc::IngressInput::message_ack(value.kind)
restate_ingress_grpc::IngressInput::message_ack(value.seq_number)
}
}
}
123 changes: 89 additions & 34 deletions src/worker/src/partition/ack.rs
Original file line number Diff line number Diff line change
@@ -1,94 +1,149 @@
use crate::partition;
use restate_common::types::{AckKind, IngressId, MessageIndex, PeerId};
use restate_common::types::{AckKind, IngressId, MessageIndex, PartitionId, PeerId};

/// Envelope for [`partition::Command`] that might require an explicit acknowledge.
#[derive(Debug)]
pub(crate) struct AckableCommand {
pub(crate) struct AckCommand {
cmd: partition::Command,
ack_target: Option<AckTarget>,
ack_mode: AckMode,
}

impl AckableCommand {
pub(crate) fn require_ack(cmd: partition::Command, ack_target: AckTarget) -> Self {
#[derive(Debug)]
pub(crate) enum AckMode {
Ack(AckTarget),
Dedup(DeduplicationSource),
None,
}

impl AckCommand {
/// Create a command that requires an acknowledgement upon reception.
pub(crate) fn ack(cmd: partition::Command, ack_target: AckTarget) -> Self {
Self {
cmd,
ack_mode: AckMode::Ack(ack_target),
}
}

/// Create a command that should be de-duplicated with respect to the `producer_id` and the
tillrohrmann marked this conversation as resolved.
Show resolved Hide resolved
/// `seq_number` by the receiver.
pub(crate) fn dedup(
cmd: partition::Command,
deduplication_source: DeduplicationSource,
) -> Self {
Self {
cmd,
ack_target: Some(ack_target),
ack_mode: AckMode::Dedup(deduplication_source),
}
}

/// Create a command that should not be acknowledged.
pub(crate) fn no_ack(cmd: partition::Command) -> Self {
Self {
cmd,
ack_target: None,
ack_mode: AckMode::None,
}
}

pub(super) fn into_inner(self) -> (partition::Command, Option<AckTarget>) {
(self.cmd, self.ack_target)
pub(super) fn into_inner(self) -> (partition::Command, AckMode) {
(self.cmd, self.ack_mode)
}
}

#[derive(Debug)]
pub(crate) enum AckTarget {
pub(crate) enum DeduplicationSource {
Shuffle {
shuffle_target: PeerId,
msg_index: MessageIndex,
producing_partition_id: PartitionId,
shuffle_id: PeerId,
seq_number: MessageIndex,
},
}

impl DeduplicationSource {
pub(crate) fn shuffle(
shuffle_id: PeerId,
producing_partition_id: PartitionId,
seq_number: MessageIndex,
) -> Self {
DeduplicationSource::Shuffle {
shuffle_id,
producing_partition_id,
seq_number,
}
}

pub(crate) fn acknowledge(self) -> AckResponse {
match self {
DeduplicationSource::Shuffle {
shuffle_id,
seq_number,
..
} => AckResponse::Shuffle(ShuffleDeduplicationResponse {
shuffle_target: shuffle_id,
kind: AckKind::Acknowledge(seq_number),
}),
}
}

pub(crate) fn duplicate(self, last_known_seq_number: MessageIndex) -> AckResponse {
match self {
DeduplicationSource::Shuffle {
shuffle_id,
seq_number,
..
} => AckResponse::Shuffle(ShuffleDeduplicationResponse {
shuffle_target: shuffle_id,
kind: AckKind::Duplicate {
seq_number,
last_known_seq_number,
},
}),
}
}
}

#[derive(Debug)]
pub(crate) enum AckTarget {
Ingress {
ingress_id: IngressId,
msg_index: MessageIndex,
seq_number: MessageIndex,
},
}

impl AckTarget {
pub(crate) fn shuffle(shuffle_target: PeerId, msg_index: MessageIndex) -> Self {
AckTarget::Shuffle {
shuffle_target,
msg_index,
}
}

pub(crate) fn ingress(ingress_id: IngressId, msg_index: MessageIndex) -> Self {
pub(crate) fn ingress(ingress_id: IngressId, seq_number: MessageIndex) -> Self {
AckTarget::Ingress {
ingress_id,
msg_index,
seq_number,
}
}

pub(super) fn acknowledge(self) -> AckResponse {
match self {
AckTarget::Shuffle {
shuffle_target,
msg_index,
} => AckResponse::Shuffle(ShuffleAckResponse {
shuffle_target,
kind: AckKind::Acknowledge(msg_index),
}),
AckTarget::Ingress {
ingress_id,
msg_index,
seq_number,
} => AckResponse::Ingress(IngressAckResponse {
_ingress_id: ingress_id,
kind: AckKind::Acknowledge(msg_index),
seq_number,
}),
}
}
}

#[derive(Debug)]
pub(crate) enum AckResponse {
Shuffle(ShuffleAckResponse),
Shuffle(ShuffleDeduplicationResponse),
Ingress(IngressAckResponse),
}

#[derive(Debug)]
pub(crate) struct ShuffleAckResponse {
pub(crate) struct ShuffleDeduplicationResponse {
pub(crate) shuffle_target: PeerId,
pub(crate) kind: AckKind,
}

#[derive(Debug)]
pub(crate) struct IngressAckResponse {
pub(crate) _ingress_id: IngressId,
pub(crate) kind: AckKind,
pub(crate) seq_number: MessageIndex,
}
Loading