Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import lombok.experimental.UtilityClass;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.exception.ExpressionEvaluationException;
import org.opensearch.sql.planner.streaming.windowing.Window;

/**
* The definition of {@link ExprValue} factory.
Expand Down Expand Up @@ -61,6 +62,14 @@ public static ExprValue intervalValue(TemporalAmount value) {
return new ExprIntervalValue(value);
}

public static ExprValue datetimeValue(String value) {
return new ExprDatetimeValue(value);
}

public static ExprValue dateValue(String value) {
return new ExprDateValue(value);
}

/**
* {@link ExprTupleValue} constructor.
*/
Expand All @@ -80,6 +89,14 @@ public static ExprValue collectionValue(List<Object> list) {
return new ExprCollectionValue(valueList);
}

public static Window window(Object lowerBound, Object upperBound) {
return new Window(fromObjectValue(lowerBound), fromObjectValue(upperBound));
}

public static Window window(Object lowerBound, Object upperBound, ExprCoreType type) {
return new Window(fromObjectValue(lowerBound, type), fromObjectValue(upperBound, type));
}

public static ExprValue missingValue() {
return ExprMissingValue.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ public enum ExprCoreType implements ExprType {
INTERVAL(UNDEFINED),

/**
* Struct.
* Composite data type.
*/
STRUCT(UNDEFINED),
WINDOW(UNDEFINED),

/**
* Array.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.planner.streaming;

import lombok.Data;
import org.opensearch.sql.data.model.ExprValue;

/**
* Stream context required by stream processing components and can be
Expand All @@ -14,6 +15,6 @@
@Data
public class StreamContext {

/** Current watermark timestamp. */
private long watermark;
/** Current watermark value. */
private ExprValue watermark;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,51 @@

package org.opensearch.sql.planner.streaming.watermark;

import static org.opensearch.sql.expression.DSL.greater;
import static org.opensearch.sql.expression.DSL.literal;
import static org.opensearch.sql.expression.DSL.subdate;
import static org.opensearch.sql.expression.DSL.subtract;

import lombok.RequiredArgsConstructor;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.FunctionExpression;

/**
* Watermark generator that generates watermark with bounded out-of-order delay.
*/
@RequiredArgsConstructor
public class BoundedOutOfOrderWatermarkGenerator implements WatermarkGenerator {

/** The maximum out-of-order allowed in millisecond. */
private final long maxOutOfOrderAllowed;
/** The maximum out-of-order value allowed. */
private final Expression maxOutOfOrderAllowed;

/** The maximum timestamp seen so far in millisecond. */
private long maxTimestamp;
/** The maximum timestamp value seen so far. */
private ExprValue maxTimestamp;

@Override
public long generate(long timestamp) {
maxTimestamp = Math.max(maxTimestamp, timestamp);
return (maxTimestamp - maxOutOfOrderAllowed - 1);
public ExprValue generate(ExprValue value) {
if (isGreaterThanMaxTimestamp(value)) {
maxTimestamp = value;
}
return generateWatermark();
}

private boolean isGreaterThanMaxTimestamp(ExprValue value) {
if (maxTimestamp == null) {
return true;
}
return greater(literal(value), literal(maxTimestamp))
.valueOf().booleanValue();
}

private ExprValue generateWatermark() {
FunctionExpression function;
if (maxTimestamp.isNumber()) {
function = subtract(literal(maxTimestamp), maxOutOfOrderAllowed);
} else {
function = subdate(literal(maxTimestamp), maxOutOfOrderAllowed);
}
return function.valueOf();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,20 @@

package org.opensearch.sql.planner.streaming.watermark;

import org.opensearch.sql.data.model.ExprValue;

/**
* A watermark generator generates watermark timestamp based on some strategy which is defined
* A watermark generator generates watermark based on some strategy which is defined
* in implementation class.
*/
public interface WatermarkGenerator {

/**
* Generate watermark timestamp on the given event timestamp.
* Generate watermark on the given value.
*
* @param timestamp event timestamp in millisecond
* @return watermark timestamp in millisecond
* @param value value
* @return watermark
*/
long generate(long timestamp);
ExprValue generate(ExprValue value);

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,86 @@

package org.opensearch.sql.planner.streaming.windowing;

import lombok.Data;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import lombok.Getter;
import lombok.ToString;
import org.opensearch.sql.data.model.AbstractExprValue;
import org.opensearch.sql.data.model.ExprNullValue;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.data.model.ExprValueUtils;
import org.opensearch.sql.data.type.ExprCoreType;
import org.opensearch.sql.data.type.ExprType;

/**
* A time window is a window of time interval with inclusive start time and exclusive end time.
* A window represents a range of values by consisting of the lower bound and upper bound for it.
* To make use of this concept in stream processing, it is called window instead of range. However,
* as the definition specifies, it is not restricted to time window by design.
*/
@Data
public class Window {
@Getter
@ToString
public class Window extends AbstractExprValue {

/** Start timestamp (inclusive) of the time window. */
private final long startTime;
/** Constants for caller to use. */
public static final ExprValue UNBOUND = ExprNullValue.of();
public static final String START_NAME = "start";
public static final String END_NAME = "end";

/** End timestamp (exclusive) of the time window. */
private final long endTime;
/** Lower bound (inclusive by default) of the time window. */
private final ExprValue lowerBound;

/** Upper bound (exclusive by default) of the time window. */
private final ExprValue upperBound;

/**
* Return the maximum timestamp (inclusive) of the window.
* Construct a window by lower and upper bound. For now inclusivity is default value
* and disallow to customize to simply arithmetic around window.
*
* @param lowerBound inclusive lower bound
* @param upperBound exclusive upper bound
*/
public long maxTimestamp() {
return endTime - 1;
public Window(ExprValue lowerBound, ExprValue upperBound) {
Preconditions.checkArgument(isBothBoundValid(lowerBound, upperBound),
"Lower bound [%s] and upper bound [%s] must be of the same type",
lowerBound.type(), upperBound.type());

this.lowerBound = lowerBound;
this.upperBound = upperBound;
}

@Override
public Object value() {
return ExprValueUtils.tupleValue(ImmutableMap.of(
START_NAME, lowerBound,
END_NAME, upperBound));
}

@Override
public ExprType type() {
return ExprCoreType.WINDOW;
}

@Override
public int compare(ExprValue o) {
// Define that a window's order is only determined by its upper bound value
Window other = (Window) o;
if (upperBound == UNBOUND && other.upperBound == UNBOUND) {
return 0;
} else if (upperBound == UNBOUND) {
return 1;
} else if (other.upperBound == UNBOUND) {
return -1;
}
return upperBound.compareTo(other.upperBound);
}

@Override
public boolean equal(ExprValue other) {
return value().equals(other.value());
}

private boolean isBothBoundValid(ExprValue lowerBound, ExprValue upperBound) {
return lowerBound.type() == upperBound.type()
|| lowerBound == UNBOUND || upperBound == UNBOUND;
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,34 +5,55 @@

package org.opensearch.sql.planner.streaming.windowing.assigner;

import com.google.common.base.Preconditions;
import static org.opensearch.sql.expression.DSL.add;
import static org.opensearch.sql.expression.DSL.adddate;
import static org.opensearch.sql.expression.DSL.interval;
import static org.opensearch.sql.expression.DSL.literal;

import java.util.Collections;
import java.util.List;
import org.opensearch.sql.data.model.ExprValue;
import org.opensearch.sql.expression.Expression;
import org.opensearch.sql.expression.FunctionExpression;
import org.opensearch.sql.expression.env.Environment;
import org.opensearch.sql.expression.span.SpanExpression;
import org.opensearch.sql.planner.streaming.windowing.Window;
import org.opensearch.sql.utils.DateTimeUtils;

/**
* A tumbling window assigner assigns a single window per event timestamp without overlap.
* A tumbling window assigner assigns a single window per input value without overlap.
*/
public class TumblingWindowAssigner implements WindowAssigner {

/** Window size in millisecond. */
private final long windowSize;
/** Window size that maybe numeric or time interval. */
private final SpanExpression windowingExpr;

/**
* Create tumbling window assigner with the given window size.
*
* @param windowSize window size in millisecond
* @param windowingExpr window size in millisecond
*/
public TumblingWindowAssigner(long windowSize) {
Preconditions.checkArgument(windowSize > 0,
"Window size [%s] must be positive number", windowSize);
this.windowSize = windowSize;
public TumblingWindowAssigner(SpanExpression windowingExpr) {
this.windowingExpr = windowingExpr;
}

@Override
public List<Window> assign(long timestamp) {
long startTime = DateTimeUtils.getWindowStartTime(timestamp, windowSize);
return Collections.singletonList(new Window(startTime, startTime + windowSize));
public List<Window> assign(ExprValue value) {
Environment<Expression, ExprValue> valueEnv = value.bindingTuples();
ExprValue lowerBound = windowingExpr.valueOf(valueEnv);
ExprValue upperBound = getUpperBound(lowerBound);
return Collections.singletonList(
new Window(lowerBound, upperBound));
}

private ExprValue getUpperBound(ExprValue lowerBound) {
ExprValue upperBound;
if (lowerBound.isNumber()) {
ExprValue windowSize = windowingExpr.getValue().valueOf();
upperBound = add(literal(lowerBound), literal(windowSize)).valueOf();
} else {
FunctionExpression windowSize = interval(windowingExpr.getValue(), literal("minute"));
upperBound = adddate(literal(lowerBound), windowSize).valueOf();
}
return upperBound;
}
}
Loading