Skip to content

Commit

Permalink
[FLINK-29305][tests] UpsertTestSinkWriter creates parent directories
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Sep 15, 2022
1 parent d3513d9 commit 33afc3c
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -53,6 +54,11 @@ class UpsertTestSinkWriter<IN> implements SinkWriter<IN> {
this.keySerializationSchema = checkNotNull(keySerializationSchema);
this.valueSerializationSchema = checkNotNull(valueSerializationSchema);
checkNotNull(outputFile);
try {
Files.createDirectories(outputFile.toPath().getParent());
} catch (IOException e) {
throw new FlinkRuntimeException("Could not parent directories for path: " + outputFile);
}
try {
this.bufferedOutputStream =
new BufferedOutputStream(new FileOutputStream(outputFile, true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
Expand All @@ -60,7 +61,7 @@ class UpsertTestSinkWriterITCase {

@BeforeEach
void setup() {
outputFile = new File(tempDir, "records.out");
outputFile = tempDir.toPath().resolve(Paths.get("dir", "records.out")).toFile();
writer = createSinkWriter(outputFile);
expectedRecords = writeTestData(writer);
}
Expand Down

0 comments on commit 33afc3c

Please sign in to comment.