Skip to content

Commit

Permalink
Transformations of Existential Subqueries using Early-out Joins
Browse files Browse the repository at this point in the history
Adds three optimizer rules:

TransformUncorrelatedInPredicateSubqueryToDistinctInnerJoin - converts an in-predicate subquery into an inner join with distinct aggregation (logically equivalent to a semijoin)

TransformDistinctInnerJoinToRightEarlyOutJoin - pushes aggregation into the left input of an inner join where applicable

TransformDistinctInnerJoinToLeftEarlyOutJoin - converts an inner join with distinct aggregation into a semijoin

Benchmarked on TPCDS(100G) data set. Queries impacted (9/100). Wallclock time performance improvement 10% for the impacted queries.
  • Loading branch information
ClarenceThreepwood authored and rschlussel committed Nov 14, 2022
1 parent d21bbc2 commit 5b1f2dd
Show file tree
Hide file tree
Showing 25 changed files with 1,820 additions and 38 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.benchmark;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.testing.LocalQueryRunner;
import com.google.common.collect.ImmutableMap;
import org.intellij.lang.annotations.Language;

import java.util.Map;

import static com.facebook.presto.SystemSessionProperties.EXPLOIT_CONSTRAINTS;
import static com.facebook.presto.SystemSessionProperties.IN_PREDICATES_AS_INNER_JOINS_ENABLED;
import static com.facebook.presto.SystemSessionProperties.PUSH_AGGREGATION_BELOW_JOIN_BYTE_REDUCTION_THRESHOLD;

