Skip to content

Conversation

mlitvk
Copy link

@mlitvk mlitvk commented Jul 3, 2025

Initial support of CDC with tablets.
CDC with tablets is implemented in scylladb/scylladb#23795

we extend the current implementation for vnodes to support also tablets-based keyspaces. The main difference is in how CDC generations work, querying to find CDC streams, and how CDC streams change. See the design and implementation for more details.

  • Instead of a global CDC generation per cluster we have something like a generation per table. The generation changes when the table's tablets are split or merged. For each table and generation we have an associated stream set.
  • To get information about CDC generations and streams we query the new virtual tables system.cdc_timestamps and system.cdc_streams. We query system.cdc_timestamps with the table name as the key to get the generation timestamps and find whether there is a new generation. We query system.cdc_streams to find the set of streams for some table and generation timestamp.
  • We refactor Master to abstract and support both the vnode model and tablets model. In the vnode model there is a global CDC generation, and in the tablets model we have an independent controller for each table that tracks the current generation for the table and creates tasks.
  • In the vnode model there is a single worker thread for each generation. When moving to the next generation, the worker thread is stopped and a new one is created. In the tablets model, there is a single "continuous" Worker thread that runs tasks for all tables. When some table moves to the next generation, is creates new tasks and adds them dynamically to the existing worker.

Refs scylladb/scylladb#22577

@piodul
Copy link

piodul commented Jul 3, 2025

@Bouncheck FYI

@mlitvk mlitvk force-pushed the cdc_with_tablets branch 2 times, most recently from e25637a to dc8b5b1 Compare July 9, 2025 14:10
@mlitvk mlitvk force-pushed the cdc_with_tablets branch 3 times, most recently from 5f1221e to 82f16ad Compare July 22, 2025 14:38
@mlitvk mlitvk marked this pull request as ready for review July 22, 2025 14:38
@Bouncheck Bouncheck self-requested a review July 28, 2025 17:06
@dkropachev dkropachev force-pushed the master branch 8 times, most recently from 5031da9 to b5730b8 Compare August 2, 2025 13:14
Copy link
Collaborator

@Bouncheck Bouncheck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First of all thank you @mlitvk for contributing.

I've read through the changes here and the design document. I appreciate that the changes are split into commits and that tests are included.

I have a few questions and possibly one remark that needs addressing.

First question I have is regarding ChangeIds. They consist internally of StreamId and ChangeTime. From what I see both are used when Driver3Reader decides from which point to read by looking at lastChangeId. The compareTo implementation first compares the stream ids and only afterwards it compares the change time. Now this makes little sense to me, unless either:

  • stream ids are also monotonic (newer stream ids being lexicographically greater)
  • the stream ids are guaranteed to compare as equal when compareTo is called

The second bullet point does not seem to be true since the lastChangeId is set only on reader creation, which happens when TaskAction reads a new window. Additionally single Task can be responsible for multiple StreamIds. The lastConsumedChangeId maintained within TaskState can come from any of those StreamIds and this is what is used during reader initialization. In case of that initialization it may even be from previous generation.
So the question basically is: are the streamIds in CDC also monotonic or chronological?
Because if not then it looks like there might be a problem even before applying this PR. One of the effects could be skipping whole streams just because the streamId compares as lesser than last seen but I don't think that ever happened. At least it never was reported, which is weird unless I misunderstand something.
I don't think the streamids are monotonic so maybe I'm missing the reason why they always compare in a way that does not result in skipping rows.
If they are monotonic though I think I can see some use for them in this PR.

Another question is: Can CDC with tablets be used alongside CDC with vnodes on a single cluster?
This is mainly out of curiosity. This change permits user to choose only one model.
If mixed usage is allowed on Scylla then it would be nice to have that in the library too, but this particular thing is not a blocker. It can be prioritized later on.

The one issue I have is with the rereading of a window when the reader manages to read past the endTimestamp. It goes back to the problem with lastConsumedChangeId. My understanding is that this is not always the change with the latest timestamp of all consumed changes - just the latest consumed by some TaskAction. The changes are consumed chronologically but within single stream, while the Task reads multiple streams.
Additionally when creating new tasks to reread the window the lastConsumedChangeId is not passed on to the new tasks, so If I understand correctly it will start reading the whole window again.
If we want to avoid reading duplicates I think we'd need to maintain the progress on a per stream basis and carry those markers on when rereading that window.

Let me know if something is unclear or if I've made a mistake somewhere

@mlitvk
Copy link
Author

mlitvk commented Aug 14, 2025

Hi, thanks for the review

First question I have is regarding ChangeIds. They consist internally of StreamId and ChangeTime. From what I see both are used when Driver3Reader decides from which point to read by looking at lastChangeId. The compareTo implementation first compares the stream ids and only afterwards it compares the change time. Now this makes little sense to me, unless either:

* stream ids are also monotonic (newer stream ids being lexicographically greater)

* the stream ids are guaranteed to compare as equal when compareTo is called

The second bullet point does not seem to be true since the lastChangeId is set only on reader creation, which happens when TaskAction reads a new window. Additionally single Task can be responsible for multiple StreamIds. The lastConsumedChangeId maintained within TaskState can come from any of those StreamIds and this is what is used during reader initialization. In case of that initialization it may even be from previous generation. So the question basically is: are the streamIds in CDC also monotonic or chronological? Because if not then it looks like there might be a problem even before applying this PR. One of the effects could be skipping whole streams just because the streamId compares as lesser than last seen but I don't think that ever happened. At least it never was reported, which is weird unless I misunderstand something. I don't think the streamids are monotonic so maybe I'm missing the reason why they always compare in a way that does not result in skipping rows. If they are monotonic though I think I can see some use for them in this PR.

