Skip to content

Commit

Permalink
Add outputWindowedValue capability to Java SDK
Browse files Browse the repository at this point in the history
  • Loading branch information
kennknowles committed Nov 13, 2023
1 parent 8fa5b19 commit 95e35f6
Show file tree
Hide file tree
Showing 10 changed files with 727 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.auto.service.AutoService;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
Expand Down Expand Up @@ -669,6 +670,15 @@ public void output(RestrictionT part) {
public void outputWithTimestamp(RestrictionT part, Instant timestamp) {
throw new UnsupportedOperationException();
}

@Override
public void outputWindowedValue(
RestrictionT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
throw new UnsupportedOperationException();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.construction.SplittableParDo.ProcessKeyedElements;
Expand Down Expand Up @@ -525,6 +526,15 @@ public void output(OutputT output) {
public void outputWithTimestamp(OutputT output, Instant timestamp) {
outerContext.outputWithTimestamp(output, timestamp);
}

@Override
public void outputWindowedValue(
OutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
outerContext.outputWindowedValue(output, timestamp, windows, paneInfo);
}
};
}

Expand All @@ -543,6 +553,15 @@ public void output(T output) {
public void outputWithTimestamp(T output, Instant timestamp) {
outerContext.outputWithTimestamp(tag, output, timestamp);
}

@Override
public void outputWindowedValue(
T output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
outerContext.outputWindowedValue(tag, output, timestamp, windows, paneInfo);
}
};
}

Expand Down Expand Up @@ -583,6 +602,15 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) {
outerContext.outputWithTimestamp(output, timestamp);
}

@Override
public void outputWindowedValue(
OutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
outerContext.outputWindowedValue(output, timestamp, windows, paneInfo);
}

@Override
public <T> void output(TupleTag<T> tag, T output) {
outerContext.output(tag, output);
Expand All @@ -593,6 +621,16 @@ public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp
outerContext.outputWithTimestamp(tag, output, timestamp);
}

@Override
public <T> void outputWindowedValue(
TupleTag<T> tag,
T output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
outerContext.outputWindowedValue(tag, output, timestamp, windows, paneInfo);
}

@Override
public InputT element() {
return element;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
Expand All @@ -42,6 +43,7 @@
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.sdk.transforms.splittabledofn.TimestampObservingWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
Expand Down Expand Up @@ -388,6 +390,15 @@ public void output(OutputT output) {

@Override
public void outputWithTimestamp(OutputT value, Instant timestamp) {
output.outputWindowedValue(value, timestamp, element.getWindows(), element.getPane());
}

@Override
public void outputWindowedValue(
OutputT value,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
noteOutput();
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp);
Expand All @@ -402,6 +413,16 @@ public <T> void output(TupleTag<T> tag, T value) {

@Override
public <T> void outputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp) {
outputWindowedValue(tag, value, timestamp, element.getWindows(), element.getPane());
}

@Override
public <T> void outputWindowedValue(
TupleTag<T> tag,
T value,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
noteOutput();
if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) {
((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -62,7 +63,10 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.PeriodFormat;
Expand Down Expand Up @@ -413,24 +417,40 @@ public void output(OutputT output) {

@Override
public void outputWithTimestamp(OutputT output, Instant timestamp) {
checkTimestamp(elem.getTimestamp(), timestamp);
outputWithTimestamp(mainOutputTag, output, timestamp);
}

@Override
public void outputWindowedValue(
OutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo);
}

@Override
public <T> void output(TupleTag<T> tag, T output) {
checkNotNull(tag, "Tag passed to output cannot be null");
outputWindowedValue(tag, elem.withValue(output));
SimpleDoFnRunner.this.outputWindowedValue(tag, elem.withValue(output));
}

@Override
public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null");
checkTimestamp(elem.getTimestamp(), timestamp);
outputWindowedValue(
SimpleDoFnRunner.this.outputWindowedValue(
tag, WindowedValue.of(output, timestamp, elem.getWindows(), elem.getPane()));
}

@Override
public <T> void outputWindowedValue(
TupleTag<T> tag,
T output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {}

@Override
public Instant timestamp() {
return elem.getTimestamp();
Expand Down Expand Up @@ -838,16 +858,37 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) {
outputWithTimestamp(mainOutputTag, output, timestamp);
}

@Override
public void outputWindowedValue(
OutputT output,
@UnknownKeyFor @NonNull @Initialized Instant timestamp,
@UnknownKeyFor @NonNull @Initialized
Collection<? extends @UnknownKeyFor @NonNull @Initialized BoundedWindow> windows,
@UnknownKeyFor @NonNull @Initialized PaneInfo paneInfo) {}

@Override
public <T> void output(TupleTag<T> tag, T output) {
checkTimestamp(timestamp(), timestamp);
outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING));
outputWithTimestamp(tag, output, timestamp);
}

@Override
public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
checkTimestamp(timestamp(), timestamp);
outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING));
outputWindowedValue(
tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
}

@Override
public <T> void outputWindowedValue(
TupleTag<T> tag,
T output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
checkTimestamp(timestamp(), timestamp);
SimpleDoFnRunner.this.outputWindowedValue(
tag, WindowedValue.of(output, timestamp, windows, paneInfo));
}

@Override
Expand Down Expand Up @@ -1045,16 +1086,38 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) {
outputWithTimestamp(mainOutputTag, output, timestamp);
}

@Override
public void outputWindowedValue(
OutputT output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo);
}

@Override
public <T> void output(TupleTag<T> tag, T output) {
checkTimestamp(this.timestamp, timestamp);
outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING));
outputWithTimestamp(tag, output, timestamp);
}

@Override
public <T> void outputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
checkTimestamp(this.timestamp, timestamp);
outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING));
outputWindowedValue(
tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING);
}

@Override
public <T> void outputWindowedValue(
TupleTag<T> tag,
T output,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo paneInfo) {
checkTimestamp(this.timestamp, timestamp);
SimpleDoFnRunner.this.outputWindowedValue(
tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING));
}

@Override
Expand Down
Loading

0 comments on commit 95e35f6

Please sign in to comment.