public class SqlEarlyOutJoinsBenchmarks
extends AbstractSqlBenchmark
{
private static final Logger LOGGER = Logger.get(SqlEarlyOutJoinsBenchmarks.class);

private static Map<String, String> disableOptimization = ImmutableMap.of(IN_PREDICATES_AS_INNER_JOINS_ENABLED, Boolean.toString(false),
EXPLOIT_CONSTRAINTS, Boolean.toString(true));
private static Map<String, String> enableOptimization = ImmutableMap.of(IN_PREDICATES_AS_INNER_JOINS_ENABLED, Boolean.toString(true),
EXPLOIT_CONSTRAINTS, Boolean.toString(true));

public SqlEarlyOutJoinsBenchmarks(LocalQueryRunner localQueryRunner, @Language("SQL") String sql)
{
super(localQueryRunner, "early_out_joins", 10, 10, sql);
}

public static void main(String[] args)
{
benchmarkTransformDistinctInnerJoinToLeftEarlyOutJoin();
benchmarkTransformDistinctInnerJoinToRightEarlyOutJoin();
benchmarkRewriteOfInPredicateToDistinctInnerJoin();
}

private static void benchmarkTransformDistinctInnerJoinToLeftEarlyOutJoin()
{
LOGGER.info("benchmarkTransformDistinctInnerJoinToLeftEarlyOutJoin");
String sql = "select distinct orderkey from lineitem, nation where orderkey=nationkey";
LOGGER.info("Without optimization");
new SqlEarlyOutJoinsBenchmarks(BenchmarkQueryRunner.createLocalQueryRunner(disableOptimization), sql).runBenchmark(new SimpleLineBenchmarkResultWriter(System.out));
LOGGER.info("With optimization");
new SqlEarlyOutJoinsBenchmarks(BenchmarkQueryRunner.createLocalQueryRunner(enableOptimization), sql).runBenchmark(new SimpleLineBenchmarkResultWriter(System.out));
}

private static void benchmarkTransformDistinctInnerJoinToRightEarlyOutJoin()
{
LOGGER.info("benchmarkTransformDistinctInnerJoinToRightEarlyOutJoin");
String sql = "select distinct l.orderkey, l.comment from lineitem l, orders o where l.orderkey = o.orderkey";
LOGGER.info("Without optimization");
new SqlEarlyOutJoinsBenchmarks(BenchmarkQueryRunner.createLocalQueryRunner(disableOptimization), sql).runBenchmark(new SimpleLineBenchmarkResultWriter(System.out));
LOGGER.info("With optimization");
new SqlEarlyOutJoinsBenchmarks(BenchmarkQueryRunner.createLocalQueryRunner(enableOptimization), sql).runBenchmark(new SimpleLineBenchmarkResultWriter(System.out));
}

private static void benchmarkRewriteOfInPredicateToDistinctInnerJoin()
{
LOGGER.info("benchmarkInPredicateToDistinctInnerJoin");
LOGGER.info("Case 1: Rewrite IN predicate to distinct + inner join");
String sql = " explain select * from region where regionkey in (select orderkey from lineitem)";
LOGGER.info("Without optimization");
new SqlEarlyOutJoinsBenchmarks(BenchmarkQueryRunner.createLocalQueryRunner(disableOptimization), sql).runBenchmark(new SimpleLineBenchmarkResultWriter(System.out));
LOGGER.info("With optimization: case 1");
new SqlEarlyOutJoinsBenchmarks(BenchmarkQueryRunner.createLocalQueryRunner(enableOptimization), sql).runBenchmark(new SimpleLineBenchmarkResultWriter(System.out));

LOGGER.info("Case 2: Rewrite IN predicate to distinct + inner join and then push aggregation down into the probe of the join");
//Use same query as previous and change the byte reduction threshold
LOGGER.info("With optimization: case 2");
Map<String, String> alteredByteReductionThreshold = ImmutableMap.<String, String>builder()
.putAll(enableOptimization)
.put(PUSH_AGGREGATION_BELOW_JOIN_BYTE_REDUCTION_THRESHOLD, "0.001")
.build();
new SqlEarlyOutJoinsBenchmarks(BenchmarkQueryRunner.createLocalQueryRunner(enableOptimization), sql).runBenchmark(new SimpleLineBenchmarkResultWriter(System.out));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ public final class SystemSessionProperties
public static final String PUSH_REMOTE_EXCHANGE_THROUGH_GROUP_ID = "push_remote_exchange_through_group_id";
public static final String OPTIMIZE_MULTIPLE_APPROX_PERCENTILE_ON_SAME_FIELD = "optimize_multiple_approx_percentile_on_same_field";
public static final String RANDOMIZE_OUTER_JOIN_NULL_KEY = "randomize_outer_join_null_key";
public static final String IN_PREDICATES_AS_INNER_JOINS_ENABLED = "in_predicates_as_inner_joins_enabled";
public static final String PUSH_AGGREGATION_BELOW_JOIN_BYTE_REDUCTION_THRESHOLD = "push_aggregation_below_join_byte_reduction_threshold";
public static final String KEY_BASED_SAMPLING_ENABLED = "key_based_sampling_enabled";
public static final String KEY_BASED_SAMPLING_PERCENTAGE = "key_based_sampling_percentage";
public static final String KEY_BASED_SAMPLING_FUNCTION = "key_based_sampling_function";
Expand Down Expand Up @@ -1421,6 +1423,15 @@ public SystemSessionProperties(
REMOVE_REDUNDANT_DISTINCT_AGGREGATION_ENABLED,
"Enable removing distinct aggregation node if input is already distinct",
featuresConfig.isRemoveRedundantDistinctAggregationEnabled(),
false),
booleanProperty(IN_PREDICATES_AS_INNER_JOINS_ENABLED,
"Enable transformation of IN predicates to inner joins",
featuresConfig.isInPredicatesAsInnerJoinsEnabled(),
false),
doubleProperty(
PUSH_AGGREGATION_BELOW_JOIN_BYTE_REDUCTION_THRESHOLD,
"Byte reduction ratio threshold at which to disable pushdown of aggregation below inner join",
featuresConfig.getPushAggregationBelowJoinByteReductionThreshold(),
false));
}

