Skip to content

Commit

Permalink
[native] Use row wise encoding for exchanges with more than 1000 streams
Browse files Browse the repository at this point in the history
  • Loading branch information
arhimondr committed Nov 12, 2024
1 parent 5610577 commit 93526d3
Show file tree
Hide file tree
Showing 58 changed files with 926 additions and 165 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ public final class SystemSessionProperties
private static final String NATIVE_EXECUTION_EXECUTABLE_PATH = "native_execution_executable_path";
private static final String NATIVE_EXECUTION_PROGRAM_ARGUMENTS = "native_execution_program_arguments";
public static final String NATIVE_EXECUTION_PROCESS_REUSE_ENABLED = "native_execution_process_reuse_enabled";
public static final String NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING = "native_min_columnar_encoding_channels_to_prefer_row_wise_encoding";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -1824,6 +1825,11 @@ public SystemSessionProperties(
booleanProperty(INLINE_PROJECTIONS_ON_VALUES,
"Whether to evaluate project node on values node",
featuresConfig.getInlineProjectionsOnValues(),
false),
integerProperty(
NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING,
"Minimum number of columnar encoding channels to consider row wise encoding for partitioned exchange. Native execution only",
queryManagerConfig.getMinColumnarEncodingChannelsToPreferRowWiseEncoding(),
false));
}

Expand Down Expand Up @@ -3100,4 +3106,9 @@ public static boolean isInlineProjectionsOnValues(Session session)
{
return session.getSystemProperty(INLINE_PROJECTIONS_ON_VALUES, Boolean.class);
}

