Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,24 @@ jobs:
restateCommit: ${{ github.event.pull_request.head.sha || github.sha }}
serviceImage: "ghcr.io/restatedev/test-services-java:main"

sdk-java-journal-table-v2:
name: Run SDK-Java integration tests with Journal Table v2
permissions:
contents: read
issues: read
checks: write
pull-requests: write
actions: read
secrets: inherit
needs: docker
uses: restatedev/sdk-java/.github/workflows/integration.yaml@main
with:
restateCommit: ${{ github.event.pull_request.head.sha || github.sha }}
serviceImage: "ghcr.io/restatedev/test-services-java:main"
testArtifactOutput: sdk-java-integration-test-journal-table-v2
envVars: |
RESTATE_INTERNAL_FORCE_MIN_RESTATE_VERSION=1.6.0

sdk-python:
name: Run SDK-Python integration tests
permissions:
Expand Down Expand Up @@ -226,6 +244,59 @@ jobs:
envVars: |
RESTATE_WORKER__INVOKER__experimental_features_allow_protocol_v6=true

e2e-enable-journal-table-v2:
name: Run E2E tests with Journal Table V2 feature
runs-on: warp-ubuntu-latest-x64-4x
permissions:
contents: read
issues: read
checks: write
pull-requests: write
actions: read
needs: docker
steps:
- name: Set up Docker containerd snapshotter
uses: crazy-max/ghaction-setup-docker@v3
with:
set-host: true
daemon-config: |
{
"features": {
"containerd-snapshotter": true
}
}

### Download the Restate container image, if needed
- name: Download restate snapshot from in-progress workflow
uses: actions/download-artifact@v4
with:
name: restate.tar
- name: Install restate snapshot
run: |
output=$(docker load --input restate.tar | head -n 1)
docker tag "${output#*: }" "localhost/restatedev/restate-commit-download:latest"
docker image ls -a

### Run e2e tests
- name: Run E2E tests
uses: restatedev/e2e@main
with:
testArtifactOutput: e2e-journal-table-v2-test-report
restateContainerImage: localhost/restatedev/restate-commit-download:latest
envVars: |
RESTATE_WORKER__INVOKER__experimental_features_allow_protocol_v6=true
RESTATE_INTERNAL_FORCE_MIN_RESTATE_VERSION=1.6.0
# Ignore forward compatibility tests as long as the previous version is not at least v1.6.0
# Why these tests are disabled?
# In restate 1.5 the invoker storage reader will not handle correctly the case where there's no pinned deployment yet
# and the journal table used is v2. This doesn't show in the logs, but will simply hang badly the invocation task loop!
# These tests trigger this condition!
exclusions: |
exclusions:
"versionCompat":
- "dev.restate.sdktesting.tests.ForwardCompatibilityTest\\$OldVersion#proxyCallShouldBeDone"
- "dev.restate.sdktesting.tests.ForwardCompatibilityTest\\$OldVersion#proxyOneWayCallShouldBeDone"

jepsen:
if: github.event.repository.fork == false && github.event.pull_request.head.repo.full_name == 'restatedev/restate' && github.ref == 'refs/heads/main'
runs-on: warp-ubuntu-latest-arm64-4x
Expand Down
4 changes: 4 additions & 0 deletions crates/invoker-api/src/invocation_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ pub struct JournalMetadata {
/// and the max time difference between two replicas applying the journal append command.
pub last_modification_date: MillisSinceEpoch,
pub random_seed: u64,
/// If true, the entries are stored in journal table v2
pub using_journal_table_v2: bool,
}

impl JournalMetadata {
Expand All @@ -42,6 +44,7 @@ impl JournalMetadata {
invocation_epoch: InvocationEpoch,
last_modification_date: MillisSinceEpoch,
random_seed: u64,
using_journal_table_v2: bool,
) -> Self {
Self {
pinned_deployment,
Expand All @@ -50,6 +53,7 @@ impl JournalMetadata {
last_modification_date,
invocation_epoch,
random_seed,
using_journal_table_v2,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/invoker-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ pub mod test_util {
0,
MillisSinceEpoch::UNIX_EPOCH,
0,
true,
),
futures::stream::empty(),
)))
Expand Down
11 changes: 11 additions & 0 deletions crates/invoker-impl/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ pub(crate) enum InvokerError {
actual: InvocationEpoch,
expected: InvocationEpoch,
},
#[error(
"error when reading the journal: expected to read {expected} entries, but read only {expected}. This indicates a bug or a storage corruption."
)]
#[code(unknown)]
UnexpectedEntryCount { actual: u32, expected: u32 },

