Skip to content
This repository has been archived by the owner on Sep 13, 2024. It is now read-only.

Commit

Permalink
DBZ-6637 Support Geo types without srid
Browse files Browse the repository at this point in the history
  • Loading branch information
mfvitale committed Jul 12, 2023
1 parent 38c3296 commit 37fcf91
Show file tree
Hide file tree
Showing 32 changed files with 114 additions and 64 deletions.
24 changes: 12 additions & 12 deletions src/main/java/io/debezium/connector/jdbc/JdbcChangeEventSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import java.sql.SQLException;
import java.util.List;
import java.util.Objects;
import java.util.Set;

import org.apache.kafka.connect.data.Struct;
Expand Down Expand Up @@ -310,18 +311,17 @@ private void writeDelete(String sql, SinkRecordDescriptor record) throws SQLExce
}

private int bindKeyValuesToQuery(SinkRecordDescriptor record, NativeQuery<?> query, int index) {
switch (config.getPrimaryKeyMode()) {
case KAFKA:
query.setParameter(index++, record.getTopicName());
query.setParameter(index++, record.getPartition());
query.setParameter(index++, record.getOffset());
break;
default:
final Struct keySource = record.getKeyStruct(config.getPrimaryKeyMode());
if (keySource != null) {
index = bindFieldValuesToQuery(record, query, index, keySource, record.getKeyFieldNames());
}
break;

if (Objects.requireNonNull(config.getPrimaryKeyMode()) == JdbcSinkConnectorConfig.PrimaryKeyMode.KAFKA) {
query.setParameter(index++, record.getTopicName());
query.setParameter(index++, record.getPartition());
query.setParameter(index++, record.getOffset());
}
else {
final Struct keySource = record.getKeyStruct(config.getPrimaryKeyMode());
if (keySource != null) {
index = bindFieldValuesToQuery(record, query, index, keySource, record.getKeyFieldNames());
}
}
return index;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ public String getTypeName() {
return typeName;
}

public String getQueryBinding(ColumnDescriptor column) {
public String getQueryBinding(ColumnDescriptor column, Object value) {
if (queryBinding == null) {
queryBinding = type.getQueryBinding(column, schema);
queryBinding = type.getQueryBinding(column, schema, value);
}
return queryBinding;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import org.hibernate.SessionFactory;
import org.hibernate.StatelessSession;
Expand Down Expand Up @@ -621,7 +622,41 @@ protected void addColumnDefaultValue(FieldDescriptor field, StringBuilder column

protected String columnQueryBindingFromField(String fieldName, TableDescriptor table, SinkRecordDescriptor record) {
final String columnName = columnNameFromField(fieldName, record);
return record.getFields().get(fieldName).getQueryBinding(table.getColumnByName(columnName));

if (record.getNonKeyFieldNames().contains(columnName)) {
Struct source = record.getAfterStruct();
return record.getFields().get(fieldName).getQueryBinding(table.getColumnByName(columnName), source.get(fieldName));
}

Object value = getValueFromKeyField(fieldName, record, columnName);
return record.getFields().get(fieldName).getQueryBinding(table.getColumnByName(columnName), value);
}

private Object getValueFromKeyField(String fieldName, SinkRecordDescriptor record, String columnName) {

Object value;
if (connectorConfig.getPrimaryKeyMode() == JdbcSinkConnectorConfig.PrimaryKeyMode.KAFKA) {
value = getValueForColumn(columnName, record);
}
else {
final Struct source = record.getKeyStruct(connectorConfig.getPrimaryKeyMode());
value = source.get(fieldName);
}
return value;
}

private Object getValueForColumn(String columnName, SinkRecordDescriptor record) {

switch (columnName) {
case "__connect_topic":
return record.getTopicName();
case "__connect_partition":
return record.getPartition();
case "__connect_offset":
return record.getOffset();
default:
return null;
}
}

protected String columnNameFromField(String fieldName, SinkRecordDescriptor record) {
Expand Down Expand Up @@ -673,7 +708,7 @@ protected String getQualifiedTableName(TableId tableId) {
private String columnNameEqualsBinding(String fieldName, TableDescriptor table, SinkRecordDescriptor record) {
final ColumnDescriptor column = table.getColumnByName(columnNameFromField(fieldName, record));
final FieldDescriptor field = record.getFields().get(fieldName);
return toIdentifier(columnNamingStrategy.resolveColumnName(fieldName)) + "=" + field.getQueryBinding(column);
return toIdentifier(columnNamingStrategy.resolveColumnName(fieldName)) + "=" + field.getQueryBinding(column, record.getAfterStruct());
}

private static boolean isColumnNullable(String columnName, Collection<String> primaryKeyColumnNames, int nullability) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ public class GeometryType extends AbstractGeoType {
public static final Type INSTANCE = new GeometryType();

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
return "ST_GeomFromWKB(?, ?)";
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
return value == null ? "?" : "ST_GeomFromWKB(?, ?)";
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public String[] getRegistrationKeys() {
}

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
return "cast(? as json)";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ class MapToJsonType extends AbstractConnectMapType {
public static final MapToJsonType INSTANCE = new MapToJsonType();

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
return JsonType.INSTANCE.getQueryBinding(column, schema);
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
return JsonType.INSTANCE.getQueryBinding(column, schema, value);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public String[] getRegistrationKeys() {
}

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
if (isBitOne(schema)) {
final Optional<String> columnType = getSourceColumnType(schema);
if (columnType.isPresent() && "BIT".equals(columnType.get())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public String[] getRegistrationKeys() {
}

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
return "cast(? as citext)";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public String[] getRegistrationKeys() {
}

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
return "cast(? as cidr)";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public void configure(JdbcSinkConnectorConfig config, DatabaseDialect dialect) {
}

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
return String.format(GEO_FROM_WKB_FUNCTION, postgisSchema);
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
return value == null ? "?" : String.format(GEO_FROM_WKB_FUNCTION, postgisSchema);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public String[] getRegistrationKeys() {
}

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
return "cast(? as inet)";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public String[] getRegistrationKeys() {
}

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
return "cast(? as interval)";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public String[] getRegistrationKeys() {
}

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
if (isHstore(schema)) {
return "cast(? as hstore)";
// return super.getQueryBinding(schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public String[] getRegistrationKeys() {
}

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
return "cast(? as ltree)";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public String[] getRegistrationKeys() {
}

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
return "cast(? as macaddr)";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class MapToHstoreType extends AbstractConnectMapType {
public static final MapToHstoreType INSTANCE = new MapToHstoreType();

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
return "cast(? as hstore)";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public String[] getRegistrationKeys() {
}

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
return "cast(? as money)";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ class PointType extends GeometryType {
private static final String GEO_FROM_WKB_FUNCTION_AS_POINT = "cast(" + GEO_FROM_WKB_FUNCTION + " as point)";

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
return String.format(GEO_FROM_WKB_FUNCTION_AS_POINT, postgisSchema);
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
return value == null ? "?" : String.format(GEO_FROM_WKB_FUNCTION_AS_POINT, postgisSchema);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public String[] getRegistrationKeys() {
}

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
return "cast(? as " + getSourceColumnType(schema).orElseThrow() + ")";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public String[] getRegistrationKeys() {
}

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
return "cast(? as uuid)";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public String[] getRegistrationKeys() {
}

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
return "cast(? as xml)";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public String[] getRegistrationKeys() {
}

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
if (Bits.LOGICAL_NAME.equals(schema.name())) {
final int bitSize = Integer.parseInt(schema.parameters().get(Bits.LENGTH_FIELD));
return String.format("cast(? as %s)", bitSize > 1 ? String.format("varbinary(%d)", bitSize) : "bit");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public String[] getRegistrationKeys() {
}

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
return "cast(? as xml)";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/
package io.debezium.connector.jdbc.type;

import java.util.Base64;
import java.util.Optional;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
Expand All @@ -24,10 +24,13 @@ public int bind(Query<?> query, int index, Schema schema, Object value) {
}

if (value instanceof Struct) {
final int srid = ((Struct) value).getInt32(SRID);
// Default srid is 0 for both
// MySQL https://dev.mysql.com/doc/refman/8.0/en/spatial-reference-systems.html#:~:text=The%20SRS%20denoted%20in%20MySQL,for%20spatial%20data%20in%20MySQL.
// PostgreSQL https://postgis.net/docs/using_postgis_dbmanagement.html#spatial_ref_sys_table
final Integer srid = Optional.ofNullable(((Struct) value).getInt32(SRID)).orElse(0);
final byte[] wkb = ((Struct) value).getBytes(WKB);

query.setParameter(index, Base64.getDecoder().decode(wkb));
query.setParameter(index, wkb);
query.setParameter(index + 1, srid);
return 2;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void configure(JdbcSinkConnectorConfig config, DatabaseDialect dialect) {
}

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
return getDialect().getQueryBindingWithValueCast(column, schema, this);
}

Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/debezium/connector/jdbc/type/Type.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ public interface Type {
*
* @param column column descriptor in the table relational model, never {@code null}
* @param schema field schema, never {@code null}
* @param value value to be bound, may be {@code null}
* @return query parameter argument binding SQL fragment
*/
String getQueryBinding(ColumnDescriptor column, Schema schema);
String getQueryBinding(ColumnDescriptor column, Schema schema, Object value);

/**
* Resolve the default value clause value.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public class ConnectMapToConnectStringType extends AbstractConnectMapType {
public static final ConnectMapToConnectStringType INSTANCE = new ConnectMapToConnectStringType();

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
return ConnectStringType.INSTANCE.getQueryBinding(column, schema);
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
return ConnectStringType.INSTANCE.getQueryBinding(column, schema, value);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public String[] getRegistrationKeys() {
}

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
return getDialect().getTimeQueryBinding();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
public abstract class AbstractDebeziumTimeType extends AbstractTimeType {

@Override
public String getQueryBinding(ColumnDescriptor column, Schema schema) {
public String getQueryBinding(ColumnDescriptor column, Schema schema, Object value) {
return getDialect().getTimeQueryBinding();
}

Expand Down
Loading

0 comments on commit 37fcf91

Please sign in to comment.