public static int getMinColumnarEncodingChannelsToPreferRowWiseEncoding(Session session)
{
return session.getSystemProperty(NATIVE_MIN_COLUMNAR_ENCODING_CHANNELS_TO_PREFER_ROW_WISE_ENCODING, Integer.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ public class QueryManagerConfig
private int rateLimiterCacheLimit = 1000;
private int rateLimiterCacheWindowMinutes = 5;

private int minColumnarEncodingChannelsToPreferRowWiseEncoding = 1000;

@Min(1)
public int getScheduleSplitBatchSize()
{
Expand Down Expand Up @@ -738,6 +740,19 @@ public QueryManagerConfig setEnableWorkerIsolation(boolean enableWorkerIsolation
return this;
}

public int getMinColumnarEncodingChannelsToPreferRowWiseEncoding()
{
return minColumnarEncodingChannelsToPreferRowWiseEncoding;
}

@Config("min-columnar-encoding-channels-to-prefer-row-wise-encoding")
@ConfigDescription("Minimum number of columnar encoding channels to consider row wise encoding for partitioned exchange. Native execution only")
public QueryManagerConfig setMinColumnarEncodingChannelsToPreferRowWiseEncoding(int minColumnarEncodingChannelsToPreferRowWiseEncoding)
{
this.minColumnarEncodingChannelsToPreferRowWiseEncoding = minColumnarEncodingChannelsToPreferRowWiseEncoding;
return this;
}

public enum ExchangeMaterializationStrategy
{
NONE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import static com.facebook.presto.SystemSessionProperties.isTableWriterMergeOperatorEnabled;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.spi.plan.ExchangeEncoding.COLUMNAR;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.LOCAL;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPARTITION;
Expand Down Expand Up @@ -284,6 +285,7 @@ public static TableFinishNode createTemporaryTableWriteWithExchanges(
outputs,
Optional.empty(),
false,
COLUMNAR,
Optional.empty());

ExchangeNode writerRemoteSource = new ExchangeNode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,8 @@ public class FeaturesConfig
private boolean eagerPlanValidationEnabled;
private int eagerPlanValidationThreadPoolSize = 20;

private boolean prestoSparkExecutionEnvironment;

public enum PartitioningPrecisionStrategy
{
// Let Presto decide when to repartition
Expand Down Expand Up @@ -2846,4 +2848,16 @@ public int getEagerPlanValidationThreadPoolSize()
{
return this.eagerPlanValidationThreadPoolSize;
}

public boolean isPrestoSparkExecutionEnvironment()
{
return prestoSparkExecutionEnvironment;
}

@Config("presto-spark-execution-environment")
public FeaturesConfig setPrestoSparkExecutionEnvironment(boolean prestoSparkExecutionEnvironment)
{
this.prestoSparkExecutionEnvironment = prestoSparkExecutionEnvironment;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,16 @@ private PlanNode createRemoteStreamingExchange(ExchangeNode exchange, RewriteCon
.map(PlanFragment::getId)
.collect(toImmutableList());

return new RemoteSourceNode(exchange.getSourceLocation(), exchange.getId(), exchange.getStatsEquivalentPlanNode(), childrenIds, exchange.getOutputVariables(), exchange.isEnsureSourceOrdering(), exchange.getOrderingScheme(), exchange.getType());
return new RemoteSourceNode(
exchange.getSourceLocation(),
exchange.getId(),
exchange.getStatsEquivalentPlanNode(),
childrenIds,
exchange.getOutputVariables(),
exchange.isEnsureSourceOrdering(),
exchange.getOrderingScheme(),
exchange.getType(),
exchange.getPartitioningScheme().getEncoding());
}

protected void setDistributionForExchange(ExchangeNode.Type exchangeType, PartitioningScheme partitioningScheme, RewriteContext<FragmentProperties> context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,6 @@
import static com.facebook.presto.sql.analyzer.ExpressionTreeUtils.createSymbolReference;
import static com.facebook.presto.sql.gen.LambdaBytecodeGenerator.compileLambdaProvider;
import static com.facebook.presto.sql.planner.RowExpressionInterpreter.rowExpressionInterpreter;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.COORDINATOR_DISTRIBUTION;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION;
import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION;
import static com.facebook.presto.sql.planner.plan.AssignmentUtils.identityAssignments;
import static com.facebook.presto.sql.relational.Expressions.constant;
import static com.facebook.presto.sql.tree.SortItem.Ordering.ASCENDING;
Expand Down Expand Up @@ -539,11 +534,7 @@ public LocalExecutionPlan plan(

private OutputFactory createOutputFactory(TaskContext taskContext, PartitioningScheme partitioningScheme, OutputBuffer outputBuffer)
{
if (partitioningScheme.getPartitioning().getHandle().equals(FIXED_BROADCAST_DISTRIBUTION) ||
partitioningScheme.getPartitioning().getHandle().equals(FIXED_ARBITRARY_DISTRIBUTION) ||
partitioningScheme.getPartitioning().getHandle().equals(SCALED_WRITER_DISTRIBUTION) ||
partitioningScheme.getPartitioning().getHandle().equals(SINGLE_DISTRIBUTION) ||
partitioningScheme.getPartitioning().getHandle().equals(COORDINATOR_DISTRIBUTION)) {
if (partitioningScheme.isSingleOrBroadcastOrArbitrary()) {
return new TaskOutputFactory(outputBuffer);
}

Expand All @@ -557,11 +548,7 @@ private OutputFactory createOutputFactory(TaskContext taskContext, PartitioningS

private Optional<OutputPartitioning> createOutputPartitioning(TaskContext taskContext, PartitioningScheme partitioningScheme)
{
if (partitioningScheme.getPartitioning().getHandle().equals(FIXED_BROADCAST_DISTRIBUTION) ||
partitioningScheme.getPartitioning().getHandle().equals(FIXED_ARBITRARY_DISTRIBUTION) ||
partitioningScheme.getPartitioning().getHandle().equals(SCALED_WRITER_DISTRIBUTION) ||
partitioningScheme.getPartitioning().getHandle().equals(SINGLE_DISTRIBUTION) ||
partitioningScheme.getPartitioning().getHandle().equals(COORDINATOR_DISTRIBUTION)) {
if (partitioningScheme.isSingleOrBroadcastOrArbitrary()) {
return Optional.empty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ private static SubPlan reassignPartitioningHandleIfNecessaryHelper(Metadata meta
outputPartitioningScheme.getOutputLayout(),
outputPartitioningScheme.getHashColumn(),
outputPartitioningScheme.isReplicateNullsAndAny(),
outputPartitioningScheme.getEncoding(),
outputPartitioningScheme.getBucketToPartition()),
fragment.getStageExecutionDescriptor(),
fragment.isOutputTableWriterFragment(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import com.facebook.presto.sql.planner.iterative.rule.CrossJoinWithOrFilterToInnerJoin;
import com.facebook.presto.sql.planner.iterative.rule.DesugarLambdaExpression;
import com.facebook.presto.sql.planner.iterative.rule.DetermineJoinDistributionType;
import com.facebook.presto.sql.planner.iterative.rule.DetermineRemotePartitionedExchangeEncoding;
import com.facebook.presto.sql.planner.iterative.rule.DetermineSemiJoinDistributionType;
import com.facebook.presto.sql.planner.iterative.rule.EliminateCrossJoins;
import com.facebook.presto.sql.planner.iterative.rule.EvaluateZeroLimit;
Expand Down Expand Up @@ -929,7 +930,14 @@ public PlanOptimizers(

// Precomputed hashes - this assumes that partitioning will not change
builder.add(new HashGenerationOptimizer(metadata.getFunctionAndTypeManager()));

builder.add(new IterativeOptimizer(
metadata,
ruleStats,
statsCalculator,
costCalculator,
ImmutableSet.of(new DetermineRemotePartitionedExchangeEncoding(
featuresConfig.isNativeExecutionEnabled(),
featuresConfig.isPrestoSparkExecutionEnvironment()))));
builder.add(new MetadataDeleteOptimizer(metadata));

// TODO: consider adding a formal final plan sanitization optimizer that prepares the plan for transmission/execution/logging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public SystemPartitionFunction getFunction()
@Override
public boolean isSingleNode()
{
return partitioning == SystemPartitioning.COORDINATOR_ONLY || partitioning == SystemPartitioning.SINGLE;
return function == SystemPartitionFunction.SINGLE;
}

@Override
Expand All @@ -110,6 +110,18 @@ public boolean isCoordinatorOnly()
return partitioning == SystemPartitioning.COORDINATOR_ONLY;
}

@Override
public boolean isBroadcast()
{
return function == SystemPartitionFunction.BROADCAST;
}

@Override
public boolean isArbitrary()
{
return function == SystemPartitionFunction.ROUND_ROBIN;
}

@Override
public boolean equals(Object o)
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.sql.planner.iterative.rule;

import com.facebook.presto.Session;
import com.facebook.presto.common.type.FixedWidthType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.matching.Captures;
import com.facebook.presto.matching.Pattern;
import com.facebook.presto.spi.relation.VariableReferenceExpression;
import com.facebook.presto.sql.planner.iterative.Rule;
import com.facebook.presto.sql.planner.plan.ExchangeNode;
import com.google.common.annotations.VisibleForTesting;

import static com.facebook.presto.SystemSessionProperties.getMinColumnarEncodingChannelsToPreferRowWiseEncoding;
import static com.facebook.presto.spi.plan.ExchangeEncoding.ROW_WISE;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Scope.REMOTE_STREAMING;
import static com.facebook.presto.sql.planner.plan.ExchangeNode.Type.REPARTITION;
import static com.facebook.presto.sql.planner.plan.Patterns.Exchange.scope;
import static com.facebook.presto.sql.planner.plan.Patterns.Exchange.type;
import static com.facebook.presto.sql.planner.plan.Patterns.exchange;

public class DetermineRemotePartitionedExchangeEncoding
implements Rule<ExchangeNode>
{
private static final Pattern<ExchangeNode> PATTERN = exchange()
.with(scope().equalTo(REMOTE_STREAMING))
.with(type().equalTo(REPARTITION));

private final boolean nativeExecution;
private final boolean prestoSparkExecutionEnvironment;

public DetermineRemotePartitionedExchangeEncoding(boolean nativeExecution, boolean prestoSparkExecutionEnvironment)
{
this.nativeExecution = nativeExecution;
this.prestoSparkExecutionEnvironment = prestoSparkExecutionEnvironment;
}

@Override
public Pattern<ExchangeNode> getPattern()
{
return PATTERN;
}

@Override
public boolean isEnabled(Session session)
{
return nativeExecution || prestoSparkExecutionEnvironment;
}

@Override
public Result apply(ExchangeNode node, Captures captures, Context context)
{
if (prestoSparkExecutionEnvironment) {
// In Presto on Spark, row-wise encoding is always used for non-special shuffles (i.e., excluding broadcast, single, and arbitrary shuffles).
// To accurately reflect this in the plan, the exchange encoding is set here.
// Presto on Spark does not check the ExchangeEncoding specified in the plan.
return determineForPrestoOnSpark(node);
}
if (nativeExecution) {
return determineForNativeExecution(context.getSession(), node);
}
// Presto Java runtime does not support row-wise encoding
return Result.empty();
}

private Result determineForPrestoOnSpark(ExchangeNode node)
{
// keep columnar for special cases
if (node.getPartitioningScheme().isSingleOrBroadcastOrArbitrary()) {
return Result.empty();
}
if (node.getPartitioningScheme().getEncoding() == ROW_WISE) {
// leave untouched if already visited
return Result.empty();
}
// otherwise switch to row-wise
return Result.ofPlanNode(node.withRowWiseEncoding());
}

private Result determineForNativeExecution(Session session, ExchangeNode node)
{
// keep columnar for special cases
if (node.getPartitioningScheme().isSingleOrBroadcastOrArbitrary()) {
return Result.empty();
}
if (node.getPartitioningScheme().getEncoding() == ROW_WISE) {
// leave untouched if already visited
return Result.empty();
}
int minChannelsToPreferRowWiseEncoding = getMinColumnarEncodingChannelsToPreferRowWiseEncoding(session);
if (estimateNumberOfOutputColumnarChannels(node) >= minChannelsToPreferRowWiseEncoding) {
return Result.ofPlanNode(node.withRowWiseEncoding());
}
return Result.empty();
}

@VisibleForTesting
static long estimateNumberOfOutputColumnarChannels(ExchangeNode node)
{
return node.getOutputVariables().stream()
.map(VariableReferenceExpression::getType)
.mapToLong(DetermineRemotePartitionedExchangeEncoding::estimateNumberOfColumnarChannels)
.sum();
}

@VisibleForTesting
static long estimateNumberOfColumnarChannels(Type type)
{
if (type instanceof FixedWidthType) {
// nulls and values
return 2;
}
if (!type.getTypeParameters().isEmpty()) {
// complex type
// nulls and offsets
long result = 2;
for (Type parameter : type.getTypeParameters()) {
result += estimateNumberOfColumnarChannels(parameter);
}
return result;
}
// nulls, offsets, values
return 3;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,7 @@ private PlanNode pushPartial(AggregationNode aggregation, ExchangeNode exchange,
aggregationOutputs,
exchange.getPartitioningScheme().getHashColumn(),
exchange.getPartitioningScheme().isReplicateNullsAndAny(),
exchange.getPartitioningScheme().getEncoding(),
exchange.getPartitioningScheme().getBucketToPartition());

return new ExchangeNode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public Result apply(ProjectNode project, Captures captures, Context context)
outputBuilder.build(),
exchange.getPartitioningScheme().getHashColumn(),
exchange.getPartitioningScheme().isReplicateNullsAndAny(),
exchange.getPartitioningScheme().getEncoding(),
exchange.getPartitioningScheme().getBucketToPartition());

PlanNode result = new ExchangeNode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ public Result apply(ExchangeNode node, Captures captures, Context context)
removeVariable(partitioningScheme.getOutputLayout(), assignUniqueId.getIdVariable()),
partitioningScheme.getHashColumn(),
partitioningScheme.isReplicateNullsAndAny(),
partitioningScheme.getEncoding(),
partitioningScheme.getBucketToPartition()),
ImmutableList.of(assignUniqueId.getSource()),
ImmutableList.of(removeVariable(getOnlyElement(node.getInputs()), assignUniqueId.getIdVariable())),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public Result apply(ExchangeNode node, Captures captures, Context context)
outputLayout,
partitioningScheme.getHashColumn(),
partitioningScheme.isReplicateNullsAndAny(),
partitioningScheme.getEncoding(),
partitioningScheme.getBucketToPartition()),
ImmutableList.of(groupIdNode.getSource()),
ImmutableList.of(inputs),
Expand Down
Loading

0 comments on commit 93526d3

Please sign in to comment.