Skip to content

Conversation

mlitvk
Copy link

@mlitvk mlitvk commented Sep 15, 2025

Initial support of CDC with tablets.
CDC with tablets is implemented in scylladb/scylladb#23795

we extend the current implementation for vnodes to support also tablets-based keyspaces. The main difference is in how CDC generations work, querying to find CDC streams, and how CDC streams change. See the design and implementation for more details.

The main change is in the GenerationFetcher. we change it into a trait with two implementations - for vnodes and tablets based keyspaces. the current implementation becomes VnodeGenerationFetcher and we add TabletsGenerationFetcher which fetches the generations and streams from the new CDC tables for tablets.

we choose the appropriate fetcher based on whether the keyspace uses tablets.

we update all relevant tests to be parametrized and run with both vnodes and tablets.

Refs scylladb/scylladb#22577

the hostname cli argument can't be short because it conflicts with the
help option.
@mlitvk mlitvk force-pushed the cdc_with_tablets branch 4 times, most recently from 1c7bf14 to 8e6550b Compare September 15, 2025 11:21
@mlitvk
Copy link
Author

mlitvk commented Sep 15, 2025

the tablets tests fail because they run on a scylla version that doesn't support CDC with tablets.
I tested it locally on a build that supports CDC with tablets and it all passes.

@mlitvk
Copy link
Author

mlitvk commented Sep 15, 2025

@piodul please review

Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds initial support for CDC (Change Data Capture) with tablets in Scylla Rust CDC client. The implementation extends the current vnode-based CDC to support tablets-based keyspaces, with different generation handling, stream querying, and stream change mechanisms.

Key changes:

  • Refactored GenerationFetcher into a trait with separate implementations for vnodes and tablets
  • Added tablet detection logic to determine the appropriate fetcher
  • Updated all tests to run with both vnodes and tablets configurations

Reviewed Changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
scylla-cdc/src/stream_generations.rs Refactored GenerationFetcher into trait with VnodeGenerationFetcher and TabletsGenerationFetcher implementations
scylla-cdc/src/log_reader.rs Added tablet detection logic and factory function for choosing appropriate generation fetcher
scylla-cdc/src/stream_reader.rs Parameterized tests to run with both vnodes and tablets configurations
scylla-cdc/src/e2e_tests.rs Updated end-to-end tests to support tablets parameter
scylla-cdc-test-utils/src/lib.rs Added tablets support to test database creation utilities
scylla-cdc-replicator/src/replication_tests.rs Parameterized replication tests for both vnodes and tablets
Cargo.toml files Added rstest dependency for parameterized testing

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@mlitvk mlitvk force-pushed the cdc_with_tablets branch 4 times, most recently from e86ad0b to be64333 Compare September 16, 2025 09:15
@mlitvk
Copy link
Author

mlitvk commented Sep 17, 2025

added a commit to skip tablets test cases if we get an error that scylla doesn't support CDC with tablets

piodul added a commit to scylladb/scylladb that referenced this pull request Sep 18, 2025
initial implementation to support CDC in tablets-enabled keyspaces.

The design is described in https://docs.google.com/document/d/1qO5f2q5QoN5z1-rYOQFu6tqVLD3Ha6pphXKEqbtSNiU/edit?usp=sharing
It is followed closely for the most part except "Deciding when to change streams" - instead, streams are changed synchronously with tablet split / merge.
Instead of the stream switching algorithm with the double writes, we use a scheme similar to the previous method for vnodes - we add the new streams with timestamp that is sufficiently far into the future.

In this PR we:
* add new group0-based internal system tables for tablet stream metadata and loading it into in-memory CDC metadata
* add virtual tables for CDC consumers
* the write coordinator chooses a stream by looking up the appropriate stream in the CDC metadata
* enable creating tables with CDC enabled in tablets-enabled keyspaces. tablets are allocated for the CDC table, and a stream is created per each tablet.
* on tablet resize (split / merge), the topology coordinator creates a new stream set with a new stream for each new tablet.
* the cdc tablets are co-located with the base tablets

Fixes #22576

backport not needed - new feature

update dtests: scylladb/scylla-dtest#5897
update java cdc library: scylladb/scylla-cdc-java#102
update rust cdc library: scylladb/scylla-cdc-rust#136

Closes #23795

