Skip to content

Commit d8d2087

Browse files
committed
linter
1 parent b679c2d commit d8d2087

File tree

3 files changed

+78
-15
lines changed

3 files changed

+78
-15
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/JdbcSourceFetchTaskContext.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ public boolean isDataChangeRecord(SourceRecord record) {
7777
public boolean isRecordBetween(SourceRecord record, Object[] splitStart, Object[] splitEnd) {
7878
LOG.info("Check record {} between {} and {}", record, splitStart, splitEnd);
7979
RowType splitKeyType = getSplitType(getDatabaseSchema().tableFor(this.getTableId(record)));
80+
LOG.info("Split key type: {}", splitKeyType);
8081
Object[] key = SourceRecordUtils.getSplitKey(splitKeyType, record, getSchemaNameAdjuster());
8182
return SourceRecordUtils.splitKeyRangeContains(key, splitStart, splitEnd);
8283
}

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/utils/SourceRecordUtils.java

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.apache.kafka.connect.data.Schema;
2929
import org.apache.kafka.connect.data.Struct;
3030
import org.apache.kafka.connect.source.SourceRecord;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
3133

3234
import java.io.IOException;
3335
import java.math.BigDecimal;
@@ -43,6 +45,7 @@
4345

4446
/** Utility class to deal record. */
4547
public class SourceRecordUtils {
48+
private static final Logger LOG = LoggerFactory.getLogger(SourceRecordUtils.class);
4649

4750
private SourceRecordUtils() {}
4851

@@ -131,15 +134,86 @@ public static TableId getTableId(SourceRecord dataRecord) {
131134

132135
public static Object[] getSplitKey(
133136
RowType splitBoundaryType, SourceRecord dataRecord, SchemaNameAdjuster nameAdjuster) {
134-
// the split key field contains single field now
135137
String splitFieldName = nameAdjuster.adjust(splitBoundaryType.getFieldNames().get(0));
136-
Struct key = (Struct) dataRecord.key();
137-
return new Object[] {key.get(splitFieldName)};
138+
139+
// Try primary key struct first (for backward compatibility)
140+
Struct keyStruct = (Struct) dataRecord.key();
141+
if (keyStruct != null && keyStruct.schema().field(splitFieldName) != null) {
142+
return new Object[] {keyStruct.get(splitFieldName)};
143+
}
144+
145+
// For non-primary key chunk keys, use value-based approach
146+
return getSplitKeyFromValue(dataRecord, splitFieldName);
147+
}
148+
149+
/** Extract chunk key from value struct (AFTER/BEFORE) for non-primary key chunk keys. */
150+
private static Object[] getSplitKeyFromValue(SourceRecord dataRecord, String splitFieldName) {
151+
Struct value = (Struct) dataRecord.value();
152+
if (value == null) {
153+
return null; // No value struct available
154+
}
155+
156+
String op = value.getString(Envelope.FieldName.OPERATION);
157+
Struct targetStruct = null;
158+
159+
if (op == null) {
160+
// READ operation (snapshot)
161+
targetStruct = value.getStruct(Envelope.FieldName.AFTER);
162+
} else {
163+
switch (op) {
164+
case "c": // CREATE
165+
case "r": // READ
166+
targetStruct = value.getStruct(Envelope.FieldName.AFTER);
167+
break;
168+
case "u": // UPDATE - prefer AFTER for current state
169+
targetStruct = value.getStruct(Envelope.FieldName.AFTER);
170+
if (targetStruct == null
171+
|| targetStruct.schema().field(splitFieldName) == null) {
172+
// Fallback to BEFORE if AFTER doesn't have the field
173+
targetStruct = value.getStruct(Envelope.FieldName.BEFORE);
174+
}
175+
break;
176+
case "d": // DELETE - use BEFORE, but fallback if missing
177+
targetStruct = value.getStruct(Envelope.FieldName.BEFORE);
178+
if (targetStruct == null
179+
|| targetStruct.schema().field(splitFieldName) == null) {
180+
// For DELETE with missing chunk key, return null to indicate "emit without
181+
// filtering"
182+
return null;
183+
}
184+
break;
185+
default:
186+
throw new IllegalArgumentException("Unknown operation: " + op);
187+
}
188+
}
189+
190+
if (targetStruct == null || targetStruct.schema().field(splitFieldName) == null) {
191+
// Chunk key field not found in value struct
192+
// This could happen with schema changes or configuration issues
193+
LOG.debug(
194+
"Chunk key field '{}' not found in record, emitting without filtering. Table: {}, Operation: {}",
195+
splitFieldName,
196+
getTableId(dataRecord),
197+
dataRecord.value() != null
198+
? ((Struct) dataRecord.value()).getString(Envelope.FieldName.OPERATION)
199+
: "unknown");
200+
return null;
201+
}
202+
203+
return new Object[] {targetStruct.get(splitFieldName)};
138204
}
139205

140206
/** Returns the specific key contains in the split key range or not. */
141207
public static boolean splitKeyRangeContains(
142208
Object[] key, Object[] splitKeyStart, Object[] splitKeyEnd) {
209+
// If key is null, chunk key field was not found (e.g., DELETE with non-primary key chunk
210+
// key)
211+
// Emit the record without filtering to prevent data loss
212+
if (key == null) {
213+
LOG.debug("Chunk key is null, emitting record without filtering");
214+
return true;
215+
}
216+
143217
// for all range
144218
if (splitKeyStart == null && splitKeyEnd == null) {
145219
return true;

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@
4949
import org.slf4j.Logger;
5050
import org.slf4j.LoggerFactory;
5151

52-
import java.lang.reflect.Constructor;
5352
import java.time.Duration;
5453
import java.util.List;
5554
import java.util.Properties;
@@ -313,17 +312,6 @@ public PostgresSourceBuilder<T> lsnCommitCheckpointsDelay(int lsnCommitDelay) {
313312
*/
314313
public PostgresIncrementalSource<T> build() {
315314
PostgresOffsetFactory offsetFactory = new PostgresOffsetFactory();
316-
// get params of jdbcsourceconfig
317-
318-
Constructor<?>[] constructors = JdbcSourceConfig.class.getDeclaredConstructors();
319-
for (Constructor<?> constructor : constructors) {
320-
// Get params type of constructor
321-
Class<?>[] parameterTypes = constructor.getParameterTypes();
322-
LOG.info("Constructor: " + constructor + ", parameter types: ");
323-
for (Class<?> paramType : parameterTypes) {
324-
LOG.info(" \n " + paramType.getName());
325-
}
326-
}
327315
PostgresDialect dialect = new PostgresDialect(configFactory.create(0));
328316

329317
PostgresIncrementalSource<T> source =

0 commit comments

Comments
 (0)