Skip to content

Commit 359dec8

Browse files
swegnerdhalperi
authored andcommitted
Register DisplayData from IO primatives
1 parent 9fa97fb commit 359dec8

20 files changed

+347
-23
lines changed

sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedReadFromUnboundedSource.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.beam.sdk.transforms.PTransform;
2626
import org.apache.beam.sdk.transforms.RemoveDuplicates;
2727
import org.apache.beam.sdk.transforms.SerializableFunction;
28+
import org.apache.beam.sdk.transforms.display.DisplayData;
2829
import org.apache.beam.sdk.util.IntervalBoundedExponentialBackOff;
2930
import org.apache.beam.sdk.util.ValueWithRecordId;
3031
import org.apache.beam.sdk.values.PCollection;
@@ -108,6 +109,16 @@ public String getKindString() {
108109
return "Read(" + approximateSimpleName(source.getClass()) + ")";
109110
}
110111

112+
@Override
113+
public void populateDisplayData(DisplayData.Builder builder) {
114+
// We explicitly do not register base-class data, instead we use the delegate inner source.
115+
builder
116+
.add("source", source.getClass())
117+
.addIfNotDefault("maxRecords", maxNumRecords, Long.MAX_VALUE)
118+
.addIfNotNull("maxReadTime", maxReadTime)
119+
.include(source);
120+
}
121+
111122
private static class UnboundedToBoundedSourceAdapter<T>
112123
extends BoundedSource<ValueWithRecordId<T>> {
113124
private final UnboundedSource<T, ?> source;

sdks/java/core/src/main/java/org/apache/beam/sdk/io/CompressedSource.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.beam.sdk.annotations.Experimental;
2121
import org.apache.beam.sdk.coders.Coder;
2222
import org.apache.beam.sdk.options.PipelineOptions;
23+
import org.apache.beam.sdk.transforms.display.DisplayData;
2324

2425
import com.google.common.base.Preconditions;
2526
import com.google.common.io.ByteStreams;
@@ -320,6 +321,22 @@ public final boolean producesSortedKeys(PipelineOptions options) throws Exceptio
320321
return sourceDelegate.producesSortedKeys(options);
321322
}
322323

324+
@Override
325+
public void populateDisplayData(DisplayData.Builder builder) {
326+
// We explicitly do not register base-class data, instead we use the delegate inner source.
327+
builder
328+
.include(sourceDelegate)
329+
.add("source", sourceDelegate.getClass());
330+
331+
if (channelFactory instanceof Enum) {
332+
// GZIP and BZIP are implemented as enums; Enum classes are anonymous, so use the .name()
333+
// value instead
334+
builder.add("compressionMode", ((Enum) channelFactory).name());
335+
} else {
336+
builder.add("compressionMode", channelFactory.getClass());
337+
}
338+
}
339+
323340
/**
324341
* Returns the delegate source's default output coder.
325342
*/

sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.beam.sdk.io.Sink.Writer;
3737
import org.apache.beam.sdk.options.GcpOptions;
3838
import org.apache.beam.sdk.options.PipelineOptions;
39+
import org.apache.beam.sdk.transforms.display.DisplayData;
3940
import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
4041
import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
4142
import org.apache.beam.sdk.values.PCollection;
@@ -386,6 +387,19 @@ public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
386387
return getPropertyMap(entity).get("entity_bytes").getIntegerValue();
387388
}
388389

390+
@Override
391+
public void populateDisplayData(DisplayData.Builder builder) {
392+
super.populateDisplayData(builder);
393+
builder
394+
.addIfNotDefault("host", host, DEFAULT_HOST)
395+
.addIfNotNull("dataset", datasetId)
396+
.addIfNotNull("namespace", namespace);
397+
398+
if (query != null) {
399+
builder.add("query", query.toString());
400+
}
401+
}
402+
389403
@Override
390404
public String toString() {
391405
return MoreObjects.toStringHelper(getClass())
@@ -587,6 +601,13 @@ public void validate(PipelineOptions options) {
587601
public DatastoreWriteOperation createWriteOperation(PipelineOptions options) {
588602
return new DatastoreWriteOperation(this);
589603
}
604+
605+
@Override
606+
public void populateDisplayData(DisplayData.Builder builder) {
607+
builder
608+
.addIfNotDefault("host", host, DEFAULT_HOST)
609+
.addIfNotNull("dataset", datasetId);
610+
}
590611
}
591612

592613
/**

sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java

Lines changed: 25 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.beam.sdk.coders.SerializableCoder;
2222
import org.apache.beam.sdk.options.GcsOptions;
2323
import org.apache.beam.sdk.options.PipelineOptions;
24+
import org.apache.beam.sdk.transforms.display.DisplayData;
2425
import org.apache.beam.sdk.util.FileIOChannelFactory;
2526
import org.apache.beam.sdk.util.GcsIOChannelFactory;
2627
import org.apache.beam.sdk.util.IOChannelFactory;
@@ -135,6 +136,30 @@ public void validate(PipelineOptions options) {}
135136
@Override
136137
public abstract FileBasedWriteOperation<T> createWriteOperation(PipelineOptions options);
137138

139+
@Override
140+
public void populateDisplayData(DisplayData.Builder builder) {
141+
super.populateDisplayData(builder);
142+
143+
String fileNamePattern = String.format("%s%s%s",
144+
baseOutputFilename, fileNamingTemplate, getFileExtension(extension));
145+
builder.add("fileNamePattern", fileNamePattern);
146+
}
147+
148+
/**
149+
* Returns the file extension to be used. If the user did not request a file
150+
* extension then this method returns the empty string. Otherwise this method
151+
* adds a {@code "."} to the beginning of the users extension if one is not present.
152+
*/
153+
private static String getFileExtension(String usersExtension) {
154+
if (usersExtension == null || usersExtension.isEmpty()) {
155+
return "";
156+
}
157+
if (usersExtension.startsWith(".")) {
158+
return usersExtension;
159+
}
160+
return "." + usersExtension;
161+
}
162+
138163
/**
139164
* Abstract {@link Sink.WriteOperation} that manages the process of writing to a
140165
* {@link FileBasedSink}.
@@ -356,21 +381,6 @@ protected final List<String> generateDestinationFilenames(int numFiles) {
356381
return destFilenames;
357382
}
358383

359-
/**
360-
* Returns the file extension to be used. If the user did not request a file
361-
* extension then this method returns the empty string. Otherwise this method
362-
* adds a {@code "."} to the beginning of the users extension if one is not present.
363-
*/
364-
private String getFileExtension(String usersExtension) {
365-
if (usersExtension == null || usersExtension.isEmpty()) {
366-
return "";
367-
}
368-
if (usersExtension.startsWith(".")) {
369-
return usersExtension;
370-
}
371-
return "." + usersExtension;
372-
}
373-
374384
/**
375385
* Removes temporary output files. Uses the temporary filename to find files to remove.
376386
*

sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSource.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static com.google.common.base.Preconditions.checkState;
2222

2323
import org.apache.beam.sdk.options.PipelineOptions;
24+
import org.apache.beam.sdk.transforms.display.DisplayData;
2425
import org.apache.beam.sdk.util.IOChannelFactory;
2526
import org.apache.beam.sdk.util.IOChannelUtils;
2627

@@ -273,6 +274,12 @@ private static long getEstimatedSizeOfFilesBySampling(
273274
/ selectedFiles.size();
274275
}
275276

277+
@Override
278+
public void populateDisplayData(DisplayData.Builder builder) {
279+
super.populateDisplayData(builder);
280+
builder.add("filePattern", getFileOrPatternSpec());
281+
}
282+
276283
private ListenableFuture<List<? extends FileBasedSource<T>>> createFutureForFileSplit(
277284
final String file,
278285
final long desiredBundleSizeBytes,

sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.beam.sdk.io.range.OffsetRangeTracker;
2121
import org.apache.beam.sdk.io.range.RangeTracker;
2222
import org.apache.beam.sdk.options.PipelineOptions;
23+
import org.apache.beam.sdk.transforms.display.DisplayData;
2324

2425
import com.google.common.base.Preconditions;
2526

@@ -202,6 +203,15 @@ public boolean allowsDynamicSplitting() {
202203
return true;
203204
}
204205

206+
@Override
207+
public void populateDisplayData(DisplayData.Builder builder) {
208+
super.populateDisplayData(builder);
209+
builder
210+
.add("minBundleSize", minBundleSize)
211+
.addIfNotDefault("startOffset", startOffset, 0)
212+
.addIfNotDefault("endOffset", endOffset, Long.MAX_VALUE);
213+
}
214+
205215
/**
206216
* A {@link Source.Reader} that implements code common to readers of all
207217
* {@link OffsetBasedSource}s.

sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.beam.sdk.coders.Coder;
2323
import org.apache.beam.sdk.runners.DirectPipelineRunner;
2424
import org.apache.beam.sdk.transforms.PTransform;
25+
import org.apache.beam.sdk.transforms.display.DisplayData;
2526
import org.apache.beam.sdk.util.SerializableUtils;
2627
import org.apache.beam.sdk.util.WindowedValue;
2728
import org.apache.beam.sdk.util.WindowingStrategy;
@@ -144,6 +145,13 @@ public String getKindString() {
144145
return "Read(" + approximateSimpleName(source.getClass()) + ")";
145146
}
146147

148+
@Override
149+
public void populateDisplayData(DisplayData.Builder builder) {
150+
builder
151+
.add("source", source.getClass())
152+
.include(source);
153+
}
154+
147155
static {
148156
registerDefaultTransformEvaluator();
149157
}
@@ -250,5 +258,12 @@ public final PCollection<T> apply(PInput input) {
250258
public String getKindString() {
251259
return "Read(" + approximateSimpleName(source.getClass()) + ")";
252260
}
261+
262+
@Override
263+
public void populateDisplayData(DisplayData.Builder builder) {
264+
builder
265+
.add("source", source.getClass())
266+
.include(source);
267+
}
253268
}
254269
}

sdks/java/core/src/main/java/org/apache/beam/sdk/io/Sink.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.apache.beam.sdk.annotations.Experimental;
2121
import org.apache.beam.sdk.coders.Coder;
2222
import org.apache.beam.sdk.options.PipelineOptions;
23+
import org.apache.beam.sdk.transforms.display.DisplayData;
24+
import org.apache.beam.sdk.transforms.display.HasDisplayData;
2325
import org.apache.beam.sdk.values.PCollection;
2426

2527
import java.io.Serializable;
@@ -121,7 +123,7 @@
121123
* @param <T> the type that will be written to the Sink.
122124
*/
123125
@Experimental(Experimental.Kind.SOURCE_SINK)
124-
public abstract class Sink<T> implements Serializable {
126+
public abstract class Sink<T> implements Serializable, HasDisplayData {
125127
/**
126128
* Ensures that the sink is valid and can be written to before the write operation begins. One
127129
* should use {@link com.google.common.base.Preconditions} to implement this method.
@@ -133,6 +135,15 @@ public abstract class Sink<T> implements Serializable {
133135
*/
134136
public abstract WriteOperation<T, ?> createWriteOperation(PipelineOptions options);
135137

138+
/**
139+
* {@inheritDoc}
140+
*
141+
* <p>By default, does not register any display data. Implementors may override this method
142+
* to provide their own display metadata.
143+
*/
144+
@Override
145+
public void populateDisplayData(DisplayData.Builder builder) {}
146+
136147
/**
137148
* A {@link WriteOperation} defines the process of a parallel write of objects to a Sink.
138149
*

sdks/java/core/src/main/java/org/apache/beam/sdk/io/Source.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.apache.beam.sdk.annotations.Experimental;
2121
import org.apache.beam.sdk.coders.Coder;
2222

23+
import org.apache.beam.sdk.transforms.display.DisplayData;
24+
import org.apache.beam.sdk.transforms.display.HasDisplayData;
2325
import org.joda.time.Instant;
2426

2527
import java.io.IOException;
@@ -52,7 +54,7 @@
5254
* @param <T> Type of elements read by the source.
5355
*/
5456
@Experimental(Experimental.Kind.SOURCE_SINK)
55-
public abstract class Source<T> implements Serializable {
57+
public abstract class Source<T> implements Serializable, HasDisplayData {
5658
/**
5759
* Checks that this source is valid, before it can be used in a pipeline.
5860
*
@@ -66,6 +68,15 @@ public abstract class Source<T> implements Serializable {
6668
*/
6769
public abstract Coder<T> getDefaultOutputCoder();
6870

71+
/**
72+
* {@inheritDoc}
73+
*
74+
* <p>By default, does not register any display data. Implementors may override this method
75+
* to provide their own display metadata.
76+
*/
77+
@Override
78+
public void populateDisplayData(DisplayData.Builder builder) {}
79+
6980
/**
7081
* The interface that readers of custom input sources must implement.
7182
*

sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.beam.sdk.transforms.PTransform;
3030
import org.apache.beam.sdk.transforms.ParDo;
3131
import org.apache.beam.sdk.transforms.View;
32+
import org.apache.beam.sdk.transforms.display.DisplayData;
3233
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
3334
import org.apache.beam.sdk.transforms.windowing.Window;
3435
import org.apache.beam.sdk.values.PCollection;
@@ -76,6 +77,13 @@ public PDone apply(PCollection<T> input) {
7677
return createWrite(input, sink.createWriteOperation(options));
7778
}
7879

80+
@Override
81+
public void populateDisplayData(DisplayData.Builder builder) {
82+
builder
83+
.add("sink", sink.getClass())
84+
.include(sink);
85+
}
86+
7987
/**
8088
* Returns the {@link Sink} associated with this PTransform.
8189
*/

0 commit comments

Comments
 (0)