* github.com:scylladb/scylladb:
  docs/dev: update CDC dev docs for tablets
  doc: update CDC docs for tablets
  test: cluster_events: enable add_cdc and drop_cdc
  test/cql: enable cql cdc tests to run with tablets
  test: test_cdc_with_alter: adjust for cdc with tablets
  test/cqlpy: adjust cdc tests for tablets
  test/cluster/test_cdc_with_tablets: introduce cdc with tablets tests
  cdc: enable cdc with tablets
  topology coordinator: change streams on tablet split/merge
  cdc: virtual tables for cdc with tablets
  cdc: generate_stream_diff helper function
  cdc: choose stream in tablets enabled keyspaces
  cdc: rename get_stream to get_vnode_stream
  cdc: load tablet streams metadata from tables
  cdc: helper functions for reading metadata from tables
  cdc: colocate cdc table with base
  cdc: remove streams when dropping CDC table
  cdc: create streams when allocating tablets
  migration_listener: add on_before_allocate_tablet_map notification
  cdc: notify when creating or dropping cdc table
  cdc: move cdc table creation to pre_create
  cdc: add internal tables for cdc with tablets
  cdc: add cdc_with_tablets feature flag
  cdc: add is_log_schema helper
Copy link
Collaborator

@piodul piodul left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So it looks like you adjusted the library so that, in case of any streams change, it closes all the streams and reopens the new ones after the adjustment. I think that's fine for now, but in the future when splitting/merging a subset of tablets at a time is allowed, we will have to improve this. After this PR, is merged, please add an issue about it.

For now, I left some minor comments, but I think that after fixing those it should be GTM.

