Skip to content

Commit 0f2b55d

Browse files
committed
Introduce version barrier for enabling journal v2 by default
The version barrier requiring min Restate version v1.6.0 will enable the journal v2 by default. It will be written when becoming leader and running at least v1.7.0. Writing the journal v2 by default changes the logic of the state machine. That's why we are guarding it with a version barrier so that during a rolling upgrade the state machine state won't diverge. A concrete problem is that Restart as new with a non-zero prefix would fail on a node that didn't store the journal entries in the journal v2 table.
1 parent e6d50e5 commit 0f2b55d

File tree

15 files changed

+212
-109
lines changed

15 files changed

+212
-109
lines changed

.github/workflows/ci.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ jobs:
141141
serviceImage: "ghcr.io/restatedev/test-services-java:main"
142142
testArtifactOutput: sdk-java-integration-test-journal-table-v2
143143
envVars: |
144-
RESTATE_EXPERIMENTAL_FEATURE__USE_JOURNAL_V2_BY_DEFAULT=true
144+
RESTATE_INTERNAL_FORCE_MIN_RESTATE_VERSION=1.6.0
145145
146146
sdk-python:
147147
name: Run SDK-Python integration tests
@@ -283,19 +283,19 @@ jobs:
283283
with:
284284
testArtifactOutput: e2e-journal-table-v2-test-report
285285
restateContainerImage: localhost/restatedev/restate-commit-download:latest
286-
# Needed for backward compatibility tests 1.6 -> 1.5
287286
envVars: |
288287
RESTATE_WORKER__INVOKER__experimental_features_allow_protocol_v6=true
289-
RESTATE_EXPERIMENTAL_FEATURE__USE_JOURNAL_V2_BY_DEFAULT=true
288+
RESTATE_INTERNAL_FORCE_MIN_RESTATE_VERSION=1.6.0
289+
# Ignore forward compatibility tests as long as the previous version is not at least v1.6.0
290290
# Why these tests are disabled?
291291
# In restate 1.5 the invoker storage reader will not handle correctly the case where there's no pinned deployment yet
292292
# and the journal table used is v2. This doesn't show in the logs, but will simply hang badly the invocation task loop!
293293
# These tests trigger this condition!
294294
exclusions: |
295295
exclusions:
296296
"versionCompat":
297-
- "dev.restate.sdktesting.tests.BackCompatibilityTest\\$OldVersion#proxyCallShouldBeDone"
298-
- "dev.restate.sdktesting.tests.BackCompatibilityTest\\$OldVersion#proxyOneWayCallShouldBeDone"
297+
- "dev.restate.sdktesting.tests.ForwardCompatibilityTest\\$OldVersion#proxyCallShouldBeDone"
298+
- "dev.restate.sdktesting.tests.ForwardCompatibilityTest\\$OldVersion#proxyOneWayCallShouldBeDone"
299299
300300
jepsen:
301301
if: github.event.repository.fork == false && github.event.pull_request.head.repo.full_name == 'restatedev/restate' && github.ref == 'refs/heads/main'

crates/types/src/restate_version.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,19 @@ impl SemanticRestateVersion {
108108
self.0.pre == semver::Prerelease::new("dev").unwrap()
109109
}
110110

111+
/// Returns the major.minor.patch part of the semantic version (ignoring the dev part).
112+
pub fn strip_dev(&self) -> Cow<'_, SemanticRestateVersion> {
113+
if self.is_dev() {
114+
Cow::Owned(SemanticRestateVersion::new(
115+
self.major(),
116+
self.minor(),
117+
self.patch(),
118+
))
119+
} else {
120+
Cow::Borrowed(self)
121+
}
122+
}
123+
111124
pub fn parse(s: &str) -> Result<Self, RestateVersionError> {
112125
Self::from_str(s)
113126
}
@@ -200,6 +213,10 @@ impl TryFrom<&RestateVersion> for SemanticRestateVersion {
200213
}
201214
}
202215

