Skip to content

Commit 053003a

Browse files
committed
Update Dataflow archetypes to be compatible with Apache Beam 2.2.0 changes.
1 parent 049df57 commit 053003a

File tree

3 files changed

+122
-117
lines changed

3 files changed

+122
-117
lines changed

maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/common/WriteOneFilePerWindow.java

Lines changed: 44 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -17,21 +17,24 @@
1717
*/
1818
package ${package}.common;
1919

20-
import static com.google.common.base.Verify.verifyNotNull;
20+
import static com.google.common.base.MoreObjects.firstNonNull;
2121

22-
import javax.annotation.Nullable;
23-
import org.apache.beam.sdk.io.FileBasedSink;
24-
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
25-
import org.apache.beam.sdk.io.TextIO;
26-
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
27-
import org.apache.beam.sdk.io.fs.ResourceId;
28-
import org.apache.beam.sdk.transforms.DoFn;
29-
import org.apache.beam.sdk.transforms.PTransform;
30-
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
31-
import org.apache.beam.sdk.values.PCollection;
32-
import org.apache.beam.sdk.values.PDone;
33-
import org.joda.time.format.DateTimeFormatter;
34-
import org.joda.time.format.ISODateTimeFormat;
22+
import javax.annotation.Nullable;
23+
import org.apache.beam.sdk.io.FileBasedSink;
24+
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
25+
import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
26+
import org.apache.beam.sdk.io.TextIO;
27+
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
28+
import org.apache.beam.sdk.io.fs.ResourceId;
29+
import org.apache.beam.sdk.transforms.DoFn;
30+
import org.apache.beam.sdk.transforms.PTransform;
31+
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
32+
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
33+
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
34+
import org.apache.beam.sdk.values.PCollection;
35+
import org.apache.beam.sdk.values.PDone;
36+
import org.joda.time.format.DateTimeFormatter;
37+
import org.joda.time.format.ISODateTimeFormat;
3538

