-
Notifications
You must be signed in to change notification settings - Fork 18
support CDC with tablets #102
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
@Bouncheck FYI |
e25637a
to
dc8b5b1
Compare
5f1221e
to
82f16ad
Compare
5031da9
to
b5730b8
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
First of all thank you @mlitvk for contributing.
I've read through the changes here and the design document. I appreciate that the changes are split into commits and that tests are included.
I have a few questions and possibly one remark that needs addressing.
First question I have is regarding ChangeIds. They consist internally of StreamId and ChangeTime. From what I see both are used when Driver3Reader
decides from which point to read by looking at lastChangeId
. The compareTo
implementation first compares the stream ids and only afterwards it compares the change time. Now this makes little sense to me, unless either:
- stream ids are also monotonic (newer stream ids being lexicographically greater)
- the stream ids are guaranteed to compare as equal when compareTo is called
The second bullet point does not seem to be true since the lastChangeId
is set only on reader creation, which happens when TaskAction reads a new window. Additionally single Task can be responsible for multiple StreamIds. The lastConsumedChangeId
maintained within TaskState can come from any of those StreamIds and this is what is used during reader initialization. In case of that initialization it may even be from previous generation.
So the question basically is: are the streamIds in CDC also monotonic or chronological?
Because if not then it looks like there might be a problem even before applying this PR. One of the effects could be skipping whole streams just because the streamId compares as lesser than last seen but I don't think that ever happened. At least it never was reported, which is weird unless I misunderstand something.
I don't think the streamids are monotonic so maybe I'm missing the reason why they always compare in a way that does not result in skipping rows.
If they are monotonic though I think I can see some use for them in this PR.
Another question is: Can CDC with tablets be used alongside CDC with vnodes on a single cluster?
This is mainly out of curiosity. This change permits user to choose only one model.
If mixed usage is allowed on Scylla then it would be nice to have that in the library too, but this particular thing is not a blocker. It can be prioritized later on.
The one issue I have is with the rereading of a window when the reader manages to read past the endTimestamp. It goes back to the problem with lastConsumedChangeId
. My understanding is that this is not always the change with the latest timestamp of all consumed changes - just the latest consumed by some TaskAction. The changes are consumed chronologically but within single stream, while the Task reads multiple streams.
Additionally when creating new tasks to reread the window the lastConsumedChangeId
is not passed on to the new tasks, so If I understand correctly it will start reading the whole window again.
If we want to avoid reading duplicates I think we'd need to maintain the progress on a per stream basis and carry those markers on when rereading that window.
Let me know if something is unclear or if I've made a mistake somewhere
scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/LocalTransport.java
Outdated
Show resolved
Hide resolved
scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/TableCDCController.java
Show resolved
Hide resolved
scylla-cdc-base/src/test/java/com/scylladb/cdc/model/master/MasterTest.java
Outdated
Show resolved
Hide resolved
scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/LocalTransport.java
Outdated
Show resolved
Hide resolved
Hi, thanks for the review
I think it works because the streams in a Task are sorted (it's a SortedSet), and the reader reads the streams one-by-one by this ordering, so the order of consuming is compatible with the ordering of ChangeId.
yes, both cdc with tablets and vnodes can be used in a cluster, in different keyspaces.
Yes, this may be an issue, thanks. I will address it in the other comment
I think that the lastConsumedChangeId is preserved because it's stored in the TaskState, and when creating the new tasks with the same TaskId it looks up the stored TaskState and uses it (in |
82f16ad
to
98dc870
Compare
Thanks, now I see it. I forgot about the sorted set. Now it makes sense.
Yes, I went too far there. I likely got confused thinking about new query reading new window through |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've read the adjustments and it looks good to me now. Thanks again @mlitvk.
I think this can be safely merged @dkropachev .
To sum up my understanding is that:
- The way the library works for the vnodes remains unchanged.
- For now you can only use tablets controller or vnode controller. If you want to read changes from both types of tables then you have to start 2 separate consumers.
- The tablets implementation guarantees "at least once" semantics, meaning each change will be read at least once. Some may be read twice due to the possibility of rereading after going past the endTimestamp. If we want to provide "exactly once" semantics this is something that can be added in the future.
For reference the old vnode implementation does not say anything about guarantees, but I think it also can fail on some exceptions and reread some changes.
Let's wait with merging until the scylladb PR gets approved and merged first. |
Thanks @Bouncheck
Suppose the generation endTimestamp is T1 and the reader reads past it until T2. (T2 > T1) |
Add new methods to retrieve CDC stream information for tables in a tablet-enabled keyspace. The methods are simialr to those for vnode based keyspaces but they have an additional table name parameter, since with tablets the CDC timestamps and streams are per-table. The new methods retrieve the information by reading the new CDC system tables for tablets, cdc_timestamps and cdc_streams.
Add a new method to MasterCQL that given a table returns true if the table uses tablets.
refactor Master in order to be able to extend it with later with the tablets model for CDC streams. Currently the reading of metadata about generations, streams, and configuring workers, is integrated into the Master code. But the way it works now is tightly coupled with the vnode-based model. Introduce an abstract interface CDCMetadataModel that handles the work that needs to be done by the Master in each step, and move the current implementation into the class GenerationBasedCDCMetadataModel that implements CDCMetadataModel with the vnode-based model that has a single global generation for all tables. Later we will add another implementation of CDCMetadataModel for tablets.
just move the class to its own file - no other changes
Introduce a new class GroupedTasks that represents a set of tasks that are created together and share generation metadata. It is used to represent a set of tasks that are created by the Master in a single step and passed to the Worker for execution. Previously it was represented simplify as a Map from TaskId to a set of StreamIds. Now we encapsulate this map in GroupedTasks and add the additional generation metadata that is shared with all the tasks. This is more convenient because we have the generation metadata passed together with the task and we don't need to calculate it from the task information, like it has been done before, and we also can add additional information that can't be calculated. In this commit we simply do this refactoring and maintain all the behaviors.
Introduce a method that adds new tasks dynamically to an existing Worker. Previously, tasks are passed to the worker only on creation. When new tasks are configured, the worker is stopped, and a new worker is started with the new tasks. This makes sense for the vnodes model because tasks are created only when the current generation is complete and we are moving to a new generation. However, with tablets we can have multiple tables, with the tasks of each table changing independently. We don't want to stop all tasks and recreate them whenever the tasks of some table are changed. Therefore, for tablets we will have one continuous worker that we can add tasks dynamically to. The new methods receives a set of tasks for one table and it queues them for execution.
98dc870
to
d0f32d4
Compare
few small changes in the commit "Add tablet-based CDC support to Master CQL interface" due to schema changes in the core PR:
|
for the tablets model, each table has independent generations and independent tasks. For the vnode-based model, the Master transport stores a currentGenerationId. For the tablets model, we need to store a current generation per table. We add a new configureWorkers method that allows to configure workers for a specific table, and will be used in the tablets model. This method updates the current generation id for that table, and updates the worker thread with the tasks for the table by adding them dynamically - not affecting the tasks of other tables.
In the previous implementation for vnodes, when starting a worker for a new generation we start an executor with tasks for the generation, and when moving to the next generation we abort all tasks by stopping the entire executor. now we want to have an executor that runs continously that we can add new tasks to it dynamically and abort tasks dynamically, since we have a single worker thread that handles possibly multiple generations of different tables. in the previous commit, we added the option to add new tasks to a running worker. now, we want to be able to abort tasks dynamically if they belong to a generation that is complete. we use the taskStates map in LocalTransport to track which tasks should be running. when we configure new tasks for table using configureWorkers, we remove the states of tasks that are not part of the new configuration - meaning the tasks can be aborted. we will use it as follows. we maintain the invariant that all tasks that should be running have a state set in taskStates, so we need to make sure now to set the state when starting a new task. the LocalTransport will abort tasks by removing they state in configureWorkers. a task checks if it should be aborted by checking if it still has a state in the LocalTransport. the way we do this is the task uses dedicated methods to update its state periodically - updateState and moveStateToNextWindow, that check if the task has an existing state or was it removed. if we find the state doesn't exist we throw an exception that signals the task to abort.
When starting the Master thread, check if it should use the tablets model for querying streams - if so, run the Master steps using TabletBasedCDCMetadataModel instead of the vnode-based generation model. TabletBasedCDCMetadataModel is used to query the CDC metadata about generations and streams for tablets-based keyspaces. In that model each table is independent - it has its own generations and set of streams in each generation. The master work for tablets consists of initializing the current generation for each table and configuring workers with tasks, and then periodically testing for each table if the generation has ended and updating the workers accordingly. The work for each specific table is done in the class TableCDCController.
Add basic tests for Master with tablets mode, verifying it configures tasks for the correct generation for each table and moves to the next generation when its completed. Also test that it starts from the correct generation, considering TTL and when resuming from exceptions. The tests are mostly similar to the existing tests but adapted for tablets.
Add a basic integration test for the cql driver with a tablets-based keyspace. Verify it reads the generation correctly and can reads CDC log rows.
Add a basic integration test for tablets-based keyspaces, verifying we consume changes correctly from a table while the table streams are changed.
d0f32d4
to
552d224
Compare
I changed the PR following the changes in the core PR. the main addition now is the "abort tasks dynamically" commit. the method for vnodes doesn't work for us because we have tasks for different tables with different generations and we add and remove tasks dynamically. fyi @Bouncheck |
initial implementation to support CDC in tablets-enabled keyspaces. The design is described in https://docs.google.com/document/d/1qO5f2q5QoN5z1-rYOQFu6tqVLD3Ha6pphXKEqbtSNiU/edit?usp=sharing It is followed closely for the most part except "Deciding when to change streams" - instead, streams are changed synchronously with tablet split / merge. Instead of the stream switching algorithm with the double writes, we use a scheme similar to the previous method for vnodes - we add the new streams with timestamp that is sufficiently far into the future. In this PR we: * add new group0-based internal system tables for tablet stream metadata and loading it into in-memory CDC metadata * add virtual tables for CDC consumers * the write coordinator chooses a stream by looking up the appropriate stream in the CDC metadata * enable creating tables with CDC enabled in tablets-enabled keyspaces. tablets are allocated for the CDC table, and a stream is created per each tablet. * on tablet resize (split / merge), the topology coordinator creates a new stream set with a new stream for each new tablet. * the cdc tablets are co-located with the base tablets Fixes #22576 backport not needed - new feature update dtests: scylladb/scylla-dtest#5897 update java cdc library: scylladb/scylla-cdc-java#102 update rust cdc library: scylladb/scylla-cdc-rust#136 Closes #23795 * github.com:scylladb/scylladb: docs/dev: update CDC dev docs for tablets doc: update CDC docs for tablets test: cluster_events: enable add_cdc and drop_cdc test/cql: enable cql cdc tests to run with tablets test: test_cdc_with_alter: adjust for cdc with tablets test/cqlpy: adjust cdc tests for tablets test/cluster/test_cdc_with_tablets: introduce cdc with tablets tests cdc: enable cdc with tablets topology coordinator: change streams on tablet split/merge cdc: virtual tables for cdc with tablets cdc: generate_stream_diff helper function cdc: choose stream in tablets enabled keyspaces cdc: rename get_stream to get_vnode_stream cdc: load tablet streams metadata from tables cdc: helper functions for reading metadata from tables cdc: colocate cdc table with base cdc: remove streams when dropping CDC table cdc: create streams when allocating tablets migration_listener: add on_before_allocate_tablet_map notification cdc: notify when creating or dropping cdc table cdc: move cdc table creation to pre_create cdc: add internal tables for cdc with tablets cdc: add cdc_with_tablets feature flag cdc: add is_log_schema helper
Need to make one final check after the changes before merging
@Bouncheck review ping |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
I did not spot any correctness issues.
Integration test that alters min_table_count is a nice addition to have.
The tests completed successfully for me when run against 2025.4.0-rc1.
generationCount = rs.one().getLong(0); | ||
Thread.sleep(1000); // Check every second | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to be extra safe I think assertion that checks the generationCount increased would be nice.
Technically we could simply exceed timeoutMs
and not notice that generation did not change.
Map<TaskId, SortedSet<StreamId>> tasks = workerTasks.getTasks(); | ||
|
||
// Remove all existing tasks from taskStates that belong to this table and no longer in the configuration | ||
// TODO MICHAEL stop tasks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mlitvk Is this just a remaining comment or something still should be added? It looks like you are doing removing which will stop the tasks.
TaskId id = taskStreams.getKey(); | ||
SortedSet<StreamId> streams = taskStreams.getValue(); | ||
TaskState state = states.getOrDefault(id, initialState); | ||
workerConfiguration.transport.setState(id, state); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optional suggestion: Consider adding a one line comment here or to the comment above the method declaration that it also additionally sets the task state in transport.
To me it feels like it's really easy to miss this line. The rest of the method looks like we're just creating some objects that are yet to be passed and used somewhere, but this line already marks those tasks as "active" so that once they are turned into TaskActions they are not immediately aborted (if i understand correctly).
Initial support of CDC with tablets.
CDC with tablets is implemented in scylladb/scylladb#23795
we extend the current implementation for vnodes to support also tablets-based keyspaces. The main difference is in how CDC generations work, querying to find CDC streams, and how CDC streams change. See the design and implementation for more details.
system.cdc_timestamps
andsystem.cdc_streams
. We querysystem.cdc_timestamps
with the table name as the key to get the generation timestamps and find whether there is a new generation. We querysystem.cdc_streams
to find the set of streams for some table and generation timestamp.Refs scylladb/scylladb#22577