Skip to content

Commit 99563a4

Browse files
Add some unit test coverage of journal table v2 feature, especially the problematic situations.
Fix some incorrectly handled corner cases, such as deletion of journal when the pinned deployment is not set yet.
1 parent c0c0f95 commit 99563a4

File tree

9 files changed

+308
-90
lines changed

9 files changed

+308
-90
lines changed

crates/worker/src/partition/state_machine/entries/mod.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -405,9 +405,11 @@ impl<CMD> ApplyJournalCommandEffect<'_, CMD> {
405405

406406
#[cfg(test)]
407407
mod tests {
408+
use crate::partition::state_machine::ExperimentalFeature;
408409
use crate::partition::state_machine::tests::fixtures::invoker_entry_effect;
409410
use crate::partition::state_machine::tests::{TestEnv, fixtures, matchers};
410411
use bytes::Bytes;
412+
use enumset::EnumSet;
411413
use googletest::prelude::*;
412414
use restate_storage_api::invocation_status_table::ReadInvocationStatusTable;
413415
use restate_types::identifiers::{InvocationId, ServiceId};
@@ -416,10 +418,15 @@ mod tests {
416418
};
417419
use restate_types::journal_v2::{CallCommand, CallRequest};
418420
use restate_wal_protocol::Command;
421+
use rstest::rstest;
419422

423+
#[rstest]
420424
#[restate_core::test]
421-
async fn update_journal_and_commands_length() {
422-
let mut test_env = TestEnv::create().await;
425+
async fn update_journal_and_commands_length(
426+
#[values(ExperimentalFeature::UseJournalTableV2AsDefault.into(), EnumSet::empty())]
427+
features: EnumSet<ExperimentalFeature>,
428+
) {
429+
let mut test_env = TestEnv::create_with_experimental_features(features).await;
423430
let invocation_id = fixtures::mock_start_invocation(&mut test_env).await;
424431
fixtures::mock_pinned_deployment_v5(&mut test_env, invocation_id).await;
425432

crates/worker/src/partition/state_machine/entries/notification.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,9 +201,11 @@ where
201201
mod tests {
202202
use super::*;
203203

204+
use crate::partition::state_machine::ExperimentalFeature;
204205
use crate::partition::state_machine::tests::{TestEnv, fixtures, matchers};
205206
use bytes::Bytes;
206207
use bytestring::ByteString;
208+
use enumset::EnumSet;
207209
use googletest::prelude::*;
208210
use restate_service_protocol_v4::entry_codec::ServiceProtocolV4Codec;
209211
use restate_storage_api::invocation_status_table::{
@@ -262,9 +264,13 @@ mod tests {
262264
test_env.shutdown().await;
263265
}
264266

267+
#[rstest]
265268
#[restate_core::test]
266-
async fn notify_signal_received_before_pinned_deployment() {
267-
let mut test_env = TestEnv::create().await;
269+
async fn notify_signal_received_before_pinned_deployment(
270+
#[values(ExperimentalFeature::UseJournalTableV2AsDefault.into(), EnumSet::empty())]
271+
features: EnumSet<ExperimentalFeature>,
272+
) {
273+
let mut test_env = TestEnv::create_with_experimental_features(features).await;
268274
let invocation_id = fixtures::mock_start_invocation(&mut test_env).await;
269275

270276
// Send signal notification before pinned deployment

crates/worker/src/partition/state_machine/lifecycle/cancel.rs

Lines changed: 98 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -117,16 +117,17 @@ where
117117
mod tests {
118118
use super::*;
119119

120-
use crate::partition::state_machine::Action;
121120
use crate::partition::state_machine::tests::{TestEnv, fixtures, matchers};
121+
use crate::partition::state_machine::{Action, ExperimentalFeature};
122122
use crate::partition::types::InvokerEffectKind;
123123
use assert2::assert;
124-
use googletest::pat;
125-
use googletest::prelude::{assert_that, contains, eq, ge, not, some};
124+
use enumset::EnumSet;
125+
use googletest::prelude::*;
126126
use restate_invoker_api::Effect;
127127
use restate_storage_api::invocation_status_table::{
128128
InvocationStatus, ReadInvocationStatusTable,
129129
};
130+
use restate_storage_api::journal_table_v2;
130131
use restate_storage_api::outbox_table::ReadOutboxTable;
131132
use restate_types::deployment::PinnedDeployment;
132133
use restate_types::errors::CANCELED_INVOCATION_ERROR;
@@ -136,10 +137,11 @@ mod tests {
136137
InvocationTarget, InvocationTermination, JournalCompletionTarget, NotifySignalRequest,
137138
ResponseResult, ServiceInvocation, ServiceInvocationResponseSink,
138139
};
139-
use restate_types::journal_v2::CANCEL_SIGNAL;
140+
use restate_types::journal_v2::{CANCEL_SIGNAL, CommandType, Entry, EntryMetadata, EntryType};
140141
use restate_types::service_protocol::ServiceProtocolVersion;
141142
use restate_types::time::MillisSinceEpoch;
142143
use restate_wal_protocol::Command;
144+
use rstest::rstest;
143145

144146
#[restate_core::test]
145147
async fn cancel_invoked_invocation() {
@@ -191,7 +193,8 @@ mod tests {
191193
}
192194

193195
#[restate_core::test]
194-
async fn cancel_invoked_invocation_without_pinned_deployment() {
196+
async fn cancel_invoked_invocation_without_pinned_deployment_without_journal_table_v2_default()
197+
{
195198
let mut test_env = TestEnv::create().await;
196199
let invocation_id = fixtures::mock_start_invocation(&mut test_env).await;
197200

@@ -233,8 +236,46 @@ mod tests {
233236
}
234237

235238
#[restate_core::test]
236-
async fn cancel_scheduled_invocation_through_notify_signal() -> anyhow::Result<()> {
237-
let mut test_env = TestEnv::create().await;
239+
async fn cancel_invoked_invocation_without_pinned_deployment_with_journal_table_v2_default() {
240+
let mut test_env = TestEnv::create_with_experimental_features(
241+
ExperimentalFeature::UseJournalTableV2AsDefault.into(),
242+
)
243+
.await;
244+
let invocation_id = fixtures::mock_start_invocation(&mut test_env).await;
245+
246+
// Send signal notification before pinning the deployment
247+
let actions = test_env
248+
.apply(Command::NotifySignal(NotifySignalRequest {
249+
invocation_id,
250+
signal: CANCEL_SIGNAL.try_into().unwrap(),
251+
}))
252+
.await;
253+
assert_that!(
254+
actions,
255+
contains(matchers::actions::forward_notification(
256+
invocation_id,
257+
CANCEL_SIGNAL.clone()
258+
))
259+
);
260+
261+
assert_that!(
262+
test_env.read_journal_to_vec(invocation_id, 2).await,
263+
elements_are![
264+
property!(Entry.ty(), eq(EntryType::Command(CommandType::Input))),
265+
matchers::entry_eq(CANCEL_SIGNAL),
266+
]
267+
);
268+
269+
test_env.shutdown().await;
270+
}
271+
272+
#[rstest]
273+
#[restate_core::test]
274+
async fn cancel_scheduled_invocation_through_notify_signal(
275+
#[values(ExperimentalFeature::UseJournalTableV2AsDefault.into(), EnumSet::empty())]
276+
features: EnumSet<ExperimentalFeature>,
277+
) -> anyhow::Result<()> {
278+
let mut test_env = TestEnv::create_with_experimental_features(features).await;
238279

239280
let invocation_id = InvocationId::mock_random();
240281
let rpc_id = PartitionProcessorRpcRequestId::new();
@@ -277,13 +318,39 @@ mod tests {
277318
.await?;
278319
assert!(let InvocationStatus::Free = current_invocation_status);
279320

321+
// Both journal table v1 and v2 are empty
322+
assert!(
323+
journal_table::ReadJournalTable::get_journal_entry(
324+
&mut test_env.storage,
325+
&invocation_id,
326+
0
327+
)
328+
.await
329+
.unwrap()
330+
.is_none()
331+
);
332+
assert!(
333+
journal_table_v2::ReadJournalTable::get_journal_entry(
334+
&mut test_env.storage,
335+
invocation_id,
336+
0
337+
)
338+
.await
339+
.unwrap()
340+
.is_none()
341+
);
342+
280343
test_env.shutdown().await;
281344
Ok(())
282345
}
283346

347+
#[rstest]
284348
#[restate_core::test]
285-
async fn cancel_inboxed_invocation_through_notify_signal() -> anyhow::Result<()> {
286-
let mut test_env = TestEnv::create().await;
349+
async fn cancel_inboxed_invocation_through_notify_signal(
350+
#[values(ExperimentalFeature::UseJournalTableV2AsDefault.into(), EnumSet::empty())]
351+
features: EnumSet<ExperimentalFeature>,
352+
) -> anyhow::Result<()> {
353+
let mut test_env = TestEnv::create_with_experimental_features(features).await;
287354

288355
let invocation_target = InvocationTarget::mock_virtual_object();
289356
let invocation_id = InvocationId::mock_generate(&invocation_target);
@@ -335,6 +402,28 @@ mod tests {
335402
// assert that invocation status was removed
336403
assert!(let InvocationStatus::Free = current_invocation_status);
337404

405+
// Both journal table v1 and v2 are empty
406+
assert!(
407+
journal_table::ReadJournalTable::get_journal_entry(
408+
&mut test_env.storage,
409+
&inboxed_id,
410+
0
411+
)
412+
.await
413+
.unwrap()
414+
.is_none()
415+
);
416+
assert!(
417+
journal_table_v2::ReadJournalTable::get_journal_entry(
418+
&mut test_env.storage,
419+
inboxed_id,
420+
0
421+
)
422+
.await
423+
.unwrap()
424+
.is_none()
425+
);
426+
338427
assert_that!(
339428
actions,
340429
contains(

crates/worker/src/partition/state_machine/lifecycle/purge.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use restate_types::invocation::client::PurgeInvocationResponse;
2424
use restate_types::invocation::{
2525
InvocationMutationResponseSink, InvocationTargetType, WorkflowHandlerType,
2626
};
27-
use restate_types::service_protocol::ServiceProtocolVersion;
2827
use tracing::trace;
2928

3029
pub struct OnPurgeCommand {
@@ -57,10 +56,9 @@ where
5756
pinned_deployment,
5857
..
5958
}) => {
60-
let should_remove_journal_table_v2 =
61-
pinned_deployment.as_ref().is_some_and(|pinned_deployment| {
62-
pinned_deployment.service_protocol_version >= ServiceProtocolVersion::V4
63-
});
59+
let pinned_service_protocol_version = pinned_deployment
60+
.as_ref()
61+
.map(|pd| pd.service_protocol_version);
6462

6563
ctx.do_free_invocation(invocation_id)?;
6664

@@ -92,7 +90,7 @@ where
9290
ctx.do_drop_journal(
9391
invocation_id,
9492
journal_metadata.length,
95-
should_remove_journal_table_v2,
93+
pinned_service_protocol_version,
9694
)
9795
.await?;
9896
}

crates/worker/src/partition/state_machine/lifecycle/purge_journal.rs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use restate_storage_api::journal_table_v2::WriteJournalTable;
1818
use restate_types::identifiers::InvocationId;
1919
use restate_types::invocation::InvocationMutationResponseSink;
2020
use restate_types::invocation::client::PurgeInvocationResponse;
21-
use restate_types::service_protocol::ServiceProtocolVersion;
2221
use tracing::trace;
2322

2423
pub struct OnPurgeJournalCommand {
@@ -42,19 +41,17 @@ where
4241
} = self;
4342
match ctx.get_invocation_status(&invocation_id).await? {
4443
InvocationStatus::Completed(mut completed) => {
45-
let should_remove_journal_table_v2 = completed
44+
let pinned_service_protocol_version = completed
4645
.pinned_deployment
4746
.as_ref()
48-
.is_some_and(|pinned_deployment| {
49-
pinned_deployment.service_protocol_version >= ServiceProtocolVersion::V4
50-
});
47+
.map(|pd| pd.service_protocol_version);
5148

5249
// If journal is not empty, clean it up
5350
if completed.journal_metadata.length != 0 {
5451
ctx.do_drop_journal(
5552
invocation_id,
5653
completed.journal_metadata.length,
57-
should_remove_journal_table_v2,
54+
pinned_service_protocol_version,
5855
)
5956
.await?;
6057
}
@@ -113,6 +110,7 @@ mod tests {
113110
InvocationTarget, PurgeInvocationRequest, ServiceInvocation, ServiceInvocationResponseSink,
114111
};
115112
use restate_types::journal_v2::{CommandType, OutputCommand, OutputResult};
113+
use restate_types::service_protocol::ServiceProtocolVersion;
116114
use restate_wal_protocol::Command;
117115
use std::time::Duration;
118116

crates/worker/src/partition/state_machine/lifecycle/restart_as_new.rs

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,7 @@ where
280280
mod tests {
281281
use super::*;
282282

283+
use crate::partition::state_machine::ExperimentalFeature;
283284
use crate::partition::state_machine::tests::{TestEnv, fixtures, matchers};
284285
use googletest::prelude::*;
285286
use restate_storage_api::invocation_status_table::{
@@ -291,8 +292,8 @@ mod tests {
291292
};
292293
use restate_types::invocation::client::RestartAsNewInvocationResponse;
293294
use restate_types::invocation::{
294-
IngressInvocationResponseSink, InvocationTarget, NotifySignalRequest,
295-
RestartAsNewInvocationRequest, ServiceInvocation,
295+
IngressInvocationResponseSink, InvocationTarget, InvocationTermination,
296+
NotifySignalRequest, RestartAsNewInvocationRequest, ServiceInvocation, TerminationFlavor,
296297
};
297298
use restate_types::journal_v2::{
298299
CommandType, CompletionType, NotificationType, OutputCommand, OutputResult, Signal,
@@ -377,6 +378,82 @@ mod tests {
377378
test_env.shutdown().await;
378379
}
379380

381+
#[restate_core::test]
382+
async fn restart_killed_invocation() {
383+
// This works only when using journal table v2 as default!
384+
// The corner case with journal table v1 is handled by the rpc handler instead.
385+
let mut test_env = TestEnv::create_with_experimental_features(
386+
ExperimentalFeature::UseJournalTableV2AsDefault.into(),
387+
)
388+
.await;
389+
390+
// Start invocation, then kill it
391+
let invocation_target = InvocationTarget::mock_virtual_object();
392+
let original_invocation_id = InvocationId::generate(&invocation_target, None);
393+
let _ = test_env
394+
.apply_multiple([
395+
Command::Invoke(Box::new(ServiceInvocation {
396+
invocation_id: original_invocation_id,
397+
invocation_target: invocation_target.clone(),
398+
completion_retention_duration: Duration::from_secs(120),
399+
journal_retention_duration: Duration::from_secs(120),
400+
..ServiceInvocation::mock()
401+
})),
402+
Command::TerminateInvocation(InvocationTermination {
403+
invocation_id: original_invocation_id,
404+
flavor: TerminationFlavor::Kill,
405+
response_sink: None,
406+
}),
407+
])
408+
.await;
409+
410+
// Restart as new with copy_prefix_up_to_index_included = 0
411+
let new_id = InvocationId::mock_generate(&invocation_target);
412+
let request_id = PartitionProcessorRpcRequestId::new();
413+
let actions = test_env
414+
.apply(Command::RestartAsNewInvocation(
415+
RestartAsNewInvocationRequest {
416+
invocation_id: original_invocation_id,
417+
new_invocation_id: new_id,
418+
copy_prefix_up_to_index_included: 0,
419+
patch_deployment_id: None,
420+
response_sink: Some(InvocationMutationResponseSink::Ingress(
421+
IngressInvocationResponseSink { request_id },
422+
)),
423+
},
424+
))
425+
.await;
426+
427+
// We should invoke the new invocation and send OK back
428+
assert_that!(
429+
actions,
430+
all!(
431+
contains(matchers::actions::invoke_for_id(new_id)),
432+
contains(pat!(Action::ForwardRestartAsNewInvocationResponse {
433+
request_id: eq(request_id),
434+
response: eq(RestartAsNewInvocationResponse::Ok {
435+
new_invocation_id: new_id
436+
})
437+
}))
438+
)
439+
);
440+
441+
assert_that!(
442+
test_env
443+
.storage
444+
.get_invocation_status(&new_id)
445+
.await
446+
.unwrap(),
447+
all!(
448+
matchers::storage::is_variant(InvocationStatusDiscriminants::Invoked),
449+
matchers::storage::has_journal_length(1),
450+
matchers::storage::has_commands(1)
451+
)
452+
);
453+
454+
test_env.shutdown().await;
455+
}
456+
380457
#[restate_core::test]
381458
async fn restart_copy_input_only() {
382459
let mut test_env = TestEnv::create().await;

0 commit comments

Comments
 (0)