Skip to content

Commit cf08b66

Browse files
committed
fix ci.
1 parent f1933b8 commit cf08b66

File tree

3 files changed

+19
-17
lines changed

3 files changed

+19
-17
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,9 @@ public void processElement(StreamRecord<Event> streamRecord) throws Exception {
189189
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
190190
Schema schema =
191191
SchemaUtils.applySchemaChangeEvent(
192-
schemaMaps.get(schemaChangeEvent.tableId()).getSchema(),
192+
Optional.ofNullable(schemaMaps.get(schemaChangeEvent.tableId()))
193+
.map(TableSchemaInfo::getSchema)
194+
.orElse(null),
193195
schemaChangeEvent);
194196
schemaMaps.put(schemaChangeEvent.tableId(), new TableSchemaInfo(schema, zoneId));
195197
// Broadcast SchemachangeEvent.

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToPaimonE2eITCase.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,9 @@ public void before() throws Exception {
9595
MountableFile.forHostPath(
9696
TestUtils.getResource(getPaimonSQLConnectorResourceName())),
9797
sharedVolume.toString() + "/" + getPaimonSQLConnectorResourceName());
98+
jobManager.copyFileToContainer(
99+
MountableFile.forHostPath(TestUtils.getResource("flink-shade-hadoop.jar")),
100+
sharedVolume.toString() + "/flink-shade-hadoop.jar");
98101
}
99102

100103
@After
@@ -141,11 +144,11 @@ public void testSyncWholeDatabase() throws Exception {
141144
database,
142145
"products",
143146
Arrays.asList(
144-
"101, One, Alice, 3.202, red, {\"key1\":\"value1\"}, null",
145-
"102, Two, Bob, 1.703, white, {\"key2\":\"value2\"}, null",
146-
"103, Three, Cecily, 4.105, red, {\"key3\":\"value3\"}, null",
147-
"104, Four, Derrida, 1.857, white, {\"key4\":\"value4\"}, null",
148-
"105, Five, Evelyn, 5.211, red, {\"K\":\"V\",\"k\":\"v\"}, null",
147+
"101, One, Alice, 3.202, red, {\"key1\": \"value1\"}, null",
148+
"102, Two, Bob, 1.703, white, {\"key2\": \"value2\"}, null",
149+
"103, Three, Cecily, 4.105, red, {\"key3\": \"value3\"}, null",
150+
"104, Four, Derrida, 1.857, white, {\"key4\": \"value4\"}, null",
151+
"105, Five, Evelyn, 5.211, red, {\"K\": \"V\", \"k\": \"v\"}, null",
149152
"106, Six, Ferris, 9.813, null, null, null",
150153
"107, Seven, Grace, 2.117, null, null, null",
151154
"108, Eight, Hesse, 6.819, null, null, null",
@@ -194,10 +197,10 @@ public void testSyncWholeDatabase() throws Exception {
194197
List<String> recordsInSnapshotPhase =
195198
new ArrayList<>(
196199
Arrays.asList(
197-
"102, Two, Bob, 1.703, white, {\"key2\":\"value2\"}, null, null, null, null, null, null, null, null, null, null",
198-
"103, Three, Cecily, 4.105, red, {\"key3\":\"value3\"}, null, null, null, null, null, null, null, null, null, null",
199-
"104, Four, Derrida, 1.857, white, {\"key4\":\"value4\"}, null, null, null, null, null, null, null, null, null, null",
200-
"105, Five, Evelyn, 5.211, red, {\"K\":\"V\",\"k\":\"v\"}, null, null, null, null, null, null, null, null, null, null",
200+
"102, Two, Bob, 1.703, white, {\"key2\": \"value2\"}, null, null, null, null, null, null, null, null, null, null",
201+
"103, Three, Cecily, 4.105, red, {\"key3\": \"value3\"}, null, null, null, null, null, null, null, null, null, null",
202+
"104, Four, Derrida, 1.857, white, {\"key4\": \"value4\"}, null, null, null, null, null, null, null, null, null, null",
203+
"105, Five, Evelyn, 5.211, red, {\"K\": \"V\", \"k\": \"v\"}, null, null, null, null, null, null, null, null, null, null",
201204
"106, Six, Fay, 9.813, null, null, null, null, null, null, null, null, null, null, null, null",
202205
"107, Seven, Grace, 5.125, null, null, null, null, null, null, null, null, null, null, null, null",
203206
"108, Eight, Hesse, 6.819, null, null, null, null, null, null, null, null, null, null, null, null",
@@ -282,6 +285,8 @@ private List<String> fetchPaimonTableRows(String warehouse, String database, Str
282285
"/opt/flink/bin/sql-client.sh",
283286
"--jar",
284287
sharedVolume.toString() + "/" + getPaimonSQLConnectorResourceName(),
288+
"--jar",
289+
sharedVolume.toString() + "/flink-shade-hadoop.jar",
285290
"-f",
286291
containerSqlPath);
287292
if (result.getExitCode() != 0) {

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/utils/PipelineTestEnvironment.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@
6060
import java.nio.file.Files;
6161
import java.nio.file.Path;
6262
import java.time.Duration;
63-
import java.util.ArrayList;
6463
import java.util.Arrays;
6564
import java.util.Collection;
6665
import java.util.Collections;
@@ -142,14 +141,9 @@ public static List<String> getFlinkVersion() {
142141
public void before() throws Exception {
143142
LOG.info("Starting containers...");
144143
jobManagerConsumer = new ToStringConsumer();
145-
List<String> cmds = new ArrayList<>();
146-
for (String prop : EXTERNAL_PROPS) {
147-
cmds.add(String.format("echo '%s' >> /opt/flink/conf/flink-conf.yaml.tmp", prop));
148-
}
149-
cmds.add("mv /opt/flink/conf/flink-conf.yaml.tmp /opt/flink/conf/flink-conf.yaml");
150-
String preCmd = String.join(" && ", cmds);
151144
jobManager =
152145
new GenericContainer<>(getFlinkDockerImageTag())
146+
.withCommand("jobmanager")
153147
.withNetwork(NETWORK)
154148
.withNetworkAliases(INTER_CONTAINER_JM_ALIAS)
155149
.withExposedPorts(JOB_MANAGER_REST_PORT)
@@ -163,6 +157,7 @@ public void before() throws Exception {
163157
taskManagerConsumer = new ToStringConsumer();
164158
taskManager =
165159
new GenericContainer<>(getFlinkDockerImageTag())
160+
.withCommand("taskmanager")
166161
.withNetwork(NETWORK)
167162
.withNetworkAliases(INTER_CONTAINER_TM_ALIAS)
168163
.withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)

0 commit comments

Comments
 (0)