Skip to content

Commit

Permalink
PartitionId as NewType
Browse files Browse the repository at this point in the history
  • Loading branch information
AhmedSoliman committed Apr 30, 2024
1 parent cbf3e90 commit f2b330d
Show file tree
Hide file tree
Showing 15 changed files with 147 additions and 63 deletions.
6 changes: 3 additions & 3 deletions crates/invoker-impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,7 @@ mod tests {
use restate_invoker_api::{entry_enricher, ServiceHandle};
use restate_schema_api::deployment::mocks::MockDeploymentMetadataRegistry;
use restate_test_util::{check, let_assert};
use restate_types::identifiers::LeaderEpoch;
use restate_types::identifiers::{LeaderEpoch, PartitionId};
use restate_types::journal::enriched::EnrichedEntryHeader;
use restate_types::journal::raw::RawEntry;
use restate_types::retries::RetryPolicy;
Expand All @@ -1044,7 +1044,7 @@ mod tests {

// -- Mocks

const MOCK_PARTITION: PartitionLeaderEpoch = (0, LeaderEpoch::INITIAL);
const MOCK_PARTITION: PartitionLeaderEpoch = (PartitionId::MIN, LeaderEpoch::INITIAL);

impl<ITR, SR> ServiceInner<ITR, SR>
where
Expand Down Expand Up @@ -1177,7 +1177,7 @@ mod tests {
)
.unwrap();

let partition_leader_epoch = (0, LeaderEpoch::INITIAL);
let partition_leader_epoch = (PartitionId::from(0), LeaderEpoch::INITIAL);
let invocation_target = InvocationTarget::mock_service();
let invocation_id = InvocationId::generate(&invocation_target);

Expand Down
14 changes: 9 additions & 5 deletions crates/partition-store/benches/basic_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use restate_storage_api::deduplication_table::{
use restate_storage_api::Transaction;
use restate_types::arc_util::Constant;
use restate_types::config::{CommonOptions, WorkerOptions};
use restate_types::identifiers::PartitionKey;
use restate_types::identifiers::{PartitionId, PartitionKey};
use tokio::runtime::Builder;

async fn writing_to_rocksdb(mut rocksdb: PartitionStore) {
Expand All @@ -30,8 +30,12 @@ async fn writing_to_rocksdb(mut rocksdb: PartitionStore) {
for i in 0..100000 {
let mut txn = rocksdb.transaction();
for j in 0..10 {
txn.put_dedup_seq_number(i, ProducerId::Partition(j), DedupSequenceNumber::Sn(0))
.await;
txn.put_dedup_seq_number(
PartitionId::from(i),
ProducerId::Partition(PartitionId::from(j)),
DedupSequenceNumber::Sn(0),
)
.await;
}
txn.commit().await.unwrap();
}
Expand All @@ -56,13 +60,13 @@ fn basic_writing_reading_benchmark(c: &mut Criterion) {
let manager = PartitionStoreManager::create(
Constant::new(worker_options.storage.clone()),
Constant::new(worker_options.storage.rocksdb.clone()),
&[(0, RangeInclusive::new(0, PartitionKey::MAX))],
&[(PartitionId::MIN, RangeInclusive::new(0, PartitionKey::MAX))],
)
.await
.expect("DB creation succeeds");
manager
.open_partition_store(
0,
PartitionId::MIN,
RangeInclusive::new(0, PartitionKey::MAX),
OpenMode::CreateIfMissing,
&worker_options.storage.rocksdb,
Expand Down
17 changes: 16 additions & 1 deletion crates/partition-store/src/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ use crate::TableKind;
pub(crate) use define_table_key;
use restate_storage_api::deduplication_table::ProducerId;
use restate_storage_api::StorageError;
use restate_types::identifiers::InvocationUuid;
use restate_types::identifiers::{InvocationUuid, PartitionId};

pub(crate) trait KeyCodec: Sized {
fn encode<B: BufMut>(&self, target: &mut B);
Expand Down Expand Up @@ -347,6 +347,21 @@ impl KeyCodec for ByteString {
}
}

impl KeyCodec for PartitionId {
fn encode<B: BufMut>(&self, target: &mut B) {
// store u64 in big-endian order to support byte-wise increment operation. See `crate::scan::try_increment`.
target.put_u64(**self);
}

fn decode<B: Buf>(source: &mut B) -> crate::Result<Self> {
Ok(PartitionId::from(source.get_u64()))
}

fn serialized_length(&self) -> usize {
std::mem::size_of::<Self>()
}
}

impl KeyCodec for u64 {
fn encode<B: BufMut>(&self, target: &mut B) {
// store u64 in big-endian order to support byte-wise increment operation. See `crate::scan::try_increment`.
Expand Down
12 changes: 6 additions & 6 deletions crates/partition-store/src/timer_table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ mod tests {
timestamp: 87654321,
};

let key_bytes = write_timer_key(1337, &key).serialize();
let key_bytes = write_timer_key(PartitionId::from(1337), &key).serialize();
let got = timer_key_from_key_slice(&key_bytes).expect("should not fail");

assert_eq!(got, key);
Expand Down Expand Up @@ -249,13 +249,13 @@ mod tests {

#[track_caller]
fn assert_in_range(key_a: TimerKey, key_b: TimerKey) {
let key_a_bytes = write_timer_key(1, &key_a).serialize();
let key_b_bytes = write_timer_key(1, &key_b).serialize();
let key_a_bytes = write_timer_key(PartitionId::from(1), &key_a).serialize();
let key_b_bytes = write_timer_key(PartitionId::from(1), &key_b).serialize();

assert!(less_than(&key_a_bytes, &key_b_bytes));

let (low, high) = match exclusive_start_key_range(1, Some(&key_a)) {
TableScan::KeyRangeInclusiveInSinglePartition(1, low, high) => (low, high),
let (low, high) = match exclusive_start_key_range(PartitionId::from(1), Some(&key_a)) {
TableScan::KeyRangeInclusiveInSinglePartition(p, low, high) if *p == 1 => (low, high),
_ => panic!(""),
};
let low = low.serialize();
Expand All @@ -279,7 +279,7 @@ mod tests {
let mut timer_keys: Vec<_> = (0..100).map(|idx| (idx, random_timer_key())).collect();
let mut binary_timer_keys: Vec<_> = timer_keys
.iter()
.map(|(idx, key)| (*idx, write_timer_key(1, key).serialize()))
.map(|(idx, key)| (*idx, write_timer_key(PartitionId::from(1), key).serialize()))
.collect();

timer_keys.sort_by(|(_, key), (_, other_key)| key.cmp(other_key));
Expand Down
16 changes: 9 additions & 7 deletions crates/partition-store/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,24 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::RangeInclusive;
use std::pin::pin;

use bytes::Bytes;
use futures::Stream;
use tokio_stream::StreamExt;

use restate_core::TaskCenterBuilder;
use restate_partition_store::{OpenMode, PartitionStore, PartitionStoreManager};
use restate_rocksdb::RocksDbManager;
use restate_storage_api::StorageError;
use restate_types::arc_util::Constant;
use restate_types::config::{CommonOptions, WorkerOptions};
use restate_types::identifiers::{InvocationId, PartitionKey, ServiceId};
use restate_types::identifiers::{InvocationId, PartitionId, PartitionKey, ServiceId};
use restate_types::invocation::{InvocationTarget, ServiceInvocation, Source, SpanRelation};
use restate_types::state_mut::ExternalStateMutation;
use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::RangeInclusive;
use std::pin::pin;
use tokio_stream::StreamExt;

mod idempotency_table_test;
mod inbox_table_test;
Expand Down Expand Up @@ -56,7 +58,7 @@ async fn storage_test_environment() -> PartitionStore {
// A single partition store that spans all keys.
manager
.open_partition_store(
0,
PartitionId::MIN,
RangeInclusive::new(0, PartitionKey::MAX - 1),
OpenMode::CreateIfMissing,
&worker_options.storage.rocksdb,
Expand Down
29 changes: 20 additions & 9 deletions crates/partition-store/tests/outbox_table_test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,35 +12,46 @@ use crate::mock_random_service_invocation;
use restate_partition_store::PartitionStore;
use restate_storage_api::outbox_table::{OutboxMessage, OutboxTable};
use restate_storage_api::Transaction;
use restate_types::identifiers::PartitionId;

fn mock_outbox_message() -> OutboxMessage {
OutboxMessage::ServiceInvocation(mock_random_service_invocation())
}

pub(crate) async fn populate_data<T: OutboxTable>(txn: &mut T) {
txn.add_message(1337, 0, mock_outbox_message()).await;
txn.add_message(1337, 1, mock_outbox_message()).await;
txn.add_message(1337, 2, mock_outbox_message()).await;
txn.add_message(1337, 3, mock_outbox_message()).await;
let partition1337 = PartitionId::from(1337);
let partition1336 = PartitionId::from(1336);
txn.add_message(partition1337, 0, mock_outbox_message())
.await;
txn.add_message(partition1337, 1, mock_outbox_message())
.await;
txn.add_message(partition1337, 2, mock_outbox_message())
.await;
txn.add_message(partition1337, 3, mock_outbox_message())
.await;

// add a successor and a predecessor partitions
txn.add_message(1336, 0, mock_outbox_message()).await;
txn.add_message(1338, 0, mock_outbox_message()).await;
txn.add_message(partition1336, 0, mock_outbox_message())
.await;
txn.add_message(PartitionId::from(1338), 0, mock_outbox_message())
.await;
}

pub(crate) async fn consume_message_and_truncate<T: OutboxTable>(txn: &mut T) {
let partition1337 = PartitionId::from(1337);
let mut sequence = 0;
while let Ok(Some((seq, _))) = txn.get_next_outbox_message(1337, sequence).await {
while let Ok(Some((seq, _))) = txn.get_next_outbox_message(partition1337, sequence).await {
sequence = seq + 1;
}
assert_eq!(sequence, 4);

txn.truncate_outbox(1337, 0..sequence).await;
txn.truncate_outbox(partition1337, 0..sequence).await;
}

pub(crate) async fn verify_outbox_is_empty_after_truncation<T: OutboxTable>(txn: &mut T) {
let partition1337 = PartitionId::from(1337);
let result = txn
.get_next_outbox_message(1337, 0)
.get_next_outbox_message(partition1337, 0)
.await
.expect("should not fail");

Expand Down
23 changes: 13 additions & 10 deletions crates/partition-store/tests/timer_table_test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,18 @@ use futures_util::StreamExt;
use restate_partition_store::PartitionStore;
use restate_storage_api::timer_table::{Timer, TimerKey, TimerTable};
use restate_storage_api::Transaction;
use restate_types::identifiers::{InvocationUuid, ServiceId};
use restate_types::identifiers::{InvocationUuid, PartitionId, ServiceId};
use restate_types::invocation::ServiceInvocation;
use std::pin::pin;

const FIXTURE_INVOCATION: InvocationUuid =
InvocationUuid::from_parts(1706027034946, 12345678900001);

const PARTITION1337: PartitionId = PartitionId::new_unchecked(1337);

async fn populate_data<T: TimerTable>(txn: &mut T) {
txn.add_timer(
1337,
PARTITION1337,
&TimerKey {
invocation_uuid: FIXTURE_INVOCATION,
journal_index: 0,
Expand All @@ -33,7 +35,7 @@ async fn populate_data<T: TimerTable>(txn: &mut T) {
.await;

txn.add_timer(
1337,
PARTITION1337,
&TimerKey {
invocation_uuid: FIXTURE_INVOCATION,
journal_index: 1,
Expand All @@ -47,7 +49,7 @@ async fn populate_data<T: TimerTable>(txn: &mut T) {
..mock_service_invocation(ServiceId::new("svc-2", "key-2"))
};
txn.add_timer(
1337,
PARTITION1337,
&TimerKey {
invocation_uuid: FIXTURE_INVOCATION,
journal_index: 2,
Expand All @@ -61,7 +63,7 @@ async fn populate_data<T: TimerTable>(txn: &mut T) {
// add a successor and a predecessor partitions
//
txn.add_timer(
1336,
PARTITION1337,
&TimerKey {
invocation_uuid: FIXTURE_INVOCATION,
journal_index: 0,
Expand All @@ -72,7 +74,7 @@ async fn populate_data<T: TimerTable>(txn: &mut T) {
.await;

txn.add_timer(
1338,
PartitionId::from(1338),
&TimerKey {
invocation_uuid: FIXTURE_INVOCATION,
journal_index: 0,
Expand All @@ -84,7 +86,7 @@ async fn populate_data<T: TimerTable>(txn: &mut T) {
}

async fn demo_how_to_find_first_timers_in_a_partition<T: TimerTable>(txn: &mut T) {
let mut stream = pin!(txn.next_timers_greater_than(1337, None, usize::MAX));
let mut stream = pin!(txn.next_timers_greater_than(PARTITION1337, None, usize::MAX));

let mut count = 0;
while stream.next().await.is_some() {
Expand All @@ -100,7 +102,7 @@ async fn find_timers_greater_than<T: TimerTable>(txn: &mut T) {
journal_index: 0,
timestamp: 0,
};
let mut stream = pin!(txn.next_timers_greater_than(1337, Some(timer_key), usize::MAX));
let mut stream = pin!(txn.next_timers_greater_than(PARTITION1337, Some(timer_key), usize::MAX));

if let Some(Ok((key, _))) = stream.next().await {
// make sure that we skip the first timer that has a journal_index of 0
Expand All @@ -119,7 +121,7 @@ async fn find_timers_greater_than<T: TimerTable>(txn: &mut T) {

async fn delete_the_first_timer<T: TimerTable>(txn: &mut T) {
txn.delete_timer(
1337,
PARTITION1337,
&TimerKey {
invocation_uuid: FIXTURE_INVOCATION,
journal_index: 0,
Expand All @@ -135,7 +137,8 @@ async fn verify_next_timer_after_deletion<T: TimerTable>(txn: &mut T) {
journal_index: 0,
timestamp: 0,
};
let mut stream = pin!(txn.next_timers_greater_than(1337, Some(timer_key), usize::MAX,));
let mut stream =
pin!(txn.next_timers_greater_than(PARTITION1337, Some(timer_key), usize::MAX,));

if let Some(Ok((key, _))) = stream.next().await {
// make sure that we skip the first timer
Expand Down
6 changes: 3 additions & 3 deletions crates/storage-query-datafusion/src/mocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ struct MockPartitionSelector;
#[async_trait]
impl SelectPartitions for MockPartitionSelector {
async fn get_live_partitions(&self) -> Result<Vec<PartitionId>, GenericError> {
Ok(vec![0])
Ok(vec![PartitionId::MIN])
}
}

Expand All @@ -110,13 +110,13 @@ impl MockQueryEngine {
let manager = PartitionStoreManager::create(
Constant::new(worker_options.storage.clone()),
Constant::new(worker_options.storage.rocksdb.clone()),
&[(0, RangeInclusive::new(0, PartitionKey::MAX))],
&[(PartitionId::MIN, RangeInclusive::new(0, PartitionKey::MAX))],
)
.await
.expect("DB creation succeeds");
let partition_store = manager
.open_partition_store(
0,
PartitionId::MIN,
PartitionKey::MIN..=PartitionKey::MAX,
OpenMode::OpenExisting,
&worker_options.storage.rocksdb,
Expand Down
6 changes: 4 additions & 2 deletions crates/storage-query-datafusion/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ use restate_storage_api::invocation_status_table::{
};
use restate_storage_api::Transaction;
use restate_types::errors::InvocationError;
use restate_types::identifiers::{DeploymentId, InvocationId, PartitionLeaderEpoch};
use restate_types::identifiers::LeaderEpoch;
use restate_types::identifiers::PartitionId;
use restate_types::identifiers::{DeploymentId, InvocationId};
use restate_types::invocation::InvocationTarget;
use restate_types::journal::EntryType;
use std::time::{Duration, SystemTime};
Expand All @@ -46,7 +48,7 @@ async fn query_sys_invocation() {
MockQueryEngine::create_with(
MockStatusHandle::default().with(InvocationStatusReport::new(
invocation_id,
PartitionLeaderEpoch::default(),
(PartitionId::MIN, LeaderEpoch::INITIAL),
InvocationStatusReportInner {
in_flight: false,
start_count: 1,
Expand Down
10 changes: 5 additions & 5 deletions crates/types/src/epoch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,24 +72,24 @@ flexbuffers_storage_encode_decode!(EpochMetadata);
#[cfg(test)]
mod tests {
use crate::epoch::EpochMetadata;
use crate::identifiers::LeaderEpoch;
use crate::identifiers::{LeaderEpoch, PartitionId};
use crate::GenerationalNodeId;

#[test]
fn basic_operations() {
let node_id = GenerationalNodeId::new(1, 1);
let other_node_id = GenerationalNodeId::new(2, 1);

let epoch = EpochMetadata::new(node_id, 0);
let epoch = EpochMetadata::new(node_id, PartitionId::from(0));

assert_eq!(epoch.epoch(), LeaderEpoch::INITIAL);
assert_eq!(epoch.partition_id(), 0);
assert_eq!(epoch.partition_id(), PartitionId::from(0));
assert_eq!(epoch.node_id(), node_id);

let next_epoch = epoch.claim_leadership(other_node_id, 1);
let next_epoch = epoch.claim_leadership(other_node_id, PartitionId::from(1));

assert_eq!(next_epoch.epoch(), LeaderEpoch::from(2));
assert_eq!(next_epoch.partition_id(), 1);
assert_eq!(next_epoch.partition_id(), PartitionId::from(1));
assert_eq!(next_epoch.node_id(), other_node_id);
}
}
Loading

0 comments on commit f2b330d

Please sign in to comment.