Expand Down Expand Up @@ -2381,4 +2392,14 @@ public static boolean isRemoveRedundantDistinctAggregationEnabled(Session sessio
{
return session.getSystemProperty(REMOVE_REDUNDANT_DISTINCT_AGGREGATION_ENABLED, Boolean.class);
}

public static boolean isInPredicatesAsInnerJoinsEnabled(Session session)
{
return session.getSystemProperty(IN_PREDICATES_AS_INNER_JOINS_ENABLED, Boolean.class);
}

public static double getPushAggregationBelowJoinByteReductionThreshold(Session session)
{
return session.getSystemProperty(PUSH_AGGREGATION_BELOW_JOIN_BYTE_REDUCTION_THRESHOLD, Double.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,8 @@ public class FeaturesConfig
private boolean randomizeOuterJoinNullKey;
private boolean isOptimizeConditionalAggregationEnabled;
private boolean isRemoveRedundantDistinctAggregationEnabled = true;
private boolean inPredicatesAsInnerJoinsEnabled;
private double pushAggregationBelowJoinByteReductionThreshold = 1;

public enum PartitioningPrecisionStrategy
{
Expand Down Expand Up @@ -2287,4 +2289,30 @@ public FeaturesConfig setRemoveRedundantDistinctAggregationEnabled(boolean isRem
this.isRemoveRedundantDistinctAggregationEnabled = isRemoveRedundantDistinctAggregationEnabled;
return this;
}

public boolean isInPredicatesAsInnerJoinsEnabled()
{
return inPredicatesAsInnerJoinsEnabled;
}

@Config("optimizer.in-predicates-as-inner-joins-enabled")
@ConfigDescription("Enable rewrite of In predicates to INNER joins")
public FeaturesConfig setInPredicatesAsInnerJoinsEnabled(boolean inPredicatesAsInnerJoinsEnabled)
{
this.inPredicatesAsInnerJoinsEnabled = inPredicatesAsInnerJoinsEnabled;
return this;
}

public double getPushAggregationBelowJoinByteReductionThreshold()
{
return pushAggregationBelowJoinByteReductionThreshold;
}

@Config("optimizer.push-aggregation-below-join-byte-reduction-threshold")
@ConfigDescription("Byte reduction ratio threshold at which to disable pushdown of aggregation below inner join")
public FeaturesConfig setPushAggregationBelowJoinByteReductionThreshold(double pushAggregationBelowJoinByteReductionThreshold)
{
this.pushAggregationBelowJoinByteReductionThreshold = pushAggregationBelowJoinByteReductionThreshold;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@
import com.facebook.presto.sql.planner.iterative.rule.TransformCorrelatedScalarAggregationToJoin;
import com.facebook.presto.sql.planner.iterative.rule.TransformCorrelatedScalarSubquery;
import com.facebook.presto.sql.planner.iterative.rule.TransformCorrelatedSingleRowSubqueryToProject;
import com.facebook.presto.sql.planner.iterative.rule.TransformDistinctInnerJoinToLeftEarlyOutJoin;
import com.facebook.presto.sql.planner.iterative.rule.TransformDistinctInnerJoinToRightEarlyOutJoin;
import com.facebook.presto.sql.planner.iterative.rule.TransformExistsApplyToLateralNode;
import com.facebook.presto.sql.planner.iterative.rule.TransformUncorrelatedInPredicateSubqueryToDistinctInnerJoin;
import com.facebook.presto.sql.planner.iterative.rule.TransformUncorrelatedInPredicateSubqueryToSemiJoin;
import com.facebook.presto.sql.planner.iterative.rule.TransformUncorrelatedLateralToJoin;
import com.facebook.presto.sql.planner.iterative.rule.TranslateExpressions;
Expand Down Expand Up @@ -395,6 +398,7 @@ public PlanOptimizers(
ImmutableSet.of(
new RemoveUnreferencedScalarLateralNodes(),
new TransformUncorrelatedLateralToJoin(),
new TransformUncorrelatedInPredicateSubqueryToDistinctInnerJoin(),
new TransformUncorrelatedInPredicateSubqueryToSemiJoin(),
new TransformCorrelatedScalarAggregationToJoin(metadata.getFunctionAndTypeManager()),
new TransformCorrelatedLateralJoinToJoin())),
Expand Down Expand Up @@ -599,6 +603,24 @@ public PlanOptimizers(
// We run it again to mark this for intermediate join nodes.
builder.add(new StatsRecordingPlanOptimizer(optimizerStats, new HistoricalStatisticsEquivalentPlanMarkingOptimizer(statsCalculator)));

// Run this set of join transformations after ReorderJoins, but before DetermineJoinDistributionType
builder.add(new IterativeOptimizer(
ruleStats,
statsCalculator,
estimatedExchangesCostCalculator,
Optional.of(new LogicalPropertiesProviderImpl(new FunctionResolution(metadata.getFunctionAndTypeManager()))),
ImmutableSet.of(
new TransformDistinctInnerJoinToLeftEarlyOutJoin(),
new TransformDistinctInnerJoinToRightEarlyOutJoin(),
new RemoveRedundantDistinct(),
new RemoveRedundantTopN(),
new RemoveRedundantSort(),
new RemoveRedundantLimit(),
new RemoveRedundantDistinctLimit(),
new RemoveRedundantAggregateDistinct(),
new RemoveRedundantIdentityProjections(),
new PushAggregationThroughOuterJoin(metadata.getFunctionAndTypeManager()))));

builder.add(new IterativeOptimizer(
ruleStats,
statsCalculator,
Expand Down
Loading

0 comments on commit 5b1f2dd

Please sign in to comment.