Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public static void setMaxLinesOfStackTrace(int maxLinesOfStackTracePerFrame) {
public static final int SERVER_TABLE_MISSING_ERROR_CODE = 230;
public static final int SERVER_SEGMENT_MISSING_ERROR_CODE = 235;
public static final int QUERY_SCHEDULING_TIMEOUT_ERROR_CODE = 240;
public static final int SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE = 245;
public static final int EXECUTION_TIMEOUT_ERROR_CODE = 250;
public static final int DATA_TABLE_SERIALIZATION_ERROR_CODE = 260;
public static final int BROKER_GATHER_ERROR_CODE = 300;
Expand Down Expand Up @@ -105,6 +106,8 @@ public static void setMaxLinesOfStackTrace(int maxLinesOfStackTracePerFrame) {
new ProcessingException(SERVER_SEGMENT_MISSING_ERROR_CODE);
public static final ProcessingException QUERY_SCHEDULING_TIMEOUT_ERROR =
new ProcessingException(QUERY_SCHEDULING_TIMEOUT_ERROR_CODE);
public static final ProcessingException SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR =
new ProcessingException(SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
public static final ProcessingException EXECUTION_TIMEOUT_ERROR =
new ProcessingException(EXECUTION_TIMEOUT_ERROR_CODE);
public static final ProcessingException DATA_TABLE_SERIALIZATION_ERROR =
Expand Down Expand Up @@ -147,6 +150,7 @@ public static void setMaxLinesOfStackTrace(int maxLinesOfStackTracePerFrame) {
SERVER_TABLE_MISSING_ERROR.setMessage("ServerTableMissing");
SERVER_SEGMENT_MISSING_ERROR.setMessage("ServerSegmentMissing");
QUERY_SCHEDULING_TIMEOUT_ERROR.setMessage("QuerySchedulingTimeoutError");
SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR.setMessage("ServerResourceLimitExceededError");
EXECUTION_TIMEOUT_ERROR.setMessage("ExecutionTimeoutError");
DATA_TABLE_DESERIALIZATION_ERROR.setMessage("DataTableSerializationError");
BROKER_GATHER_ERROR.setMessage("BrokerGatherError");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ public class QueryOptionsUtils {
private QueryOptionsUtils() {
}


private static final Map<String, String> CONFIG_RESOLVER;
private static final RuntimeException CLASS_LOAD_ERROR;

Expand Down Expand Up @@ -189,4 +188,15 @@ public static Integer getGroupTrimThreshold(Map<String, String> queryOptions) {
public static boolean shouldDropResults(Map<String, String> queryOptions) {
return Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.QueryOptionKey.DROP_RESULTS));
}

@Nullable
public static Integer getMaxRowsInJoin(Map<String, String> queryOptions) {
String maxRowsInJoin = queryOptions.get(QueryOptionKey.MAX_ROWS_IN_JOIN);
return maxRowsInJoin != null ? Integer.parseInt(maxRowsInJoin) : null;
}

@Nullable
public static String getJoinOverflowMode(Map<String, String> queryOptions) {
return queryOptions.get(QueryOptionKey.JOIN_OVERFLOW_MODE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ public static class AggregateOptions {

public static class JoinHintOptions {
public static final String JOIN_STRATEGY = "join_strategy";
/**
* Max rows allowed to build the right table hash collection.
*/
public static final String MAX_ROWS_IN_JOIN = "max_rows_in_join";
/**
* Mode when join overflow happens, supported values: THROW or BREAK.
* THROW(default): Break right table build process, and throw exception, no JOIN with left table performed.
* BREAK: Break right table build process, continue to perform JOIN operation, results might be partial.
*/
public static final String JOIN_OVERFLOW_MODE = "join_overflow_mode";
}

public static class TableHintOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private static PlanNode convertLogicalJoin(LogicalJoin node, int currentStageId)
List<RexExpression> joinClause =
joinInfo.nonEquiConditions.stream().map(RexExpression::toRexExpression).collect(Collectors.toList());
return new JoinNode(currentStageId, toDataSchema(node.getRowType()), toDataSchema(node.getLeft().getRowType()),
toDataSchema(node.getRight().getRowType()), joinType, joinKeys, joinClause);
toDataSchema(node.getRight().getRowType()), joinType, joinKeys, joinClause, node.getHints());
}

private static DataSchema toDataSchema(RelDataType rowType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public Plan.ObjectField toObjectField() {
public static class NodeHint {
@ProtoProperties
public Map<String, Map<String, String>> _hintOptions;

public NodeHint() {
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Arrays;
import java.util.List;
import org.apache.calcite.rel.core.JoinRelType;
import org.apache.calcite.rel.hint.RelHint;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector;
Expand All @@ -36,6 +37,8 @@ public class JoinNode extends AbstractPlanNode {
@ProtoProperties
private List<RexExpression> _joinClause;
@ProtoProperties
private NodeHint _joinHints;
@ProtoProperties
private List<String> _leftColumnNames;
@ProtoProperties
private List<String> _rightColumnNames;
Expand All @@ -45,13 +48,14 @@ public JoinNode(int planFragmentId) {
}

public JoinNode(int planFragmentId, DataSchema dataSchema, DataSchema leftSchema, DataSchema rightSchema,
JoinRelType joinRelType, JoinKeys joinKeys, List<RexExpression> joinClause) {
JoinRelType joinRelType, JoinKeys joinKeys, List<RexExpression> joinClause, List<RelHint> joinHints) {
super(planFragmentId, dataSchema);
_leftColumnNames = Arrays.asList(leftSchema.getColumnNames());
_rightColumnNames = Arrays.asList(rightSchema.getColumnNames());
_joinRelType = joinRelType;
_joinKeys = joinKeys;
_joinClause = joinClause;
_joinHints = new NodeHint(joinHints);
}

public JoinRelType getJoinRelType() {
Expand All @@ -66,6 +70,10 @@ public List<RexExpression> getJoinClauses() {
return _joinClause;
}

public NodeHint getJoinHints() {
return _joinHints;
}

public List<String> getLeftColumnNames() {
return _leftColumnNames;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nullable;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.data.manager.InstanceDataManager;
import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
Expand Down Expand Up @@ -78,6 +80,12 @@ public class QueryRunner {

private OpChainSchedulerService _scheduler;

// Join Overflow configs
@Nullable
private Integer _maxRowsInJoin;
@Nullable
private String _joinOverflowMode;

/**
* Initializes the query executor.
* <p>Should be called only once and before calling any other method.
Expand All @@ -89,6 +97,11 @@ public void init(PinotConfiguration config, InstanceDataManager instanceDataMana
CommonConstants.Helix.SERVER_INSTANCE_PREFIX_LENGTH) : instanceName;
_port = config.getProperty(QueryConfig.KEY_OF_QUERY_RUNNER_PORT, QueryConfig.DEFAULT_QUERY_RUNNER_PORT);
_helixManager = helixManager;
// Set Join Overflow configs
_joinOverflowMode = config.getProperty(QueryConfig.KEY_OF_JOIN_OVERFLOW_MODE);
_maxRowsInJoin = config.containsKey(QueryConfig.KEY_OF_MAX_ROWS_IN_JOIN) ? Integer.parseInt(
config.getProperty(QueryConfig.KEY_OF_MAX_ROWS_IN_JOIN)) : null;

try {
//TODO: make this configurable
_opChainExecutor = ExecutorServiceUtils.create(config, "pinot.query.runner.opchain",
Expand Down Expand Up @@ -133,6 +146,9 @@ public void processQuery(DistributedStagePlan distributedStagePlan, Map<String,
PipelineBreakerResult pipelineBreakerResult = PipelineBreakerExecutor.executePipelineBreakers(_scheduler,
_mailboxService, distributedStagePlan, deadlineMs, requestId, isTraceEnabled);

// Set Join Overflow configs to StageMetadata from request
setJoinOverflowConfigs(distributedStagePlan, requestMetadataMap);

// run OpChain
if (DistributedStagePlan.isLeafStage(distributedStagePlan)) {
try {
Expand All @@ -157,6 +173,27 @@ public void processQuery(DistributedStagePlan distributedStagePlan, Map<String,
}
}

private void setJoinOverflowConfigs(DistributedStagePlan distributedStagePlan,
Map<String, String> requestMetadataMap) {
String joinOverflowMode = QueryOptionsUtils.getJoinOverflowMode(requestMetadataMap);
if (joinOverflowMode != null) {
distributedStagePlan.getStageMetadata().getCustomProperties()
.put(CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE, joinOverflowMode);
} else if (_joinOverflowMode != null) {
distributedStagePlan.getStageMetadata().getCustomProperties()
.put(CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE, _joinOverflowMode);
}

Integer maxRowsInJoin = QueryOptionsUtils.getMaxRowsInJoin(requestMetadataMap);
if (maxRowsInJoin != null) {
distributedStagePlan.getStageMetadata().getCustomProperties()
.put(CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN, String.valueOf(maxRowsInJoin));
} else if (_maxRowsInJoin != null) {
distributedStagePlan.getStageMetadata().getCustomProperties()
.put(CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN, String.valueOf(_maxRowsInJoin));
}
}

public void cancel(long requestId) {
_scheduler.cancel(requestId);
}
Expand Down
Loading