Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@ The connector has the following capabilities:
- Compatible with standard Kafka Connect transformations
- Metadata about CDC events - each generated Kafka message contains information about source, such as timestamp and table name
- Seamless handling of schema changes and topology changes (adding, removing nodes from Scylla cluster)
- Post image only: You can configure the connector to produce only `POST_IMAGE` cdc events as `CREATE` events.

The connector has the following limitations:
- Only Kafka 2.6.0+ is supported
- Only row-level operations are produced (`INSERT`, `UPDATE`, `DELETE`):
- Partition deletes - those changes are ignored
- Row range deletes - those changes are ignored
- No support for collection types (`LIST`, `SET`, `MAP`) and `UDT` - columns with those types are omitted from generated messages
- No support for preimage and postimage - changes only contain those columns that were modified, not the entire row before/after change. More information [here](#cell-representation)
- No support for preimage - changes only contain those columns that were modified, not the entire row before/after change. More information [here](#cell-representation)

## Connector installation

Expand Down Expand Up @@ -596,6 +597,7 @@ In addition to the configuration parameters described in the ["Configuration"](#
| `scylla.confidence.window.size` | The size of the confidence window. It is necessary for the connector to avoid reading too fresh data from the CDC log due to the eventual consistency of Scylla. The problem could appear when a newer write reaches a replica before some older write. For a short period of time, when reading, it is possible for the replica to return only the newer write. The connector mitigates this problem by not reading a window of most recent changes (controlled by this parameter). Value expressed in milliseconds.|
| `scylla.consistency.level` | The consistency level of CDC table read queries. This consistency level is used only for read queries to the CDC log table. By default, `QUORUM` level is used. |
| `scylla.local.dc` | The name of Scylla local datacenter. This local datacenter name will be used to setup the connection to Scylla to prioritize sending requests to the nodes in the local datacenter. If not set, no particular datacenter will be prioritized. |
| `post.image.only` | Push only the post image events from scylla cdc to kafka. The events are pushed as `CREATE` events. |

### Configuration for large Scylla clusters
#### Offset (progress) storage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ protected Envelope.Operation getOperation() {
case ROW_UPDATE:
return Envelope.Operation.UPDATE;
case ROW_INSERT:
case POST_IMAGE:
return Envelope.Operation.CREATE;
case PARTITION_DELETE: // See comment in ScyllaChangesConsumer on the support of partition deletes.
case ROW_DELETE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.scylladb.cdc.model.worker.ChangeSchema;
import com.scylladb.cdc.model.worker.RawChange;
import com.scylladb.cdc.model.worker.RawChange.OperationType;
import com.scylladb.cdc.model.worker.Task;
import com.scylladb.cdc.model.worker.TaskAndRawChangeConsumer;
import io.debezium.pipeline.EventDispatcher;
Expand All @@ -18,12 +19,14 @@ public class ScyllaChangesConsumer implements TaskAndRawChangeConsumer {
private final ScyllaOffsetContext offsetContext;
private final ScyllaSchema schema;
private final Clock clock;
private final boolean postImageOnly;

public ScyllaChangesConsumer(EventDispatcher<CollectionId> dispatcher, ScyllaOffsetContext offsetContext, ScyllaSchema schema, Clock clock) {
public ScyllaChangesConsumer(EventDispatcher<CollectionId> dispatcher, ScyllaOffsetContext offsetContext, ScyllaSchema schema, Clock clock, boolean postImageOnly) {
this.dispatcher = dispatcher;
this.offsetContext = offsetContext;
this.schema = schema;
this.clock = clock;
this.postImageOnly = postImageOnly;
}

@Override
Expand All @@ -49,10 +52,13 @@ public CompletableFuture<Void> consume(Task task, RawChange change) {
if (hasClusteringColumn) {
return CompletableFuture.completedFuture(null);
}
} else if (operationType != RawChange.OperationType.ROW_INSERT
} else if (!this.postImageOnly
&& operationType != RawChange.OperationType.ROW_INSERT
&& operationType != RawChange.OperationType.ROW_UPDATE
&& operationType != RawChange.OperationType.ROW_DELETE) {
return CompletableFuture.completedFuture(null);
} else if (this.postImageOnly && operationType != OperationType.POST_IMAGE) {
return CompletableFuture.completedFuture(null);
}

dispatcher.dispatchDataChangeEvent(new CollectionId(task.id.getTable()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ public class ScyllaConnectorConfig extends CommonConnectorConfig {
.withDescription("The consistency level of CDC table read queries. This consistency level is used only for read queries " +
"to the CDC log table.");

public static final boolean POST_IMAGE_ONLY_DEFAULT = false;
public static final Field POST_IMAGE_ONLY = Field.create("post.image.only")
.withDisplayName("Post Image Only")
.withType(ConfigDef.Type.BOOLEAN)
.withWidth(ConfigDef.Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription("Whether the connector should publish only post image events. The cdc settings must have `'postimage': 'true'`. To get full image with all fields, set `'preimage': 'full'`.");

public static final Field LOCAL_DC_NAME = Field.create("scylla.local.dc")
.withDisplayName("Local DC Name")
.withType(ConfigDef.Type.STRING)
Expand Down Expand Up @@ -124,7 +132,7 @@ public class ScyllaConnectorConfig extends CommonConnectorConfig {
.name("Scylla")
.type(CLUSTER_IP_ADDRESSES, USER, PASSWORD, LOGICAL_NAME, CONSISTENCY_LEVEL, LOCAL_DC_NAME)
.connector(QUERY_TIME_WINDOW_SIZE, CONFIDENCE_WINDOW_SIZE)
.events(TABLE_NAMES)
.events(TABLE_NAMES, POST_IMAGE_ONLY)
.excluding(Heartbeat.HEARTBEAT_INTERVAL).events(CUSTOM_HEARTBEAT_INTERVAL)
// Exclude some Debezium options, which are not applicable/not supported by
// the Scylla CDC Source Connector.
Expand Down Expand Up @@ -188,6 +196,10 @@ public CQLConfiguration.ConsistencyLevel getConsistencyLevel() {
}
}

public boolean isPostImageOnly() {
return config.getBoolean(ScyllaConnectorConfig.POST_IMAGE_ONLY, POST_IMAGE_ONLY_DEFAULT);
}

public String getLocalDCName() {
return config.getString(ScyllaConnectorConfig.LOCAL_DC_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,14 @@

import com.scylladb.cdc.cql.driver3.Driver3Session;
import com.scylladb.cdc.cql.driver3.Driver3WorkerCQL;
import com.scylladb.cdc.model.ExponentialRetryBackoffWithJitter;
import com.scylladb.cdc.model.RetryBackoff;
import com.scylladb.cdc.model.worker.WorkerConfiguration;
import com.scylladb.cdc.model.worker.Worker;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.util.Clock;
import org.apache.commons.lang3.tuple.Pair;

import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.sql.Driver;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -43,7 +37,7 @@ public void execute(ChangeEventSourceContext context) throws InterruptedExceptio
Driver3Session session = new ScyllaSessionBuilder(configuration).build();
Driver3WorkerCQL cql = new Driver3WorkerCQL(session);
ScyllaWorkerTransport workerTransport = new ScyllaWorkerTransport(context, offsetContext, dispatcher, configuration.getHeartbeatIntervalMs());
ScyllaChangesConsumer changeConsumer = new ScyllaChangesConsumer(dispatcher, offsetContext, schema, clock);
ScyllaChangesConsumer changeConsumer = new ScyllaChangesConsumer(dispatcher, offsetContext, schema, clock, configuration.isPostImageOnly());
WorkerConfiguration workerConfiguration = WorkerConfiguration.builder()
.withTransport(workerTransport)
.withCQL(cql)
Expand Down