Skip to content

Commit

Permalink
SAMZA-1108: Implementation of Windows and various kinds of Triggers
Browse files Browse the repository at this point in the history
* Implemented various triggers and the orchestration logic of the window operator.
* Implemented wire-up of window and the flow of messages through various trigger implementations.
* Implementations for count, time, timeSinceFirst, timeSinceLast, Any, Repeating triggers.

Author: vjagadish1989 <jvenkatr@linkedin.com>

Reviewers: Yi Pan (Data Infrastructure) <nickpan47@gmail.com>, Prateek Maheshwari <pmaheshw@linkedin.com>, Chris Pettitt <cpettitt@linkedin.com>

Closes apache#66 from vjagadish1989/window-impl
  • Loading branch information
jagadish-northguard committed Mar 19, 2017
1 parent 05915bf commit d399d6f
Show file tree
Hide file tree
Showing 46 changed files with 1,885 additions and 218 deletions.
7 changes: 5 additions & 2 deletions checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
-->
<module name="Checker">
<property name="localeLanguage" value="en"/>

<!-- allow suppression for specific files -->
<module name="SuppressionCommentFilter"/>

<module name="FileTabCharacter"/>

<!-- header: use one star only -->
Expand All @@ -32,6 +34,7 @@

<!-- code cleanup -->
<module name="UnusedImports"/>
<module name="FileContentsHolder"/>
<module name="RedundantImport"/>
<module name="IllegalImport" />
<module name="EqualsHashCode"/>
Expand Down Expand Up @@ -62,8 +65,8 @@
<!-- whitespace -->
<module name="GenericWhitespace"/>
<module name="NoWhitespaceBefore"/>
<module name="WhitespaceAfter" />
<module name="NoWhitespaceAfter"/>
<module name="WhitespaceAfter" />
<module name="WhitespaceAround">
<property name="allowEmptyConstructors" value="true"/>
<property name="allowEmptyMethods" value="true"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public interface FilterFunction<M> extends InitableFunction {

/**
* Returns a boolean indicating whether this message should be retained or filtered out.
* @param message the input message to be checked
* @param message the input message to be checked. This object should not be mutated.
* @return true if {@code message} should be retained
*/
boolean apply(M message);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.operators.functions;

/**
* A fold function that incrementally combines and aggregates values for a window.
*/
public interface FoldLeftFunction<M, WV> extends InitableFunction {

/**
* Incrementally combine and aggregate values for the window. Guaranteed to be invoked for every
* message added to the window.
*
* @param message the incoming message that is added to the window. This object should not be mutated.
* @param oldValue the previous value
* @return the new value
*/
WV apply(M message, WV oldValue);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@
public interface MapFunction<M, OM> extends InitableFunction {

/**
* Transforms the provided message into another message
* @param message the input message to be transformed
* Transforms the provided message into another message.
*
* @param message the input message to be transformed. This object should not be mutated.
* @return the transformed message
*/
OM apply(M message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,15 @@
/**
* A {@link Trigger} fires as soon as any of its individual triggers has fired.
*/
public class AnyTrigger<M> implements Trigger {
public class AnyTrigger<M> implements Trigger<M> {

private final List<Trigger> triggers;
private final List<Trigger<M>> triggers;

AnyTrigger(List<Trigger> triggers) {
AnyTrigger(List<Trigger<M>> triggers) {
this.triggers = triggers;
}

public List<Trigger> getTriggers() {
public List<Trigger<M>> getTriggers() {
return triggers;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
* A {@link Trigger} that fires when the number of messages in the {@link org.apache.samza.operators.windows.WindowPane}
* reaches the specified count.
*/
public class CountTrigger<M> implements Trigger {
public class CountTrigger<M> implements Trigger<M> {

private final long count;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.operators.triggers;

/**
* The type of the {@link org.apache.samza.operators.triggers.Trigger} firing.
* Firings can be either early or late or default. Late triggers are not supported currently.
*/
public enum FiringType {
EARLY,
DEFAULT,
LATE
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,9 @@ class RepeatingTrigger<M> implements Trigger<M> {
RepeatingTrigger(Trigger<M> trigger) {
this.trigger = trigger;
}

public Trigger<M> getTrigger() {
return trigger;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* A {@link Trigger} that fires after the specified duration has passed since the first {@link MessageEnvelope} in
* the window pane.
*/
public class TimeSinceFirstMessageTrigger<M> implements Trigger {
public class TimeSinceFirstMessageTrigger<M> implements Trigger<M> {

private final Duration duration;
private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@

/*
* A {@link Trigger} that fires when there are no new {@link MessageEnvelope}s in the window pane for the specified duration.
* @param <M> the type of the incoming {@link MessageEnvelope}
*/
public class TimeSinceLastMessageTrigger<M> implements Trigger {
public class TimeSinceLastMessageTrigger<M> implements Trigger<M> {

private final Duration duration;
private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
/*
* A {@link Trigger} that fires after the specified duration in processing time.
*/
public class TimeTrigger<M> implements Trigger {
public class TimeTrigger<M> implements Trigger<M> {

private final Duration duration;
private final DurationCharacteristic characteristic = DurationCharacteristic.PROCESSING_TIME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,19 @@
*
*/
public class WindowKey<K> {

/**
* A (key,paneId) tuple uniquely identifies an emission from a window. For instance, in case of keyed-tumbling time windows,
* the key is provided by the keyExtractor function, and the paneId is the start of the time window boundary. In case
* of session windows, the key is provided by the keyExtractor function, and the paneId is the time at which the earliest
* message in the window arrived.
*/
private final K key;

private final String paneId;

public WindowKey(K key, String windowId) {
public WindowKey(K key, String paneId) {
this.key = key;
this.paneId = windowId;
this.paneId = paneId;
}

public K getKey() {
Expand All @@ -52,4 +57,29 @@ public String toString() {
}
return String.format("%s%s", wndKey, paneId);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

WindowKey<?> windowKey = (WindowKey<?>) o;

if (!key.equals(windowKey.key)) return false;

if (paneId == null) {
return windowKey.paneId == null;
}

return paneId.equals(windowKey.paneId);

}

@Override
public int hashCode() {
int result = key.hashCode();
result = 31 * result + (paneId != null ? paneId.hashCode() : 0);
return result;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.samza.operators.windows;

import org.apache.samza.operators.triggers.FiringType;

/**
* Specifies the result emitted from a {@link Window}.
*
Expand All @@ -32,10 +34,16 @@ public final class WindowPane<K, V> {

private final AccumulationMode mode;

WindowPane(WindowKey<K> key, V value, AccumulationMode mode) {
/**
* The type of the trigger that emitted this result. Results can be emitted from early, late or default triggers.
*/
private final FiringType type;

public WindowPane(WindowKey<K> key, V value, AccumulationMode mode, FiringType type) {
this.key = key;
this.value = value;
this.mode = mode;
this.type = type;
}

public V getMessage() {
Expand All @@ -46,8 +54,8 @@ public WindowKey<K> getKey() {
return this.key;
}

static public <K, M> WindowPane<K, M> of(WindowKey<K> key, M result) {
return new WindowPane<>(key, result, AccumulationMode.DISCARDING);
public FiringType getFiringType() {
return type;
}
}

Expand Down
Loading

0 comments on commit d399d6f

Please sign in to comment.