#[error(transparent)]
#[code(restate_errors::RT0010)]
Expand Down Expand Up @@ -172,6 +177,12 @@ pub(crate) enum InvokerError {
#[error("service is temporary unavailable '{0}'")]
#[code(restate_errors::RT0010)]
ServiceUnavailable(http::StatusCode),

#[error(
"service {0} is exposed by the deprecated deployment {1}, please upgrade the SDK used by the service."
)]
#[code(restate_errors::RT0020)]
DeploymentDeprecated(String, DeploymentId),
}

impl InvokerError {
Expand Down
10 changes: 10 additions & 0 deletions crates/invoker-impl/src/invocation_task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,16 @@ where
EagerState::<Empty<_>>::default().map(itertools::Either::Right)
};

if chosen_service_protocol_version < ServiceProtocolVersion::V4
&& journal_metadata.using_journal_table_v2
{
// We don't support migrating from journal v2 to journal v1!
shortcircuit!(Err(InvokerError::DeploymentDeprecated(
self.invocation_target.service_name().to_string(),
deployment.id
)));
}

// No need to read from Rocksdb anymore
drop(txn);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,13 @@ where

// Execute the replay
crate::shortcircuit!(
self.replay_loop(&mut http_stream_tx, &mut decoder_stream, journal_stream)
.await
self.replay_loop(
&mut http_stream_tx,
&mut decoder_stream,
journal_stream,
journal_metadata.length
)
.await
);