3639
/**
3740
* A {@link DoFn} that writes elements to files with names deterministically derived from the lower
@@ -53,22 +56,12 @@ public WriteOneFilePerWindow(String filenamePrefix, Integer numShards) {
5356

5457
@Override
5558
public PDone expand(PCollection<String> input) {
56-
// filenamePrefix may contain a directory and a filename component. Pull out only the filename
57-
// component from that path for the PerWindowFiles.
58-
String prefix = "";
5959
ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
60-
if (!resource.isDirectory()) {
61-
prefix = verifyNotNull(
62-
resource.getFilename(),
63-
"A non-directory resource should have a non-null filename: %s",
64-
resource);
65-
}
66-
67-
68-
TextIO.Write write = TextIO.write()
69-
.to(resource.getCurrentDirectory())
70-
.withFilenamePolicy(new PerWindowFiles(prefix))
71-
.withWindowedWrites();
60+
TextIO.Write write =
61+
TextIO.write()
62+
.to(new PerWindowFiles(resource))
63+
.withTempDirectory(resource.getCurrentDirectory())
64+
.withWindowedWrites();
7265
if (numShards != null) {
7366
write = write.withNumShards(numShards);
7467
}
@@ -83,31 +76,41 @@ public PDone expand(PCollection<String> input) {
8376
*/
8477
public static class PerWindowFiles extends FilenamePolicy {
8578

86-
private final String prefix;
79+
private final ResourceId baseFilename;
8780

88-
public PerWindowFiles(String prefix) {
89-
this.prefix = prefix;
81+
public PerWindowFiles(ResourceId baseFilename) {
82+
this.baseFilename = baseFilename;
9083
}
9184

9285
public String filenamePrefixForWindow(IntervalWindow window) {
86+
String prefix =
87+
baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), "");
9388
return String.format("%s-%s-%s",
9489
prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
9590
}
9691

9792
@Override
98-
public ResourceId windowedFilename(
99-
ResourceId outputDirectory, WindowedContext context, String extension) {
100-
IntervalWindow window = (IntervalWindow) context.getWindow();
101-
String filename = String.format(
102-
"%s-%s-of-%s%s",
103-
filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
104-
extension);
105-
return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
93+
public ResourceId windowedFilename(int shardNumber,
94+
int numShards,
95+
BoundedWindow window,
96+
PaneInfo paneInfo,
97+
OutputFileHints outputFileHints) {
98+
IntervalWindow intervalWindow = (IntervalWindow) window;
99+
String filename =
100+
String.format(
101+
"%s-%s-of-%s%s",
102+
filenamePrefixForWindow(intervalWindow),
103+
shardNumber,
104+
numShards,
105+
outputFileHints.getSuggestedFilenameSuffix());
106+
return baseFilename
107+
.getCurrentDirectory()
108+
.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
106109
}
107110

108111
@Override
109112
public ResourceId unwindowedFilename(
110-
ResourceId outputDirectory, Context context, String extension) {
113+
int shardNumber, int numShards, OutputFileHints outputFileHints) {
111114
throw new UnsupportedOperationException("Unsupported.");
112115
}
113116
}

maven-archetypes/examples-java8/src/main/resources/archetype-resources/src/main/java/complete/game/utils/WriteToText.java

Lines changed: 47 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -17,30 +17,31 @@
1717
*/
1818
package ${package}.complete.game.utils;
1919

20-
import static com.google.common.base.Preconditions.checkArgument;
21-
import static com.google.common.base.Verify.verifyNotNull;
22-
23-
import java.io.Serializable;
24-
import java.util.ArrayList;
25-
import java.util.List;
26-
import java.util.Map;
27-
import java.util.TimeZone;
28-
import java.util.stream.Collectors;
29-
import org.apache.beam.sdk.io.FileBasedSink;
30-
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
31-
import org.apache.beam.sdk.io.TextIO;
32-
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
33-
import org.apache.beam.sdk.io.fs.ResourceId;
34-
import org.apache.beam.sdk.transforms.DoFn;
35-
import org.apache.beam.sdk.transforms.PTransform;
36-
import org.apache.beam.sdk.transforms.ParDo;
37-
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
38-
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
39-
import org.apache.beam.sdk.values.PCollection;
40-
import org.apache.beam.sdk.values.PDone;
41-
import org.joda.time.DateTimeZone;
42-
import org.joda.time.format.DateTimeFormat;
43-
import org.joda.time.format.DateTimeFormatter;
20+
import static com.google.common.base.Preconditions.checkArgument;
21+
22+
import java.io.Serializable;
23+
import java.util.ArrayList;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.TimeZone;
27+
import java.util.stream.Collectors;
28+
import org.apache.beam.sdk.io.FileBasedSink;
29+
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
30+
import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
31+
import org.apache.beam.sdk.io.TextIO;
32+
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
33+
import org.apache.beam.sdk.io.fs.ResourceId;
34+
import org.apache.beam.sdk.transforms.DoFn;
35+
import org.apache.beam.sdk.transforms.PTransform;
36+
import org.apache.beam.sdk.transforms.ParDo;
37+
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
38+
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
39+
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
40+
import org.apache.beam.sdk.values.PCollection;
41+
import org.apache.beam.sdk.values.PDone;
42+
import org.joda.time.DateTimeZone;
43+
import org.joda.time.format.DateTimeFormat;
44+
import org.joda.time.format.DateTimeFormatter;
4445

4546
/**
4647
* Generate, format, and write rows. Use provided information about the field names and types, as
@@ -111,21 +112,12 @@ public PDone expand(PCollection<String> input) {
111112
checkArgument(
112113
input.getWindowingStrategy().getWindowFn().windowCoder() == IntervalWindow.getCoder());
113114

114-
// filenamePrefix may contain a directory and a filename component. Pull out only the filename
115-
// component from that path for the PerWindowFiles.
116-
String prefix = "";
117115
ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
118-
if (!resource.isDirectory()) {
119-
prefix = verifyNotNull(
120-
resource.getFilename(),
121-
"A non-directory resource should have a non-null filename: %s",
122-
resource);
123-
}
124116

125117
return input.apply(
126118
TextIO.write()
127-
.to(resource.getCurrentDirectory())
128-
.withFilenamePolicy(new PerWindowFiles(prefix))
119+
.to(new PerWindowFiles(resource))
120+
.withTempDirectory(resource.getCurrentDirectory())
129121
.withWindowedWrites()
130122
.withNumShards(3));
131123
}
@@ -139,31 +131,38 @@ public PDone expand(PCollection<String> input) {
139131
*/
140132
protected static class PerWindowFiles extends FilenamePolicy {
141133

142-
private final String prefix;
134+
private final ResourceId prefix;
143135

144-
public PerWindowFiles(String prefix) {
136+
public PerWindowFiles(ResourceId prefix) {
145137
this.prefix = prefix;
146138
}
147139

148140
public String filenamePrefixForWindow(IntervalWindow window) {
149-
return String.format("%s-%s-%s",
150-
prefix, formatter.print(window.start()), formatter.print(window.end()));
141+
String filePrefix = prefix.isDirectory() ? "" : prefix.getFilename();
142+
return String.format(
143+
"%s-%s-%s", filePrefix, formatter.print(window.start()), formatter.print(window.end()));
151144
}
152145

153146
@Override
154-
public ResourceId windowedFilename(
155-
ResourceId outputDirectory, WindowedContext context, String extension) {
156-
IntervalWindow window = (IntervalWindow) context.getWindow();
157-
String filename = String.format(
158-
"%s-%s-of-%s%s",
159-
filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
160-
extension);
161-
return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
147+
public ResourceId windowedFilename(int shardNumber,
148+
int numShards,
149+
BoundedWindow window,
150+
PaneInfo paneInfo,
151+
OutputFileHints outputFileHints) {
152+
IntervalWindow intervalWindow = (IntervalWindow) window;
153+
String filename =
154+
String.format(
155+
"%s-%s-of-%s%s",
156+
filenamePrefixForWindow(intervalWindow),
157+
shardNumber,
158+
numShards,
159+
outputFileHints.getSuggestedFilenameSuffix());
160+
return prefix.getCurrentDirectory().resolve(filename, StandardResolveOptions.RESOLVE_FILE);
162161
}
163162

164163
@Override
165164
public ResourceId unwindowedFilename(
166-
ResourceId outputDirectory, Context context, String extension) {
165+
int shardNumber, int numShards, OutputFileHints outputFileHints) {
167166
throw new UnsupportedOperationException("Unsupported.");
168167
}
169168
}

maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/common/WriteOneFilePerWindow.java

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,20 @@
1717
*/
1818
package ${package}.common;
1919

20-
import static com.google.common.base.Verify.verifyNotNull;
20+
import static com.google.common.base.MoreObjects.firstNonNull;
2121

2222
import javax.annotation.Nullable;
2323
import org.apache.beam.sdk.io.FileBasedSink;
2424
import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
25+
import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
2526
import org.apache.beam.sdk.io.TextIO;
2627
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
2728
import org.apache.beam.sdk.io.fs.ResourceId;
2829
import org.apache.beam.sdk.transforms.DoFn;
2930
import org.apache.beam.sdk.transforms.PTransform;
31+
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
3032
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
33+
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
3134
import org.apache.beam.sdk.values.PCollection;
3235
import org.apache.beam.sdk.values.PDone;
3336
import org.joda.time.format.DateTimeFormatter;
@@ -53,22 +56,12 @@ public WriteOneFilePerWindow(String filenamePrefix, Integer numShards) {
5356

5457
@Override
5558
public PDone expand(PCollection<String> input) {
56-
// filenamePrefix may contain a directory and a filename component. Pull out only the filename
57-
// component from that path for the PerWindowFiles.
58-
String prefix = "";
5959
ResourceId resource = FileBasedSink.convertToFileResourceIfPossible(filenamePrefix);
60-
if (!resource.isDirectory()) {
61-
prefix = verifyNotNull(
62-
resource.getFilename(),
63-
"A non-directory resource should have a non-null filename: %s",
64-
resource);
65-
}
66-
67-
68-
TextIO.Write write = TextIO.write()
69-
.to(resource.getCurrentDirectory())
70-
.withFilenamePolicy(new PerWindowFiles(prefix))
71-
.withWindowedWrites();
60+
TextIO.Write write =
61+
TextIO.write()
62+
.to(new PerWindowFiles(resource))
63+
.withTempDirectory(resource.getCurrentDirectory())
64+
.withWindowedWrites();
7265
if (numShards != null) {
7366
write = write.withNumShards(numShards);
7467
}
@@ -83,31 +76,41 @@ public PDone expand(PCollection<String> input) {
8376
*/
8477
public static class PerWindowFiles extends FilenamePolicy {
8578

86-
private final String prefix;
79+
private final ResourceId baseFilename;
8780

88-
public PerWindowFiles(String prefix) {
89-
this.prefix = prefix;
81+
public PerWindowFiles(ResourceId baseFilename) {
82+
this.baseFilename = baseFilename;
9083
}
9184

9285
public String filenamePrefixForWindow(IntervalWindow window) {
86+
String prefix =
87+
baseFilename.isDirectory() ? "" : firstNonNull(baseFilename.getFilename(), "");
9388
return String.format("%s-%s-%s",
9489
prefix, FORMATTER.print(window.start()), FORMATTER.print(window.end()));
9590
}
9691

9792
@Override
98-
public ResourceId windowedFilename(
99-
ResourceId outputDirectory, WindowedContext context, String extension) {
100-
IntervalWindow window = (IntervalWindow) context.getWindow();
101-
String filename = String.format(
102-
"%s-%s-of-%s%s",
103-
filenamePrefixForWindow(window), context.getShardNumber(), context.getNumShards(),
104-
extension);
105-
return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
93+
public ResourceId windowedFilename(int shardNumber,
94+
int numShards,
95+
BoundedWindow window,
96+
PaneInfo paneInfo,
97+
OutputFileHints outputFileHints) {
98+
IntervalWindow intervalWindow = (IntervalWindow) window;
99+
String filename =
100+
String.format(
101+
"%s-%s-of-%s%s",
102+
filenamePrefixForWindow(intervalWindow),
103+
shardNumber,
104+
numShards,
105+
outputFileHints.getSuggestedFilenameSuffix());
106+
return baseFilename
107+
.getCurrentDirectory()
108+
.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
106109
}
107110

108111
@Override
109112
public ResourceId unwindowedFilename(
110-
ResourceId outputDirectory, Context context, String extension) {
113+
int shardNumber, int numShards, OutputFileHints outputFileHints) {
111114
throw new UnsupportedOperationException("Unsupported.");
112115
}
113116
}

0 commit comments

Comments
 (0)