Skip to content

Commit 0ea00ea

Browse files
wangyang0918kl0u
authored andcommitted
[FLINK-13758][client] Support to register DFS files as distributed cache
All the Flink Standalone, Yarn, Mesos, Kubernetes session clusters are using RestClusterClient#submitJob to submit a job to an existing session. Before this commit, the Flink client will hang when trying to register DFS artifacts as distributed cache for session cluster.
1 parent 90ad2e6 commit 0ea00ea

File tree

3 files changed

+45
-6
lines changed

3 files changed

+45
-6
lines changed

flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,8 +343,17 @@ public CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGra
343343
}
344344

345345
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> artifacts : jobGraph.getUserArtifacts().entrySet()) {
346-
artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new Path(artifacts.getValue().filePath).getName()));
347-
filesToUpload.add(new FileUpload(Paths.get(artifacts.getValue().filePath), RestConstants.CONTENT_TYPE_BINARY));
346+
final Path artifactFilePath = new Path(artifacts.getValue().filePath);
347+
try {
348+
// Only local artifacts need to be uploaded.
349+
if (!artifactFilePath.getFileSystem().isDistributedFS()) {
350+
artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), artifactFilePath.getName()));
351+
filesToUpload.add(new FileUpload(Paths.get(artifacts.getValue().filePath), RestConstants.CONTENT_TYPE_BINARY));
352+
}
353+
} catch (IOException e) {
354+
throw new CompletionException(
355+
new FlinkException("Failed to get the FileSystem of artifact " + artifactFilePath + ".", e));
356+
}
348357
}
349358

350359
final JobSubmitRequestBody requestBody = new JobSubmitRequestBody(

flink-fs-tests/src/test/java/org/apache/flink/hdfstests/DistributedCacheDfsTest.java

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,12 @@
1919
package org.apache.flink.hdfstests;
2020

2121
import org.apache.flink.api.common.functions.RichMapFunction;
22+
import org.apache.flink.client.program.rest.RestClusterClient;
2223
import org.apache.flink.core.fs.FileStatus;
2324
import org.apache.flink.core.fs.FileSystem;
2425
import org.apache.flink.core.fs.Path;
26+
import org.apache.flink.runtime.jobgraph.JobGraph;
27+
import org.apache.flink.runtime.jobmaster.JobResult;
2528
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
2629
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2730
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
@@ -120,8 +123,36 @@ public static void teardown() {
120123
}
121124

122125
@Test
123-
public void testDistributeFileViaDFS() throws Exception {
126+
public void testDistributedFileViaDFS() throws Exception {
127+
createJobWithRegisteredCachedFiles().execute("Distributed Cache Via Blob Test Program");
128+
}
129+
130+
/**
131+
* All the Flink Standalone, Yarn, Mesos, Kubernetes sessions are using {@link RestClusterClient#submitJob(JobGraph)}
132+
* to submit a job to an existing session. This test will cover this cases.
133+
*/
134+
@Test(timeout = 30000)
135+
public void testSubmittingJobViaRestClusterClient() throws Exception {
136+
RestClusterClient<String> restClusterClient = new RestClusterClient<>(
137+
MINI_CLUSTER_RESOURCE.getClientConfiguration(),
138+
"testSubmittingJobViaRestClusterClient");
139+
140+
final JobGraph jobGraph = createJobWithRegisteredCachedFiles()
141+
.getStreamGraph()
142+
.getJobGraph();
143+
144+
final JobResult jobResult = restClusterClient
145+
.submitJob(jobGraph)
146+
.thenCompose(restClusterClient::requestJobResult)
147+
.get();
148+
149+
final String messageInCaseOfFailure = jobResult.getSerializedThrowable().isPresent() ?
150+
jobResult.getSerializedThrowable().get().getFullStringifiedStackTrace()
151+
: "Job failed.";
152+
assertTrue(messageInCaseOfFailure, jobResult.isSuccess());
153+
}
124154

155+
private StreamExecutionEnvironment createJobWithRegisteredCachedFiles() {
125156
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
126157
env.setParallelism(1);
127158

@@ -131,8 +162,7 @@ public void testDistributeFileViaDFS() throws Exception {
131162
env.fromElements(1)
132163
.map(new TestMapFunction())
133164
.addSink(new DiscardingSink<>());
134-
135-
env.execute("Distributed Cache Via Blob Test Program");
165+
return env;
136166
}
137167

138168
private static class TestMapFunction extends RichMapFunction<Integer, String> {

flink-runtime/src/main/java/org/apache/flink/runtime/client/ClientUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ public static void uploadJobGraphFiles(
8080
throw new FlinkException("Could not upload job files.", ioe);
8181
}
8282
}
83+
jobGraph.writeUserArtifactEntriesToConfiguration();
8384
}
8485

8586
/**
@@ -137,6 +138,5 @@ private static void setUserArtifactBlobKeys(JobGraph jobGraph, Collection<Tuple2
137138
for (Tuple2<String, PermanentBlobKey> blobKey : blobKeys) {
138139
jobGraph.setUserArtifactBlobKey(blobKey.f0, blobKey.f1);
139140
}
140-
jobGraph.writeUserArtifactEntriesToConfiguration();
141141
}
142142
}

0 commit comments

Comments
 (0)