// If we have the invoker_rx and the protocol type is bidi stream,
Expand Down Expand Up @@ -305,13 +310,15 @@ where
http_stream_tx: &mut InvokerRequestStreamSender,
http_stream_rx: &mut S,
journal_stream: JournalStream,
expected_entries_count: u32,
) -> TerminalLoopState<()>
where
JournalStream: Stream<Item = JournalEntry> + Unpin,
S: Stream<Item = Result<DecoderStreamItem, InvokerError>> + Unpin,
{
let mut journal_stream = journal_stream.fuse();
let mut got_headers = false;
let mut sent_entries = 0;

loop {
tokio::select! {
Expand All @@ -334,10 +341,11 @@ where
opt_je = journal_stream.next() => {
match opt_je {
Some(JournalEntry::JournalV2(entry)) => {
sent_entries += 1;
crate::shortcircuit!(self.write_entry(http_stream_tx, entry.inner).await);

}
Some(JournalEntry::JournalV1(old_entry)) => {
sent_entries += 1;
if let journal::Entry::Input(input_entry) = crate::shortcircuit!(old_entry.deserialize_entry::<ProtobufRawEntryCodec>()) {
crate::shortcircuit!(self.write_entry(
http_stream_tx,
Expand All @@ -352,6 +360,14 @@ where
}
},
None => {
// Let's verify if we sent all the entries we promised, otherwise the stream will hang in a bad way!
if sent_entries < expected_entries_count {
return TerminalLoopState::Failed(InvokerError::UnexpectedEntryCount {
actual: sent_entries,
expected: expected_entries_count,
})
}

// No need to wait for the headers to continue
trace!("Finished to replay the journal");
return TerminalLoopState::Continue(())
Expand Down
17 changes: 17 additions & 0 deletions crates/types/src/restate_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,19 @@ impl SemanticRestateVersion {
self.0.pre == semver::Prerelease::new("dev").unwrap()
}

/// Returns the major.minor.patch part of the semantic version (ignoring the dev part).
pub fn strip_dev(&self) -> Cow<'_, SemanticRestateVersion> {
if self.is_dev() {
Cow::Owned(SemanticRestateVersion::new(
self.major(),
self.minor(),
self.patch(),
))
} else {
Cow::Borrowed(self)
}
}

pub fn parse(s: &str) -> Result<Self, RestateVersionError> {
Self::from_str(s)
}
Expand Down Expand Up @@ -200,6 +213,10 @@ impl TryFrom<&RestateVersion> for SemanticRestateVersion {
}
}

// Version constants
pub const RESTATE_VERSION_1_6_0: SemanticRestateVersion = SemanticRestateVersion::new(1, 6, 0);
pub const RESTATE_VERSION_1_7_0: SemanticRestateVersion = SemanticRestateVersion::new(1, 7, 0);

#[cfg(test)]
mod tests {
use super::*;
Expand Down
66 changes: 33 additions & 33 deletions crates/worker/src/partition/invoker_storage_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use restate_storage_api::state_table::ReadStateTable;
use restate_storage_api::{IsolationLevel, journal_table as journal_table_v1, journal_table_v2};
use restate_types::identifiers::InvocationId;
use restate_types::identifiers::ServiceId;
use restate_types::service_protocol::ServiceProtocolVersion;
use std::vec::IntoIter;

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -81,40 +80,40 @@ where
.unwrap_or_else(|| invocation_id.to_random_seed());

if let InvocationStatus::Invoked(invoked_status) = invocation_status {
let (journal_metadata, journal_stream) = if invoked_status
.pinned_deployment
.as_ref()
.is_some_and(|p| p.service_protocol_version >= ServiceProtocolVersion::V4)
{
// If pinned service protocol version exists and >= V4, we need to read from Journal Table V2!
let entries = journal_table_v2::ReadJournalTable::get_journal(
&mut self.txn,
*invocation_id,
invoked_status.journal_metadata.length,
)?
.map(|entry| {
entry
.map_err(InvokerStorageReaderError::Storage)
.map(|(_, entry)| {
restate_invoker_api::invocation_reader::JournalEntry::JournalV2(entry)
})
})
// TODO: Update invoker to maintain transaction while reading the journal stream: See https://github.com/restatedev/restate/issues/275
// collecting the stream because we cannot keep the transaction open
.try_collect::<Vec<_>>()
.await?;

let journal_metadata = JournalMetadata::new(
invoked_status.journal_metadata.length,
invoked_status.journal_metadata.span_context,
invoked_status.pinned_deployment,
invoked_status.current_invocation_epoch,
invoked_status.timestamps.modification_time(),
random_seed,
);
// Try to read first from journal table v2
let entries = journal_table_v2::ReadJournalTable::get_journal(
&mut self.txn,
*invocation_id,
invoked_status.journal_metadata.length,
)?
.map(|entry| {
entry
.map_err(InvokerStorageReaderError::Storage)
.map(|(_, entry)| {
restate_invoker_api::invocation_reader::JournalEntry::JournalV2(entry)
})
})
// TODO: Update invoker to maintain transaction while reading the journal stream: See https://github.com/restatedev/restate/issues/275
// collecting the stream because we cannot keep the transaction open
.try_collect::<Vec<_>>()
.await?;

(journal_metadata, entries)
let (journal_metadata, journal_stream) = if !entries.is_empty() {
// We got the journal, good to go
(
JournalMetadata::new(
invoked_status.journal_metadata.length,
invoked_status.journal_metadata.span_context,
invoked_status.pinned_deployment,
invoked_status.current_invocation_epoch,
invoked_status.timestamps.modification_time(),
random_seed,
true,
),
entries,
)
} else {
// We didn't read a thing from journal table v2 -> we need to read journal v1
(
JournalMetadata::new(
// Use entries len here, because we might be filtering out events
Expand All @@ -124,6 +123,7 @@ where
invoked_status.current_invocation_epoch,
invoked_status.timestamps.modification_time(),
random_seed,
false,
),
journal_table_v1::ReadJournalTable::get_journal(
&mut self.txn,
Expand Down
11 changes: 6 additions & 5 deletions crates/worker/src/partition/leadership/leader_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use restate_types::net::partition_processor::{
PartitionProcessorRpcError, PartitionProcessorRpcResponse,
};
use restate_types::time::MillisSinceEpoch;
use restate_types::{SemanticRestateVersion, Version, Versioned};
use restate_types::{RESTATE_VERSION_1_7_0, SemanticRestateVersion, Version, Versioned};
use restate_wal_protocol::Command;
use restate_wal_protocol::timer::TimerKeyValue;

Expand Down Expand Up @@ -309,10 +309,11 @@ impl LeaderState {
.await?;
}
ActionEffect::UpsertSchema(schema) => {
const GATE_VERSION: SemanticRestateVersion =
SemanticRestateVersion::new(1, 7, 0);

if SemanticRestateVersion::current().is_equal_or_newer_than(&GATE_VERSION) {
// treat dev version as released version to enable testing
if SemanticRestateVersion::current()
.strip_dev()
.is_equal_or_newer_than(&RESTATE_VERSION_1_7_0)
{
self.self_proposer
.propose(
*self.partition_key_range.start(),
Expand Down
Loading
Loading