Skip to content

Commit 42143b9

Browse files
authored
Add Streaming Plan Impl (opensearch-project#1068)
Signed-off-by: Peng Huo <penghuo@gmail.com>
1 parent 0f5d628 commit 42143b9

File tree

36 files changed

+1349
-214
lines changed

36 files changed

+1349
-214
lines changed

core/build.gradle

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,17 +26,12 @@ plugins {
2626
id 'java-library'
2727
id "io.freefair.lombok"
2828
id 'jacoco'
29+
id 'java-test-fixtures'
2930
}
3031

3132
repositories {
3233
mavenCentral()
3334
}
34-
//
35-
//configurations.all {
36-
// resolutionStrategy.dependencySubstitution {
37-
// substitute module('com.google.guava:guava:26.0-jre') with module('com.google.guava:guava:29.0-jre')
38-
// }
39-
//}
4035

4136
dependencies {
4237
api group: 'com.google.guava', name: 'guava', version: '31.0.1-jre'

core/src/main/java/org/opensearch/sql/ast/statement/Query.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
@RequiredArgsConstructor
2727
public class Query extends Statement {
2828

29-
private final UnresolvedPlan plan;
29+
protected final UnresolvedPlan plan;
3030

3131
@Override
3232
public <R, C> R accept(AbstractNodeVisitor<R, C> visitor, C context) {

core/src/main/java/org/opensearch/sql/datasource/model/DataSourceType.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,7 @@
66
package org.opensearch.sql.datasource.model;
77

88
public enum DataSourceType {
9-
PROMETHEUS,OPENSEARCH
9+
PROMETHEUS,
10+
OPENSEARCH,
11+
FILESYSTEM
1012
}

core/src/main/java/org/opensearch/sql/executor/DefaultQueryManager.java

Lines changed: 0 additions & 24 deletions
This file was deleted.
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.executor;
7+
8+
import java.util.Optional;
9+
import lombok.Getter;
10+
import org.opensearch.sql.storage.split.Split;
11+
12+
/**
13+
* Execution context hold planning related information.
14+
*/
15+
public class ExecutionContext {
16+
@Getter
17+
private final Optional<Split> split;
18+
19+
public ExecutionContext(Split split) {
20+
this.split = Optional.of(split);
21+
}
22+
23+
private ExecutionContext(Optional<Split> split) {
24+
this.split = split;
25+
}
26+
27+
public static ExecutionContext emptyExecutionContext() {
28+
return new ExecutionContext(Optional.empty());
29+
}
30+
}

core/src/main/java/org/opensearch/sql/executor/ExecutionEngine.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,19 @@ public interface ExecutionEngine {
2323

2424
/**
2525
* Execute physical plan and call back response listener.
26+
* Todo. deprecated this interface after finalize {@link ExecutionContext}.
2627
*
2728
* @param plan executable physical plan
2829
* @param listener response listener
2930
*/
3031
void execute(PhysicalPlan plan, ResponseListener<QueryResponse> listener);
3132

33+
/**
34+
* Execute physical plan with {@link ExecutionContext} and call back response listener.
35+
*/
36+
void execute(PhysicalPlan plan, ExecutionContext context,
37+
ResponseListener<QueryResponse> listener);
38+
3239
/**
3340
* Explain physical plan and call back response listener. The reason why this has to
3441
* be part of execution engine interface is that the physical plan probably needs to

core/src/main/java/org/opensearch/sql/executor/QueryManager.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,13 @@ public interface QueryManager {
2222
* @return {@link QueryId}.
2323
*/
2424
QueryId submit(AbstractPlan queryPlan);
25+
26+
/**
27+
* Cancel submitted {@link AbstractPlan} by {@link QueryId}.
28+
*
29+
* @return true indicate successful.
30+
*/
31+
default boolean cancel(QueryId queryId) {
32+
throw new UnsupportedOperationException();
33+
}
2534
}

core/src/main/java/org/opensearch/sql/executor/QueryService.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,12 @@ public void executePlan(LogicalPlan plan,
5959
PlanContext planContext,
6060
ResponseListener<ExecutionEngine.QueryResponse> listener) {
6161
try {
62-
executionEngine.execute(plan(plan), listener);
62+
planContext
63+
.getSplit()
64+
.ifPresentOrElse(
65+
split -> executionEngine.execute(plan(plan), new ExecutionContext(split), listener),
66+
() -> executionEngine.execute(
67+
plan(plan), ExecutionContext.emptyExecutionContext(), listener));
6368
} catch (Exception e) {
6469
listener.onFailure(e);
6570
}

core/src/main/java/org/opensearch/sql/executor/execution/QueryPlan.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,14 @@ public class QueryPlan extends AbstractPlan {
2424
/**
2525
* The query plan ast.
2626
*/
27-
private final UnresolvedPlan plan;
27+
protected final UnresolvedPlan plan;
2828

2929
/**
3030
* Query service.
3131
*/
32-
private final QueryService queryService;
32+
protected final QueryService queryService;
3333

34-
private final ResponseListener<ExecutionEngine.QueryResponse> listener;
34+
protected final ResponseListener<ExecutionEngine.QueryResponse> listener;
3535

3636
/** constructor. */
3737
public QueryPlan(
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Copyright OpenSearch Contributors
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package org.opensearch.sql.executor.execution;
7+
8+
import java.time.Duration;
9+
import java.time.Instant;
10+
import java.util.List;
11+
import java.util.concurrent.TimeUnit;
12+
import lombok.RequiredArgsConstructor;
13+
import org.apache.logging.log4j.LogManager;
14+
import org.apache.logging.log4j.Logger;
15+
import org.opensearch.sql.ast.tree.UnresolvedPlan;
16+
import org.opensearch.sql.common.response.ResponseListener;
17+
import org.opensearch.sql.executor.ExecutionEngine;
18+
import org.opensearch.sql.executor.QueryId;
19+
import org.opensearch.sql.executor.QueryService;
20+
import org.opensearch.sql.executor.streaming.DefaultMetadataLog;
21+
import org.opensearch.sql.executor.streaming.MicroBatchStreamingExecution;
22+
import org.opensearch.sql.executor.streaming.StreamingSource;
23+
import org.opensearch.sql.planner.logical.LogicalPlan;
24+
import org.opensearch.sql.planner.logical.LogicalPlanNodeVisitor;
25+
import org.opensearch.sql.planner.logical.LogicalRelation;
26+
27+
/**
28+
* Streaming Query Plan.
29+
*/
30+
public class StreamingQueryPlan extends QueryPlan {
31+
32+
private static final Logger log = LogManager.getLogger(StreamingQueryPlan.class);
33+
34+
private final ExecutionStrategy executionStrategy;
35+
36+
private MicroBatchStreamingExecution streamingExecution;
37+
38+
/**
39+
* constructor.
40+
*/
41+
public StreamingQueryPlan(QueryId queryId,
42+
UnresolvedPlan plan,
43+
QueryService queryService,
44+
ResponseListener<ExecutionEngine.QueryResponse> listener,
45+
ExecutionStrategy executionStrategy) {
46+
super(queryId, plan, queryService, listener);
47+
48+
this.executionStrategy = executionStrategy;
49+
}
50+
51+
@Override
52+
public void execute() {
53+
try {
54+
LogicalPlan logicalPlan = queryService.analyze(plan);
55+
StreamingSource streamingSource = buildStreamingSource(logicalPlan);
56+
streamingExecution =
57+
new MicroBatchStreamingExecution(
58+
streamingSource,
59+
logicalPlan,
60+
queryService,
61+
new DefaultMetadataLog<>(),
62+
new DefaultMetadataLog<>());
63+
executionStrategy.execute(streamingExecution::execute);
64+
} catch (UnsupportedOperationException | IllegalArgumentException e) {
65+
listener.onFailure(e);
66+
} catch (InterruptedException e) {
67+
log.error(e);
68+
// todo, update async task status.
69+
}
70+
}
71+
72+
interface ExecutionStrategy {
73+
/**
74+
* execute task.
75+
*/
76+
void execute(Runnable task) throws InterruptedException;
77+
}
78+
79+
/**
80+
* execute task with fixed interval.
81+
* if task run time < interval, trigger next task on next interval.
82+
* if task run time >= interval, trigger next task immediately.
83+
*/
84+
@RequiredArgsConstructor
85+
public static class IntervalTriggerExecution implements ExecutionStrategy {
86+
87+
private final long intervalInSeconds;
88+
89+
@Override
90+
public void execute(Runnable runnable) throws InterruptedException {
91+
while (!Thread.currentThread().isInterrupted()) {
92+
try {
93+
Instant start = Instant.now();
94+
runnable.run();
95+
Instant end = Instant.now();
96+
long took = Duration.between(start, end).toSeconds();
97+
TimeUnit.SECONDS.sleep(intervalInSeconds > took ? intervalInSeconds - took : 0);
98+
} catch (InterruptedException e) {
99+
Thread.currentThread().interrupt();
100+
}
101+
}
102+
}
103+
}
104+
105+
private StreamingSource buildStreamingSource(LogicalPlan logicalPlan) {
106+
return logicalPlan.accept(new StreamingSourceBuilder(), null);
107+
}
108+
109+
static class StreamingSourceBuilder extends LogicalPlanNodeVisitor<StreamingSource, Void> {
110+
@Override
111+
public StreamingSource visitNode(LogicalPlan plan, Void context) {
112+
List<LogicalPlan> children = plan.getChild();
113+
if (children.isEmpty()) {
114+
String errorMsg =
115+
String.format(
116+
"Could find relation plan, %s does not have child node.",
117+
plan.getClass().getSimpleName());
118+
log.error(errorMsg);
119+
throw new IllegalArgumentException(errorMsg);
120+
}
121+
return children.get(0).accept(this, context);
122+
}
123+
124+
@Override
125+
public StreamingSource visitRelation(LogicalRelation plan, Void context) {
126+
try {
127+
return plan.getTable().asStreamingSource();
128+
} catch (UnsupportedOperationException e) {
129+
String errorMsg =
130+
String.format(
131+
"table %s could not been used as streaming source.", plan.getRelationName());
132+
log.error(errorMsg);
133+
throw new UnsupportedOperationException(errorMsg);
134+
}
135+
}
136+
}
137+
}

0 commit comments

Comments
 (0)