-
Notifications
You must be signed in to change notification settings - Fork 20
Refactor Connector semantics #66
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
A "raw" Debezium message contains two Fields within its payload: after and before. In the RDBMs world, a DELETE message often represents a `null` after and a pre-image of the affected row within the before Field. This semantics, however, makes no sense in ScyllaDB. A pre-image is only possible in a delete operation during a ROW_DELETE operation, and all other types of deletes (including range deletes) - today - can not be represented by preImages. Arguably, representing delete operations in the after field won't make sense either, since a delete operation by definition should always result in a null after payload. In other words, we live in the worst of the two words: We are a NoSQL database using a format originally thought for relational systems. The motivation for this commit is quite simple: We need to come up with a standardized way to handle delete events, including eventually supporting complex delete events. The Cassandra connector follows the same approach, and adds additional metadata to delete operations, such that a specialized consumer can interpret the delete in question. After this change, the following structure is defined: - The before field is used exclusively by preImages. - The after field is used exclusively by deltas or postImages. - Supported deletes should always emit an after message.
ScyllaExtractNewRecordState payload can't distinguish between NULL updates and DELETEs, in turn making it impossible for a consumer or subscriber to reconciliate deltas and converge to a stable state. This commit addresses the problem in two ways: First, it ensures that every consumed message gets its own schema. This is done via the new getSchemaKey method, which computes a unique hashCode accordingly to the values sent by the producer. Second, it drops all null Struct fields via the isDroppableField method, so that consumers will only observe null values representing affected columns as affected observed within an update or delete operation. Since deletes are now part of the after Field, we also handle the case when delete.handling.mode is a rewrite and set the respective boolean flag to true, so that consumers relying on it can reconciliate the changes. When we decide to more complex delete types, care should be taken not to emit wrong tombstones and correctly handling the rewrite mode. This is beyond the scope of this commit.
This commit promotes preImages to production ready. After this change, preImages operations exclusively emit events via the before field and we eliminate concerns related with delete operations.
Based on scylladb#24 from @chiragb1994. Introduce support for postImages. Since postImages and preImages have the same read-before-write cost, users interested in both events should now benefit from them. The only missing piece is to actually wire the postImage under its actual message Envelope. This shouldn't be particularly hard, but is outside the scope of this commit and series.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some comments in the code regarding cache and unused variable.
After the change connector with SMT (default configs) will publish row delete as a simple message with primary keys. This is indistinguishable from insert with primary keys only.
Since the default value for delete.handling.mode
is drop
I think delete records should remain being dropped in that configuration, or we should exclude this option from configuration if we don't want to follow it.
After the change connector with SMT and delete.handling.mode=rewrite
when running row delete it adds to the message
"__deleted": {
"string": "false"
}
It should be true
instead.
I wanted to compare it with current master version but I was unable to. With avro based setup it does not run correctly with this mode. It seems there is schema conflict issue (field names don't match when running delete. "before" != "after").
I haven't tested pre and post images yet.
Extra thoughts:
To be honest I think having deletes use before field is more semantically accurate so I'd lean towards that, even though non key fields may not be null before the change.
Hard for me to say if this makes things harder in the future without some idea how we want to handle partition and range deletes.
// when an after field is present. However, in ScyllaDB, we represent | ||
// the affected keys in the after field. | ||
boolean isDelete = false; | ||
boolean hasRewrite = false; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see hasRewrite
used
|
||
public class ScyllaExtractNewRecordState<R extends ConnectRecord<R>> extends ExtractNewRecordState<R> { | ||
private Cache<Schema, Schema> schemaUpdateCache = new SynchronizedCache<>(new LRUCache<>(16)); | ||
private Cache<Integer, Schema> schemaUpdateCache = new SynchronizedCache<>(new LRUCache<>(16)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe the cache does not help much when so many different possible keys are introduced. Previously it was (generally) 1 schema for 1 table. Now depending on N non-key columns there are 2^N possible custom schemas. Of course not all of them will be generated but even a handful is a significant increase from 1. It could easily exceed 16
Records should still be dropped from the stream indeed if the user so decides. Should be a simple
Yes, I noted the same Avro thing as well. I ain't certain if it is due to our super ancient Debezium, or if it is simply a matter of definining all schemas upfront on Avro. Either way... Nice catch, seems like I pushed the wrong changes.
Well, the good news is that I won't need to push any fixes and I can move on with my life. However, as I already evidenced under multiple venues, it is not a SMT I can recommend. I don't think there is anything accurate, no matter how hard we want to reason about it. This proposal is no different. See the corresponding commit which explains why either convention isn't enough. The only suitable alternative is perhaps adding another payload field, specific for handling the case of deletes - whereas this proposal relies on the same semantics as the Cassandra connector. In ScyllaDB, any operation is always an upsert, therefore representing changes in the |
e924b3e
to
d6de129
Compare
This series implements proper connector semantics going forward.
After this series:
ScyllaExtractNewState
SMT can be used both with preImages/postImages and Deltas enabled (it is now dynamic)ScyllaExtractNewState
must implement anUPSERT
mode when consuming from ScyllaDB deltas only (Apache Pinot and Elasticsearch are some examples of datastores supporting these modes), whereas postImage operations should rely onFULL
(or equivalent) mode.Breaking change, existing consumers may require changes.
Tested with Apache Pinot in
FULL
mode (postImages and preImages), and Elasticsearch inUPSERT
mode.