diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml index 0999fd765bd7f..775d6748eeecf 100644 --- a/checkstyle/checkstyle.xml +++ b/checkstyle/checkstyle.xml @@ -20,7 +20,9 @@ --> - + + + @@ -32,6 +34,7 @@ + @@ -62,8 +65,8 @@ - + diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java index 58479d6eb6de0..143bae0861d32 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FilterFunction.java @@ -31,7 +31,7 @@ public interface FilterFunction extends InitableFunction { /** * Returns a boolean indicating whether this message should be retained or filtered out. - * @param message the input message to be checked + * @param message the input message to be checked. This object should not be mutated. * @return true if {@code message} should be retained */ boolean apply(M message); diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java new file mode 100644 index 0000000000000..58e88fd3809d7 --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/FoldLeftFunction.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.functions; + +/** + * A fold function that incrementally combines and aggregates values for a window. + */ +public interface FoldLeftFunction extends InitableFunction { + + /** + * Incrementally combine and aggregate values for the window. Guaranteed to be invoked for every + * message added to the window. + * + * @param message the incoming message that is added to the window. This object should not be mutated. + * @param oldValue the previous value + * @return the new value + */ + WV apply(M message, WV oldValue); +} diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java index 05a554f9645fe..b09fb99c618c6 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java @@ -31,8 +31,9 @@ public interface MapFunction extends InitableFunction { /** - * Transforms the provided message into another message - * @param message the input message to be transformed + * Transforms the provided message into another message. + * + * @param message the input message to be transformed. This object should not be mutated. * @return the transformed message */ OM apply(M message); diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java index 6e134df2c7ddc..f52b57b2f401c 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/AnyTrigger.java @@ -23,16 +23,15 @@ /** * A {@link Trigger} fires as soon as any of its individual triggers has fired. */ -public class AnyTrigger implements Trigger { +public class AnyTrigger implements Trigger { - private final List triggers; + private final List> triggers; - AnyTrigger(List triggers) { + AnyTrigger(List> triggers) { this.triggers = triggers; } - public List getTriggers() { + public List> getTriggers() { return triggers; } } - diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java index 1cf930c6dd871..dbae3a9fe0672 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/CountTrigger.java @@ -22,7 +22,7 @@ * A {@link Trigger} that fires when the number of messages in the {@link org.apache.samza.operators.windows.WindowPane} * reaches the specified count. */ -public class CountTrigger implements Trigger { +public class CountTrigger implements Trigger { private final long count; diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/FiringType.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/FiringType.java new file mode 100644 index 0000000000000..49d971df11c5e --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/FiringType.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.triggers; + +/** + * The type of the {@link org.apache.samza.operators.triggers.Trigger} firing. + * Firings can be either early or late or default. Late triggers are not supported currently. + */ +public enum FiringType { + EARLY, + DEFAULT, + LATE +} diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java index 7f78eb8b9851f..166d0d97b92c9 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/RepeatingTrigger.java @@ -28,5 +28,9 @@ class RepeatingTrigger implements Trigger { RepeatingTrigger(Trigger trigger) { this.trigger = trigger; } + + public Trigger getTrigger() { + return trigger; + } } diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java index 4de60a2a23c6f..94b7769a54dc7 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTrigger.java @@ -26,7 +26,7 @@ * A {@link Trigger} that fires after the specified duration has passed since the first {@link MessageEnvelope} in * the window pane. */ -public class TimeSinceFirstMessageTrigger implements Trigger { +public class TimeSinceFirstMessageTrigger implements Trigger { private final Duration duration; private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME; diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java index 6b09625ad6d19..2231fd47802eb 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTrigger.java @@ -22,8 +22,9 @@ /* * A {@link Trigger} that fires when there are no new {@link MessageEnvelope}s in the window pane for the specified duration. + * @param the type of the incoming {@link MessageEnvelope} */ -public class TimeSinceLastMessageTrigger implements Trigger { +public class TimeSinceLastMessageTrigger implements Trigger { private final Duration duration; private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME; diff --git a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java index c5875aa8e1e65..d854d74dffa99 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java +++ b/samza-api/src/main/java/org/apache/samza/operators/triggers/TimeTrigger.java @@ -23,7 +23,7 @@ /* * A {@link Trigger} that fires after the specified duration in processing time. */ -public class TimeTrigger implements Trigger { +public class TimeTrigger implements Trigger { private final Duration duration; private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME; diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java index 14bd5ab877049..bf5272475e773 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowKey.java @@ -26,14 +26,19 @@ * */ public class WindowKey { - + /** + * A (key,paneId) tuple uniquely identifies an emission from a window. For instance, in case of keyed-tumbling time windows, + * the key is provided by the keyExtractor function, and the paneId is the start of the time window boundary. In case + * of session windows, the key is provided by the keyExtractor function, and the paneId is the time at which the earliest + * message in the window arrived. + */ private final K key; private final String paneId; - public WindowKey(K key, String windowId) { + public WindowKey(K key, String paneId) { this.key = key; - this.paneId = windowId; + this.paneId = paneId; } public K getKey() { @@ -52,4 +57,29 @@ public String toString() { } return String.format("%s%s", wndKey, paneId); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + WindowKey windowKey = (WindowKey) o; + + if (!key.equals(windowKey.key)) return false; + + if (paneId == null) { + return windowKey.paneId == null; + } + + return paneId.equals(windowKey.paneId); + + } + + @Override + public int hashCode() { + int result = key.hashCode(); + result = 31 * result + (paneId != null ? paneId.hashCode() : 0); + return result; + } + } diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java index 3b66bd122a635..3b19f8ad2c9b4 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/WindowPane.java @@ -18,6 +18,8 @@ */ package org.apache.samza.operators.windows; +import org.apache.samza.operators.triggers.FiringType; + /** * Specifies the result emitted from a {@link Window}. * @@ -32,10 +34,16 @@ public final class WindowPane { private final AccumulationMode mode; - WindowPane(WindowKey key, V value, AccumulationMode mode) { + /** + * The type of the trigger that emitted this result. Results can be emitted from early, late or default triggers. + */ + private final FiringType type; + + public WindowPane(WindowKey key, V value, AccumulationMode mode, FiringType type) { this.key = key; this.value = value; this.mode = mode; + this.type = type; } public V getMessage() { @@ -46,8 +54,8 @@ public WindowKey getKey() { return this.key; } - static public WindowPane of(WindowKey key, M result) { - return new WindowPane<>(key, result, AccumulationMode.DISCARDING); + public FiringType getFiringType() { + return type; } } diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java index 73fb5c8001839..9192fc1890b18 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/Windows.java @@ -20,15 +20,18 @@ package org.apache.samza.operators.windows; import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.functions.FoldLeftFunction; import org.apache.samza.operators.triggers.TimeTrigger; import org.apache.samza.operators.triggers.Trigger; import org.apache.samza.operators.triggers.Triggers; import org.apache.samza.operators.windows.internal.WindowInternal; +import org.apache.samza.operators.windows.internal.WindowType; import java.time.Duration; +import java.util.ArrayList; import java.util.Collection; -import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.Supplier; /** * APIs for creating different types of {@link Window}s. @@ -84,6 +87,8 @@ * and triggers are fired and window panes are emitted per-key. It is possible to construct "keyed" variants of all the above window * types. * + *

Time granularity for windows: Currently, time durations are always measured in milliseconds. Time units of + * finer granularity are not supported. */ @InterfaceStability.Unstable public final class Windows { @@ -107,17 +112,18 @@ private Windows() { } * * @param keyFn the function to extract the window key from a message * @param interval the duration in processing time + * @param initialValue the initial value to be used for aggregations * @param foldFn the function to aggregate messages in the {@link WindowPane} * @param the type of the input message * @param the type of the {@link WindowPane} output value * @param the type of the key in the {@link Window} * @return the created {@link Window} function. */ - public static Window - keyedTumblingWindow(Function keyFn, Duration interval, BiFunction foldFn) { + public static Window keyedTumblingWindow(Function keyFn, Duration interval, + Supplier initialValue, FoldLeftFunction foldFn) { Trigger defaultTrigger = new TimeTrigger<>(interval); - return new WindowInternal(defaultTrigger, foldFn, keyFn, null); + return new WindowInternal(defaultTrigger, initialValue, foldFn, keyFn, null, WindowType.TUMBLING); } @@ -142,11 +148,10 @@ private Windows() { } * @return the created {@link Window} function */ public static Window> keyedTumblingWindow(Function keyFn, Duration interval) { - BiFunction, Collection> aggregator = (m, c) -> { - c.add(m); - return c; - }; - return keyedTumblingWindow(keyFn, interval, aggregator); + FoldLeftFunction> aggregator = createAggregator(); + + Supplier> initialValue = () -> new ArrayList<>(); + return keyedTumblingWindow(keyFn, interval, initialValue, aggregator); } /** @@ -164,15 +169,16 @@ public static Window> keyedTumblingWindow(Function * * @param duration the duration in processing time + * @param initialValue the initial value to be used for aggregations * @param foldFn to aggregate messages in the {@link WindowPane} * @param the type of the input message * @param the type of the {@link WindowPane} output value * @return the created {@link Window} function */ - public static Window - tumblingWindow(Duration duration, BiFunction foldFn) { + public static Window tumblingWindow(Duration duration, Supplier initialValue, + FoldLeftFunction foldFn) { Trigger defaultTrigger = Triggers.repeat(new TimeTrigger<>(duration)); - return new WindowInternal<>(defaultTrigger, foldFn, null, null); + return new WindowInternal<>(defaultTrigger, initialValue, foldFn, null, null, WindowType.TUMBLING); } /** @@ -195,11 +201,10 @@ public static Window> keyedTumblingWindow(Function Window> tumblingWindow(Duration duration) { - BiFunction, Collection> aggregator = (m, c) -> { - c.add(m); - return c; - }; - return tumblingWindow(duration, aggregator); + FoldLeftFunction> aggregator = createAggregator(); + + Supplier> initialValue = () -> new ArrayList<>(); + return tumblingWindow(duration, initialValue, aggregator); } /** @@ -223,15 +228,17 @@ public static Window> tumblingWindow(Duration duratio * * @param keyFn the function to extract the window key from a message * @param sessionGap the timeout gap for defining the session + * @param initialValue the initial value to be used for aggregations * @param foldFn the function to aggregate messages in the {@link WindowPane} * @param the type of the input message * @param the type of the key in the {@link Window} * @param the type of the output value in the {@link WindowPane} * @return the created {@link Window} function */ - public static Window keyedSessionWindow(Function keyFn, Duration sessionGap, BiFunction foldFn) { + public static Window keyedSessionWindow(Function keyFn, Duration sessionGap, + Supplier initialValue, FoldLeftFunction foldFn) { Trigger defaultTrigger = Triggers.timeSinceLastMessage(sessionGap); - return new WindowInternal<>(defaultTrigger, foldFn, keyFn, null); + return new WindowInternal<>(defaultTrigger, initialValue, foldFn, keyFn, null, WindowType.SESSION); } /** @@ -260,114 +267,18 @@ public static Window keyedSessionWindow(Function keyF */ public static Window> keyedSessionWindow(Function keyFn, Duration sessionGap) { - BiFunction, Collection> aggregator = (m, c) -> { - c.add(m); - return c; - }; - return keyedSessionWindow(keyFn, sessionGap, aggregator); - } - + FoldLeftFunction> aggregator = createAggregator(); - /** - * Creates a {@link Window} that groups incoming messages into a single global window. This window does not have a - * default trigger. The triggering behavior must be specified by setting an early trigger. - * - *

The below example computes the maximum value over a count based window. The window emits {@link WindowPane}s when - * there are either 50 messages in the window pane or when 10 seconds have passed since the first message in the pane. - * - *

 {@code
-   *    MessageStream stream = ...;
-   *    BiFunction maxAggregator = (m, c)-> Math.max(m, c);
-   *    MessageStream> windowedStream = stream.window(Windows.globalWindow(maxAggregator)
-   *      .setEarlyTriggers(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.ofSeconds(10))))))
-   * }
-   * 
- * - * @param foldFn the function to aggregate messages in the {@link WindowPane} - * @param the type of message - * @param type of the output value in the {@link WindowPane} - * @return the created {@link Window} function. - */ - public static Window globalWindow(BiFunction foldFn) { - return new WindowInternal<>(null, foldFn, null, null); + Supplier> initialValue = () -> new ArrayList<>(); + return keyedSessionWindow(keyFn, sessionGap, initialValue, aggregator); } - /** - * Creates a {@link Window} that groups incoming messages into a single global window. This window does not have a - * default trigger. The triggering behavior must be specified by setting an early trigger. - * - * The below example groups the stream into count based windows that trigger every 50 messages or every 10 minutes. - *
 {@code
-   *    MessageStream stream = ...;
-   *    MessageStream> windowedStream = stream.window(Windows.globalWindow()
-   *      .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.ofSeconds(10))))))
-   * }
-   * 
- * - * @param the type of message - * @return the created {@link Window} function. - */ - public static Window> globalWindow() { - BiFunction, Collection> aggregator = (m, c) -> { - c.add(m); - return c; - }; - return globalWindow(aggregator); - } - /** - * Returns a global {@link Window} that groups incoming messages using the provided keyFn. - * The window does not have a default trigger. The triggering behavior must be specified by setting an early - * trigger. - * - *

The below example groups the stream into count based windows. The window triggers every 50 messages or every - * 10 minutes. - * - *

 {@code
-   *    MessageStream stream = ...;
-   *    BiFunction maxAggregator = (m, c)-> Math.max(parseLongField(m), c);
-   *    Function keyFn = ...;
-   *    MessageStream> windowedStream = stream.window(Windows.keyedGlobalWindow(keyFn, maxAggregator)
-   *      .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.minutes(10))))))
-   * }
-   * 
- * - * @param keyFn the function to extract the window key from a message - * @param foldFn the function to aggregate messages in the {@link WindowPane} - * @param the type of message - * @param type of the key in the {@link Window} - * @param the type of the output value in the {@link WindowPane} - * @return the created {@link Window} function - */ - public static Window keyedGlobalWindow(Function keyFn, BiFunction foldFn) { - return new WindowInternal(null, foldFn, keyFn, null); - } - - /** - * Returns a global {@link Window} that groups incoming messages using the provided keyFn. - * The window does not have a default trigger. The triggering behavior must be specified by setting an early trigger. - * - *

The below example groups the stream per-key into count based windows. The window triggers every 50 messages or - * every 10 minutes. - * - *

 {@code
-   *    MessageStream stream = ...;
-   *    Function keyFn = ...;
-   *    MessageStream> windowedStream = stream.window(Windows.keyedGlobalWindow(keyFn)
-   *      .setEarlyTrigger(Triggers.repeat(Triggers.any(Triggers.count(50), Triggers.timeSinceFirstMessage(Duration.minutes(10))))))
-   * }
-   * 
- * - * @param keyFn the function to extract the window key from a message - * @param the type of message - * @param the type of the key in the {@link Window} - * @return the created {@link Window} function - */ - public static Window> keyedGlobalWindow(Function keyFn) { - BiFunction, Collection> aggregator = (m, c) -> { + private static FoldLeftFunction> createAggregator() { + return (m, c) -> { c.add(m); return c; }; - return keyedGlobalWindow(keyFn, aggregator); } + } diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java index 9479eea51678c..f6ac301a6b60d 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowInternal.java @@ -18,12 +18,13 @@ */ package org.apache.samza.operators.windows.internal; import org.apache.samza.annotation.InterfaceStability; +import org.apache.samza.operators.functions.FoldLeftFunction; import org.apache.samza.operators.triggers.Trigger; import org.apache.samza.operators.windows.AccumulationMode; import org.apache.samza.operators.windows.Window; -import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.Supplier; /** * Internal representation of a {@link Window}. This specifies default, early and late triggers for the {@link Window} @@ -32,81 +33,105 @@ * Note: This class is meant to be used internally by Samza, and is not to be instantiated by programmers. * * @param the type of input message - * @param the type of key for the window + * @param the type of key for the window * @param the type of aggregated value in the window output */ @InterfaceStability.Unstable -public final class WindowInternal implements Window { +public final class WindowInternal implements Window { - private final Trigger defaultTrigger; + private final Trigger defaultTrigger; + + /** + * The supplier of initial value to be used for windowed aggregations + */ + private final Supplier initializer; /* * The function that is applied each time a {@link MessageEnvelope} is added to this window. */ - private final BiFunction foldFunction; + private final FoldLeftFunction foldLeftFunction; /* * The function that extracts the key from a {@link MessageEnvelope} */ - private final Function keyExtractor; + private final Function keyExtractor; /* * The function that extracts the event time from a {@link MessageEnvelope} */ private final Function eventTimeExtractor; - private Trigger earlyTrigger; + /** + * The type of this window. Tumbling and Session windows are supported for now. + */ + private final WindowType windowType; + + private Trigger earlyTrigger; - private Trigger lateTrigger; + private Trigger lateTrigger; private AccumulationMode mode; - public WindowInternal(Trigger defaultTrigger, BiFunction foldFunction, Function keyExtractor, Function eventTimeExtractor) { - this.foldFunction = foldFunction; + public WindowInternal(Trigger defaultTrigger, Supplier initialValue, FoldLeftFunction foldLeftFunction, Function keyExtractor, Function eventTimeExtractor, WindowType windowType) { + this.defaultTrigger = defaultTrigger; + this.initializer = initialValue; + this.foldLeftFunction = foldLeftFunction; this.eventTimeExtractor = eventTimeExtractor; this.keyExtractor = keyExtractor; - this.defaultTrigger = defaultTrigger; + this.windowType = windowType; } @Override - public Window setEarlyTrigger(Trigger trigger) { + public Window setEarlyTrigger(Trigger trigger) { this.earlyTrigger = trigger; return this; } @Override - public Window setLateTrigger(Trigger trigger) { + public Window setLateTrigger(Trigger trigger) { this.lateTrigger = trigger; return this; } @Override - public Window setAccumulationMode(AccumulationMode mode) { + public Window setAccumulationMode(AccumulationMode mode) { this.mode = mode; return this; } - public Trigger getDefaultTrigger() { + public Trigger getDefaultTrigger() { return defaultTrigger; } - public Trigger getEarlyTrigger() { + public Trigger getEarlyTrigger() { return earlyTrigger; } - public Trigger getLateTrigger() { + public Trigger getLateTrigger() { return lateTrigger; } - public BiFunction getFoldFunction() { - return foldFunction; + public Supplier getInitializer() { + return initializer; } - public Function getKeyExtractor() { + public FoldLeftFunction getFoldLeftFunction() { + return foldLeftFunction; + } + + public Function getKeyExtractor() { return keyExtractor; } public Function getEventTimeExtractor() { return eventTimeExtractor; } + + public WindowType getWindowType() { + return windowType; + } + + public AccumulationMode getAccumulationMode() { + return mode; + } } diff --git a/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowType.java b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowType.java new file mode 100644 index 0000000000000..409d56a51d08d --- /dev/null +++ b/samza-api/src/main/java/org/apache/samza/operators/windows/internal/WindowType.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.windows.internal; + +public enum WindowType { + TUMBLING, SESSION + //,SLIDING +} diff --git a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java index 54d0b2fcbbd12..4184c9d254ff4 100644 --- a/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java +++ b/samza-api/src/test/java/org/apache/samza/operators/windows/TestWindowPane.java @@ -18,6 +18,7 @@ */ package org.apache.samza.operators.windows; +import org.apache.samza.operators.triggers.FiringType; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -26,7 +27,7 @@ public class TestWindowPane { @Test public void testConstructor() { - WindowPane wndOutput = WindowPane.of(new WindowKey<>("testMsg", null), 10); + WindowPane wndOutput = new WindowPane(new WindowKey("testMsg", null), 10, AccumulationMode.DISCARDING, FiringType.EARLY); assertEquals(wndOutput.getKey().getKey(), "testMsg"); assertEquals(wndOutput.getMessage(), Integer.valueOf(10)); } diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java index f801097d73452..1b36f76f641c8 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Map; import java.util.function.Function; + import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.operators.data.MessageEnvelope; diff --git a/samza-core/src/main/java/org/apache/samza/operators/WindowState.java b/samza-core/src/main/java/org/apache/samza/operators/WindowState.java new file mode 100644 index 0000000000000..4e80862adb4b1 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/WindowState.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +/** + * Wraps the value stored for a particular {@link org.apache.samza.operators.windows.WindowKey} with additional metadata. + */ +public class WindowState { + + final WV wv; + /** + * Time of the first message in the window + */ + final long earliestRecvTime; + + public WindowState(WV wv, long earliestRecvTime) { + this.wv = wv; + this.earliestRecvTime = earliestRecvTime; + } + + public WV getWindowValue() { + return wv; + } + + public long getEarliestTimestamp() { + return earliestRecvTime; + } +} diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java index 3efd5f5eb56c8..ca8e34b4206d5 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorGraph.java @@ -27,6 +27,8 @@ import org.apache.samza.operators.spec.WindowOperatorSpec; import org.apache.samza.system.SystemStream; import org.apache.samza.task.TaskContext; +import org.apache.samza.util.Clock; +import org.apache.samza.util.SystemClock; import java.util.Collection; import java.util.Collections; @@ -52,6 +54,16 @@ public class OperatorGraph { */ private final Map operatorGraph = new HashMap<>(); + private final Clock clock; + + public OperatorGraph(Clock clock) { + this.clock = clock; + } + + public OperatorGraph() { + this(SystemClock.instance()); + } + /** * Initialize the whole DAG of {@link OperatorImpl}s, based on the input {@link MessageStreamImpl} from the {@link org.apache.samza.operators.StreamGraph}. * This method will traverse each input {@link org.apache.samza.operators.MessageStream} in the {@code inputStreams} and @@ -156,14 +168,14 @@ private RootOperatorImpl createOperatorImpls(MessageStreamImpl source, * @param context the {@link TaskContext} required to instantiate operators * @return the {@link OperatorImpl} implementation instance */ - private static OperatorImpl createOperatorImpl(MessageStreamImpl source, OperatorSpec operatorSpec, Config config, TaskContext context) { + private OperatorImpl createOperatorImpl(MessageStreamImpl source, OperatorSpec operatorSpec, Config config, TaskContext context) { if (operatorSpec instanceof StreamOperatorSpec) { StreamOperatorSpec streamOpSpec = (StreamOperatorSpec) operatorSpec; return new StreamOperatorImpl<>(streamOpSpec, source, config, context); } else if (operatorSpec instanceof SinkOperatorSpec) { return new SinkOperatorImpl<>((SinkOperatorSpec) operatorSpec, config, context); } else if (operatorSpec instanceof WindowOperatorSpec) { - return new WindowOperatorImpl<>((WindowOperatorSpec) operatorSpec, source, config, context); + return new WindowOperatorImpl((WindowOperatorSpec) operatorSpec, clock); } else if (operatorSpec instanceof PartialJoinOperatorSpec) { return new PartialJoinOperatorImpl<>((PartialJoinOperatorSpec) operatorSpec, source, config, context); } diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java index 99833078b9632..b9a606b9fe8da 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java @@ -52,41 +52,37 @@ void registerNextOperator(OperatorImpl nextOperator) { public abstract void onNext(M message, MessageCollector collector, TaskCoordinator coordinator); /** - * Perform the actions required on a timer tick and call the downstream operators. - * - * Overriding implementations must call {@link #propagateTimer} to propagate the timer tick to registered - * downstream operators correctly. + * Invoked at every tick. This method delegates to {@link #onTimer(MessageCollector, TaskCoordinator)} * * @param collector the {@link MessageCollector} in the context * @param coordinator the {@link TaskCoordinator} in the context */ - public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { - propagateTimer(collector, coordinator); + public final void onTick(MessageCollector collector, TaskCoordinator coordinator) { + onTimer(collector, coordinator); + nextOperators.forEach(sub -> sub.onTick(collector, coordinator)); } /** - * Helper method to propagate the output of this operator to all registered downstream operators. - * - * This method must be called from {@link #onNext} to propagate the operator output correctly. + * Invoked at every tick. Implementations must call {@link #propagateResult} to propagate any generated output + * to registered downstream operators. * - * @param outputMessage output message * @param collector the {@link MessageCollector} in the context * @param coordinator the {@link TaskCoordinator} in the context */ - void propagateResult(RM outputMessage, MessageCollector collector, TaskCoordinator coordinator) { - nextOperators.forEach(sub -> sub.onNext(outputMessage, collector, coordinator)); + public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { } /** - * Helper method to propagate the timer tick to all registered downstream operators. + * Helper method to propagate the output of this operator to all registered downstream operators. * - * This method must be called from {@link #onTimer} to propagate the timer tick correctly. + * This method must be called from {@link #onNext} and {@link #onTimer} + * to propagate the operator output correctly. * + * @param outputMessage output message * @param collector the {@link MessageCollector} in the context * @param coordinator the {@link TaskCoordinator} in the context */ - void propagateTimer(MessageCollector collector, TaskCoordinator coordinator) { - nextOperators.forEach(sub -> sub.onTimer(collector, coordinator)); + void propagateResult(RM outputMessage, MessageCollector collector, TaskCoordinator coordinator) { + nextOperators.forEach(sub -> sub.onNext(outputMessage, collector, coordinator)); } - } diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java index f704f3fc6f015..b2948a34468cb 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartialJoinOperatorImpl.java @@ -93,7 +93,6 @@ public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { thisState.deleteAll(keysToRemove); LOGGER.info("Operator ID {} onTimer self time: {} ms", opId, System.currentTimeMillis() - now); - this.propagateTimer(collector, coordinator); } } diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java b/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java new file mode 100644 index 0000000000000..49fefc0bfe0e9 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerKey.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.triggers.FiringType; +import org.apache.samza.operators.windows.WindowKey; + +/** + * Uniquely identifies a trigger firing + */ +public class TriggerKey { + private final FiringType type; + private final WindowKey key; + + public TriggerKey(FiringType type, WindowKey key) { + if (type == null) { + throw new IllegalArgumentException("Firing type cannot be null"); + } + + if (key == null) { + throw new IllegalArgumentException("WindowKey cannot be null"); + } + + this.type = type; + this.key = key; + } + + /** + * Equality is determined by both the type, and the window key. + */ + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TriggerKey that = (TriggerKey) o; + return type == that.type && key.equals(that.key); + } + + /** + * Hashcode is computed by from the type, and the window key. + */ + @Override + public int hashCode() { + int result = type.hashCode(); + result = 31 * result + key.hashCode(); + return result; + } + + public WindowKey getKey() { + return key; + } + + public FiringType getType() { + return type; + } +} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerScheduler.java b/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerScheduler.java new file mode 100644 index 0000000000000..952d9f1405557 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/TriggerScheduler.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.impl; + +import org.apache.samza.operators.triggers.Cancellable; +import org.apache.samza.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.PriorityQueue; + +/** + * Allows to schedule and cancel callbacks for triggers. + */ +public class TriggerScheduler { + + private static final Logger LOG = LoggerFactory.getLogger(TriggerScheduler.class); + + private final PriorityQueue> pendingCallbacks; + private final Clock clock; + + public TriggerScheduler(Clock clock) { + this.pendingCallbacks = new PriorityQueue<>(); + this.clock = clock; + } + + /** + * Schedule the provided runnable for execution at the specified duration. + * @param runnable the provided runnable to schedule. + * @param scheduledTimeMs time at which the runnable must be scheduled for execution + * @param triggerKey a key that uniquely identifies the corresponding trigger firing. + * @return a {@link Cancellable} that can be used to cancel the execution of this runnable. + */ + public Cancellable scheduleCallback(Runnable runnable, long scheduledTimeMs, TriggerKey triggerKey) { + TriggerCallbackState timerState = new TriggerCallbackState(triggerKey, runnable, scheduledTimeMs); + pendingCallbacks.add(timerState); + LOG.trace("Scheduled a new callback: {} at {} for triggerKey {}", new Object[] {runnable, scheduledTimeMs, triggerKey}); + return timerState; + } + + /** + * Run all pending callbacks that are ready to be scheduled. A callback is defined as "ready" if it's scheduledTime + * is less than or equal to {@link Clock#currentTimeMillis()} + * + * @return the list of {@link TriggerKey}s corresponding to the callbacks that were run. + */ + public List> runPendingCallbacks() { + TriggerCallbackState state; + List> keys = new ArrayList<>(); + long now = clock.currentTimeMillis(); + + while ((state = pendingCallbacks.peek()) != null && state.getScheduledTimeMs() <= now) { + pendingCallbacks.remove(); + state.getCallback().run(); + TriggerKey key = state.getTriggerKey(); + keys.add(key); + } + return keys; + } + + /** + * State corresponding to pending timer callbacks scheduled by various triggers. + */ + private class TriggerCallbackState implements Comparable>, Cancellable { + + private final TriggerKey triggerKey; + private final Runnable callback; + + // the time at which the callback should trigger + private final long scheduledTimeMs; + + private TriggerCallbackState(TriggerKey triggerKey, Runnable callback, long scheduledTimeMs) { + this.triggerKey = triggerKey; + this.callback = callback; + this.scheduledTimeMs = scheduledTimeMs; + } + + private Runnable getCallback() { + return callback; + } + + private long getScheduledTimeMs() { + return scheduledTimeMs; + } + + private TriggerKey getTriggerKey() { + return triggerKey; + } + + @Override + public int compareTo(TriggerCallbackState other) { + return Long.compare(this.scheduledTimeMs, other.scheduledTimeMs); + } + + @Override + public boolean cancel() { + LOG.trace("Cancelled a callback: {} at {} for triggerKey {}", new Object[] {callback, scheduledTimeMs, triggerKey}); + return pendingCallbacks.remove(this); + } + } +} diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java index af005534b590c..cd3b1bcd95ffa 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java @@ -1,3 +1,5 @@ +// CHECKSTYLE:OFF +// Turn off checkstyle for this class because of a checkstyle bug in handling nested typed collections /* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file @@ -18,26 +20,300 @@ */ package org.apache.samza.operators.impl; -import org.apache.samza.config.Config; -import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.spec.WindowOperatorSpec; +import org.apache.samza.operators.WindowState; +import org.apache.samza.operators.triggers.RepeatingTriggerImpl; +import org.apache.samza.operators.triggers.TimeTrigger; +import org.apache.samza.operators.triggers.Trigger; +import org.apache.samza.operators.triggers.TriggerImpl; +import org.apache.samza.operators.triggers.TriggerImpls; +import org.apache.samza.operators.triggers.FiringType; +import org.apache.samza.operators.util.InternalInMemoryStore; +import org.apache.samza.operators.windows.AccumulationMode; +import org.apache.samza.operators.windows.WindowKey; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.internal.WindowInternal; +import org.apache.samza.operators.windows.internal.WindowType; +import org.apache.samza.storage.kv.KeyValueStore; import org.apache.samza.task.MessageCollector; -import org.apache.samza.task.TaskContext; import org.apache.samza.task.TaskCoordinator; +import org.apache.samza.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; + +/** + * Implementation of a window operator that groups messages into finite windows for processing. + * + * This class implements the processing logic for various types of windows and triggers. It tracks and manages state for + * all open windows, the active triggers that correspond to each of the windows and the pending callbacks. It provides + * an implementation of {@link TriggerScheduler} that {@link TriggerImpl}s can use to schedule and cancel callbacks. It + * also orchestrates the flow of messages through the various {@link TriggerImpl}s. + * + *

An instance of a {@link TriggerImplHandler} is created corresponding to each {@link Trigger} configured for a + * particular window. For every message added to the window, this class looks up the corresponding {@link TriggerImplHandler} + * for the trigger and invokes {@link TriggerImplHandler#onMessage(TriggerKey, Object, MessageCollector, TaskCoordinator)}. + * The {@link TriggerImplHandler} maintains the {@link TriggerImpl} instance along with whether it has been canceled yet + * or not. Then, the {@link TriggerImplHandler} invokes onMessage on underlying its {@link TriggerImpl} instance. A + * {@link TriggerImpl} instance is scoped to a window and its firing determines when results for its window are emitted. The + * {@link WindowOperatorImpl} checks if the trigger fired, and propagates the result of the firing to its downstream + * operators. + * + * @param the type of the incoming message + * @param the type of the key in this {@link org.apache.samza.operators.MessageStream} + * @param the type of the value in the emitted window pane + * + */ public class WindowOperatorImpl extends OperatorImpl> { + private static final Logger LOG = LoggerFactory.getLogger(WindowOperatorImpl.class); + private final WindowInternal window; + private final KeyValueStore, WindowState> store = new InternalInMemoryStore<>(); + TriggerScheduler triggerScheduler ; + + // The trigger state corresponding to each {@link TriggerKey}. + private final Map, TriggerImplHandler> triggers = new HashMap<>(); + private final Clock clock; - public WindowOperatorImpl(WindowOperatorSpec spec, MessageStreamImpl source, Config config, TaskContext context) { - // source, config, and context are used to initialize the window kv-store - window = spec.getWindow(); + public WindowOperatorImpl(WindowOperatorSpec spec, Clock clock) { + this.clock = clock; + this.window = spec.getWindow(); + this.triggerScheduler= new TriggerScheduler(clock); } @Override public void onNext(M message, MessageCollector collector, TaskCoordinator coordinator) { + LOG.trace("Processing message envelope: {}", message); + WindowKey storeKey = getStoreKey(message); + WindowState existingState = store.get(storeKey); + WindowState newState = applyFoldFunction(existingState, message); + + LOG.trace("New window value: {}, earliest timestamp: {}", newState.getWindowValue(), newState.getEarliestTimestamp()); + store.put(storeKey, newState); + + if (window.getEarlyTrigger() != null) { + TriggerKey triggerKey = new TriggerKey<>(FiringType.EARLY, storeKey); + + getOrCreateTriggerImplWrapper(triggerKey, window.getEarlyTrigger()) + .onMessage(triggerKey, message, collector, coordinator); + } + + if (window.getDefaultTrigger() != null) { + TriggerKey triggerKey = new TriggerKey<>(FiringType.DEFAULT, storeKey); + getOrCreateTriggerImplWrapper(triggerKey, window.getDefaultTrigger()) + .onMessage(triggerKey, message, collector, coordinator); + } } -} + + @Override + public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { + List> keys = triggerScheduler.runPendingCallbacks(); + + for (TriggerKey key : keys) { + TriggerImplHandler triggerImplHandler = triggers.get(key); + if (triggerImplHandler != null) { + triggerImplHandler.onTimer(key, collector, coordinator); + } + } + + } + + /** + * Get the key to be used for lookups in the store for this message. + */ + private WindowKey getStoreKey(M message) { + Function keyExtractor = window.getKeyExtractor(); + WK key = null; + + if (keyExtractor != null) { + key = keyExtractor.apply(message); + } + + String paneId = null; + + if (window.getWindowType() == WindowType.TUMBLING) { + long triggerDurationMs = ((TimeTrigger) window.getDefaultTrigger()).getDuration().toMillis(); + final long now = clock.currentTimeMillis(); + Long windowBoundary = now - now % triggerDurationMs; + paneId = windowBoundary.toString(); + } + + return new WindowKey<>(key, paneId); + } + + private WindowState applyFoldFunction(WindowState existingState, M message) { + WV wv; + long earliestTimestamp; + + if (existingState == null) { + LOG.trace("No existing state found for key"); + wv = window.getInitializer().get(); + earliestTimestamp = clock.currentTimeMillis(); + } else { + wv = existingState.getWindowValue(); + earliestTimestamp = existingState.getEarliestTimestamp(); + } + + WV newVal = window.getFoldLeftFunction().apply(message, wv); + WindowState newState = new WindowState(newVal, earliestTimestamp); + + return newState; + } + + private TriggerImplHandler getOrCreateTriggerImplWrapper(TriggerKey triggerKey, Trigger trigger) { + TriggerImplHandler wrapper = triggers.get(triggerKey); + if (wrapper != null) { + LOG.trace("Returning existing trigger wrapper for {}", triggerKey); + return wrapper; + } + + LOG.trace("Creating a new trigger wrapper for {}", triggerKey); + + TriggerImpl triggerImpl = TriggerImpls.createTriggerImpl(trigger, clock, triggerKey); + wrapper = new TriggerImplHandler(triggerKey, triggerImpl); + triggers.put(triggerKey, wrapper); + + return wrapper; + } + + /** + * Handles trigger firings, and propagates results to downstream operators. + */ + private void onTriggerFired(TriggerKey triggerKey, MessageCollector collector, TaskCoordinator coordinator) { + LOG.trace("Trigger key {} fired." , triggerKey); + + TriggerImplHandler wrapper = triggers.get(triggerKey); + WindowKey windowKey = triggerKey.getKey(); + WindowState state = store.get(windowKey); + + if (state == null) { + LOG.trace("No state found for triggerKey: {}", triggerKey); + return; + } + + WindowPane paneOutput = computePaneOutput(triggerKey, state); + super.propagateResult(paneOutput, collector, coordinator); + + // Handle accumulation modes. + if (window.getAccumulationMode() == AccumulationMode.DISCARDING) { + LOG.trace("Clearing state for trigger key: {}", triggerKey); + store.put(windowKey, null); + } + + // Cancel all early triggers too when the default trigger fires. Also, clean all state for the key. + // note: We don't handle late arrivals yet, So, all arrivals are either early or on-time. + if (triggerKey.getType() == FiringType.DEFAULT) { + + LOG.trace("Default trigger fired. Canceling triggers for {}", triggerKey); + + cancelTrigger(triggerKey, true); + cancelTrigger(new TriggerKey(FiringType.EARLY, triggerKey.getKey()), true); + WindowKey key = triggerKey.getKey(); + store.delete(key); + } + + // Cancel non-repeating early triggers. All early triggers should be removed from the "triggers" map only after the + // firing of their corresponding default trigger. Removing them pre-maturely (immediately after cancellation) will + // will create a new {@link TriggerImplWrapper} instance at a future invocation of getOrCreateTriggerWrapper(). + // This would cause an already canceled trigger to fire again for the window. + + if (triggerKey.getType() == FiringType.EARLY && !wrapper.isRepeating()) { + cancelTrigger(triggerKey, false); + } + } + + /** + * Computes the pane output corresponding to a {@link TriggerKey} that fired. + */ + private WindowPane computePaneOutput(TriggerKey triggerKey, WindowState state) { + WindowKey windowKey = triggerKey.getKey(); + WV windowVal = state.getWindowValue(); + + // For session windows, we will create a new window key by using the time of the first message in the window as + //the paneId. + if (window.getWindowType() == WindowType.SESSION) { + windowKey = new WindowKey<>(windowKey.getKey(), Long.toString(state.getEarliestTimestamp())); + } + + // Make a defensive copy so that we are immune to further mutations on the collection + if (windowVal instanceof Collection) { + windowVal = (WV) new ArrayList<>((Collection) windowVal); + } + + WindowPane paneOutput = new WindowPane<>(windowKey, windowVal, window.getAccumulationMode(), triggerKey.getType()); + LOG.trace("Emitting pane output for trigger key {}", triggerKey); + return paneOutput; + } + + /** + * Cancels the firing of the {@link TriggerImpl} identified by this {@link TriggerKey} and optionally removes it. + */ + private void cancelTrigger(TriggerKey triggerKey, boolean shouldRemove) { + TriggerImplHandler triggerImplHandler = triggers.get(triggerKey); + if (triggerImplHandler != null) { + triggerImplHandler.cancel(); + } + if (shouldRemove && triggerKey != null) { + triggers.remove(triggerKey); + } + } + + /** + * State corresponding to a created {@link TriggerImpl} instance. + */ + private class TriggerImplHandler { + // The context, and the {@link TriggerImpl} instance corresponding to this triggerKey + private final TriggerImpl impl; + // Guard to ensure that we don't invoke onMessage or onTimer on already cancelled triggers + private boolean isCancelled = false; + + public TriggerImplHandler(TriggerKey key, TriggerImpl impl) { + this.impl = impl; + } + + public void onMessage(TriggerKey triggerKey, M message, MessageCollector collector, TaskCoordinator coordinator) { + if (!isCancelled) { + LOG.trace("Forwarding callbacks for {}", message); + impl.onMessage(message, triggerScheduler); + + if (impl.shouldFire()) { + // repeating trigger can trigger multiple times, So, clear the state to allow future triggerings. + if (impl instanceof RepeatingTriggerImpl) { + ((RepeatingTriggerImpl) impl).clear(); + } + onTriggerFired(triggerKey, collector, coordinator); + } + } + } + + public void onTimer(TriggerKey key, MessageCollector collector, TaskCoordinator coordinator) { + if (impl.shouldFire() && !isCancelled) { + LOG.trace("Triggering timer triggers"); + + // repeating trigger can trigger multiple times, So, clear the trigger to allow future triggerings. + if (impl instanceof RepeatingTriggerImpl) { + ((RepeatingTriggerImpl) impl).clear(); + } + onTriggerFired(key, collector, coordinator); + } + } + + public void cancel() { + impl.cancel(); + isCancelled = true; + } + + public boolean isRepeating() { + return this.impl instanceof RepeatingTriggerImpl; + } + } + +} \ No newline at end of file diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java index 46417edb98ad5..6d948d764d709 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java @@ -19,9 +19,11 @@ package org.apache.samza.operators.spec; +import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.windows.WindowPane; import org.apache.samza.operators.windows.internal.WindowInternal; +import org.apache.samza.task.TaskContext; /** @@ -53,12 +55,19 @@ public class WindowOperatorSpec implements OperatorSpec> getNextStream() { return this.outputStream; } - public WindowInternal getWindow() { + public WindowInternal getWindow() { return window; } diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/AnyTriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/AnyTriggerImpl.java new file mode 100644 index 0000000000000..a0aa384eeb668 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/AnyTriggerImpl.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.triggers; + +import org.apache.samza.operators.impl.TriggerKey; +import org.apache.samza.operators.impl.TriggerScheduler; +import org.apache.samza.util.Clock; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * Implementation of an {@link AnyTrigger} + */ +public class AnyTriggerImpl implements TriggerImpl { + + private final List> triggers; + + private final List> triggerImpls = new ArrayList<>(); + private final Clock clock; + private boolean shouldFire = false; + + public AnyTriggerImpl(AnyTrigger anyTrigger, Clock clock, TriggerKey triggerKey) { + this.triggers = anyTrigger.getTriggers(); + this.clock = clock; + for (Trigger trigger : triggers) { + triggerImpls.add(TriggerImpls.createTriggerImpl(trigger, clock, triggerKey)); + } + } + + @Override + public void onMessage(M message, TriggerScheduler context) { + for (TriggerImpl impl : triggerImpls) { + impl.onMessage(message, context); + if (impl.shouldFire()) { + shouldFire = true; + break; + } + } + if (shouldFire) { + cancel(); + } + } + + public void cancel() { + for (Iterator> it = triggerImpls.iterator(); it.hasNext(); ) { + TriggerImpl impl = it.next(); + impl.cancel(); + it.remove(); + } + } + + @Override + public boolean shouldFire() { + for (TriggerImpl impl : triggerImpls) { + if (impl.shouldFire()) { + shouldFire = true; + break; + } + } + return shouldFire; + } +} diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/Cancellable.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/Cancellable.java new file mode 100644 index 0000000000000..ca0ba6792aca2 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/Cancellable.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.triggers; + +/** + * Represents a task or an operation whose execution can be cancelled. + */ +public interface Cancellable { + + /** + * Cancel the execution of this operation (if it is not scheduled for execution yet). If the operation is in progress, + * it is not interrupted / cancelled. + * + * @return the result of the cancelation + */ + public boolean cancel(); +} diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/CountTriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/CountTriggerImpl.java new file mode 100644 index 0000000000000..da1efda9e32be --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/CountTriggerImpl.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.triggers; + +import org.apache.samza.operators.impl.TriggerKey; +import org.apache.samza.operators.impl.TriggerScheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation class for a {@link CountTrigger} + */ +public class CountTriggerImpl implements TriggerImpl { + private static final Logger LOG = LoggerFactory.getLogger(CountTriggerImpl.class); + + private final long triggerCount; + private final TriggerKey triggerKey; + private long currentCount; + private boolean shouldFire = false; + + public CountTriggerImpl(CountTrigger triggerCount, TriggerKey triggerKey) { + this.triggerCount = triggerCount.getCount(); + this.currentCount = 0; + this.triggerKey = triggerKey; + } + + public void onMessage(M message, TriggerScheduler context) { + currentCount++; + if (currentCount == triggerCount) { + LOG.trace("count trigger fired for {}", message); + shouldFire = true; + } + } + + @Override + public void cancel() { + //no-op + } + + @Override + public boolean shouldFire() { + return shouldFire; + } +} diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/RepeatingTriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/RepeatingTriggerImpl.java new file mode 100644 index 0000000000000..39b9b4097114a --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/RepeatingTriggerImpl.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.triggers; + +import org.apache.samza.operators.impl.TriggerKey; +import org.apache.samza.operators.impl.TriggerScheduler; +import org.apache.samza.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation class for a {@link RepeatingTrigger} + */ +public class RepeatingTriggerImpl implements TriggerImpl { + private static final Logger LOG = LoggerFactory.getLogger(RepeatingTriggerImpl.class); + + private final Trigger repeatingTrigger; + private final Clock clock; + private final TriggerKey triggerKey; + + private TriggerImpl currentTriggerImpl; + + public RepeatingTriggerImpl(RepeatingTrigger repeatingTrigger, Clock clock, TriggerKey key) { + this.repeatingTrigger = repeatingTrigger.getTrigger(); + this.clock = clock; + this.triggerKey = key; + this.currentTriggerImpl = TriggerImpls.createTriggerImpl(this.repeatingTrigger, clock, triggerKey); + } + + @Override + public void onMessage(M message, TriggerScheduler context) { + currentTriggerImpl.onMessage(message, context); + } + + @Override + public void cancel() { + currentTriggerImpl.cancel(); + } + + public void clear() { + LOG.trace("Clearing state for repeating trigger"); + currentTriggerImpl.cancel(); + currentTriggerImpl = TriggerImpls.createTriggerImpl(repeatingTrigger, clock, triggerKey); + } + + @Override + public boolean shouldFire() { + return currentTriggerImpl.shouldFire(); + } +} diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTriggerImpl.java new file mode 100644 index 0000000000000..32bf988e076c3 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceFirstMessageTriggerImpl.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.triggers; + +import org.apache.samza.operators.impl.TriggerKey; +import org.apache.samza.operators.impl.TriggerScheduler; +import org.apache.samza.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation class for a {@link TimeSinceFirstMessageTrigger} + * @param the type of the incoming message + */ +public class TimeSinceFirstMessageTriggerImpl implements TriggerImpl { + + private static final Logger LOG = LoggerFactory.getLogger(TimeSinceFirstMessageTriggerImpl.class); + + private final TimeSinceFirstMessageTrigger trigger; + private final Clock clock; + private final TriggerKey triggerKey; + private Cancellable cancellable; + private boolean shouldFire = false; + + public TimeSinceFirstMessageTriggerImpl(TimeSinceFirstMessageTrigger trigger, Clock clock, TriggerKey key) { + this.trigger = trigger; + this.clock = clock; + this.triggerKey = key; + } + + public void onMessage(M message, TriggerScheduler context) { + if (cancellable == null && !shouldFire) { + final long now = clock.currentTimeMillis(); + long triggerDurationMs = trigger.getDuration().toMillis(); + Long callbackTime = now + triggerDurationMs; + cancellable = context.scheduleCallback(() -> { + LOG.trace("Time since first message trigger fired"); + shouldFire = true; + }, callbackTime, triggerKey); + } + } + + @Override + public void cancel() { + if (cancellable != null) { + cancellable.cancel(); + } + } + + @Override + public boolean shouldFire() { + return shouldFire; + } +} diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTriggerImpl.java new file mode 100644 index 0000000000000..8544efd0f9024 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeSinceLastMessageTriggerImpl.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.triggers; + +import org.apache.samza.operators.impl.TriggerKey; +import org.apache.samza.operators.impl.TriggerScheduler; +import org.apache.samza.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation class for a {@link TimeSinceLastMessageTrigger} + * @param the type of the incoming message + */ +public class TimeSinceLastMessageTriggerImpl implements TriggerImpl { + + private static final Logger LOG = LoggerFactory.getLogger(TimeSinceLastMessageTriggerImpl.class); + private final TimeSinceLastMessageTrigger trigger; + private final long durationMs; + private final Clock clock; + private final TriggerKey triggerKey; + private long callbackTime = Integer.MIN_VALUE; + private Cancellable cancellable = null; + private boolean shouldFire = false; + + public TimeSinceLastMessageTriggerImpl(TimeSinceLastMessageTrigger trigger, Clock clock, TriggerKey key) { + this.trigger = trigger; + this.durationMs = trigger.getDuration().toMillis(); + this.clock = clock; + this.triggerKey = key; + } + + @Override + public void onMessage(M message, TriggerScheduler context) { + if (!shouldFire) { + long currTime = clock.currentTimeMillis(); + + if (currTime < callbackTime && cancellable != null) { + cancellable.cancel(); + } + + callbackTime = currTime + durationMs; + Runnable runnable = () -> { + LOG.trace("Time since last message trigger fired"); + shouldFire = true; + }; + + cancellable = context.scheduleCallback(runnable, callbackTime, triggerKey); + } + } + + @Override + public void cancel() { + if (cancellable != null) { + cancellable.cancel(); + } + } + + @Override + public boolean shouldFire() { + return shouldFire; + } +} diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeTriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeTriggerImpl.java new file mode 100644 index 0000000000000..2454ce9f7d5d2 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/TimeTriggerImpl.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.triggers; + +import org.apache.samza.operators.impl.TriggerKey; +import org.apache.samza.operators.impl.TriggerScheduler; +import org.apache.samza.util.Clock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Implementation class for a {@link TimeTrigger} + */ +public class TimeTriggerImpl implements TriggerImpl { + + private static final Logger LOG = LoggerFactory.getLogger(TimeTriggerImpl.class); + + private final TimeTrigger trigger; + private final TriggerKey triggerKey; + private Cancellable cancellable; + private final Clock clock; + private boolean shouldFire = false; + + public TimeTriggerImpl(TimeTrigger trigger, Clock clock, TriggerKey key) { + this.trigger = trigger; + this.clock = clock; + this.triggerKey = key; + } + + public void onMessage(M message, TriggerScheduler context) { + final long now = clock.currentTimeMillis(); + long triggerDurationMs = trigger.getDuration().toMillis(); + Long callbackTime = (now - now % triggerDurationMs) + triggerDurationMs; + + if (cancellable == null) { + cancellable = context.scheduleCallback(() -> { + LOG.trace("Time trigger fired"); + shouldFire = true; + }, callbackTime, triggerKey); + } + } + + @Override + public void cancel() { + if (cancellable != null) { + cancellable.cancel(); + } + } + + @Override + public boolean shouldFire() { + return shouldFire; + } +} diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java new file mode 100644 index 0000000000000..705cab74a63ac --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpl.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.triggers; + + +import org.apache.samza.operators.impl.TriggerScheduler; + +/** + * Implementation class for a {@link Trigger}. A {@link TriggerImpl} is used with a + * which is invoked when the trigger fires. + * + *

When messages arrive in the {@code WindowOperatorImpl}, they are assigned to one or more windows. An + * instance of a {@link TriggerImpl} is created corresponding to each {@link Trigger} configured for a window. For every + * message added to the window, the {@code WindowOperatorImpl} invokes the {@link #onMessage} on its corresponding + * {@link TriggerImpl}s. A {@link TriggerImpl} instance is scoped to a window and its firing determines when results for + * its window are emitted. + * + * {@link TriggerImpl}s can use the {@link TriggerScheduler} to schedule and cancel callbacks (for example, implementations + * of time-based triggers). + * + *

State management: The state maintained by {@link TriggerImpl}s is not durable across re-starts and is transient. + * New instances of {@link TriggerImpl} are created on a re-start. + * + */ +public interface TriggerImpl { + + /** + * Invoked when a message is added to the window corresponding to this {@link TriggerImpl}. + * @param message the incoming message + * @param context the {@link TriggerScheduler} to schedule and cancel callbacks + */ + public void onMessage(M message, TriggerScheduler context); + + /** + * Returns {@code true} if the current state of the trigger indicates that its condition + * is satisfied and it is ready to fire. + * @return if this trigger should fire. + */ + public boolean shouldFire(); + + /** + * Invoked when the execution of this {@link TriggerImpl} is canceled by an up-stream {@link TriggerImpl}. + * + * No calls to {@link #onMessage(Object, TriggerScheduler)} or {@link #shouldFire()} will be invoked + * after this invocation. + */ + public void cancel(); + +} diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpls.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpls.java new file mode 100644 index 0000000000000..f64a1db159990 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/TriggerImpls.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.triggers; + +import org.apache.samza.SamzaException; +import org.apache.samza.operators.impl.TriggerKey; +import org.apache.samza.util.Clock; + +/** + * Factory methods for instantiating {@link TriggerImpl}s from individual {@link Trigger}s. + */ +public class TriggerImpls { + + public static TriggerImpl createTriggerImpl(Trigger trigger, Clock clock, TriggerKey triggerKey) { + + if (trigger == null) { + throw new IllegalArgumentException("Trigger must not be null"); + } + + if (trigger instanceof CountTrigger) { + return new CountTriggerImpl<>((CountTrigger) trigger, triggerKey); + } else if (trigger instanceof RepeatingTrigger) { + return new RepeatingTriggerImpl<>((RepeatingTrigger) trigger, clock, triggerKey); + } else if (trigger instanceof AnyTrigger) { + return new AnyTriggerImpl<>((AnyTrigger) trigger, clock, triggerKey); + } else if (trigger instanceof TimeSinceLastMessageTrigger) { + return new TimeSinceLastMessageTriggerImpl<>((TimeSinceLastMessageTrigger) trigger, clock, triggerKey); + } else if (trigger instanceof TimeTrigger) { + return new TimeTriggerImpl((TimeTrigger) trigger, clock, triggerKey); + } else if (trigger instanceof TimeSinceFirstMessageTrigger) { + return new TimeSinceFirstMessageTriggerImpl<>((TimeSinceFirstMessageTrigger) trigger, clock, triggerKey); + } + + throw new SamzaException("No implementation class defined for the trigger " + trigger.getClass().getCanonicalName()); + } +} diff --git a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java index e5dab80ea8f1e..b8672c628dc33 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java +++ b/samza-core/src/main/java/org/apache/samza/operators/util/InternalInMemoryStore.java @@ -16,6 +16,7 @@ * specific language governing permissions and limitations * under the License. */ + package org.apache.samza.operators.util; import org.apache.samza.storage.kv.Entry; @@ -30,24 +31,28 @@ /** * Implements a {@link KeyValueStore} using an in-memory Java Map. - * @param the type of the key in the store - * @param the type of the value in the store + * @param the type of key + * @param the type of value + * + * TODO: This class is a stop-gap until we implement persistent store creation from TaskContext. * - * TODO HIGH prateekm: Remove when we switch to an persistent implementation for KeyValueStore API. */ public class InternalInMemoryStore implements KeyValueStore { - final Map map = new LinkedHashMap<>(); + private final Map map = new LinkedHashMap<>(); @Override public V get(K key) { + if (key == null) { + throw new NullPointerException("Null key provided"); + } return map.get(key); } @Override public Map getAll(List keys) { Map values = new HashMap<>(); - for (K key: keys) { + for (K key : keys) { values.put(key, map.get(key)); } return values; @@ -55,18 +60,24 @@ public Map getAll(List keys) { @Override public void put(K key, V value) { + if (key == null) { + throw new NullPointerException("Null key provided"); + } map.put(key, value); } @Override public void putAll(List> entries) { - for (Entry entry: entries) { + for (Entry entry : entries) { put(entry.getKey(), entry.getValue()); } } @Override public void delete(K key) { + if (key == null) { + throw new NullPointerException("Null key provided"); + } map.remove(key); } @@ -119,4 +130,4 @@ public void close() { public void flush() { //not applicable } -} \ No newline at end of file +} diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java index 9ec8e5a355c99..d4224c397b351 100644 --- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java +++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java @@ -20,6 +20,8 @@ import java.util.HashMap; import java.util.Map; +import java.util.Set; + import org.apache.samza.config.Config; import org.apache.samza.operators.ContextManager; import org.apache.samza.operators.MessageStreamImpl; @@ -30,6 +32,9 @@ import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.system.IncomingMessageEnvelope; import org.apache.samza.system.SystemStream; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.Clock; +import org.apache.samza.util.SystemClock; /** @@ -65,16 +70,27 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo /** * A mapping from each {@link SystemStream} to the root node of its operator chain DAG. */ - private final OperatorGraph operatorGraph = new OperatorGraph(); + private final OperatorGraph operatorGraph; private final StreamApplication graphBuilder; private final ApplicationRunner runner; + private final Clock clock; + private ContextManager contextManager; + private Set systemStreamPartitions; + public StreamOperatorTask(StreamApplication graphBuilder, ApplicationRunner runner) { + this(graphBuilder, SystemClock.instance(), runner); + } + + // purely for testing. + public StreamOperatorTask(StreamApplication graphBuilder, Clock clock, ApplicationRunner runner) { this.graphBuilder = graphBuilder; + this.operatorGraph = new OperatorGraph(clock); + this.clock = clock; this.runner = runner; } @@ -85,9 +101,10 @@ public final void init(Config config, TaskContext context) throws Exception { this.graphBuilder.init(streamGraph, config); // get the context manager of the {@link StreamGraph} and initialize the task-specific context this.contextManager = streamGraph.getContextManager(); + this.systemStreamPartitions = context.getSystemStreamPartitions(); Map inputBySystemStream = new HashMap<>(); - context.getSystemStreamPartitions().forEach(ssp -> { + systemStreamPartitions.forEach(ssp -> { if (!inputBySystemStream.containsKey(ssp.getSystemStream())) { // create mapping from the physical input {@link SystemStream} to the logic {@link MessageStream} inputBySystemStream.putIfAbsent(ssp.getSystemStream(), streamGraph.getInputStream(ssp.getSystemStream())); @@ -103,8 +120,11 @@ public final void process(IncomingMessageEnvelope ime, MessageCollector collecto } @Override - public final void window(MessageCollector collector, TaskCoordinator coordinator) throws Exception { - this.operatorGraph.getAll().forEach(r -> r.onTimer(collector, coordinator)); + public final void window(MessageCollector collector, TaskCoordinator coordinator) { + systemStreamPartitions.forEach(ssp -> { + this.operatorGraph.get(ssp.getSystemStream()) + .onTick(collector, coordinator); + }); } @Override diff --git a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java index 543716aa4139b..6edf048e2ab8e 100644 --- a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java @@ -33,6 +33,7 @@ import org.apache.samza.util.CommandLine; import java.time.Duration; +import java.util.function.Supplier; /** @@ -44,9 +45,10 @@ public class PageViewCounterExample implements StreamApplication { MessageStream pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>()); OutputStream pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>()); + Supplier initialValue = () -> 0; pageViewEvents. - window(Windows.keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), (m, c) -> c + 1). + window(Windows.keyedTumblingWindow(m -> m.getMessage().memberId, Duration.ofSeconds(10), initialValue, (m, c) -> c + 1). setEarlyTrigger(Triggers.repeat(Triggers.count(5))). setAccumulationMode(AccumulationMode.DISCARDING)). map(MyStreamOutput::new). diff --git a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java index 729b26f978ad6..e222fe474d43b 100644 --- a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java @@ -31,6 +31,7 @@ import org.apache.samza.util.CommandLine; import java.time.Duration; +import java.util.function.Supplier; /** @@ -67,11 +68,12 @@ public class RepartitionExample implements StreamApplication { MessageStream pageViewEvents = graph.createInStream(input1, new StringSerde("UTF-8"), new JsonSerde<>()); OutputStream pageViewPerMemberCounters = graph.createOutStream(output, new StringSerde("UTF-8"), new JsonSerde<>()); + Supplier initialValue = () -> 0; pageViewEvents. partitionBy(m -> m.getMessage().memberId). window(Windows.keyedTumblingWindow( - msg -> msg.getMessage().memberId, Duration.ofMinutes(5), (m, c) -> c + 1)). + msg -> msg.getMessage().memberId, Duration.ofMinutes(5), initialValue, (m, c) -> c + 1)). map(MyStreamOutput::new). sendTo(pageViewPerMemberCounters); diff --git a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java index d9882702f9be4..1c30a218fd850 100644 --- a/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/TestBroadcastExample.java @@ -21,13 +21,15 @@ import java.time.Duration; import java.util.Set; -import java.util.function.BiFunction; +import java.util.function.Supplier; + import org.apache.samza.config.Config; import org.apache.samza.operators.MessageStream; import org.apache.samza.operators.StreamGraph; import org.apache.samza.operators.data.InputMessageEnvelope; import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; import org.apache.samza.operators.data.Offset; +import org.apache.samza.operators.functions.FoldLeftFunction; import org.apache.samza.operators.triggers.Triggers; import org.apache.samza.operators.windows.Windows; import org.apache.samza.system.StreamSpec; @@ -65,18 +67,20 @@ class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope @Override public void init(StreamGraph graph, Config config) { - BiFunction sumAggregator = (m, c) -> c + 1; + FoldLeftFunction sumAggregator = (m, c) -> c + 1; + Supplier initialValue = () -> 0; + inputs.keySet().forEach(entry -> { MessageStream inputStream = graph.createInStream( new StreamSpec(entry.getSystem() + "-" + entry.getStream(), entry.getStream(), entry.getSystem()), null, null).map(this::getInputMessage); - inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) + inputStream.filter(this::myFilter1).window(Windows.tumblingWindow(Duration.ofMillis(100), initialValue, sumAggregator) .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); - inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) + inputStream.filter(this::myFilter2).window(Windows.tumblingWindow(Duration.ofMillis(100), initialValue, sumAggregator) .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); - inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), sumAggregator) + inputStream.filter(this::myFilter3).window(Windows.tumblingWindow(Duration.ofMillis(100), initialValue, sumAggregator) .setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceFirstMessage(Duration.ofMillis(10))))); }); diff --git a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java index 6896da5778efd..c88df7c763bc5 100644 --- a/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java +++ b/samza-core/src/test/java/org/apache/samza/example/TestWindowExample.java @@ -25,13 +25,14 @@ import org.apache.samza.operators.data.JsonIncomingSystemMessageEnvelope; import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.operators.data.Offset; +import org.apache.samza.operators.functions.FoldLeftFunction; import org.apache.samza.operators.windows.Windows; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemStreamPartition; import java.time.Duration; -import java.util.function.BiFunction; import java.util.Set; +import java.util.function.Supplier; /** @@ -57,11 +58,12 @@ class JsonMessageEnvelope extends JsonIncomingSystemMessageEnvelope @Override public void init(StreamGraph graph, Config config) { - BiFunction maxAggregator = (m, c) -> c + 1; + FoldLeftFunction maxAggregator = (m, c) -> c + 1; + Supplier initialValue = () -> 0; inputs.keySet().forEach(source -> graph.createInStream( new StreamSpec(source.getSystem() + "-" + source.getStream(), source.getStream(), source.getSystem()), null, null). map(m1 -> new JsonMessageEnvelope(this.myMessageKeyFunction(m1), (MessageType) m1.getMessage(), m1.getOffset(), - m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), maxAggregator))); + m1.getSystemStreamPartition())).window(Windows.tumblingWindow(Duration.ofMillis(200), initialValue, maxAggregator))); } diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java index 361972e0e1165..5722dbd9f5180 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java @@ -50,7 +50,12 @@ public void onNext(TestMessageEnvelope message, MessageCollector collector, Task TestOperatorImpl.this.curCollector = collector; TestOperatorImpl.this.curCoordinator = coordinator; } - }; + @Override + public void onTimer(MessageCollector collector, TaskCoordinator coordinator) { + + } + + }; // verify registerNextOperator() added the mockSub and propagateResult() invoked the mockSub.onNext() OperatorImpl mockSub = mock(OperatorImpl.class); opImpl.registerNextOperator(mockSub); diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java index 088cb00d3213d..31f6f4a04b254 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java +++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImpls.java @@ -38,6 +38,7 @@ import org.apache.samza.operators.spec.StreamOperatorSpec; import org.apache.samza.operators.windows.Windows; import org.apache.samza.operators.windows.internal.WindowInternal; +import org.apache.samza.operators.windows.internal.WindowType; import org.apache.samza.task.TaskContext; import org.junit.Before; import org.junit.Test; @@ -77,7 +78,7 @@ public void prep() throws NoSuchFieldException, NoSuchMethodException { public void testCreateOperator() throws NoSuchFieldException, IllegalAccessException, InvocationTargetException { // get window operator WindowOperatorSpec mockWnd = mock(WindowOperatorSpec.class); - WindowInternal windowInternal = new WindowInternal<>(null, null, null, null); + WindowInternal windowInternal = new WindowInternal<>(null, null, null, null, null, WindowType.TUMBLING); when(mockWnd.getWindow()).thenReturn(windowInternal); MessageStreamImpl mockStream = mock(MessageStreamImpl.class); Config mockConfig = mock(Config.class); diff --git a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java index ae3d151692c2f..ec1d74cb30f33 100644 --- a/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java +++ b/samza-core/src/test/java/org/apache/samza/operators/spec/TestOperatorSpecs.java @@ -25,18 +25,20 @@ import org.apache.samza.operators.TestOutputMessageEnvelope; import org.apache.samza.operators.data.MessageEnvelope; import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.functions.FoldLeftFunction; import org.apache.samza.operators.functions.PartialJoinFunction; import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.windows.internal.WindowInternal; import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.internal.WindowType; import org.apache.samza.task.MessageCollector; import org.apache.samza.task.TaskCoordinator; import org.junit.Test; import java.util.ArrayList; import java.util.Collection; -import java.util.function.BiFunction; import java.util.function.Function; +import java.util.function.Supplier; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -69,17 +71,17 @@ public void testGetSinkOperator() { @Test public void testGetWindowOperator() throws Exception { Function keyExtractor = m -> "globalkey"; - BiFunction aggregator = (m, c) -> c + 1; - + FoldLeftFunction aggregator = (m, c) -> c + 1; + Supplier initialValue = () -> 0; //instantiate a window using reflection - WindowInternal window = new WindowInternal(null, aggregator, keyExtractor, null); + WindowInternal window = new WindowInternal(null, initialValue, aggregator, keyExtractor, null, WindowType.TUMBLING); StreamGraphImpl mockGraph = mock(StreamGraphImpl.class); MessageStreamImpl> mockWndOut = mock(MessageStreamImpl.class); WindowOperatorSpec spec = OperatorSpecs.createWindowOperatorSpec(window, mockGraph, mockWndOut); assertEquals(spec.getWindow(), window); assertEquals(spec.getWindow().getKeyExtractor(), keyExtractor); - assertEquals(spec.getWindow().getFoldFunction(), aggregator); + assertEquals(spec.getWindow().getFoldLeftFunction(), aggregator); } @Test diff --git a/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java new file mode 100644 index 0000000000000..674a8f15dbe62 --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestClock.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.triggers; + +import org.apache.samza.util.Clock; + +import java.time.Duration; + +/** + * An implementation of {@link Clock} that allows to advance the time by an arbitrary duration. + * Used for testing. + */ +public class TestClock implements Clock { + + long currentTime = 1; + + public void advanceTime(Duration duration) { + currentTime += duration.toMillis(); + } + + public void advanceTime(long millis) { + currentTime += millis; + } + + @Override + public long currentTimeMillis() { + return currentTime; + } +} diff --git a/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java new file mode 100644 index 0000000000000..0d720dd76883d --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/operators/triggers/TestWindowOperator.java @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.operators.triggers; + + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import junit.framework.Assert; +import org.apache.samza.Partition; +import org.apache.samza.application.StreamApplication; +import org.apache.samza.config.Config; +import org.apache.samza.operators.MessageStream; +import org.apache.samza.operators.StreamGraph; +import org.apache.samza.operators.data.MessageEnvelope; +import org.apache.samza.operators.windows.AccumulationMode; +import org.apache.samza.operators.windows.WindowPane; +import org.apache.samza.operators.windows.Windows; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.system.IncomingMessageEnvelope; +import org.apache.samza.system.StreamSpec; +import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.task.MessageCollector; +import org.apache.samza.task.StreamOperatorTask; +import org.apache.samza.task.TaskContext; +import org.apache.samza.task.TaskCoordinator; +import org.junit.Before; +import org.junit.Test; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.function.Function; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestWindowOperator { + private final MessageCollector messageCollector = mock(MessageCollector.class); + private final TaskCoordinator taskCoordinator = mock(TaskCoordinator.class); + private final List>>> windowPanes = new ArrayList<>(); + private final List integers = ImmutableList.of(1, 2, 1, 2, 1, 2, 1, 2, 3); + private Config config; + private TaskContext taskContext; + private ApplicationRunner runner; + + @Before + public void setup() throws Exception { + windowPanes.clear(); + + config = mock(Config.class); + taskContext = mock(TaskContext.class); + runner = mock(ApplicationRunner.class); + when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet + .of(new SystemStreamPartition("kafka", "integers", new Partition(0)))); + + } + + @Test + public void testTumblingWindowsDiscardingMode() throws Exception { + + StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))); + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner); + task.init(config, taskContext); + + integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), messageCollector, taskCoordinator)); + testClock.advanceTime(Duration.ofSeconds(1)); + + task.window(messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 5); + Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); + Assert.assertEquals(((Collection>) windowPanes.get(0).getMessage()).size(), 2); + + Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2)); + Assert.assertEquals(((Collection>) windowPanes.get(1).getMessage()).size(), 2); + + Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1)); + Assert.assertEquals(((Collection>) windowPanes.get(2).getMessage()).size(), 2); + + Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2)); + Assert.assertEquals(((Collection>) windowPanes.get(3).getMessage()).size(), 2); + + Assert.assertEquals(windowPanes.get(4).getKey().getKey(), new Integer(3)); + Assert.assertEquals(((Collection>) windowPanes.get(4).getMessage()).size(), 1); + } + + @Test + public void testTumblingWindowsAccumulatingMode() throws Exception { + StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), Triggers.repeat(Triggers.count(2))); + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner); + task.init(config, taskContext); + + integers.forEach(n -> task.process(new IntegerMessageEnvelope(n, n), messageCollector, taskCoordinator)); + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 7); + Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); + Assert.assertEquals(((Collection>) windowPanes.get(0).getMessage()).size(), 2); + + Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2)); + Assert.assertEquals(((Collection>) windowPanes.get(1).getMessage()).size(), 2); + + Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1)); + Assert.assertEquals(((Collection>) windowPanes.get(2).getMessage()).size(), 4); + + Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2)); + Assert.assertEquals(((Collection>) windowPanes.get(3).getMessage()).size(), 4); + } + + @Test + public void testSessionWindowsDiscardingMode() throws Exception { + StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500)); + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner); + task.init(config, taskContext); + + task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 1); + Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1"); + Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); + + task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(3, 3), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(3, 3), messageCollector, taskCoordinator); + + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 3); + Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "1"); + Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "1001"); + Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1001"); + Assert.assertEquals(((Collection>) windowPanes.get(0).getMessage()).size(), 2); + Assert.assertEquals(((Collection>) windowPanes.get(1).getMessage()).size(), 2); + Assert.assertEquals(((Collection>) windowPanes.get(2).getMessage()).size(), 2); + + task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 4); + Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(2)); + Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "2001"); + Assert.assertEquals(((Collection>) windowPanes.get(3).getMessage()).size(), 2); + + } + + @Test + public void testSessionWindowsAccumulatingMode() throws Exception { + StreamApplication sgb = new KeyedSessionWindowStreamApplication(AccumulationMode.DISCARDING, Duration.ofMillis(500)); + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner); + task.init(config, taskContext); + + task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); + testClock.advanceTime(Duration.ofSeconds(1)); + + task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + + task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(2, 2), messageCollector, taskCoordinator); + + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 2); + Assert.assertEquals(((Collection>) windowPanes.get(0).getMessage()).size(), 2); + Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); + Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(2)); + Assert.assertEquals(((Collection>) windowPanes.get(0).getMessage()).size(), 2); + Assert.assertEquals(((Collection>) windowPanes.get(1).getMessage()).size(), 4); + } + + @Test + public void testCancelationOfOnceTrigger() throws Exception { + StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), Triggers.count(2)); + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner); + task.init(config, taskContext); + + task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 1); + Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0"); + Assert.assertEquals(windowPanes.get(0).getKey().getKey(), new Integer(1)); + Assert.assertEquals(windowPanes.get(0).getFiringType(), FiringType.EARLY); + + task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 1); + + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 2); + Assert.assertEquals(windowPanes.get(0).getKey().getPaneId(), "0"); + Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0"); + Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT); + + task.process(new IntegerMessageEnvelope(3, 6), messageCollector, taskCoordinator); + testClock.advanceTime(Duration.ofSeconds(1)); + task.window(messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 3); + Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(3)); + Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000"); + Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.DEFAULT); + Assert.assertEquals(((Collection>) windowPanes.get(2).getMessage()).size(), 1); + + } + + @Test + public void testCancelationOfAnyTrigger() throws Exception { + StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), + Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500)))); + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner); + task.init(config, taskContext); + + task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator); + //assert that the count trigger fired + Assert.assertEquals(windowPanes.size(), 1); + + //advance the timer to enable the triggering of the inner timeSinceFirstMessage trigger + testClock.advanceTime(Duration.ofMillis(500)); + + //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger + Assert.assertEquals(windowPanes.size(), 1); + + task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator); + + //advance timer by 500 more millis to enable the default trigger + testClock.advanceTime(Duration.ofMillis(500)); + task.window(messageCollector, taskCoordinator); + + //assert that the default trigger fired + Assert.assertEquals(windowPanes.size(), 2); + Assert.assertEquals(windowPanes.get(1).getFiringType(), FiringType.DEFAULT); + Assert.assertEquals(windowPanes.get(1).getKey().getKey(), new Integer(1)); + Assert.assertEquals(windowPanes.get(1).getKey().getPaneId(), "0"); + Assert.assertEquals(((Collection>) windowPanes.get(1).getMessage()).size(), 5); + + task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator); + + //advance timer by 500 millis to enable the inner timeSinceFirstMessage trigger + testClock.advanceTime(Duration.ofMillis(500)); + task.window(messageCollector, taskCoordinator); + + Assert.assertEquals(windowPanes.size(), 3); + Assert.assertEquals(windowPanes.get(2).getFiringType(), FiringType.EARLY); + Assert.assertEquals(windowPanes.get(2).getKey().getKey(), new Integer(1)); + Assert.assertEquals(windowPanes.get(2).getKey().getPaneId(), "1000"); + + //advance timer by > 500 millis to enable the default trigger + testClock.advanceTime(Duration.ofMillis(900)); + task.window(messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 4); + Assert.assertEquals(windowPanes.get(3).getFiringType(), FiringType.DEFAULT); + Assert.assertEquals(windowPanes.get(3).getKey().getKey(), new Integer(1)); + Assert.assertEquals(windowPanes.get(3).getKey().getPaneId(), "1000"); + } + + @Test + public void testCancelationOfRepeatingNestedTriggers() throws Exception { + + StreamApplication sgb = new KeyedTumblingWindowStreamApplication(AccumulationMode.ACCUMULATING, Duration.ofSeconds(1), + Triggers.repeat(Triggers.any(Triggers.count(2), Triggers.timeSinceFirstMessage(Duration.ofMillis(500))))); + TestClock testClock = new TestClock(); + StreamOperatorTask task = new StreamOperatorTask(sgb, testClock, runner); + task.init(config, taskContext); + + task.process(new IntegerMessageEnvelope(1, 1), messageCollector, taskCoordinator); + + task.process(new IntegerMessageEnvelope(1, 2), messageCollector, taskCoordinator); + //assert that the count trigger fired + Assert.assertEquals(windowPanes.size(), 1); + + //advance the timer to enable the potential triggering of the inner timeSinceFirstMessage trigger + task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator); + testClock.advanceTime(Duration.ofMillis(500)); + //assert that the triggering of the count trigger cancelled the inner timeSinceFirstMessage trigger + task.window(messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 2); + + task.process(new IntegerMessageEnvelope(1, 3), messageCollector, taskCoordinator); + task.process(new IntegerMessageEnvelope(1, 4), messageCollector, taskCoordinator); + Assert.assertEquals(windowPanes.size(), 3); + + task.process(new IntegerMessageEnvelope(1, 5), messageCollector, taskCoordinator); + //advance timer by 500 more millis to enable the default trigger + testClock.advanceTime(Duration.ofMillis(500)); + task.window(messageCollector, taskCoordinator); + //assert that the default trigger fired + Assert.assertEquals(windowPanes.size(), 4); + } + + private class KeyedTumblingWindowStreamApplication implements StreamApplication { + + private final StreamSpec streamSpec = new StreamSpec("integer-stream", "integers", "kafka"); + private final AccumulationMode mode; + private final Duration duration; + private final Trigger> earlyTrigger; + + KeyedTumblingWindowStreamApplication(AccumulationMode mode, Duration timeDuration, Trigger> earlyTrigger) { + this.mode = mode; + this.duration = timeDuration; + this.earlyTrigger = earlyTrigger; + } + + @Override + public void init(StreamGraph graph, Config config) { + MessageStream> inStream = graph.createInStream(streamSpec, null, null); + Function, Integer> keyFn = m -> m.getKey(); + inStream + .map(m -> m) + .window(Windows.keyedTumblingWindow(keyFn, duration).setEarlyTrigger(earlyTrigger) + .setAccumulationMode(mode)) + .map(m -> { + windowPanes.add(m); + return m; + }); + } + } + + private class KeyedSessionWindowStreamApplication implements StreamApplication { + + private final StreamSpec streamSpec = new StreamSpec("integer-stream", "integers", "kafka"); + private final AccumulationMode mode; + private final Duration duration; + + KeyedSessionWindowStreamApplication(AccumulationMode mode, Duration duration) { + this.mode = mode; + this.duration = duration; + } + + @Override + public void init(StreamGraph graph, Config config) { + MessageStream> inStream = graph.createInStream(streamSpec, null, null); + Function, Integer> keyFn = m -> m.getKey(); + + inStream + .map(m -> m) + .window(Windows.keyedSessionWindow(keyFn, duration) + .setAccumulationMode(mode)) + .map(m -> { + windowPanes.add(m); + return m; + }); + } + } + + private class IntegerMessageEnvelope extends IncomingMessageEnvelope { + IntegerMessageEnvelope(int key, int msg) { + super(new SystemStreamPartition("kafka", "integers", new Partition(0)), "1", key, msg); + } + } +}