-
Notifications
You must be signed in to change notification settings - Fork 111
Use journal table v2 as default #3921
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Use journal table v2 as default #3921
Conversation
…ice invocation ingestion logic to write the journal v2
… and then tries to read from journal table v1
443e68d to
257c1d9
Compare
|
This is ready for an initial pass. The test is failing because i still didn't figure out how to do the exclusion properly 🫨 but the exclusion is legit |
…tever reason the negotiated protocol when invoking will be less than 4.
…he problematic situations. Fix some incorrectly handled corner cases, such as deletion of journal when the pinned deployment is not set yet.
…tion. See the changes for more details.
257c1d9 to
60b2632
Compare
|
|
This case can happen if we got an invocation scheduled (or inboxed) before the journal v2 default feature was enabled.
|
Fixed the problem with exclusions! |
|
did some manual testing locally, primarily around scheduled invocations, first disabling then enabling, and viceversa. So far everything seems to work as expected. |
tillrohrmann
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for preparing a future where we can drop the old state machine and the old invoker @slinkydeveloper. The changes look good to me. I left a few comments about adding a bit more context for identifying code paths that are legacy and will get removed. Also the usage of rstest seems not strictly needed and could allow us to reduce this dependency eventually if we don't add more tests using it.
As a side comment: The state machine has become quite hard to reason about with the new and old code paths. I am really looking forward simplifying things when we get rid of the old code paths.
| expected: InvocationEpoch, | ||
| }, | ||
| #[error( | ||
| "error when reading the journal: expected to read {expected} entries, but read only {expected}. This indicates a bug or a storage corruption." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| "error when reading the journal: expected to read {expected} entries, but read only {expected}. This indicates a bug or a storage corruption." | |
| "error when reading the journal: expected to read {expected} entries, but read only {actual}. This indicates a bug or a storage corruption." |
| // Let's verify if we sent all the entries we promised, otherwise the stream will hang in a bad way! | ||
| if sent_entries < expected_entries_count { | ||
| return TerminalLoopState::Failed(InvokerError::UnexpectedEntryCount { | ||
| actual: sent_entries, | ||
| expected: expected_entries_count, | ||
| }) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a situation that couldn't arise before and is now possible due to the changes of the PR or is this covering a case that could have happened before as well?
| Err(storage_error) => { | ||
| replier.send_result(Err(PartitionProcessorRpcError::Internal( | ||
| storage_error.to_string(), | ||
| ))); | ||
| return Ok(()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A bit surprising that we reply with an error but return Ok(()). It seems that the return value of RpcHandler::handle is ignored here
restate/crates/worker/src/partition/mod.rs
Line 618 in cd0423b
| let _ = rpc::RpcHandler::handle( |
| PatchDeploymentId::KeepPinned => { | ||
| // Just keep current deployment, all good | ||
| pinned_deployment.deployment_id | ||
| None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we no longer keeping the previous deployment id if we should keep it?
| test_env.shutdown().await; | ||
| } | ||
|
|
||
| #[rstest] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of bringing in the rstest dependency here, we could have a run_test_notify_signal_received_before_pinned_deployment(features: EnumSet<Feature>) and then have two tests that call the method with the respective EnumSet values. This is probably not more verbose and wouldn't depend on rstest.
| # Needed for backward compatibility tests 1.6 -> 1.5 | ||
| envVars: | | ||
| RESTATE_WORKER__INVOKER__experimental_features_allow_protocol_v6=true | ||
| RESTATE_EXPERIMENTAL_FEATURE__USE_JOURNAL_V2_BY_DEFAULT=true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By enabling the journal v2 usage by default, aren't we giving up forward compatibility of v1.5? Why are we testing this case?
| ), | ||
| entries, | ||
| ) | ||
| } else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This branch will no longer be necessary once we have the "eager" migration from v1 to v2 on the version barrier, right?
| // 0. Prepare the journal table v2 | ||
| if self.features.contains(Feature::UseJournalTableV2AsDefault) | ||
| && let PreFlightInvocationArgument::Input(PreFlightInvocationInput { | ||
| argument, | ||
| headers, | ||
| span_context, | ||
| }) = pre_flight_invocation_metadata.input | ||
| { | ||
| // In this case, we do the following: | ||
| // * Write the input in the journal table v2 | ||
| // * Change pre_flight_invocation_metadata.input | ||
|
|
||
| // Prepare the new entry | ||
| let new_entry: journal_v2::Entry = InputCommand { | ||
| headers, | ||
| payload: argument, | ||
| name: Default::default(), | ||
| } | ||
| .into(); | ||
| let new_raw_entry = new_entry.encode::<ServiceProtocolV4Codec>(); | ||
|
|
||
| // Now write the entry in the new table | ||
| journal_table_v2::WriteJournalTable::put_journal_entry( | ||
| self.storage, | ||
| invocation_id, | ||
| 0, | ||
| &StoredRawEntry::new( | ||
| StoredRawEntryHeader::new(self.record_created_at), | ||
| new_raw_entry, | ||
| ), | ||
| &[], | ||
| )?; | ||
|
|
||
| // Input is now a journal directly | ||
| pre_flight_invocation_metadata.input = | ||
| PreFlightInvocationArgument::Journal(PreFlightInvocationJournal { | ||
| journal_metadata: JournalMetadata { | ||
| length: 1, | ||
| commands: 1, | ||
| span_context, | ||
| }, | ||
| pinned_deployment: None, | ||
| }); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It took me quite a while to understand why this snippet here isn't a duplicate compared to the one in init_journal. The snippet in init_journal won't be executed because pre_flight_invocation_metadata.input is PreFlighInvocationArgument::Journal. This is quite subtle and hard to follow because one needs to trace back how the invocation_input which is being passed to init_journal_and_invoke is being constructed. Maybe add a comment to init_journal explaining a little bit the different cases, which one of them belongs to which version and which case will soon be removed with what version.
Part of #3184