Skip to content

Conversation

fee-mendes
Copy link
Member

This series implements proper connector semantics going forward.

After this series:

  • DELETE operations exclusively use the after Field.
  • preImages exclusively use the before Field.
  • The ScyllaExtractNewState SMT can be used both with preImages/postImages and Deltas enabled (it is now dynamic)
  • Consumers using the ScyllaExtractNewState must implement an UPSERT mode when consuming from ScyllaDB deltas only (Apache Pinot and Elasticsearch are some examples of datastores supporting these modes), whereas postImage operations should rely on FULL (or equivalent) mode.
  • We promote preImages to production ready.

Breaking change, existing consumers may require changes.

Tested with Apache Pinot in FULL mode (postImages and preImages), and Elasticsearch in UPSERTmode.

A "raw" Debezium message contains two Fields within its payload: after
and before. In the RDBMs world, a DELETE message often represents a
`null` after and a pre-image of the affected row within the before Field.

This semantics, however, makes no sense in ScyllaDB. A pre-image is only
possible in a delete operation during a ROW_DELETE operation, and all
other types of deletes (including range deletes) - today - can not be
represented by preImages.

Arguably, representing delete operations in the after field won't make
sense either, since a delete operation by definition should always
result in a null after payload.

In other words, we live in the worst of the two words: We are a NoSQL
database using a format originally thought for relational systems.

The motivation for this commit is quite simple: We need to come up with
a standardized way to handle delete events, including eventually
supporting complex delete events.

The Cassandra connector follows the same approach, and adds additional
metadata to delete operations, such that a specialized consumer can
interpret the delete in question.

After this change, the following structure is defined:

 - The before field is used exclusively by preImages.
 - The after field is used exclusively by deltas or postImages.
 - Supported deletes should always emit an after message.
ScyllaExtractNewRecordState payload can't distinguish between NULL
updates and DELETEs, in turn making it impossible for a consumer or
subscriber to reconciliate deltas and converge to a stable state.

This commit addresses the problem in two ways:

First, it ensures that every consumed message gets its own schema. This
is done via the new getSchemaKey method, which computes a unique
hashCode accordingly to the values sent by the producer.

Second, it drops all null Struct fields via the isDroppableField method,
so that consumers will only observe null values representing affected
columns as affected observed within an update or delete operation.

Since deletes are now part of the after Field, we also handle the case
when delete.handling.mode is a rewrite and set the respective boolean
flag to true, so that consumers relying on it can reconciliate the
changes.

When we decide to more complex delete types, care should be taken not to
emit wrong tombstones and correctly handling the rewrite mode. This is
beyond the scope of this commit.
This commit promotes preImages to production ready. After this change,
preImages operations exclusively emit events via the before field and we
eliminate concerns related with delete operations.
Based on scylladb#24
from @chiragb1994.

Introduce support for postImages. Since postImages and preImages have
the same read-before-write cost, users interested in both events should
now benefit from them.

The only missing piece is to actually wire the postImage under its
actual message Envelope. This shouldn't be particularly hard, but is
outside the scope of this commit and series.
Copy link
Collaborator

@Bouncheck Bouncheck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added some comments in the code regarding cache and unused variable.

After the change connector with SMT (default configs) will publish row delete as a simple message with primary keys. This is indistinguishable from insert with primary keys only.
Since the default value for delete.handling.mode is drop I think delete records should remain being dropped in that configuration, or we should exclude this option from configuration if we don't want to follow it.

After the change connector with SMT and delete.handling.mode=rewrite when running row delete it adds to the message

"__deleted": {
    "string": "false"
  }

It should be true instead.
I wanted to compare it with current master version but I was unable to. With avro based setup it does not run correctly with this mode. It seems there is schema conflict issue (field names don't match when running delete. "before" != "after").

I haven't tested pre and post images yet.

Extra thoughts:
To be honest I think having deletes use before field is more semantically accurate so I'd lean towards that, even though non key fields may not be null before the change.
Hard for me to say if this makes things harder in the future without some idea how we want to handle partition and range deletes.

// when an after field is present. However, in ScyllaDB, we represent
// the affected keys in the after field.
boolean isDelete = false;
boolean hasRewrite = false;
Copy link
Collaborator

@Bouncheck Bouncheck Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see hasRewrite used


public class ScyllaExtractNewRecordState<R extends ConnectRecord<R>> extends ExtractNewRecordState<R> {
private Cache<Schema, Schema> schemaUpdateCache = new SynchronizedCache<>(new LRUCache<>(16));
private Cache<Integer, Schema> schemaUpdateCache = new SynchronizedCache<>(new LRUCache<>(16));
Copy link
Collaborator

@Bouncheck Bouncheck Mar 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the cache does not help much when so many different possible keys are introduced. Previously it was (generally) 1 schema for 1 table. Now depending on N non-key columns there are 2^N possible custom schemas. Of course not all of them will be generated but even a handful is a significant increase from 1. It could easily exceed 16

@fee-mendes
Copy link
Member Author

Added some comments in the code regarding cache and unused variable.

After the change connector with SMT (default configs) will publish row delete as a simple message with primary keys. This is indistinguishable from insert with primary keys only. Since the default value for delete.handling.mode is drop I think delete records should remain being dropped in that configuration, or we should exclude this option from configuration if we don't want to follow it.

Records should still be dropped from the stream indeed if the user so decides. Should be a simple switch condition.

After the change connector with SMT and delete.handling.mode=rewrite when running row delete it adds to the message

"__deleted": {
    "string": "false"
  }

It should be true instead. I wanted to compare it with current master version but I was unable to. With avro based setup it does not run correctly with this mode. It seems there is schema conflict issue (field names don't match when running delete. "before" != "after").

Yes, I noted the same Avro thing as well. I ain't certain if it is due to our super ancient Debezium, or if it is simply a matter of definining all schemas upfront on Avro.

Either way... Nice catch, seems like I pushed the wrong changes. super.configure(configs); retrieves the config map from which we should retrieve the delete.handling.mode setting.

I haven't tested pre and post images yet.

Extra thoughts: To be honest I think having deletes use before field is more semantically accurate so I'd lean towards that, even though non key fields may not be null before the change. Hard for me to say if this makes things harder in the future without some idea how we want to handle partition and range deletes.

Well, the good news is that I won't need to push any fixes and I can move on with my life. However, as I already evidenced under multiple venues, it is not a SMT I can recommend.

I don't think there is anything accurate, no matter how hard we want to reason about it. This proposal is no different. See the corresponding commit which explains why either convention isn't enough. The only suitable alternative is perhaps adding another payload field, specific for handling the case of deletes - whereas this proposal relies on the same semantics as the Cassandra connector. In ScyllaDB, any operation is always an upsert, therefore representing changes in the before field is semantically wrong in our world.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants