Skip to content

Commit

Permalink
[timeseries] Part-2: Enable Streaming Response for Time Series
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitsultana committed Dec 3, 2024
1 parent d45f55f commit 00a5aff
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 37 deletions.
5 changes: 3 additions & 2 deletions pinot-common/src/main/proto/worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ service PinotQueryWorker {
// Dispatch a QueryRequest to a PinotQueryWorker
rpc Submit(QueryRequest) returns (QueryResponse);

rpc SubmitTimeSeries(TimeSeriesQueryRequest) returns (TimeSeriesResponse);
rpc SubmitTimeSeries(TimeSeriesQueryRequest) returns (stream TimeSeriesResponse);

rpc Cancel(CancelRequest) returns (CancelResponse);

Expand Down Expand Up @@ -54,7 +54,8 @@ message QueryResponse {
}

message TimeSeriesQueryRequest {
bytes dispatchPlan = 1;
// List of plan sub-trees which will be executed in order on the server
repeated string dispatchPlan = 1;
map<string, string> metadata = 2;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerExecutor;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestUtils;
import org.apache.pinot.query.runtime.timeseries.PhysicalTimeSeriesPlanVisitor;
import org.apache.pinot.query.runtime.timeseries.PhysicalTimeSeriesServerPlanVisitor;
import org.apache.pinot.query.runtime.timeseries.TimeSeriesExecutionContext;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.env.PinotConfiguration;
Expand Down Expand Up @@ -115,6 +115,8 @@ public class QueryRunner {
private Integer _maxRowsInJoin;
@Nullable
private JoinOverFlowMode _joinOverflowMode;
@Nullable
private PhysicalTimeSeriesServerPlanVisitor _timeSeriesPhysicalPlanVisitor;

/**
* Initializes the query executor.
Expand Down Expand Up @@ -158,7 +160,7 @@ public void init(PinotConfiguration config, InstanceDataManager instanceDataMana
throw new RuntimeException(e);
}
if (StringUtils.isNotBlank(config.getProperty(PinotTimeSeriesConfiguration.getEnabledLanguagesConfigKey()))) {
PhysicalTimeSeriesPlanVisitor.INSTANCE.init(_leafQueryExecutor, _executorService, serverMetrics);
_timeSeriesPhysicalPlanVisitor = new PhysicalTimeSeriesServerPlanVisitor(_leafQueryExecutor, _executorService, serverMetrics);
TimeSeriesBuilderFactoryProvider.init(config);
}

Expand Down Expand Up @@ -261,15 +263,15 @@ public void processTimeSeriesQuery(String serializedPlan, Map<String, String> me
responseObserver.onCompleted();
};
try {
final long timeoutMs = extractTimeoutMs(metadata);
Preconditions.checkState(timeoutMs > 0,
"Query timed out before getting processed in server. Remaining time: %s", timeoutMs);
final long deadlineMs = extractDeadlineMs(metadata);
Preconditions.checkState(deadlineMs > 0,
"Query timed out before getting processed in server. Remaining time: %s", deadlineMs);
// Deserialize plan, and compile to create a tree of operators.
BaseTimeSeriesPlanNode rootNode = TimeSeriesPlanSerde.deserialize(serializedPlan);
TimeSeriesExecutionContext context = new TimeSeriesExecutionContext(
metadata.get(WorkerRequestMetadataKeys.LANGUAGE), extractTimeBuckets(metadata),
extractPlanToSegmentMap(metadata), timeoutMs, metadata);
BaseTimeSeriesOperator operator = PhysicalTimeSeriesPlanVisitor.INSTANCE.compile(rootNode, context);
extractPlanToSegmentMap(metadata), deadlineMs, metadata);
BaseTimeSeriesOperator operator = _timeSeriesPhysicalPlanVisitor.compile(rootNode, context);
// Run the operator using the same executor service as OpChainSchedulerService
_executorService.submit(() -> {
try {
Expand Down Expand Up @@ -403,9 +405,8 @@ private PlanNode substituteNode(PlanNode node, Map<PlanNode, ? extends PlanNode>

// Time series related utility methods below

private long extractTimeoutMs(Map<String, String> metadataMap) {
long deadlineMs = Long.parseLong(metadataMap.get(WorkerRequestMetadataKeys.DEADLINE_MS));
return deadlineMs - System.currentTimeMillis();
private long extractDeadlineMs(Map<String, String> metadataMap) {
return Long.parseLong(metadataMap.get(WorkerRequestMetadataKeys.DEADLINE_MS));
}

private TimeBuckets extractTimeBuckets(Map<String, String> metadataMap) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,13 @@
import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;


public class PhysicalTimeSeriesPlanVisitor {
public static final PhysicalTimeSeriesPlanVisitor INSTANCE = new PhysicalTimeSeriesPlanVisitor();

public class PhysicalTimeSeriesServerPlanVisitor {
private QueryExecutor _queryExecutor;
private ExecutorService _executorService;
private ServerMetrics _serverMetrics;

private PhysicalTimeSeriesPlanVisitor() {
}

public void init(QueryExecutor queryExecutor, ExecutorService executorService, ServerMetrics serverMetrics) {
// Warning: Don't use singleton access pattern, since Quickstarts run in a single JVM and spawn multiple broker/server
public PhysicalTimeSeriesServerPlanVisitor(QueryExecutor queryExecutor, ExecutorService executorService, ServerMetrics serverMetrics) {
_queryExecutor = queryExecutor;
_executorService = executorService;
_serverMetrics = serverMetrics;
Expand Down Expand Up @@ -103,7 +99,7 @@ QueryContext compileQueryContext(LeafTimeSeriesPlanNode leafNode, TimeSeriesExec
.setFilter(filterContext)
.setGroupByExpressions(groupByExpressions)
.setSelectExpressions(Collections.emptyList())
.setQueryOptions(ImmutableMap.of(QueryOptionKey.TIMEOUT_MS, Long.toString(context.getTimeoutMs())))
.setQueryOptions(ImmutableMap.of(QueryOptionKey.TIMEOUT_MS, Long.toString(context.getRemainingTimeMs())))
.setAliasList(Collections.emptyList())
.setTimeSeriesContext(timeSeriesContext)
.setLimit(Integer.MAX_VALUE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ public class TimeSeriesExecutionContext {
private final String _language;
private final TimeBuckets _initialTimeBuckets;
private final Map<String, List<String>> _planIdToSegmentsMap;
private final long _timeoutMs;
private final long _deadlineMs;
private final Map<String, String> _metadataMap;

public TimeSeriesExecutionContext(String language, TimeBuckets initialTimeBuckets,
Map<String, List<String>> planIdToSegmentsMap, long timeoutMs, Map<String, String> metadataMap) {
Map<String, List<String>> planIdToSegmentsMap, long deadlineMs, Map<String, String> metadataMap) {
_language = language;
_initialTimeBuckets = initialTimeBuckets;
_planIdToSegmentsMap = planIdToSegmentsMap;
_timeoutMs = timeoutMs;
_deadlineMs = deadlineMs;
_metadataMap = metadataMap;
}

Expand All @@ -51,8 +51,8 @@ public Map<String, List<String>> getPlanIdToSegmentsMap() {
return _planIdToSegmentsMap;
}

public long getTimeoutMs() {
return _timeoutMs;
public long getRemainingTimeMs() {
return _deadlineMs - System.currentTimeMillis();
}

public Map<String, String> getMetadataMap() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Deadline;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -291,7 +290,7 @@ void submit(long requestId, TimeSeriesDispatchablePlan plan, long timeoutMs, Map
long deadlineMs = System.currentTimeMillis() + timeoutMs;
String serializedPlan = plan.getSerializedPlan();
Worker.TimeSeriesQueryRequest request = Worker.TimeSeriesQueryRequest.newBuilder()
.setDispatchPlan(ByteString.copyFrom(serializedPlan, StandardCharsets.UTF_8))
.addDispatchPlan(serializedPlan)
.putAllMetadata(initializeTimeSeriesMetadataMap(plan, deadlineMs, requestContext))
.putMetadata(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, Long.toString(requestId))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,8 @@ public void explain(Worker.QueryRequest request, StreamObserver<Worker.ExplainRe
@Override
public void submitTimeSeries(Worker.TimeSeriesQueryRequest request,
StreamObserver<Worker.TimeSeriesResponse> responseObserver) {
ByteString bytes = request.getDispatchPlan();
_queryRunner.processTimeSeriesQuery(bytes.toStringUtf8(), request.getMetadataMap(), responseObserver);
String dispatchPlan = request.getDispatchPlan(0);
_queryRunner.processTimeSeriesQuery(dispatchPlan, request.getMetadataMap(), responseObserver);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,26 @@

import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
import org.apache.pinot.tsdb.spi.AggInfo;
import org.apache.pinot.tsdb.spi.TimeBuckets;
import org.apache.pinot.tsdb.spi.plan.LeafTimeSeriesPlanNode;
import org.testng.annotations.Test;

import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;


public class PhysicalTimeSeriesPlanVisitorTest {
private static final int DUMMY_TIMEOUT_MS = 10_000;
public class PhysicalTimeSeriesServerPlanVisitorTest {
private static final int DUMMY_DEADLINE_MS = 10_000;

@Test
public void testCompileQueryContext() {
Expand All @@ -42,33 +48,35 @@ public void testCompileQueryContext() {
final String timeColumn = "orderTime";
final AggInfo aggInfo = new AggInfo("SUM", null);
final String filterExpr = "cityName = 'Chicago'";
PhysicalTimeSeriesServerPlanVisitor serverPlanVisitor = new PhysicalTimeSeriesServerPlanVisitor(
mock(QueryExecutor.class), mock(ExecutorService.class), mock(ServerMetrics.class));
// Case-1: Without offset, simple column based group-by expression, simple column based value, and non-empty filter.
{
TimeSeriesExecutionContext context =
new TimeSeriesExecutionContext("m3ql", TimeBuckets.ofSeconds(1000L, Duration.ofSeconds(10), 100),
Collections.emptyMap(), DUMMY_TIMEOUT_MS, Collections.emptyMap());
Collections.emptyMap(), DUMMY_DEADLINE_MS, Collections.emptyMap());
LeafTimeSeriesPlanNode leafNode =
new LeafTimeSeriesPlanNode(planId, Collections.emptyList(), tableName, timeColumn, TimeUnit.SECONDS, 0L,
filterExpr, "orderCount", aggInfo, Collections.singletonList("cityName"));
QueryContext queryContext = PhysicalTimeSeriesPlanVisitor.INSTANCE.compileQueryContext(leafNode, context);
QueryContext queryContext = serverPlanVisitor.compileQueryContext(leafNode, context);
assertNotNull(queryContext.getTimeSeriesContext());
assertEquals(queryContext.getTimeSeriesContext().getLanguage(), "m3ql");
assertEquals(queryContext.getTimeSeriesContext().getOffsetSeconds(), 0L);
assertEquals(queryContext.getTimeSeriesContext().getTimeColumn(), timeColumn);
assertEquals(queryContext.getTimeSeriesContext().getValueExpression().getIdentifier(), "orderCount");
assertEquals(queryContext.getFilter().toString(),
"(cityName = 'Chicago' AND orderTime > '990' AND orderTime <= '1990')");
assertEquals(Long.parseLong(queryContext.getQueryOptions().get(QueryOptionKey.TIMEOUT_MS)), DUMMY_TIMEOUT_MS);
assertTrue(StringUtils.isNumeric(queryContext.getQueryOptions().get(QueryOptionKey.TIMEOUT_MS)));
}
// Case-2: With offset, complex group-by expression, complex value, and non-empty filter
{
TimeSeriesExecutionContext context =
new TimeSeriesExecutionContext("m3ql", TimeBuckets.ofSeconds(1000L, Duration.ofSeconds(10), 100),
Collections.emptyMap(), DUMMY_TIMEOUT_MS, Collections.emptyMap());
Collections.emptyMap(), DUMMY_DEADLINE_MS, Collections.emptyMap());
LeafTimeSeriesPlanNode leafNode =
new LeafTimeSeriesPlanNode(planId, Collections.emptyList(), tableName, timeColumn, TimeUnit.SECONDS, 10L,
filterExpr, "orderCount*2", aggInfo, Collections.singletonList("concat(cityName, stateName, '-')"));
QueryContext queryContext = PhysicalTimeSeriesPlanVisitor.INSTANCE.compileQueryContext(leafNode, context);
QueryContext queryContext = serverPlanVisitor.compileQueryContext(leafNode, context);
assertNotNull(queryContext);
assertNotNull(queryContext.getGroupByExpressions());
assertEquals("concat(cityName,stateName,'-')", queryContext.getGroupByExpressions().get(0).toString());
Expand All @@ -80,7 +88,7 @@ public void testCompileQueryContext() {
assertNotNull(queryContext.getFilter());
assertEquals(queryContext.getFilter().toString(),
"(cityName = 'Chicago' AND orderTime > '980' AND orderTime <= '1980')");
assertEquals(Long.parseLong(queryContext.getQueryOptions().get(QueryOptionKey.TIMEOUT_MS)), DUMMY_TIMEOUT_MS);
assertTrue(StringUtils.isNumeric(queryContext.getQueryOptions().get(QueryOptionKey.TIMEOUT_MS)));
}
}
}

0 comments on commit 00a5aff

Please sign in to comment.