Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pinot.common.request.ExpressionType;
import org.apache.pinot.common.request.Function;
import org.apache.pinot.common.request.Literal;
import org.apache.pinot.common.request.context.predicate.ConstantPredicate;
import org.apache.pinot.common.request.context.predicate.EqPredicate;
import org.apache.pinot.common.request.context.predicate.InPredicate;
import org.apache.pinot.common.request.context.predicate.IsNotNullPredicate;
Expand Down Expand Up @@ -216,6 +217,9 @@ public static FilterContext getFilter(Function thriftFunction) {
case IS_NOT_NULL:
return new FilterContext(FilterContext.Type.PREDICATE, null,
new IsNotNullPredicate(getExpression(operands.get(0))));
case CONSTANT:
return new FilterContext(FilterContext.Type.PREDICATE, null,
new ConstantPredicate(getExpression(operands.get(0)), getStringValue(operands.get(1))));
default:
throw new IllegalStateException();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.common.request.context.predicate;

import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.spi.utils.BooleanUtils;


public class ConstantPredicate extends BasePredicate {
private final boolean _constValue;

public ConstantPredicate(ExpressionContext identifier, String constant) {
super(identifier);
_constValue = BooleanUtils.toBoolean(constant);
}

@Override
public Type getType() {
return Type.CONSTANT;
}

public boolean getConstValue() {
return _constValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ enum Type {
TEXT_MATCH,
JSON_MATCH,
IS_NULL,
IS_NOT_NULL(true);
IS_NOT_NULL(true),
CONSTANT;

private final boolean _exclusive;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public enum FilterKind {
TEXT_MATCH,
JSON_MATCH,
IS_NULL,
IS_NOT_NULL;
IS_NOT_NULL,
CONSTANT;

/**
* Helper method that returns true if the enum maps to a Range.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public class FunctionDefinitionRegistryTest {
"arraylength", "arrayaverage", "arraymin", "arraymax", "arraysum", "clpdecode", "groovy", "inidset",
"jsonextractscalar", "jsonextractindex", "jsonextractkey", "lookup", "mapvalue", "timeconvert", "valuein",
// functions not needed for register b/c they are in std sql table or they will not be composed directly.
"in", "not_in", "and", "or", "range", "extract", "is_true", "is_not_true", "is_false", "is_not_false"
"in", "not_in", "and", "or", "range", "extract", "is_true", "is_not_true", "is_false", "is_not_false",
"constant"
);

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.FilterContext;
import org.apache.pinot.common.request.context.FunctionContext;
import org.apache.pinot.common.request.context.predicate.ConstantPredicate;
import org.apache.pinot.common.request.context.predicate.JsonMatchPredicate;
import org.apache.pinot.common.request.context.predicate.Predicate;
import org.apache.pinot.common.request.context.predicate.RegexpLikePredicate;
Expand Down Expand Up @@ -244,6 +245,12 @@ private BaseFilterOperator constructPhysicalOperator(FilterContext filter, int n
// TODO: ExpressionFilterOperator does not support predicate types without PredicateEvaluator (TEXT_MATCH)
return new ExpressionFilterOperator(_indexSegment, _queryContext, predicate, numDocs);
}
} else if (predicate.getType() == Predicate.Type.CONSTANT) {
if (((ConstantPredicate) predicate).getConstValue()) {
return new MatchAllFilterOperator(numDocs);
} else {
return EmptyFilterOperator.getInstance();
}
} else {
String column = lhs.getIdentifier();
DataSource dataSource = _indexSegment.getDataSource(column);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ Expression optimizeChild(Expression filterExpression, @Nullable Schema schema) {
switch (kind) {
case IS_NULL:
case IS_NOT_NULL:
// No need to try to optimize IS_NULL and IS_NOT_NULL operations on numerical columns.
case CONSTANT:
// No need to try to optimize CONSTANT, IS_NULL, IS_NOT_NULL operations on numerical columns.
break;
default:
List<Expression> operands = function.getOperands();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.pinot.query.runtime;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -32,24 +31,19 @@
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.query.mailbox.MailboxIdUtils;
import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.query.planner.plannode.MailboxSendNode;
import org.apache.pinot.query.routing.MailboxMetadata;
import org.apache.pinot.query.runtime.blocks.TransferableBlock;
import org.apache.pinot.query.runtime.executor.ExecutorServiceUtils;
import org.apache.pinot.query.runtime.executor.OpChainSchedulerService;
import org.apache.pinot.query.runtime.operator.LeafStageTransferableBlockOperator;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
import org.apache.pinot.query.runtime.operator.MultiStageOperator;
import org.apache.pinot.query.runtime.operator.OpChain;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerExecutor;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
Expand Down Expand Up @@ -193,7 +187,8 @@ public void processQuery(DistributedStagePlan distributedStagePlan, Map<String,
pipelineBreakerResult);
OpChain opChain;
if (DistributedStagePlan.isLeafStage(distributedStagePlan)) {
opChain = compileLeafStage(executionContext, distributedStagePlan);
opChain = ServerPlanRequestUtils.compileLeafStage(executionContext, distributedStagePlan, _helixManager,
_serverMetrics, _leafQueryExecutor, _executorService);
} else {
opChain = PhysicalPlanVisitor.walkPlanNode(distributedStagePlan.getStageRoot(), executionContext);
}
Expand Down Expand Up @@ -246,26 +241,4 @@ private Map<String, String> consolidateMetadata(Map<String, String> customProper
public void cancel(long requestId) {
_opChainScheduler.cancel(requestId);
}

private OpChain compileLeafStage(OpChainExecutionContext executionContext,
DistributedStagePlan distributedStagePlan) {
List<ServerPlanRequestContext> serverPlanRequestContexts =
ServerPlanRequestUtils.constructServerQueryRequests(executionContext, distributedStagePlan,
_helixManager.getHelixPropertyStore());
List<ServerQueryRequest> serverQueryRequests = new ArrayList<>(serverPlanRequestContexts.size());
long queryArrivalTimeMs = System.currentTimeMillis();
for (ServerPlanRequestContext requestContext : serverPlanRequestContexts) {
serverQueryRequests.add(
new ServerQueryRequest(requestContext.getInstanceRequest(), _serverMetrics, queryArrivalTimeMs, true));
}
MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot();
MultiStageOperator leafStageOperator =
new LeafStageTransferableBlockOperator(executionContext, serverQueryRequests, sendNode.getDataSchema(),
_leafQueryExecutor, _executorService);
MailboxSendOperator mailboxSendOperator =
new MailboxSendOperator(executionContext, leafStageOperator, sendNode.getDistributionType(),
sendNode.getDistributionKeys(), sendNode.getCollationKeys(), sendNode.getCollationDirections(),
sendNode.isSortOnSender(), sendNode.getReceiverStageId());
return new OpChain(executionContext, mailboxSendOperator);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,20 @@ public class OpChainExecutionContext {
private final PipelineBreakerResult _pipelineBreakerResult;
private final boolean _traceEnabled;

protected OpChainExecutionContext(OpChainExecutionContext that) {
_mailboxService = that._mailboxService;
_requestId = that._requestId;
_stageId = that._stageId;
_server = that._server;
_deadlineMs = that._deadlineMs;
_opChainMetadata = that._opChainMetadata;
_stageMetadata = that._stageMetadata;
_id = that._id;
_stats = that._stats;
_pipelineBreakerResult = that._pipelineBreakerResult;
_traceEnabled = that._traceEnabled;
}

public OpChainExecutionContext(MailboxService mailboxService, long requestId, int stageId,
VirtualServerAddress server, long deadlineMs, Map<String, String> opChainMetadata, StageMetadata stageMetadata,
PipelineBreakerResult pipelineBreakerResult) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.query.runtime.plan.server;

import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.pinot.common.request.PinotQuery;
import org.apache.pinot.core.query.executor.QueryExecutor;
import org.apache.pinot.core.query.request.ServerQueryRequest;
import org.apache.pinot.query.planner.plannode.PlanNode;
import org.apache.pinot.query.runtime.plan.DistributedStagePlan;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;


/**
* Extension of the {@link OpChainExecutionContext} for {@link DistributedStagePlan} runs on leaf-stage.
*
* The difference is that: on a leaf-stage server node, {@link PlanNode} are split into {@link PinotQuery} part and
* {@link org.apache.pinot.query.runtime.operator.OpChain} part and are connected together in this context.
*/
public class ServerOpChainExecutionContext extends OpChainExecutionContext {
private final DistributedStagePlan _stagePlan;
private final QueryExecutor _queryExecutor;
private final ExecutorService _executorService;

private final PinotQuery _pinotQuery;
private PlanNode _leafStageBoundaryNode;
private List<ServerQueryRequest> _serverQueryRequests;

public ServerOpChainExecutionContext(OpChainExecutionContext executionContext, DistributedStagePlan stagePlan,
QueryExecutor leafQueryExecutor, ExecutorService executorService) {
super(executionContext);
_stagePlan = stagePlan;
_queryExecutor = leafQueryExecutor;
_executorService = executorService;
_pinotQuery = new PinotQuery();
}

public DistributedStagePlan getStagePlan() {
return _stagePlan;
}

public QueryExecutor getQueryExecutor() {
return _queryExecutor;
}

public ExecutorService getExecutorService() {
return _executorService;
}

public PinotQuery getPinotQuery() {
return _pinotQuery;
}

public PlanNode getLeafStageBoundaryNode() {
return _leafStageBoundaryNode;
}

public void setLeafStageBoundaryNode(PlanNode leafStageBoundaryNode) {
_leafStageBoundaryNode = leafStageBoundaryNode;
}

public List<ServerQueryRequest> getServerQueryRequests() {
return _serverQueryRequests;
}

public void setServerQueryRequests(List<ServerQueryRequest> serverQueryRequests) {
_serverQueryRequests = serverQueryRequests;
}
}
Loading