216+
// Version constants
217+
pub const RESTATE_VERSION_1_6_0: SemanticRestateVersion = SemanticRestateVersion::new(1, 6, 0);
218+
pub const RESTATE_VERSION_1_7_0: SemanticRestateVersion = SemanticRestateVersion::new(1, 7, 0);
219+
203220
#[cfg(test)]
204221
mod tests {
205222
use super::*;

crates/worker/src/partition/leadership/leader_state.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use restate_types::net::partition_processor::{
3939
PartitionProcessorRpcError, PartitionProcessorRpcResponse,
4040
};
4141
use restate_types::time::MillisSinceEpoch;
42-
use restate_types::{SemanticRestateVersion, Version, Versioned};
42+
use restate_types::{RESTATE_VERSION_1_7_0, SemanticRestateVersion, Version, Versioned};
4343
use restate_wal_protocol::Command;
4444
use restate_wal_protocol::timer::TimerKeyValue;
4545

@@ -309,10 +309,11 @@ impl LeaderState {
309309
.await?;
310310
}
311311
ActionEffect::UpsertSchema(schema) => {
312-
const GATE_VERSION: SemanticRestateVersion =
313-
SemanticRestateVersion::new(1, 7, 0);
314-
315-
if SemanticRestateVersion::current().is_equal_or_newer_than(&GATE_VERSION) {
312+
// treat dev version as released version to enable testing
313+
if SemanticRestateVersion::current()
314+
.strip_dev()
315+
.is_equal_or_newer_than(&RESTATE_VERSION_1_7_0)
316+
{
316317
self.self_proposer
317318
.propose(
318319
*self.partition_key_range.start(),

crates/worker/src/partition/leadership/mod.rs

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,12 @@ use restate_storage_api::invocation_status_table::{
3939
use restate_storage_api::outbox_table::{OutboxMessage, ReadOutboxTable};
4040
use restate_storage_api::timer_table::{ReadTimerTable, TimerKey};
4141
use restate_timer::TokioClock;
42-
use restate_types::GenerationalNodeId;
4342
use restate_types::cluster::cluster_state::RunMode;
4443
use restate_types::config::Configuration;
4544
use restate_types::errors::GenericError;
4645
use restate_types::identifiers::{InvocationId, PartitionKey, PartitionProcessorRpcRequestId};
4746
use restate_types::identifiers::{LeaderEpoch, PartitionLeaderEpoch};
47+
use restate_types::logs::Keys;
4848
use restate_types::message::MessageIndex;
4949
use restate_types::net::partition_processor::{
5050
PartitionProcessorRpcError, PartitionProcessorRpcResponse,
@@ -54,8 +54,11 @@ use restate_types::partitions::state::PartitionReplicaSetStates;
5454
use restate_types::retries::with_jitter;
5555
use restate_types::schema::Schema;
5656
use restate_types::storage::StorageEncodeError;
57+
use restate_types::{
58+
GenerationalNodeId, RESTATE_VERSION_1_6_0, RESTATE_VERSION_1_7_0, SemanticRestateVersion,
59+
};
5760
use restate_wal_protocol::Command;
58-
use restate_wal_protocol::control::{AnnounceLeader, PartitionDurability};
61+
use restate_wal_protocol::control::{AnnounceLeader, PartitionDurability, VersionBarrier};
5962
use restate_wal_protocol::timer::TimerKeyValue;
6063

6164
use crate::partition::cleaner::Cleaner;
@@ -399,6 +402,53 @@ where
399402
let mut self_proposer = self_proposer.take().expect("must be present");
400403
self_proposer.mark_as_leader().await;
401404

405+
let mut min_restate_version = partition_store.get_min_restate_version().await?;
406+
407+
// Force the provided min Restate version, this is used for internal testing only
408+
if let Some(forced_min_restate_version) =
409+
std::env::var("RESTATE_INTERNAL_FORCE_MIN_RESTATE_VERSION")
410+
.ok()
411+
.and_then(|min_restate_version| {
412+
SemanticRestateVersion::parse(&min_restate_version).ok()
413+
})
414+
{
415+
self_proposer
416+
.propose(
417+
*self.partition.key_range.start(),
418+
Command::VersionBarrier(VersionBarrier {
419+
version: forced_min_restate_version.clone(),
420+
partition_key_range: Keys::RangeInclusive(
421+
self.partition.key_range.clone(),
422+
),
423+
human_reason: Some("Force min Restate version".to_owned()),
424+
}),
425+
)
426+
.await?;
427+
428+
min_restate_version = min_restate_version.max(forced_min_restate_version);
429+
}
430+
431+
// In v1.7.0 we enable by default writing to the journal v2 which requires min Restate v1.6.0
432+
// We ignore dev to enable testing
433+
if SemanticRestateVersion::current()
434+
.strip_dev()
435+
.is_equal_or_newer_than(&RESTATE_VERSION_1_7_0)
436+
&& min_restate_version.cmp_precedence(&RESTATE_VERSION_1_6_0) == Ordering::Less
437+
{
438+
self_proposer
439+
.propose(
440+
*self.partition.key_range.start(),
441+
Command::VersionBarrier(VersionBarrier {
442+
version: RESTATE_VERSION_1_6_0,
443+
partition_key_range: Keys::RangeInclusive(
444+
self.partition.key_range.clone(),
445+
),
446+
human_reason: Some("Enable journal v2 by default".to_owned()),
447+
}),
448+
)
449+
.await?;
450+
}
451+
402452
let last_reported_durable_lsn = partition_store
403453
.get_partition_durability()
404454
.await?

crates/worker/src/partition/mod.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,12 @@ pub mod shuffle;
1616
mod state_machine;
1717
pub mod types;
1818

19-
use std::env;
2019
use std::fmt::Debug;
2120
use std::sync::Arc;
2221
use std::time::Duration;
2322

2423
use anyhow::Context;
2524
use assert2::let_assert;
26-
use enumset::EnumSet;
2725
use futures::{FutureExt, Stream, StreamExt};
2826
use metrics::{SharedString, gauge, histogram};
2927
use tokio::sync::{mpsc, watch};
@@ -67,7 +65,7 @@ use crate::metric_definitions::{
6765
};
6866
use crate::partition::invoker_storage_reader::InvokerStorageReader;
6967
use crate::partition::leadership::LeadershipState;
70-
use crate::partition::state_machine::{ActionCollector, Feature, StateMachine};
68+
use crate::partition::state_machine::{ActionCollector, StateMachine};
7169

7270
/// Target leader state of the partition processor.
7371
#[derive(Clone, Copy, Debug, Default, PartialEq)]
@@ -184,7 +182,11 @@ where
184182
let min_restate_version = partition_store.get_min_restate_version().await?;
185183
let schema = partition_store.get_schema().await?;
186184

187-
if !SemanticRestateVersion::current().is_equal_or_newer_than(&min_restate_version) {
185+
// We ignore dev to enable testing in dev version
186+
if !SemanticRestateVersion::current()
187+
.strip_dev()
188+
.is_equal_or_newer_than(&min_restate_version)
189+
{
188190
gauge!(PARTITION_BLOCKED_FLARE, PARTITION_LABEL =>
189191
partition_store.partition_id().to_string())
190192
.set(1);
@@ -194,19 +196,12 @@ where
194196
});
195197
}
196198

197-
let mut features = EnumSet::new();
198-
// TODO(till) enable this using partition processor version barrier
199-
if env::var("RESTATE_EXPERIMENTAL_FEATURE__USE_JOURNAL_V2_BY_DEFAULT").is_ok() {
200-
features.insert(Feature::UseJournalTableV2AsDefault);
201-
}
202-
203199
let state_machine = StateMachine::new(
204200
inbox_seq_number,
205201
outbox_seq_number,
206202
outbox_head_seq_number,
207203
partition_store.partition_key_range().clone(),
208204
min_restate_version,
209-
features,
210205
schema,
211206
);
212207

crates/worker/src/partition/state_machine/entries/mod.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -405,29 +405,31 @@ impl<CMD> ApplyJournalCommandEffect<'_, CMD> {
405405

406406
#[cfg(test)]
407407
mod tests {
408-
use crate::partition::state_machine::Feature;
409408
use crate::partition::state_machine::tests::fixtures::invoker_entry_effect;
410409
use crate::partition::state_machine::tests::{TestEnv, fixtures, matchers};
411410
use bytes::Bytes;
412-
use enumset::EnumSet;
413411
use googletest::prelude::*;
414412
use restate_storage_api::invocation_status_table::ReadInvocationStatusTable;
415413
use restate_types::identifiers::{InvocationId, ServiceId};
416414
use restate_types::invocation::{
417415
Header, InvocationResponse, InvocationTarget, JournalCompletionTarget, ResponseResult,
418416
};
419417
use restate_types::journal_v2::{CallCommand, CallRequest};
418+
use restate_types::{RESTATE_VERSION_1_6_0, SemanticRestateVersion};
420419
use restate_wal_protocol::Command;
421-
use rstest::rstest;
422420

423-
#[rstest]
424421
#[restate_core::test]
425-
async fn update_journal_and_commands_length(
426-
#[values(Feature::UseJournalTableV2AsDefault.into(), EnumSet::empty())] features: EnumSet<
427-
Feature,
428-
>,
429-
) {
430-
let mut test_env = TestEnv::create_with_features(features).await;
422+
async fn update_journal_and_commands_length() {
423+
run_update_journal_and_commands_length(SemanticRestateVersion::unknown()).await;
424+
}
425+
426+
#[restate_core::test]
427+
async fn update_journal_and_commands_length_journal_v2_enabled() {
428+
run_update_journal_and_commands_length(RESTATE_VERSION_1_6_0).await;
429+
}
430+
431+
async fn run_update_journal_and_commands_length(min_restate_version: SemanticRestateVersion) {
432+
let mut test_env = TestEnv::create_with_min_restate_version(min_restate_version).await;
431433
let invocation_id = fixtures::mock_start_invocation(&mut test_env).await;
432434
fixtures::mock_pinned_deployment_v5(&mut test_env, invocation_id).await;
433435

crates/worker/src/partition/state_machine/entries/notification.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -201,11 +201,9 @@ where
201201
mod tests {
202202
use super::*;
203203

204-
use crate::partition::state_machine::Feature;
205204
use crate::partition::state_machine::tests::{TestEnv, fixtures, matchers};
206205
use bytes::Bytes;
207206
use bytestring::ByteString;
208-
use enumset::EnumSet;
209207
use googletest::prelude::*;
210208
use restate_service_protocol_v4::entry_codec::ServiceProtocolV4Codec;
211209
use restate_storage_api::invocation_status_table::{
@@ -221,6 +219,7 @@ mod tests {
221219
SleepCommand, SleepCompletion,
222220
};
223221
use restate_types::time::MillisSinceEpoch;
222+
use restate_types::{RESTATE_VERSION_1_6_0, SemanticRestateVersion};
224223
use restate_wal_protocol::Command;
225224
use restate_wal_protocol::timer::TimerKeyValue;
226225
use rstest::rstest;
@@ -264,14 +263,21 @@ mod tests {
264263
test_env.shutdown().await;
265264
}
266265

267-
#[rstest]
268266
#[restate_core::test]
269-
async fn notify_signal_received_before_pinned_deployment(
270-
#[values(Feature::UseJournalTableV2AsDefault.into(), EnumSet::empty())] features: EnumSet<
271-
Feature,
272-
>,
267+
async fn notify_signal_received_before_pinned_deployment() {
268+
run_notify_signal_received_before_pinned_deployment(SemanticRestateVersion::unknown())
269+
.await;
270+
}
271+
272+
#[restate_core::test]
273+
async fn notify_signal_received_before_pinned_deployment_journal_v2_enabled() {
274+
run_notify_signal_received_before_pinned_deployment(RESTATE_VERSION_1_6_0).await;
275+
}
276+
277+
async fn run_notify_signal_received_before_pinned_deployment(
278+
min_restate_version: SemanticRestateVersion,
273279
) {
274-
let mut test_env = TestEnv::create_with_features(features).await;
280+
let mut test_env = TestEnv::create_with_min_restate_version(min_restate_version).await;
275281
let invocation_id = fixtures::mock_start_invocation(&mut test_env).await;
276282

277283
// Send signal notification before pinned deployment

0 commit comments

Comments
 (0)