Skip to content

Commit 13a2921

Browse files
committed
Checkpoint 58 - Remove unnecessary changes
1 parent 5559469 commit 13a2921

File tree

4 files changed

+3
-14
lines changed

4 files changed

+3
-14
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-hudi/src/main/java/org/apache/flink/cdc/connectors/hudi/sink/coordinator/MultiTableStreamWriteOperatorCoordinator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -565,7 +565,8 @@ private void handleEnhancedWriteMetadataEvent(EnhancedWriteMetadataEvent enhance
565565
TableId tableId = pathToTableId.get(tablePath);
566566

567567
if (tableId == null) {
568-
LOG.warn("No tableId found for path: {}. Cannot process event.", tablePath);
568+
LOG.error("No tableId found for path: {}. Cannot process event.", tablePath);
569+
context.failJob(new IllegalStateException("No tableId found for path: " + tablePath));
569570
return;
570571
}
571572

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToHudiE2eITCase.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -812,11 +812,6 @@ public void waitUntilJobState(JobID jobId, Duration timeout, JobStatus expectedS
812812
JobStatusMessage message = optMessage.get();
813813
JobStatus jobStatus = message.getJobState();
814814
if (!expectedStatus.isTerminalState() && jobStatus.isTerminalState()) {
815-
try {
816-
Thread.sleep(50000);
817-
} catch (InterruptedException e) {
818-
throw new RuntimeException(e);
819-
}
820815
throw new ValidationException(
821816
String.format(
822817
"Job has been terminated! JobName: %s, JobID: %s, Status: %s",

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToIcebergE2eITCase.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,6 @@ public void testSyncWholeDatabase() throws Exception {
243243
LOG.error("Update table for CDC failed.", e);
244244
throw e;
245245
}
246-
247246
List<String> recordsInSnapshotPhase =
248247
new ArrayList<>(
249248
Arrays.asList(
@@ -262,7 +261,6 @@ public void testSyncWholeDatabase() throws Exception {
262261
recordsInSnapshotPhase =
263262
recordsInSnapshotPhase.stream().sorted().collect(Collectors.toList());
264263
validateSinkResult(warehouse, database, "products", recordsInSnapshotPhase);
265-
Thread.sleep(3600000L);
266264
}
267265

268266
/**

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ private int getParallelism() {
135135
"blob.server.port: 6124",
136136
"taskmanager.numberOfTaskSlots: 10",
137137
"parallelism.default: 4",
138-
"execution.checkpointing.interval: 30s",
138+
"execution.checkpointing.interval: 300",
139139
"state.backend.type: hashmap",
140140
"env.java.opts.all: -Doracle.jdbc.timezoneAsRegion=false",
141141
"execution.checkpointing.savepoint-dir: file:///opt/flink",
@@ -389,11 +389,6 @@ public void waitUntilJobState(Duration timeout, JobStatus expectedStatus) {
389389
JobStatusMessage message = jobStatusMessages.iterator().next();
390390
JobStatus jobStatus = message.getJobState();
391391
if (!expectedStatus.isTerminalState() && jobStatus.isTerminalState()) {
392-
try {
393-
Thread.sleep(50000);
394-
} catch (InterruptedException e) {
395-
throw new RuntimeException(e);
396-
}
397392
throw new ValidationException(
398393
String.format(
399394
"Job has been terminated! JobName: %s, JobID: %s, Status: %s",

0 commit comments

Comments
 (0)