From c19a1024040b646324f79a710b7d8ceb624e67d2 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Thu, 5 Oct 2023 14:40:59 -0400 Subject: [PATCH] Fix Reshuffle implementation in Java SDK --- .../apache/beam/sdk/transforms/Reshuffle.java | 101 ++++++++++++++++-- 1 file changed, 92 insertions(+), 9 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java index 42f0f6accc7f0..905c6e5279761 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Reshuffle.java @@ -17,16 +17,24 @@ */ package org.apache.beam.sdk.transforms; +import java.util.Collection; +import java.util.Collections; import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.annotations.Internal; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IncompatibleWindowException; +import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; import org.apache.beam.sdk.transforms.windowing.ReshuffleTrigger; import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.util.IdentityWindowFn; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.ValueInSingleWindow; import org.apache.beam.sdk.values.WindowingStrategy; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.UnsignedInteger; import org.checkerframework.checker.nullness.qual.Nullable; @@ -81,28 +89,103 @@ public PCollection> expand(PCollection> input) { .withTimestampCombiner(TimestampCombiner.EARLIEST) .withAllowedLateness(Duration.millis(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis())); - return input - .apply(rewindow) - .apply("ReifyOriginalTimestamps", Reify.timestampsInValue()) - .apply(GroupByKey.create()) + PCollection>> reified = + input.apply(rewindow).apply("ReifyOriginalMetadata", Reify.windowsInValue()); + + PCollection>>> grouped = + reified.apply(GroupByKey.create()); + return grouped // Set the windowing strategy directly, so that it doesn't get counted as the user having // set allowed lateness. .setWindowingStrategyInternal(originalStrategy) .apply( "ExpandIterable", ParDo.of( - new DoFn>>, KV>>() { + new DoFn>>, KV>>() { @ProcessElement public void processElement( - @Element KV>> element, - OutputReceiver>> r) { + @Element KV>> element, + OutputReceiver>> r) { K key = element.getKey(); - for (TimestampedValue value : element.getValue()) { + for (ValueInSingleWindow value : element.getValue()) { r.output(KV.of(key, value)); } } })) - .apply("RestoreOriginalTimestamps", ReifyTimestamps.extractFromValues()); + .apply(Window.into(new RestoreWindowsFn<>(originalStrategy.getWindowFn()))) + .apply("RestoreOriginalTimestamps", new RestoreTimestamps<>()); + } + + private static class RestoreWindowsFn + extends NonMergingWindowFn>, W> { + + private final WindowFn originalWindowFn; + + private RestoreWindowsFn(WindowFn originalWindowFn) { + this.originalWindowFn = originalWindowFn; + } + + @Override + public Collection assignWindows(AssignContext c) throws Exception { + return Collections.singleton((W) c.element().getValue().getWindow()); + } + + @Override + public boolean isCompatible(WindowFn other) { + throw new UnsupportedOperationException( + String.format( + "%s.isCompatible() should never be called." + + " It is a private implementation detail of sdk utilities." + + " This message indicates a bug in the Beam SDK.", + getClass().getCanonicalName())); + } + + @Override + public void verifyCompatibility(WindowFn other) throws IncompatibleWindowException { + throw new UnsupportedOperationException( + String.format( + "%s.verifyCompatibility() should never be called." + + " It is a private implementation detail of sdk utilities." + + " This message indicates a bug in the Beam SDK.", + getClass().getCanonicalName())); + } + + @Override + public Coder windowCoder() { + return originalWindowFn.windowCoder(); + } + + @Override + public WindowMappingFn getDefaultWindowMappingFn() { + throw new UnsupportedOperationException( + String.format( + "%s.getSideInputWindow() should never be called." + + " It is a private implementation detail of sdk utilities." + + " This message indicates a bug in the Beam SDK.", + getClass().getCanonicalName())); + } + } + + private static class RestoreTimestamps + extends PTransform>>, PCollection>> { + @Override + public PCollection> expand(PCollection>> input) { + return input.apply( + ParDo.of( + new DoFn>, KV>() { + @Override + public Duration getAllowedTimestampSkew() { + return Duration.millis(Long.MAX_VALUE); + } + + @ProcessElement + public void processElement( + @Element KV> kv, OutputReceiver> r) { + r.outputWithTimestamp( + KV.of(kv.getKey(), kv.getValue().getValue()), kv.getValue().getTimestamp()); + } + })); + } } /** Implementation of {@link #viaRandomKey()}. */