I think it works because the streams in a Task are sorted (it's a SortedSet), and the reader reads the streams one-by-one by this ordering, so the order of consuming is compatible with the ordering of ChangeId.
and I think the lastConsumedChangeId is only tied to a specific TaskId and generation. I don't think it can pass between generations.

Another question is: Can CDC with tablets be used alongside CDC with vnodes on a single cluster? This is mainly out of curiosity. This change permits user to choose only one model. If mixed usage is allowed on Scylla then it would be nice to have that in the library too, but this particular thing is not a blocker. It can be prioritized later on.

yes, both cdc with tablets and vnodes can be used in a cluster, in different keyspaces.
For simplicity the PR currently doesn't support creating a single consumer with mixed models, but it can be added in the future.
but it also should be easy to work around this by creating two consumers for each model.

The one issue I have is with the rereading of a window when the reader manages to read past the endTimestamp. It goes back to the problem with lastConsumedChangeId. My understanding is that this is not always the change with the latest timestamp of all consumed changes - just the latest consumed by some TaskAction. The changes are consumed chronologically but within single stream, while the Task reads multiple streams.

Yes, this may be an issue, thanks. I will address it in the other comment

Additionally when creating new tasks to reread the window the lastConsumedChangeId is not passed on to the new tasks, so If I understand correctly it will start reading the whole window again. If we want to avoid reading duplicates I think we'd need to maintain the progress on a per stream basis and carry those markers on when rereading that window.

I think that the lastConsumedChangeId is preserved because it's stored in the TaskState, and when creating the new tasks with the same TaskId it looks up the stored TaskState and uses it (in createTasksWithState)

@Bouncheck
Copy link
Collaborator

I think it works because the streams in a Task are sorted (it's a SortedSet), and the reader reads the streams one-by-one by this ordering, so the order of consuming is compatible with the ordering of ChangeId.

Thanks, now I see it. I forgot about the sorted set. Now it makes sense.

and I think the lastConsumedChangeId is only tied to a specific TaskId and generation. I don't think it can pass between generations.

Yes, I went too far there. I likely got confused thinking about new query reading new window through ReadNewWindowTaskAction but that is something still within the same generation so I was wrong.

@Bouncheck Bouncheck self-requested a review August 18, 2025 14:54
Bouncheck
Bouncheck previously approved these changes Aug 18, 2025
Copy link
Collaborator

@Bouncheck Bouncheck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've read the adjustments and it looks good to me now. Thanks again @mlitvk.
I think this can be safely merged @dkropachev .

To sum up my understanding is that:

  • The way the library works for the vnodes remains unchanged.
  • For now you can only use tablets controller or vnode controller. If you want to read changes from both types of tables then you have to start 2 separate consumers.
  • The tablets implementation guarantees "at least once" semantics, meaning each change will be read at least once. Some may be read twice due to the possibility of rereading after going past the endTimestamp. If we want to provide "exactly once" semantics this is something that can be added in the future.
    For reference the old vnode implementation does not say anything about guarantees, but I think it also can fail on some exceptions and reread some changes.

@piodul
Copy link

piodul commented Aug 18, 2025

I think this can be safely merged @dkropachev .

Let's wait with merging until the scylladb PR gets approved and merged first.

@mlitvk
Copy link
Author

mlitvk commented Aug 18, 2025

Thanks @Bouncheck
Just to clarify about this point:

The tablets implementation guarantees "at least once" semantics, meaning each change will be read at least once. Some may be read twice due to the possibility of rereading when going past the endTimestamp. If we want to provide "exactly once" semantics this is something that can be added in the future.

Suppose the generation endTimestamp is T1 and the reader reads past it until T2. (T2 > T1)
We re-read the streams until T2, and start reading the next generation starting from T2.
So I don't think something should be read twice

Add new methods to retrieve CDC stream information for tables in a
tablet-enabled keyspace. The methods are simialr to those for vnode
based keyspaces but they have an additional table name parameter, since
with tablets the CDC timestamps and streams are per-table.

The new methods retrieve the information by reading the new CDC system
tables for tablets, cdc_timestamps and cdc_streams.
Add a new method to MasterCQL that given a table returns true if the
table uses tablets.
refactor Master in order to be able to extend it with later with the
tablets model for CDC streams.

Currently the reading of metadata about generations, streams, and
configuring workers, is integrated into the Master code. But the way it
works now is tightly coupled with the vnode-based model.

Introduce an abstract interface CDCMetadataModel that handles the work
that needs to be done by the Master in each step, and move the current
implementation into the class GenerationBasedCDCMetadataModel that
implements CDCMetadataModel with the vnode-based model that has a single
global generation for all tables. Later we will add another
implementation of CDCMetadataModel for tablets.
just move the class to its own file - no other changes
Introduce a new class GroupedTasks that represents a set of tasks that
are created together and share generation metadata.

It is used to represent a set of tasks that are created by the Master in
a single step and passed to the Worker for execution.

Previously it was represented simplify as a Map from TaskId to a set of
StreamIds. Now we encapsulate this map in GroupedTasks and add the
additional generation metadata that is shared with all the tasks. This
is more convenient because we have the generation metadata passed
together with the task and we don't need to calculate it from the task
information, like it has been done before, and we also can add
additional information that can't be calculated.

In this commit we simply do this refactoring and maintain all the
behaviors.
Introduce a method that adds new tasks dynamically to an existing
Worker.

Previously, tasks are passed to the worker only on creation. When new
tasks are configured, the worker is stopped, and a new worker is started
with the new tasks. This makes sense for the vnodes model because tasks
are created only when the current generation is complete and we are
moving to a new generation.

However, with tablets we can have multiple tables, with the tasks of
each table changing independently. We don't want to stop all tasks and
recreate them whenever the tasks of some table are changed.

Therefore, for tablets we will have one continuous worker that we can
add tasks dynamically to. The new methods receives a set of tasks for
one table and it queues them for execution.
@mlitvk
Copy link
Author

mlitvk commented Sep 3, 2025

few small changes in the commit "Add tablet-based CDC support to Master CQL interface" due to schema changes in the core PR:

  • change getFetchSmallestTableGenerationAfter: the timestamp column is now descending
  • rename stream_kind to stream_state

for the tablets model, each table has independent generations and
independent tasks.

For the vnode-based model, the Master transport stores a
currentGenerationId. For the tablets model, we need to store a current
generation per table.

We add a new configureWorkers method that allows to configure workers
for a specific table, and will be used in the tablets model. This method
updates the current generation id for that table, and updates the worker
thread with the tasks for the table by adding them dynamically - not
affecting the tasks of other tables.
In the previous implementation for vnodes, when starting a worker for a
new generation we start an executor with tasks for the generation, and
when moving to the next generation we abort all tasks by stopping the
entire executor.

now we want to have an executor that runs continously that we can add
new tasks to it dynamically and abort tasks dynamically, since we have a
single worker thread that handles possibly multiple generations of
different tables. in the previous commit, we added the option to add new
tasks to a running worker.

now, we want to be able to abort tasks dynamically if they belong to a
generation that is complete.

we use the taskStates map in LocalTransport to track which tasks should
be running. when we configure new tasks for table using
configureWorkers, we remove the states of tasks that are not part of the
new configuration - meaning the tasks can be aborted.

we will use it as follows. we maintain the invariant that all tasks that
should be running have a state set in taskStates, so we need to make
sure now to set the state when starting a new task. the LocalTransport
will abort tasks by removing they state in configureWorkers. a task
checks if it should be aborted by checking if it still has a state in
the LocalTransport.

the way we do this is the task uses dedicated methods to update its
state periodically - updateState and moveStateToNextWindow, that check
if the task has an existing state or was it removed. if we find the
state doesn't exist we throw an exception that signals the task to
abort.
When starting the Master thread, check if it should use the tablets
model for querying streams - if so, run the Master steps using
TabletBasedCDCMetadataModel instead of the vnode-based generation model.

TabletBasedCDCMetadataModel is used to query the CDC metadata about
generations and streams for tablets-based keyspaces. In that model each
table is independent - it has its own generations and set of streams in
each generation.

The master work for tablets consists of initializing the current
generation for each table and configuring workers with tasks, and then
periodically testing for each table if the generation has ended and
updating the workers accordingly. The work for each specific table is
done in the class TableCDCController.
Add basic tests for Master with tablets mode, verifying it configures
tasks for the correct generation for each table and moves to the next
generation when its completed. Also test that it starts from the correct
generation, considering TTL and when resuming from exceptions.

The tests are mostly similar to the existing tests but adapted for
tablets.
Add a basic integration test for the cql driver with a tablets-based
keyspace. Verify it reads the generation correctly and can reads CDC log
rows.
Add a basic integration test for tablets-based keyspaces, verifying we
consume changes correctly from a table while the table streams are
changed.
@mlitvk
Copy link
Author

mlitvk commented Sep 11, 2025

I changed the PR following the changes in the core PR.
we changed the stream switching algorithm to be similar to the previous scheme for vnodes keyspaces, so many of the changes I did are not required anymore and were removed.
now we just create tasks for each table's generation and they run until we find a new generation and all tasks for the previous generation are fully consumed. this is very similar to the existing method. we don't need to worry anymore about duplicate entries and reading only between specific timestamps, so all the commits related to that were removed.

the main addition now is the "abort tasks dynamically" commit. the method for vnodes doesn't work for us because we have tasks for different tables with different generations and we add and remove tasks dynamically.
now we abort a task by removing its state in configureWorkers, and eventually the task will try to update the state and it will get an exception that it should abort.

fyi @Bouncheck

piodul added a commit to scylladb/scylladb that referenced this pull request Sep 18, 2025
initial implementation to support CDC in tablets-enabled keyspaces.

The design is described in https://docs.google.com/document/d/1qO5f2q5QoN5z1-rYOQFu6tqVLD3Ha6pphXKEqbtSNiU/edit?usp=sharing
It is followed closely for the most part except "Deciding when to change streams" - instead, streams are changed synchronously with tablet split / merge.
Instead of the stream switching algorithm with the double writes, we use a scheme similar to the previous method for vnodes - we add the new streams with timestamp that is sufficiently far into the future.

In this PR we:
* add new group0-based internal system tables for tablet stream metadata and loading it into in-memory CDC metadata
* add virtual tables for CDC consumers
* the write coordinator chooses a stream by looking up the appropriate stream in the CDC metadata
* enable creating tables with CDC enabled in tablets-enabled keyspaces. tablets are allocated for the CDC table, and a stream is created per each tablet.
* on tablet resize (split / merge), the topology coordinator creates a new stream set with a new stream for each new tablet.
* the cdc tablets are co-located with the base tablets

Fixes #22576

backport not needed - new feature

update dtests: scylladb/scylla-dtest#5897
update java cdc library: scylladb/scylla-cdc-java#102
update rust cdc library: scylladb/scylla-cdc-rust#136

Closes #23795

* github.com:scylladb/scylladb:
  docs/dev: update CDC dev docs for tablets
  doc: update CDC docs for tablets
  test: cluster_events: enable add_cdc and drop_cdc
  test/cql: enable cql cdc tests to run with tablets
  test: test_cdc_with_alter: adjust for cdc with tablets
  test/cqlpy: adjust cdc tests for tablets
  test/cluster/test_cdc_with_tablets: introduce cdc with tablets tests
  cdc: enable cdc with tablets
  topology coordinator: change streams on tablet split/merge
  cdc: virtual tables for cdc with tablets
  cdc: generate_stream_diff helper function
  cdc: choose stream in tablets enabled keyspaces
  cdc: rename get_stream to get_vnode_stream
  cdc: load tablet streams metadata from tables
  cdc: helper functions for reading metadata from tables
  cdc: colocate cdc table with base
  cdc: remove streams when dropping CDC table
  cdc: create streams when allocating tablets
  migration_listener: add on_before_allocate_tablet_map notification
  cdc: notify when creating or dropping cdc table
  cdc: move cdc table creation to pre_create
  cdc: add internal tables for cdc with tablets
  cdc: add cdc_with_tablets feature flag
  cdc: add is_log_schema helper
@Bouncheck Bouncheck self-requested a review September 22, 2025 11:22
@Bouncheck Bouncheck dismissed their stale review September 22, 2025 11:22

Need to make one final check after the changes before merging

@piodul
Copy link

piodul commented Oct 6, 2025

@Bouncheck review ping

Copy link
Collaborator

@Bouncheck Bouncheck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.
I did not spot any correctness issues.
Integration test that alters min_table_count is a nice addition to have.
The tests completed successfully for me when run against 2025.4.0-rc1.

generationCount = rs.one().getLong(0);
Thread.sleep(1000); // Check every second
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to be extra safe I think assertion that checks the generationCount increased would be nice.
Technically we could simply exceed timeoutMs and not notice that generation did not change.

Map<TaskId, SortedSet<StreamId>> tasks = workerTasks.getTasks();

// Remove all existing tasks from taskStates that belong to this table and no longer in the configuration
// TODO MICHAEL stop tasks
Copy link
Collaborator

@Bouncheck Bouncheck Oct 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mlitvk Is this just a remaining comment or something still should be added? It looks like you are doing removing which will stop the tasks.

TaskId id = taskStreams.getKey();
SortedSet<StreamId> streams = taskStreams.getValue();
TaskState state = states.getOrDefault(id, initialState);
workerConfiguration.transport.setState(id, state);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional suggestion: Consider adding a one line comment here or to the comment above the method declaration that it also additionally sets the task state in transport.
To me it feels like it's really easy to miss this line. The rest of the method looks like we're just creating some objects that are yet to be passed and used somewhere, but this line already marks those tasks as "active" so that once they are turned into TaskActions they are not immediately aborted (if i understand correctly).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants