diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java index 5ea2c4968dd98..9cf0606b68b9a 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDo.java @@ -21,6 +21,7 @@ import com.google.auto.service.AutoService; import java.io.IOException; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -669,6 +670,15 @@ public void output(RestrictionT part) { public void outputWithTimestamp(RestrictionT part, Instant timestamp) { throw new UnsupportedOperationException(); } + + @Override + public void outputWindowedValue( + RestrictionT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + throw new UnsupportedOperationException(); + } }; } diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java index 061e1cb11b5c1..ad6b515397421 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java @@ -19,6 +19,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import java.util.Collection; import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.beam.runners.core.construction.SplittableParDo.ProcessKeyedElements; @@ -525,6 +526,15 @@ public void output(OutputT output) { public void outputWithTimestamp(OutputT output, Instant timestamp) { outerContext.outputWithTimestamp(output, timestamp); } + + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + outerContext.outputWindowedValue(output, timestamp, windows, paneInfo); + } }; } @@ -543,6 +553,15 @@ public void output(T output) { public void outputWithTimestamp(T output, Instant timestamp) { outerContext.outputWithTimestamp(tag, output, timestamp); } + + @Override + public void outputWindowedValue( + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + outerContext.outputWindowedValue(tag, output, timestamp, windows, paneInfo); + } }; } @@ -583,6 +602,15 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { outerContext.outputWithTimestamp(output, timestamp); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + outerContext.outputWindowedValue(output, timestamp, windows, paneInfo); + } + @Override public void output(TupleTag tag, T output) { outerContext.output(tag, output); @@ -593,6 +621,16 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp outerContext.outputWithTimestamp(tag, output, timestamp); } + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + outerContext.outputWindowedValue(tag, output, timestamp, windows, paneInfo); + } + @Override public InputT element() { return element; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index 85a46eb7dc042..dcbac5a20f690 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -20,6 +20,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import java.util.Collection; import java.util.Map; import java.util.concurrent.CancellationException; import java.util.concurrent.Future; @@ -42,6 +43,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; import org.apache.beam.sdk.transforms.splittabledofn.TimestampObservingWatermarkEstimator; import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimator; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; @@ -388,6 +390,15 @@ public void output(OutputT output) { @Override public void outputWithTimestamp(OutputT value, Instant timestamp) { + output.outputWindowedValue(value, timestamp, element.getWindows(), element.getPane()); + } + + @Override + public void outputWindowedValue( + OutputT value, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { noteOutput(); if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) { ((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp); @@ -402,6 +413,16 @@ public void output(TupleTag tag, T value) { @Override public void outputWithTimestamp(TupleTag tag, T value, Instant timestamp) { + outputWindowedValue(tag, value, timestamp, element.getWindows(), element.getPane()); + } + + @Override + public void outputWindowedValue( + TupleTag tag, + T value, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { noteOutput(); if (watermarkEstimator instanceof TimestampObservingWatermarkEstimator) { ((TimestampObservingWatermarkEstimator) watermarkEstimator).observeTimestamp(timestamp); diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index 16986fdf8d52d..f211ea9cb28c1 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -21,6 +21,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -62,7 +63,10 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; import org.joda.time.Duration; import org.joda.time.Instant; import org.joda.time.format.PeriodFormat; @@ -413,24 +417,40 @@ public void output(OutputT output) { @Override public void outputWithTimestamp(OutputT output, Instant timestamp) { - checkTimestamp(elem.getTimestamp(), timestamp); outputWithTimestamp(mainOutputTag, output, timestamp); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo); + } + @Override public void output(TupleTag tag, T output) { checkNotNull(tag, "Tag passed to output cannot be null"); - outputWindowedValue(tag, elem.withValue(output)); + SimpleDoFnRunner.this.outputWindowedValue(tag, elem.withValue(output)); } @Override public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { checkNotNull(tag, "Tag passed to outputWithTimestamp cannot be null"); checkTimestamp(elem.getTimestamp(), timestamp); - outputWindowedValue( + SimpleDoFnRunner.this.outputWindowedValue( tag, WindowedValue.of(output, timestamp, elem.getWindows(), elem.getPane())); } + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) {} + @Override public Instant timestamp() { return elem.getTimestamp(); @@ -838,16 +858,37 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { outputWithTimestamp(mainOutputTag, output, timestamp); } + @Override + public void outputWindowedValue( + OutputT output, + @UnknownKeyFor @NonNull @Initialized Instant timestamp, + @UnknownKeyFor @NonNull @Initialized + Collection windows, + @UnknownKeyFor @NonNull @Initialized PaneInfo paneInfo) {} + @Override public void output(TupleTag tag, T output) { checkTimestamp(timestamp(), timestamp); - outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING)); + outputWithTimestamp(tag, output, timestamp); } @Override public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { checkTimestamp(timestamp(), timestamp); - outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING)); + outputWindowedValue( + tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING); + } + + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + checkTimestamp(timestamp(), timestamp); + SimpleDoFnRunner.this.outputWindowedValue( + tag, WindowedValue.of(output, timestamp, windows, paneInfo)); } @Override @@ -1045,16 +1086,38 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { outputWithTimestamp(mainOutputTag, output, timestamp); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo); + } + @Override public void output(TupleTag tag, T output) { checkTimestamp(this.timestamp, timestamp); - outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING)); + outputWithTimestamp(tag, output, timestamp); } @Override public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { checkTimestamp(this.timestamp, timestamp); - outputWindowedValue(tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING)); + outputWindowedValue( + tag, output, timestamp, Collections.singleton(window()), PaneInfo.NO_FIRING); + } + + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + checkTimestamp(this.timestamp, timestamp); + SimpleDoFnRunner.this.outputWindowedValue( + tag, WindowedValue.of(output, timestamp, window(), PaneInfo.NO_FIRING)); } @Override diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java index 079379953cd95..d8dad5a7e7a0b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Create.java @@ -55,7 +55,9 @@ import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.schemas.SchemaRegistry; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.CoderUtils; +import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; @@ -241,6 +243,34 @@ public static TimestampedValues timestamped( return timestamped(ImmutableList.>builder().add(elem).add(elems).build()); } + /** + * Returns a new {@link Create.TimestampedValues} transform that produces a {@link PCollection} + * containing the elements of the provided {@code Iterable} with the specified timestamps. + * + *

The argument should not be modified after this is called. + * + *

By default, {@code Create.TimestampedValues} can automatically determine the {@code Coder} + * to use if all elements have the same non-parameterized run-time class, and a default coder is + * registered for that class. See {@link CoderRegistry} for details on how defaults are + * determined. Otherwise, use {@link Create.TimestampedValues#withCoder} to set the coder + * explicitly. + */ + public static WindowedValues windowedValues(Iterable> elems) { + return new WindowedValues<>(elems, Optional.absent(), Optional.absent(), Optional.absent()); + } + + /** + * Returns a new {@link Create.TimestampedValues} transform that produces a {@link PCollection} + * containing the specified elements with the specified timestamps. + * + *

The arguments should not be modified after this is called. + */ + @SafeVarargs + public static WindowedValues windowedValues( + WindowedValue elem, @SuppressWarnings("unchecked") WindowedValue... elems) { + return windowedValues(ImmutableList.>builder().add(elem).add(elems).build()); + } + /** * Returns a new root transform that produces a {@link PCollection} containing the specified * elements with the specified timestamps. @@ -727,6 +757,144 @@ public void processElement(@Element TimestampedValue element, OutputReceiver< } } + /** + * A {@code PTransform} that creates a {@code PCollection} whose elements have associated + * timestamps. + */ + public static class WindowedValues extends PTransform> { + + /** + * Returns a {@link Create.WindowedValues} PTransform like this one that uses the given {@code + * Coder} to decode each of the objects into a value of type {@code T}. + * + *

By default, {@code Create.TimestampedValues} can automatically determine the {@code Coder} + * to use if all elements have the same non-parameterized run-time class, and a default coder is + * registered for that class. See {@link CoderRegistry} for details on how defaults are + * determined. + * + *

Note that for {@link Create.TimestampedValues with no elements}, the {@link VoidCoder} is + * used. + */ + public WindowedValues withCoder(Coder coder) { + return new WindowedValues<>(windowedValues, Optional.of(coder), windowCoder, typeDescriptor); + } + + /** + * Returns a {@link Create.TimestampedValues} PTransform like this one that uses the given + * {@code Schema} to represent objects. + */ + public WindowedValues withSchema( + Schema schema, + TypeDescriptor typeDescriptor, + SerializableFunction toRowFunction, + SerializableFunction fromRowFunction) { + return withCoder(SchemaCoder.of(schema, typeDescriptor, toRowFunction, fromRowFunction)); + } + + /** + * Returns a {@link Create.WindowedValues} PTransform like this one that uses the given {@code + * TypeDescriptor} to determine the {@code Coder} to use to decode each of the objects into a + * value of type {@code T}. Note that a default coder must be registered for the class described + * in the {@code TypeDescriptor}. + * + *

By default, {@code Create.TimestampedValues} can automatically determine the {@code Coder} + * to use if all elements have the same non-parameterized run-time class, and a default coder is + * registered for that class. See {@link CoderRegistry} for details on how defaults are + * determined. + * + *

Note that for {@link Create.WindowedValues} with no elements, the {@link VoidCoder} is + * used. + */ + public WindowedValues withType(TypeDescriptor type) { + return new WindowedValues<>(windowedValues, elementCoder, windowCoder, Optional.of(type)); + } + + @Override + public PCollection expand(PBegin input) { + try { + Coder coder = null; + CoderRegistry coderRegistry = input.getPipeline().getCoderRegistry(); + SchemaRegistry schemaRegistry = input.getPipeline().getSchemaRegistry(); + if (elementCoder.isPresent()) { + coder = elementCoder.get(); + } else if (typeDescriptor.isPresent()) { + try { + coder = + SchemaCoder.of( + schemaRegistry.getSchema(typeDescriptor.get()), + typeDescriptor.get(), + schemaRegistry.getToRowFunction(typeDescriptor.get()), + schemaRegistry.getFromRowFunction(typeDescriptor.get())); + } catch (NoSuchSchemaException e) { + // No schema registered. + } + if (coder == null) { + coder = coderRegistry.getCoder(typeDescriptor.get()); + } + } else { + Iterable rawElements = Iterables.transform(windowedValues, WindowedValue::getValue); + coder = getDefaultCreateCoder(coderRegistry, schemaRegistry, rawElements); + } + + Coder windowCoder; + if (this.windowCoder.isPresent()) { + windowCoder = this.windowCoder.get(); + } else { + Iterable rawWindows = + Iterables.concat(Iterables.transform(windowedValues, WindowedValue::getWindows)); + windowCoder = getDefaultCreateCoder(coderRegistry, schemaRegistry, rawWindows); + } + + PCollection> intermediate = + Pipeline.applyTransform( + input, + Create.of(windowedValues) + .withCoder(WindowedValue.getFullCoder(coder, windowCoder))); + + PCollection output = intermediate.apply(ParDo.of(new ConvertWindowedValues<>())); + output.setCoder(coder); + return output; + } catch (CannotProvideCoderException e) { + throw new IllegalArgumentException( + "Unable to infer a coder and no Coder was specified. " + + "Please set a coder by invoking CreateTimestamped.withCoder() explicitly.", + e); + } + } + + ///////////////////////////////////////////////////////////////////////////// + + /** The timestamped elements of the resulting PCollection. */ + private final transient Iterable> windowedValues; + + /** The coder used to encode the values to and from a binary representation. */ + private final transient Optional> elementCoder; + + private final Optional> windowCoder; + + /** The value type. */ + private final transient Optional> typeDescriptor; + + private WindowedValues( + Iterable> windowedValues, + Optional> elementCoder, + Optional> windowCoder, + Optional> typeDescriptor) { + this.windowedValues = windowedValues; + this.elementCoder = elementCoder; + this.windowCoder = windowCoder; + this.typeDescriptor = typeDescriptor; + } + + private static class ConvertWindowedValues extends DoFn, T> { + @ProcessElement + public void processElement(@Element WindowedValue element, OutputReceiver r) { + r.outputWindowedValue( + element.getValue(), element.getTimestamp(), element.getWindows(), element.getPane()); + } + } + } + private static Coder getDefaultCreateCoder( CoderRegistry coderRegistry, SchemaRegistry schemaRegistry, Iterable elems) throws CannotProvideCoderException { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java index 2993d8cca97a1..c22b726c99a2e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java @@ -24,6 +24,7 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import java.util.Collection; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.state.State; @@ -185,6 +186,31 @@ public abstract class WindowedContext { */ public abstract void outputWithTimestamp(OutputT output, Instant timestamp); + /** + * Adds the given element to the main output {@code PCollection}, with the given windowing + * metadata. + * + *

Once passed to {@code outputWindowedValue} the element should not be modified in any way. + * + *

If invoked from {@link ProcessElement}), the timestamp must not be older than the input + * element's timestamp minus {@link DoFn#getAllowedTimestampSkew}. The output element will be in + * the same windows as the input element. + * + *

If invoked from {@link StartBundle} or {@link FinishBundle}, this will attempt to use the + * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} of the input {@code PCollection} to + * determine what windows the element should be in, throwing an exception if the {@code + * WindowFn} attempts to access any information about the input element except for the + * timestamp. + * + *

Note: A splittable {@link DoFn} is not allowed to output from {@link StartBundle} + * or {@link FinishBundle} methods. + */ + public abstract void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo); + /** * Adds the given element to the output {@code PCollection} with the given tag. * @@ -231,6 +257,32 @@ public abstract class WindowedContext { * @see ParDo.SingleOutput#withOutputTags */ public abstract void outputWithTimestamp(TupleTag tag, T output, Instant timestamp); + + /** + * Adds the given element to the main output {@code PCollection}, with the given windowing + * metadata. + * + *

Once passed to {@code outputWindowedValue} the element should not be modified in any way. + * + *

If invoked from {@link ProcessElement}), the timestamp must not be older than the input + * element's timestamp minus {@link DoFn#getAllowedTimestampSkew}. The output element will be in + * the same windows as the input element. + * + *

If invoked from {@link StartBundle} or {@link FinishBundle}, this will attempt to use the + * {@link org.apache.beam.sdk.transforms.windowing.WindowFn} of the input {@code PCollection} to + * determine what windows the element should be in, throwing an exception if the {@code + * WindowFn} attempts to access any information about the input element except for the + * timestamp. + * + *

Note: A splittable {@link DoFn} is not allowed to output from {@link StartBundle} + * or {@link FinishBundle} methods. + */ + public abstract void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo); } /** Information accessible when running a {@link DoFn.ProcessElement} method. */ @@ -342,6 +394,12 @@ public interface OutputReceiver { void output(T output); void outputWithTimestamp(T output, Instant timestamp); + + void outputWindowedValue( + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo); } /** Receives tagged output for a multi-output function. */ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java index 4f3719f2cc0eb..738d21d46efc4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnOutputReceivers.java @@ -20,12 +20,15 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import java.util.Collection; import java.util.Map; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.values.TupleTag; import org.checkerframework.checker.nullness.qual.Nullable; @@ -58,6 +61,16 @@ public void output(Row output) { public void outputWithTimestamp(Row output, Instant timestamp) { outputReceiver.outputWithTimestamp(schemaCoder.getFromRowFunction().apply(output), timestamp); } + + @Override + public void outputWindowedValue( + Row output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + outputReceiver.outputWindowedValue( + schemaCoder.getFromRowFunction().apply(output), timestamp, windows, paneInfo); + } } private static class WindowedContextOutputReceiver implements OutputReceiver { @@ -87,6 +100,20 @@ public void outputWithTimestamp(T output, Instant timestamp) { ((DoFn.WindowedContext) context).outputWithTimestamp(output, timestamp); } } + + @Override + public void outputWindowedValue( + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + if (outputTag != null) { + context.outputWithTimestamp(outputTag, output, timestamp); + } else { + ((DoFn.WindowedContext) context) + .outputWindowedValue(output, timestamp, windows, paneInfo); + } + } } private static class WindowedContextMultiOutputReceiver implements MultiOutputReceiver { diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index f72de2af35efc..c0915c2dcd75e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -581,6 +582,15 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { outputWithTimestamp(mainOutputTag, output, timestamp); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + outputWindowedValue(mainOutputTag, output, timestamp, windows, paneInfo); + } + @Override public void output(TupleTag tag, T output) { outputWithTimestamp(tag, output, element.getTimestamp()); @@ -591,6 +601,18 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp getMutableOutput(tag) .add(ValueInSingleWindow.of(output, timestamp, element.getWindow(), element.getPane())); } + + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + for (BoundedWindow w : windows) { + getMutableOutput(tag).add(ValueInSingleWindow.of(output, timestamp, w, paneInfo)); + } + } } /** @deprecated Use {@link TestPipeline} with the {@code DirectRunner}. */ diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java index a99d18a3d91f2..bcd1f9ccf7b58 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokersTest.java @@ -41,6 +41,7 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.List; import org.apache.beam.sdk.coders.AtomicCoder; import org.apache.beam.sdk.coders.CoderException; @@ -75,6 +76,7 @@ import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.UserCodeException; import org.joda.time.Instant; import org.junit.Before; @@ -556,6 +558,15 @@ public void output(SomeRestriction output) { public void outputWithTimestamp(SomeRestriction output, Instant timestamp) { fail("Unexpected output with timestamp"); } + + @Override + public void outputWindowedValue( + SomeRestriction output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + fail("Unexpected outputWindowedValue"); + } }; } }); @@ -800,6 +811,17 @@ public void outputWithTimestamp(String output, Instant instant) { invoked = true; assertEquals("foo", output); } + + @Override + public void outputWindowedValue( + String output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + assertFalse(invoked); + invoked = true; + assertEquals("foo", output); + } }; } }); diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java index ddf52125b2e48..6b8f9e08fd241 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java @@ -123,7 +123,10 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Table; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; import org.joda.time.DateTimeUtils; import org.joda.time.Duration; import org.joda.time.Instant; @@ -2216,6 +2219,16 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { WindowedValue.of(output, timestamp, currentWindow, currentElement.getPane())); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + // TODO: Check that timestamp is valid once all runners can provide proper timestamps. + outputTo(mainOutputConsumer, WindowedValue.of(output, timestamp, windows, paneInfo)); + } + @Override public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { // TODO: Check that timestamp is valid once all runners can provide proper timestamps. @@ -2228,6 +2241,22 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp consumer, WindowedValue.of(output, timestamp, currentWindow, currentElement.getPane())); } + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + // TODO: Check that timestamp is valid once all runners can provide proper timestamps. + FnDataReceiver> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + if (consumer == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo(consumer, WindowedValue.of(output, timestamp, windows, paneInfo)); + } + @Override public State state(String stateId, boolean alwaysFetched) { StateDeclaration stateDeclaration = doFnSignature.stateDeclarations().get(stateId); @@ -2372,6 +2401,46 @@ public Instant timestamp(DoFn doFn) { currentElement.getPane())); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + checkTimestamp(timestamp); + double size = + doFnInvoker.invokeGetSize( + new DelegatingArgumentProvider( + this, this.errorContextPrefix + "/GetSize") { + @Override + public Object restriction() { + return output; + } + + @Override + public Instant timestamp(DoFn doFn) { + return timestamp; + } + + @Override + public RestrictionTracker restrictionTracker() { + return doFnInvoker.invokeNewTracker(this); + } + }); + + outputTo( + mainOutputConsumer, + (WindowedValue) + WindowedValue.of( + KV.of( + KV.of( + currentElement.getValue(), KV.of(output, currentWatermarkEstimatorState)), + size), + timestamp, + windows, + paneInfo)); + } + @Override public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions @@ -2380,6 +2449,19 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix)); } + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions + // of these methods when producing output. + throw new UnsupportedOperationException( + String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix)); + } + @Override public State state(String stateId, boolean alwaysFetched) { throw new UnsupportedOperationException( @@ -2487,6 +2569,46 @@ public Instant timestamp(DoFn doFn) { currentElement.getPane())); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + checkTimestamp(timestamp); + double size = + doFnInvoker.invokeGetSize( + new DelegatingArgumentProvider( + this, errorContextPrefix + "/GetSize") { + @Override + public Object restriction() { + return output; + } + + @Override + public Instant timestamp(DoFn doFn) { + return timestamp; + } + + @Override + public RestrictionTracker restrictionTracker() { + return doFnInvoker.invokeNewTracker(this); + } + }); + + outputTo( + mainOutputConsumer, + (WindowedValue) + WindowedValue.of( + KV.of( + KV.of( + currentElement.getValue(), KV.of(output, currentWatermarkEstimatorState)), + size), + timestamp, + windows, + paneInfo)); + } + @Override public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions @@ -2494,6 +2616,19 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp throw new UnsupportedOperationException( String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix)); } + + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + // Note that the OutputReceiver/RowOutputReceiver specifically will use the non-tag versions + // of these methods when producing output. + throw new UnsupportedOperationException( + String.format("Non-main output %s unsupported in %s", tag, errorContextPrefix)); + } } /** Provides arguments for a {@link DoFnInvoker} for a non-window observing method. */ @@ -2530,6 +2665,16 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { output, timestamp, currentElement.getWindows(), currentElement.getPane())); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + checkTimestamp(timestamp); + outputTo(mainOutputConsumer, WindowedValue.of(output, timestamp, windows, paneInfo)); + } + @Override public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { checkTimestamp(timestamp); @@ -2543,6 +2688,22 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp WindowedValue.of( output, timestamp, currentElement.getWindows(), currentElement.getPane())); } + + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + checkTimestamp(timestamp); + FnDataReceiver> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + if (consumer == null) { + throw new IllegalArgumentException(String.format("Unknown output tag %s", tag)); + } + outputTo(consumer, WindowedValue.of(output, timestamp, windows, paneInfo)); + } } /** Provides base arguments for a {@link DoFnInvoker} for a non-window observing method. */ @@ -2672,6 +2833,16 @@ public void outputWithTimestamp(Row output, Instant timestamp) { ProcessBundleContextBase.this.outputWithTimestamp( fromRowFunction.apply(output), timestamp); } + + @Override + public void outputWindowedValue( + Row output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + ProcessBundleContextBase.this.outputWindowedValue( + fromRowFunction.apply(output), timestamp, windows, paneInfo); + } }; @Override @@ -2707,6 +2878,16 @@ public void output(T output) { public void outputWithTimestamp(T output, Instant timestamp) { ProcessBundleContextBase.this.outputWithTimestamp(tag, output, timestamp); } + + @Override + public void outputWindowedValue( + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + ProcessBundleContextBase.this.outputWindowedValue( + tag, output, timestamp, windows, paneInfo); + } }; } @@ -2742,6 +2923,16 @@ public void outputWithTimestamp(Row output, Instant timestamp) { ProcessBundleContextBase.this.outputWithTimestamp( tag, fromRowFunction.apply(output), timestamp); } + + @Override + public void outputWindowedValue( + Row output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + ProcessBundleContextBase.this.outputWindowedValue( + tag, fromRowFunction.apply(output), timestamp, windows, paneInfo); + } }; } @@ -2856,6 +3047,16 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { WindowedValue.of(output, timestamp, currentWindow, currentTimer.getPane())); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + checkOnWindowExpirationTimestamp(timestamp); + outputTo(mainOutputConsumer, WindowedValue.of(output, timestamp, windows, paneInfo)); + } + @Override public void output(TupleTag tag, T output) { FnDataReceiver> consumer = @@ -2881,6 +3082,19 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp consumer, WindowedValue.of(output, timestamp, currentWindow, currentTimer.getPane())); } + @Override + public void outputWindowedValue( + TupleTag tag, + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + checkOnWindowExpirationTimestamp(timestamp); + FnDataReceiver> consumer = + (FnDataReceiver) localNameToConsumer.get(tag.getId()); + outputTo(consumer, WindowedValue.of(output, timestamp, windows, paneInfo)); + } + @SuppressWarnings( "deprecation") // Allowed Skew is deprecated for users, but must be respected private void checkOnWindowExpirationTimestamp(Instant timestamp) { @@ -2952,6 +3166,16 @@ public void output(Row output) { public void outputWithTimestamp(Row output, Instant timestamp) { context.outputWithTimestamp(fromRowFunction.apply(output), timestamp); } + + @Override + public void outputWindowedValue( + Row output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + context.outputWindowedValue( + fromRowFunction.apply(output), timestamp, windows, paneInfo); + } }; @Override @@ -2984,6 +3208,15 @@ public void output(T output) { public void outputWithTimestamp(T output, Instant timestamp) { context.outputWithTimestamp(tag, output, timestamp); } + + @Override + public void outputWindowedValue( + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + context.outputWindowedValue(tag, output, timestamp, windows, paneInfo); + } }; } @@ -3015,6 +3248,16 @@ public void output(Row output) { public void outputWithTimestamp(Row output, Instant timestamp) { context.outputWithTimestamp(tag, fromRowFunction.apply(output), timestamp); } + + @Override + public void outputWindowedValue( + Row output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + context.outputWindowedValue( + tag, fromRowFunction.apply(output), timestamp, windows, paneInfo); + } }; } @@ -3100,6 +3343,16 @@ public void outputWithTimestamp(OutputT output, Instant timestamp) { WindowedValue.of(output, timestamp, currentWindow, currentTimer.getPane())); } + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + checkTimerTimestamp(timestamp); + outputTo(mainOutputConsumer, WindowedValue.of(output, timestamp, windows, paneInfo)); + } + @Override public void output(TupleTag tag, T output) { checkTimerTimestamp(currentTimer.getHoldTimestamp()); @@ -3126,6 +3379,15 @@ public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp consumer, WindowedValue.of(output, timestamp, currentWindow, currentTimer.getPane())); } + @Override + public void outputWindowedValue( + @UnknownKeyFor @NonNull @Initialized TupleTag tag, + T output, + @UnknownKeyFor @NonNull @Initialized Instant timestamp, + @UnknownKeyFor @NonNull @Initialized + Collection windows, + @UnknownKeyFor @NonNull @Initialized PaneInfo paneInfo) {} + @Override public TimeDomain timeDomain() { return currentTimeDomain; @@ -3212,6 +3474,16 @@ public void output(Row output) { public void outputWithTimestamp(Row output, Instant timestamp) { context.outputWithTimestamp(fromRowFunction.apply(output), timestamp); } + + @Override + public void outputWindowedValue( + Row output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + context.outputWindowedValue( + fromRowFunction.apply(output), timestamp, windows, paneInfo); + } }; @Override @@ -3244,6 +3516,15 @@ public void output(T output) { public void outputWithTimestamp(T output, Instant timestamp) { context.outputWithTimestamp(tag, output, timestamp); } + + @Override + public void outputWindowedValue( + T output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + context.outputWindowedValue(tag, output, timestamp, windows, paneInfo); + } }; } @@ -3275,6 +3556,16 @@ public void output(Row output) { public void outputWithTimestamp(Row output, Instant timestamp) { context.outputWithTimestamp(tag, fromRowFunction.apply(output), timestamp); } + + @Override + public void outputWindowedValue( + Row output, + Instant timestamp, + Collection windows, + PaneInfo paneInfo) { + context.outputWindowedValue( + tag, fromRowFunction.apply(output), timestamp, windows, paneInfo); + } }; }