/// Component responsible for managing stream generations.
pub struct GenerationFetcher {
#[async_trait]
pub trait GenerationFetcher: Send + Sync + 'static {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure that the "generation" terminology applies well to CDC for tablets. A generation means a set of streams that span the same timestamp range and, combined, cover the whole token ring. While right now we only support splitting/merging all tablets at once - so we replace all streams at once - I think we should use different, more general terminology.

We don't have such a nice term like "generation" in case of tablets. I think we can call this a "streams change", "change in streams", etc.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, let's make this trait pub(crate). The original GenerationFetcher used to be public, but it is not exposed via any public module path, so it's only sufficient to make it public within the crate.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that the name can stay, as long as this is internal.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

currently I use the concept of generation in the same way for tablets as for vnodes, so I think it's appropriate for now until it's changed to handle more general stream changes
changed to pub(crate)

Comment on lines 280 to 284
impl From<StreamState> for i8 {
fn from(state: StreamState) -> Self {
state as i8
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is necessary. The as operator that you use in a couple of places does not use this trait, it relies on the fact that the repr is i8.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

Comment on lines 274 to 278
pub enum StreamState {
Current = 0,
Closed = 1,
Opened = 2,
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider making this type private (drop the pub). Although the module itself is not public, let's make it explicit here that this type is not public.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made private

Comment on lines 362 to 373
let mut rows = self
.session
.query_iter(query, (&self.keyspace_name, &self.table_name))
.await?
.rows_stream::<(GenerationTimestamp,)>()?;

while let Some(generation) = rows.next().await {
generations.push(generation?.0)
}

Ok(generations)
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: you can probably use the TryStreamExt::try_collect method to avoid the loop:

use futures::stream::TryStreamExt;

self
    .session
    .query_iter(query, (&self.keyspace_name, &self.table_name))
    .await?
    .rows_stream::<(GenerationTimestamp,)>()?
    .map(|r| r.map(|(gt,)| gt.0))
    .try_collect::<Vec<_>>()

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 428 to 448
let mut rows = self
.session
.query_iter(
query,
(
&self.keyspace_name,
&self.table_name,
generation,
StreamState::Current as i8,
),
)
.await?
.rows_stream::<(StreamID,)>()?;

let mut result = Vec::new();
while let Some(next_row) = rows.next().await {
let (id,) = next_row?;
result.push(vec![id]);
}

Ok(result)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: you could use try_collect here as well

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines +668 to +703
// Insert generations
for ts in &[GENERATION_NEW_MILLISECONDS, GENERATION_OLD_MILLISECONDS] {
insert_generation_timestamp(&session, TEST_KEYSPACE, TEST_TABLE, *ts).await;
}

// Insert streams
for sid in &[TEST_STREAM_1, TEST_STREAM_2] {
let stream_id = hex::decode(sid.strip_prefix("0x").unwrap()).unwrap();
let query = new_distributed_system_query(
"INSERT INTO cdc_streams (keyspace_name, table_name, timestamp, stream_state, stream_id) VALUES (?, ?, ?, ?, ?);".to_string(),
&session,
)
.await
.unwrap();

for st in &[StreamState::Current, StreamState::Opened] {
session
.query_unpaged(
query.clone(),
(
TEST_KEYSPACE,
TEST_TABLE,
value::CqlTimestamp(GENERATION_NEW_MILLISECONDS),
*st as i8,
stream_id.clone(),
),
)
.await
.unwrap();
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old "generation" does not have any streams, it's nonsensical... However, as I'm reading the old tests right now, it looks like it used to be a problem with vnodes as well. Probably not worth fixing it in this PR.

If we extend the library to handle partial stream changes (i.e. a subset of streams is closed at a time), then it will become more important to improve the test cases here.

Comment on lines 712 to 725
if tablets_enabled {
let fetcher = tablets_tests::setup().await?;
let session = fetcher.session.clone();
Ok((Arc::new(fetcher), session))
} else {
let fetcher = vnode_tests::setup().await?;
let session = fetcher.session.clone();
Ok((Arc::new(fetcher), session))
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: consider using the fully-qualified syntax to clone the session pointers (Arc::clone(&fetcher.session)). This makes it more explicit that we are sharing the session here instead of creating a separate clone of the session.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 864 to 869
let stream1 = StreamID {
id: hex::decode(TEST_STREAM_1.strip_prefix("0x").unwrap()).unwrap(),
};
let stream2 = StreamID {
id: hex::decode(TEST_STREAM_2.strip_prefix("0x").unwrap()).unwrap(),
};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: let's use StreamID::new(hex::decode(...)) here, it is a more proper way to initialize this struct (and I would like to change its internal structure in #135, so it will make it a bit easier).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment on lines 67 to 72
// the query may fail on old scylla versions that don't have this table. in this case return
// false because it doesn't support tablets.
let result = match session.query_unpaged(query, &[keyspace][..]).await {
Ok(r) => r,
Err(_) => return Ok(false),
};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be flaky, if there is a timeout during the query we will treat the table as one that uses tablets.

I think such check should be implemented in the driver. In order to reliably obtain this information, this requires issuing multiple queries to the same node, and using a driver to do that is tricky (technically doable, but it requires implementing custom error handling logic in case one of the queries fail.

For now, I'm not sure if it makes sense to make it more robust.

cc: @Lorak-mmk

.to_string();
// the query may fail on old scylla versions that don't have this table. in this case return
// false because it doesn't support tablets.
let result = match session.query_unpaged(query, &[keyspace][..]).await {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this &[keyspace][..] looks a bit weird, how about using a one-element tuple: (keyspace,) ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

the current GenerationFetcher is appropriate for vnode-based keyspaces.
we will want to extend it also to tablets-based keyspaces.

we define a generic GenerationFetcher trait that will have
implementations for both vnodes and tablets.
mlitvk added 7 commits October 9, 2025 13:41
add an implementation of GenerationFetcher for tablets-based keyspaces.

it has additional members keyspace_name and table_name for the table for
which we want to fetch generations and streams, because in tablets-based
keyspace each table has its own generations and streams, as opposed to
vnodes where they are global.

it fetches generation and streams from the system tables
system.cdc_timestamps and system.cdc_streams.
extend the generation fetcher test suite to test both the vnode and
tablets generation fetcher.

define separate setup methods for vnodes and tablets since the
generations and streams tables are different.

all the tests are made to be parametrized for vnodes/tablets.
add a method that checks whether a keyspace uses tablets by querying the
system tables.

we will use it to choose the appropriate generation fetcher for a table.
use either a VnodeGenerationFetcher or TabletsGenerationFetcher in the
log reader, based on whether the keyspace uses tablets.
add a parameter to prepare_db that controls whether the keyspace is
created with tablets enabled or not. it is mostly used for tests and
internal storage for the cdc library.

we will use it to extend the tests to test both vnode and tablets based
keyspaces.
make all the stream_reader tests parametrized to run with both tablets
and vnodes keyspaces.
make the e2e tests parametrized to run with both tablets and vnodes
keyspaces.
mlitvk added 2 commits October 9, 2025 13:41
make all replication tests parametrized to run with both vnodes and
tablets keyspaces.
for all tablets tests, if they run on a scylla node that doesn't support
CDC with tablets, then skip the test.
@piodul
Copy link
Collaborator

piodul commented Oct 9, 2025

@wprzytula please review and consider merging if everything is fine here

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants