-
Notifications
You must be signed in to change notification settings - Fork 27
support CDC with tablets #136
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
the hostname cli argument can't be short because it conflicts with the help option.
1c7bf14
to
8e6550b
Compare
the tablets tests fail because they run on a scylla version that doesn't support CDC with tablets. |
@piodul please review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds initial support for CDC (Change Data Capture) with tablets in Scylla Rust CDC client. The implementation extends the current vnode-based CDC to support tablets-based keyspaces, with different generation handling, stream querying, and stream change mechanisms.
Key changes:
- Refactored
GenerationFetcher
into a trait with separate implementations for vnodes and tablets - Added tablet detection logic to determine the appropriate fetcher
- Updated all tests to run with both vnodes and tablets configurations
Reviewed Changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
scylla-cdc/src/stream_generations.rs | Refactored GenerationFetcher into trait with VnodeGenerationFetcher and TabletsGenerationFetcher implementations |
scylla-cdc/src/log_reader.rs | Added tablet detection logic and factory function for choosing appropriate generation fetcher |
scylla-cdc/src/stream_reader.rs | Parameterized tests to run with both vnodes and tablets configurations |
scylla-cdc/src/e2e_tests.rs | Updated end-to-end tests to support tablets parameter |
scylla-cdc-test-utils/src/lib.rs | Added tablets support to test database creation utilities |
scylla-cdc-replicator/src/replication_tests.rs | Parameterized replication tests for both vnodes and tablets |
Cargo.toml files | Added rstest dependency for parameterized testing |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
e86ad0b
to
be64333
Compare
added a commit to skip tablets test cases if we get an error that scylla doesn't support CDC with tablets |
3dc0222
to
a66f9d7
Compare
initial implementation to support CDC in tablets-enabled keyspaces. The design is described in https://docs.google.com/document/d/1qO5f2q5QoN5z1-rYOQFu6tqVLD3Ha6pphXKEqbtSNiU/edit?usp=sharing It is followed closely for the most part except "Deciding when to change streams" - instead, streams are changed synchronously with tablet split / merge. Instead of the stream switching algorithm with the double writes, we use a scheme similar to the previous method for vnodes - we add the new streams with timestamp that is sufficiently far into the future. In this PR we: * add new group0-based internal system tables for tablet stream metadata and loading it into in-memory CDC metadata * add virtual tables for CDC consumers * the write coordinator chooses a stream by looking up the appropriate stream in the CDC metadata * enable creating tables with CDC enabled in tablets-enabled keyspaces. tablets are allocated for the CDC table, and a stream is created per each tablet. * on tablet resize (split / merge), the topology coordinator creates a new stream set with a new stream for each new tablet. * the cdc tablets are co-located with the base tablets Fixes #22576 backport not needed - new feature update dtests: scylladb/scylla-dtest#5897 update java cdc library: scylladb/scylla-cdc-java#102 update rust cdc library: scylladb/scylla-cdc-rust#136 Closes #23795 * github.com:scylladb/scylladb: docs/dev: update CDC dev docs for tablets doc: update CDC docs for tablets test: cluster_events: enable add_cdc and drop_cdc test/cql: enable cql cdc tests to run with tablets test: test_cdc_with_alter: adjust for cdc with tablets test/cqlpy: adjust cdc tests for tablets test/cluster/test_cdc_with_tablets: introduce cdc with tablets tests cdc: enable cdc with tablets topology coordinator: change streams on tablet split/merge cdc: virtual tables for cdc with tablets cdc: generate_stream_diff helper function cdc: choose stream in tablets enabled keyspaces cdc: rename get_stream to get_vnode_stream cdc: load tablet streams metadata from tables cdc: helper functions for reading metadata from tables cdc: colocate cdc table with base cdc: remove streams when dropping CDC table cdc: create streams when allocating tablets migration_listener: add on_before_allocate_tablet_map notification cdc: notify when creating or dropping cdc table cdc: move cdc table creation to pre_create cdc: add internal tables for cdc with tablets cdc: add cdc_with_tablets feature flag cdc: add is_log_schema helper
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it looks like you adjusted the library so that, in case of any streams change, it closes all the streams and reopens the new ones after the adjustment. I think that's fine for now, but in the future when splitting/merging a subset of tablets at a time is allowed, we will have to improve this. After this PR, is merged, please add an issue about it.
For now, I left some minor comments, but I think that after fixing those it should be GTM.
scylla-cdc/src/stream_generations.rs
Outdated
/// Component responsible for managing stream generations. | ||
pub struct GenerationFetcher { | ||
#[async_trait] | ||
pub trait GenerationFetcher: Send + Sync + 'static { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure that the "generation" terminology applies well to CDC for tablets. A generation means a set of streams that span the same timestamp range and, combined, cover the whole token ring. While right now we only support splitting/merging all tablets at once - so we replace all streams at once - I think we should use different, more general terminology.
We don't have such a nice term like "generation" in case of tablets. I think we can call this a "streams change", "change in streams", etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, let's make this trait pub(crate)
. The original GenerationFetcher
used to be public, but it is not exposed via any public module path, so it's only sufficient to make it public within the crate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess that the name can stay, as long as this is internal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currently I use the concept of generation in the same way for tablets as for vnodes, so I think it's appropriate for now until it's changed to handle more general stream changes
changed to pub(crate)
scylla-cdc/src/stream_generations.rs
Outdated
impl From<StreamState> for i8 { | ||
fn from(state: StreamState) -> Self { | ||
state as i8 | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure this is necessary. The as
operator that you use in a couple of places does not use this trait, it relies on the fact that the repr
is i8
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
scylla-cdc/src/stream_generations.rs
Outdated
pub enum StreamState { | ||
Current = 0, | ||
Closed = 1, | ||
Opened = 2, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider making this type private (drop the pub
). Although the module itself is not public, let's make it explicit here that this type is not public.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made private
scylla-cdc/src/stream_generations.rs
Outdated
let mut rows = self | ||
.session | ||
.query_iter(query, (&self.keyspace_name, &self.table_name)) | ||
.await? | ||
.rows_stream::<(GenerationTimestamp,)>()?; | ||
|
||
while let Some(generation) = rows.next().await { | ||
generations.push(generation?.0) | ||
} | ||
|
||
Ok(generations) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: you can probably use the TryStreamExt::try_collect
method to avoid the loop:
use futures::stream::TryStreamExt;
self
.session
.query_iter(query, (&self.keyspace_name, &self.table_name))
.await?
.rows_stream::<(GenerationTimestamp,)>()?
.map(|r| r.map(|(gt,)| gt.0))
.try_collect::<Vec<_>>()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
scylla-cdc/src/stream_generations.rs
Outdated
let mut rows = self | ||
.session | ||
.query_iter( | ||
query, | ||
( | ||
&self.keyspace_name, | ||
&self.table_name, | ||
generation, | ||
StreamState::Current as i8, | ||
), | ||
) | ||
.await? | ||
.rows_stream::<(StreamID,)>()?; | ||
|
||
let mut result = Vec::new(); | ||
while let Some(next_row) = rows.next().await { | ||
let (id,) = next_row?; | ||
result.push(vec![id]); | ||
} | ||
|
||
Ok(result) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: you could use try_collect
here as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// Insert generations | ||
for ts in &[GENERATION_NEW_MILLISECONDS, GENERATION_OLD_MILLISECONDS] { | ||
insert_generation_timestamp(&session, TEST_KEYSPACE, TEST_TABLE, *ts).await; | ||
} | ||
|
||
// Insert streams | ||
for sid in &[TEST_STREAM_1, TEST_STREAM_2] { | ||
let stream_id = hex::decode(sid.strip_prefix("0x").unwrap()).unwrap(); | ||
let query = new_distributed_system_query( | ||
"INSERT INTO cdc_streams (keyspace_name, table_name, timestamp, stream_state, stream_id) VALUES (?, ?, ?, ?, ?);".to_string(), | ||
&session, | ||
) | ||
.await | ||
.unwrap(); | ||
|
||
for st in &[StreamState::Current, StreamState::Opened] { | ||
session | ||
.query_unpaged( | ||
query.clone(), | ||
( | ||
TEST_KEYSPACE, | ||
TEST_TABLE, | ||
value::CqlTimestamp(GENERATION_NEW_MILLISECONDS), | ||
*st as i8, | ||
stream_id.clone(), | ||
), | ||
) | ||
.await | ||
.unwrap(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The old "generation" does not have any streams, it's nonsensical... However, as I'm reading the old tests right now, it looks like it used to be a problem with vnodes as well. Probably not worth fixing it in this PR.
If we extend the library to handle partial stream changes (i.e. a subset of streams is closed at a time), then it will become more important to improve the test cases here.
if tablets_enabled { | ||
let fetcher = tablets_tests::setup().await?; | ||
let session = fetcher.session.clone(); | ||
Ok((Arc::new(fetcher), session)) | ||
} else { | ||
let fetcher = vnode_tests::setup().await?; | ||
let session = fetcher.session.clone(); | ||
Ok((Arc::new(fetcher), session)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: consider using the fully-qualified syntax to clone the session pointers (Arc::clone(&fetcher.session)
). This makes it more explicit that we are sharing the session here instead of creating a separate clone of the session.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
scylla-cdc/src/stream_generations.rs
Outdated
let stream1 = StreamID { | ||
id: hex::decode(TEST_STREAM_1.strip_prefix("0x").unwrap()).unwrap(), | ||
}; | ||
let stream2 = StreamID { | ||
id: hex::decode(TEST_STREAM_2.strip_prefix("0x").unwrap()).unwrap(), | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: let's use StreamID::new(hex::decode(...))
here, it is a more proper way to initialize this struct (and I would like to change its internal structure in #135, so it will make it a bit easier).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
// the query may fail on old scylla versions that don't have this table. in this case return | ||
// false because it doesn't support tablets. | ||
let result = match session.query_unpaged(query, &[keyspace][..]).await { | ||
Ok(r) => r, | ||
Err(_) => return Ok(false), | ||
}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be flaky, if there is a timeout during the query we will treat the table as one that uses tablets.
I think such check should be implemented in the driver. In order to reliably obtain this information, this requires issuing multiple queries to the same node, and using a driver to do that is tricky (technically doable, but it requires implementing custom error handling logic in case one of the queries fail.
For now, I'm not sure if it makes sense to make it more robust.
cc: @Lorak-mmk
scylla-cdc/src/log_reader.rs
Outdated
.to_string(); | ||
// the query may fail on old scylla versions that don't have this table. in this case return | ||
// false because it doesn't support tablets. | ||
let result = match session.query_unpaged(query, &[keyspace][..]).await { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this &[keyspace][..]
looks a bit weird, how about using a one-element tuple: (keyspace,)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
the current GenerationFetcher is appropriate for vnode-based keyspaces. we will want to extend it also to tablets-based keyspaces. we define a generic GenerationFetcher trait that will have implementations for both vnodes and tablets.
a66f9d7
to
60afdc0
Compare
add an implementation of GenerationFetcher for tablets-based keyspaces. it has additional members keyspace_name and table_name for the table for which we want to fetch generations and streams, because in tablets-based keyspace each table has its own generations and streams, as opposed to vnodes where they are global. it fetches generation and streams from the system tables system.cdc_timestamps and system.cdc_streams.
extend the generation fetcher test suite to test both the vnode and tablets generation fetcher. define separate setup methods for vnodes and tablets since the generations and streams tables are different. all the tests are made to be parametrized for vnodes/tablets.
add a method that checks whether a keyspace uses tablets by querying the system tables. we will use it to choose the appropriate generation fetcher for a table.
use either a VnodeGenerationFetcher or TabletsGenerationFetcher in the log reader, based on whether the keyspace uses tablets.
add a parameter to prepare_db that controls whether the keyspace is created with tablets enabled or not. it is mostly used for tests and internal storage for the cdc library. we will use it to extend the tests to test both vnode and tablets based keyspaces.
make all the stream_reader tests parametrized to run with both tablets and vnodes keyspaces.
make the e2e tests parametrized to run with both tablets and vnodes keyspaces.
make all replication tests parametrized to run with both vnodes and tablets keyspaces.
for all tablets tests, if they run on a scylla node that doesn't support CDC with tablets, then skip the test.
60afdc0
to
3a18e49
Compare
@wprzytula please review and consider merging if everything is fine here |
Initial support of CDC with tablets.
CDC with tablets is implemented in scylladb/scylladb#23795
we extend the current implementation for vnodes to support also tablets-based keyspaces. The main difference is in how CDC generations work, querying to find CDC streams, and how CDC streams change. See the design and implementation for more details.
The main change is in the GenerationFetcher. we change it into a trait with two implementations - for vnodes and tablets based keyspaces. the current implementation becomes VnodeGenerationFetcher and we add TabletsGenerationFetcher which fetches the generations and streams from the new CDC tables for tablets.
we choose the appropriate fetcher based on whether the keyspace uses tablets.
we update all relevant tests to be parametrized and run with both vnodes and tablets.
Refs scylladb/scylladb#22577