diff --git a/runners/samza/src/test/resources/ExpectedDag.json b/runners/samza/src/test/resources/ExpectedDag.json index 3165fc84958ca..ede1651d01ccd 100644 --- a/runners/samza/src/test/resources/ExpectedDag.json +++ b/runners/samza/src/test/resources/ExpectedDag.json @@ -98,26 +98,26 @@ "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements", "ChildNodes": [ { - "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/Window.Into()", + "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/SetIdentityWindow", "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle", "ChildNodes": [ { - "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/Window.Into()/Window.Assign", - "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/Window.Into()" + "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/SetIdentityWindow/Window.Assign", + "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/SetIdentityWindow" } ] }, { - "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalTimestamps", + "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalMetadata", "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle", "ChildNodes": [ { - "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)", - "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalTimestamps", + "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalMetadata/ParDo(Anonymous)", + "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalMetadata", "ChildNodes": [ { - "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)", - "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)" + "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalMetadata/ParDo(Anonymous)/ParMultiDo(Anonymous)", + "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalMetadata/ParDo(Anonymous)" } ] } @@ -138,38 +138,26 @@ ] }, { - "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps", + "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalWindows", "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle", "ChildNodes": [ { - "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard", - "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps", - "ChildNodes": [ - { - "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)", - "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard", - "ChildNodes": [ - { - "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)", - "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)" - } - ] - } - ] - }, + "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalWindows/Window.Assign", + "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalWindows" + } + ] + }, + { + "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps", + "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle", + "ChildNodes": [ { - "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues", + "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/ParDo(Anonymous)", "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps", "ChildNodes": [ { - "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)", - "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues", - "ChildNodes": [ - { - "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)", - "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)" - } - ] + "fullName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)", + "enclosingNode": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/ParDo(Anonymous)" } ] } @@ -287,14 +275,14 @@ }, { "from": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/Assign unique key/AddKeys/Map/ParMultiDo(Anonymous)", - "to": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/Window.Into()/Window.Assign" + "to": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/SetIdentityWindow/Window.Assign" }, { - "from": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/Window.Into()/Window.Assign", - "to": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)" + "from": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/SetIdentityWindow/Window.Assign", + "to": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalMetadata/ParDo(Anonymous)/ParMultiDo(Anonymous)" }, { - "from": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)", + "from": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ReifyOriginalMetadata/ParDo(Anonymous)/ParMultiDo(Anonymous)", "to": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/GroupByKey" }, { @@ -303,14 +291,14 @@ }, { "from": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/ExpandIterable/ParMultiDo(Anonymous)", - "to": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)" + "to": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalWindows/Window.Assign" }, { - "from": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/ReifyTimestamps.RemoveWildcard/ParDo(Anonymous)/ParMultiDo(Anonymous)", - "to": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)" + "from": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalWindows/Window.Assign", + "to": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)" }, { - "from": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous)", + "from": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous)", "to": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Drop key/Values/Map/ParMultiDo(Anonymous)" }, { @@ -357,7 +345,7 @@ }, { "transformName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Drop key/Values/Map/ParMultiDo(Anonymous)", - "inputs": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous).output", + "inputs": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous).output", "outputs": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Drop key/Values/Map/ParMultiDo(Anonymous).output" }, { @@ -378,7 +366,7 @@ { "transformName": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle", "inputs": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/Assign unique key/AddKeys/Map/ParMultiDo(Anonymous).output", - "outputs": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/Reify.ExtractTimestampsFromValues/ParDo(Anonymous)/ParMultiDo(Anonymous).output" + "outputs": "Create.TimestampedValues/Create.Values/Read(CreateSource)/ParDo(BoundedSourceAsSDFWrapper)/ParMultiDo(BoundedSourceAsSDFWrapper)/ProcessKeyedElements/Reshuffle/RestoreOriginalTimestamps/ParDo(Anonymous)/ParMultiDo(Anonymous).output" }, { "transformName": "Create.TimestampedValues/ParDo(ConvertTimestamps)/ParMultiDo(ConvertTimestamps)", diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java index 42f0f6accc7f0..31fdd24033a4f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java @@ -17,16 +17,24 @@ */ package org.apache.beam.sdk.transforms; +import java.util.Collection; +import java.util.Collections; import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException; +import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.util.IdentityWindowFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.UnsignedInteger; import org.checkerframework.checker.nullness.qual.Nullable; @@ -81,28 +89,107 @@ public PCollection> expand(PCollection> input) { .withTimestampCombiner(TimestampCombiner.EARLIEST) .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); - return input - .apply(rewindow) - .apply("ReifyOriginalTimestamps", Reify.timestampsInValue()) - .apply(GroupByKey.create()) - // Set the windowing strategy directly, so that it doesn't get counted as the user having - // set allowed lateness. - .setWindowingStrategyInternal(originalStrategy) + PCollection>> reified = + input + .apply("SetIdentityWindow", rewindow) + .apply("ReifyOriginalMetadata", Reify.windowsInValue()); + + PCollection>>> grouped = + reified.apply(GroupByKey.create()); + return grouped .apply( "ExpandIterable", ParDo.of( - new DoFn>>, KV>>() { + new DoFn>>, KV>>() { @ProcessElement public void processElement( - @Element KV>> element, - OutputReceiver>> r) { + @Element KV>> element, + OutputReceiver>> r) { K key = element.getKey(); - for (TimestampedValue value : element.getValue()) { + for (ValueInSingleWindow value : element.getValue()) { r.output(KV.of(key, value)); } } })) - .apply("RestoreOriginalTimestamps", ReifyTimestamps.extractFromValues()); + .apply( + "RestoreOriginalWindows", + Window.into(new RestoreWindowsFn<>(originalStrategy.getWindowFn()))) + .apply("RestoreOriginalTimestamps", new RestoreTimestamps<>()) + // Set the windowing strategy directly, so that it doesn't get counted as the user having + // set allowed lateness. + .setWindowingStrategyInternal(originalStrategy); + } + + private static class RestoreWindowsFn + extends NonMergingWindowFn>, W> { + + private final WindowFn originalWindowFn; + + private RestoreWindowsFn(WindowFn originalWindowFn) { + this.originalWindowFn = originalWindowFn; + } + + @Override + public Collection assignWindows(AssignContext c) throws Exception { + return Collections.singleton((W) c.element().getValue().getWindow()); + } + + @Override + public boolean isCompatible(WindowFn other) { + throw new UnsupportedOperationException( + String.format( + "%s.isCompatible() should never be called." + + " It is a private implementation detail of sdk utilities." + + " This message indicates a bug in the Beam SDK.", + getClass().getCanonicalName())); + } + + @Override + public void verifyCompatibility(WindowFn other) throws IncompatibleWindowException { + throw new UnsupportedOperationException( + String.format( + "%s.verifyCompatibility() should never be called." + + " It is a private implementation detail of sdk utilities." + + " This message indicates a bug in the Beam SDK.", + getClass().getCanonicalName())); + } + + @Override + public Coder windowCoder() { + return originalWindowFn.windowCoder(); + } + + @Override + public WindowMappingFn getDefaultWindowMappingFn() { + throw new UnsupportedOperationException( + String.format( + "%s.getSideInputWindow() should never be called." + + " It is a private implementation detail of sdk utilities." + + " This message indicates a bug in the Beam SDK.", + getClass().getCanonicalName())); + } + } + + private static class RestoreTimestamps + extends PTransform>>, PCollection>> { + @Override + public PCollection> expand(PCollection>> input) { + return input.apply( + ParDo.of( + new DoFn>, KV>() { + @Override + public Duration getAllowedTimestampSkew() { + return Duration.millis(Long.MAX_VALUE); + } + + @ProcessElement + public void processElement( + @Element KV> kv, OutputReceiver> r) { + r.outputWithTimestamp( + KV.of(kv.getKey(), kv.getValue().getValue()), kv.getValue().getTimestamp()); + } + })); + } } /** Implementation of {@link #viaRandomKey()}. */