Skip to content

Commit

Permalink
Optimize join with empty hashtable
Browse files Browse the repository at this point in the history
If there is no spill, and build hashtable is empty, and is inner join
or right join. Skip the probe side.
  • Loading branch information
feilong-liu authored and pranjalssh committed Mar 4, 2023
1 parent e16f275 commit 614dd67
Show file tree
Hide file tree
Showing 13 changed files with 280 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ protected List<Driver> createDrivers(TaskContext taskContext)
hashChannel,
Optional.empty(),
OptionalInt.empty(),
unsupportedPartitioningSpillerFactory());
unsupportedPartitioningSpillerFactory(),
false);
joinDriversBuilder.add(joinOperator);
joinDriversBuilder.add(new NullOutputOperatorFactory(3, new PlanNodeId("test")));
DriverFactory joinDriverFactory = new DriverFactory(1, true, true, joinDriversBuilder.build(), OptionalInt.empty(), UNGROUPED_EXECUTION, Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ protected List<Driver> createDrivers(TaskContext taskContext)
OptionalInt.empty(),
Optional.empty(),
OptionalInt.empty(),
unsupportedPartitioningSpillerFactory());
unsupportedPartitioningSpillerFactory(),
false);
joinDriversBuilder.add(joinOperator);
joinDriversBuilder.add(new NullOutputOperatorFactory(3, new PlanNodeId("test")));
DriverFactory joinDriverFactory = new DriverFactory(1, true, true, joinDriversBuilder.build(), OptionalInt.empty(), UNGROUPED_EXECUTION, Optional.empty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,17 @@ protected List<Driver> createDrivers(TaskContext taskContext)

List<Type> lineItemTypes = getColumnTypes("lineitem", "orderkey", "quantity");
OperatorFactory lineItemTableScan = createTableScanOperator(0, new PlanNodeId("test"), "lineitem", "orderkey", "quantity");
OperatorFactory joinOperator = LOOKUP_JOIN_OPERATORS.innerJoin(1, new PlanNodeId("test"), lookupSourceFactoryManager, lineItemTypes, Ints.asList(0), OptionalInt.empty(), Optional.empty(), OptionalInt.empty(), unsupportedPartitioningSpillerFactory());
OperatorFactory joinOperator = LOOKUP_JOIN_OPERATORS.innerJoin(
1,
new PlanNodeId("test"),
lookupSourceFactoryManager,
lineItemTypes,
Ints.asList(0),
OptionalInt.empty(),
Optional.empty(),
OptionalInt.empty(),
unsupportedPartitioningSpillerFactory(),
false);
NullOutputOperatorFactory output = new NullOutputOperatorFactory(2, new PlanNodeId("test"));
this.probeDriverFactory = new DriverFactory(1, true, true, ImmutableList.of(lineItemTableScan, joinOperator, output), OptionalInt.empty(), UNGROUPED_EXECUTION, Optional.empty());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ public final class SystemSessionProperties
public static final String REMOVE_REDUNDANT_DISTINCT_AGGREGATION_ENABLED = "remove_redundant_distinct_aggregation_enabled";
public static final String PREFILTER_FOR_GROUPBY_LIMIT = "prefilter_for_groupby_limit";
public static final String PREFILTER_FOR_GROUPBY_LIMIT_TIMEOUT_MS = "prefilter_for_groupby_limit_timeout_ms";
public static final String OPTIMIZE_JOIN_PROBE_FOR_EMPTY_BUILD_RUNTIME = "optimize_join_probe_for_empty_build_runtime";

// TODO: Native execution related session properties that are temporarily put here. They will be relocated in the future.
public static final String NATIVE_SIMPLIFIED_EXPRESSION_EVALUATION_ENABLED = "simplified_expression_evaluation_enabled";
Expand Down Expand Up @@ -1444,6 +1445,11 @@ public SystemSessionProperties(
PREFILTER_FOR_GROUPBY_LIMIT_TIMEOUT_MS,
"Timeout for finding the LIMIT number of keys for group by",
10000,
false),
booleanProperty(
OPTIMIZE_JOIN_PROBE_FOR_EMPTY_BUILD_RUNTIME,
"Optimize join probe at runtime if build side is empty",
featuresConfig.isOptimizeJoinProbeForEmptyBuildRuntimeEnabled(),
false));
}

Expand Down Expand Up @@ -2431,4 +2437,9 @@ public static int getPrefilterForGroupbyLimitTimeoutMS(Session session)
{
return session.getSystemProperty(PREFILTER_FOR_GROUPBY_LIMIT_TIMEOUT_MS, Integer.class);
}

public static boolean isOptimizeJoinProbeForEmptyBuildRuntimeEnabled(Session session)
{
return session.getSystemProperty(OPTIMIZE_JOIN_PROBE_FOR_EMPTY_BUILD_RUNTIME, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public class LookupJoinOperator
private Optional<Partition<Supplier<LookupSource>>> currentPartition = Optional.empty();
private Optional<ListenableFuture<Supplier<LookupSource>>> unspilledLookupSource = Optional.empty();
private Iterator<Page> unspilledInputPages = emptyIterator();
private final boolean optimizeProbeForEmptyBuild;

public LookupJoinOperator(
OperatorContext operatorContext,
Expand All @@ -106,7 +107,8 @@ public LookupJoinOperator(
Runnable afterClose,
OptionalInt lookupJoinsCount,
HashGenerator hashGenerator,
PartitioningSpillerFactory partitioningSpillerFactory)
PartitioningSpillerFactory partitioningSpillerFactory,
boolean optimizeProbeForEmptyBuild)
{
this.operatorContext = requireNonNull(operatorContext, "operatorContext is null");
this.probeTypes = ImmutableList.copyOf(requireNonNull(probeTypes, "probeTypes is null"));
Expand All @@ -127,6 +129,7 @@ public LookupJoinOperator(
operatorContext.setInfoSupplier(this.statisticsCounter);

this.pageBuilder = new LookupJoinPageBuilder(buildOutputTypes);
this.optimizeProbeForEmptyBuild = optimizeProbeForEmptyBuild;
}

@Override
Expand Down Expand Up @@ -184,6 +187,18 @@ public ListenableFuture<?> isBlocked()
@Override
public boolean needsInput()
{
// We can skip probe for empty build input only when probeOnOuterSide is false
if (optimizeProbeForEmptyBuild && !probeOnOuterSide) {
if (tryFetchLookupSourceProvider()) {
lookupSourceProvider.withLease(lookupSourceLease -> {
// Do not have spill, build side is empty and probe side does not output for non match, skip and finish the operator
if (!lookupSourceLease.hasSpilled() && lookupSourceLease.getLookupSource().isEmpty()) {
finish();
}
return null;
});
}
}
return !finishing
&& lookupSourceProviderFuture.isDone()
&& spillInProgress.isDone()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class LookupJoinOperatorFactory
private final OptionalInt totalOperatorsCount;
private final HashGenerator probeHashGenerator;
private final PartitioningSpillerFactory partitioningSpillerFactory;
private final boolean optimizeProbeForEmptyBuild;

private boolean closed;

Expand All @@ -62,7 +63,8 @@ public LookupJoinOperatorFactory(
OptionalInt totalOperatorsCount,
List<Integer> probeJoinChannels,
OptionalInt probeHashChannel,
PartitioningSpillerFactory partitioningSpillerFactory)
PartitioningSpillerFactory partitioningSpillerFactory,
boolean optimizeProbeForEmptyBuild)
{
this.operatorId = operatorId;
this.planNodeId = requireNonNull(planNodeId, "planNodeId is null");
Expand Down Expand Up @@ -102,6 +104,7 @@ public LookupJoinOperatorFactory(
}

this.partitioningSpillerFactory = requireNonNull(partitioningSpillerFactory, "partitioningSpillerFactory is null");
this.optimizeProbeForEmptyBuild = optimizeProbeForEmptyBuild;
}

private LookupJoinOperatorFactory(LookupJoinOperatorFactory other)
Expand All @@ -120,6 +123,7 @@ private LookupJoinOperatorFactory(LookupJoinOperatorFactory other)
totalOperatorsCount = other.totalOperatorsCount;
probeHashGenerator = other.probeHashGenerator;
partitioningSpillerFactory = other.partitioningSpillerFactory;
optimizeProbeForEmptyBuild = other.optimizeProbeForEmptyBuild;

closed = false;
joinBridgeManager.incrementProbeFactoryCount();
Expand Down Expand Up @@ -151,7 +155,8 @@ public Operator createOperator(DriverContext driverContext)
() -> joinBridgeManager.probeOperatorClosed(driverContext.getLifespan()),
totalOperatorsCount,
probeHashGenerator,
partitioningSpillerFactory);
partitioningSpillerFactory,
optimizeProbeForEmptyBuild);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,24 +58,108 @@ public LookupJoinOperators()
{
}

public OperatorFactory innerJoin(int operatorId, PlanNodeId planNodeId, JoinBridgeManager<? extends LookupSourceFactory> lookupSourceFactory, List<Type> probeTypes, List<Integer> probeJoinChannel, OptionalInt probeHashChannel, Optional<List<Integer>> probeOutputChannels, OptionalInt totalOperatorsCount, PartitioningSpillerFactory partitioningSpillerFactory)
public OperatorFactory innerJoin(
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<? extends LookupSourceFactory> lookupSourceFactory,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Optional<List<Integer>> probeOutputChannels,
OptionalInt totalOperatorsCount,
PartitioningSpillerFactory partitioningSpillerFactory,
boolean optimizeProbeForEmptyBuild)
{
return createJoinOperatorFactory(operatorId, planNodeId, lookupSourceFactory, probeTypes, probeJoinChannel, probeHashChannel, probeOutputChannels.orElse(rangeList(probeTypes.size())), JoinType.INNER, totalOperatorsCount, partitioningSpillerFactory);
return createJoinOperatorFactory(
operatorId,
planNodeId,
lookupSourceFactory,
probeTypes,
probeJoinChannel,
probeHashChannel,
probeOutputChannels.orElse(rangeList(probeTypes.size())),
JoinType.INNER,
totalOperatorsCount,
partitioningSpillerFactory,
optimizeProbeForEmptyBuild);
}

public OperatorFactory probeOuterJoin(int operatorId, PlanNodeId planNodeId, JoinBridgeManager<? extends LookupSourceFactory> lookupSourceFactory, List<Type> probeTypes, List<Integer> probeJoinChannel, OptionalInt probeHashChannel, Optional<List<Integer>> probeOutputChannels, OptionalInt totalOperatorsCount, PartitioningSpillerFactory partitioningSpillerFactory)
public OperatorFactory probeOuterJoin(
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<? extends LookupSourceFactory> lookupSourceFactory,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Optional<List<Integer>> probeOutputChannels,
OptionalInt totalOperatorsCount,
PartitioningSpillerFactory partitioningSpillerFactory,
boolean optimizeProbeForEmptyBuild)
{
return createJoinOperatorFactory(operatorId, planNodeId, lookupSourceFactory, probeTypes, probeJoinChannel, probeHashChannel, probeOutputChannels.orElse(rangeList(probeTypes.size())), JoinType.PROBE_OUTER, totalOperatorsCount, partitioningSpillerFactory);
return createJoinOperatorFactory(
operatorId,
planNodeId,
lookupSourceFactory,
probeTypes,
probeJoinChannel,
probeHashChannel,
probeOutputChannels.orElse(rangeList(probeTypes.size())),
JoinType.PROBE_OUTER,
totalOperatorsCount,
partitioningSpillerFactory,
optimizeProbeForEmptyBuild);
}

public OperatorFactory lookupOuterJoin(int operatorId, PlanNodeId planNodeId, JoinBridgeManager<? extends LookupSourceFactory> lookupSourceFactory, List<Type> probeTypes, List<Integer> probeJoinChannel, OptionalInt probeHashChannel, Optional<List<Integer>> probeOutputChannels, OptionalInt totalOperatorsCount, PartitioningSpillerFactory partitioningSpillerFactory)
public OperatorFactory lookupOuterJoin(
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<? extends LookupSourceFactory> lookupSourceFactory,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Optional<List<Integer>> probeOutputChannels,
OptionalInt totalOperatorsCount,
PartitioningSpillerFactory partitioningSpillerFactory,
boolean optimizeProbeForEmptyBuild)
{
return createJoinOperatorFactory(operatorId, planNodeId, lookupSourceFactory, probeTypes, probeJoinChannel, probeHashChannel, probeOutputChannels.orElse(rangeList(probeTypes.size())), JoinType.LOOKUP_OUTER, totalOperatorsCount, partitioningSpillerFactory);
return createJoinOperatorFactory(
operatorId,
planNodeId,
lookupSourceFactory,
probeTypes,
probeJoinChannel,
probeHashChannel,
probeOutputChannels.orElse(rangeList(probeTypes.size())),
JoinType.LOOKUP_OUTER,
totalOperatorsCount,
partitioningSpillerFactory,
optimizeProbeForEmptyBuild);
}

public OperatorFactory fullOuterJoin(int operatorId, PlanNodeId planNodeId, JoinBridgeManager<? extends LookupSourceFactory> lookupSourceFactory, List<Type> probeTypes, List<Integer> probeJoinChannel, OptionalInt probeHashChannel, Optional<List<Integer>> probeOutputChannels, OptionalInt totalOperatorsCount, PartitioningSpillerFactory partitioningSpillerFactory)
public OperatorFactory fullOuterJoin(
int operatorId,
PlanNodeId planNodeId,
JoinBridgeManager<? extends LookupSourceFactory> lookupSourceFactory,
List<Type> probeTypes,
List<Integer> probeJoinChannel,
OptionalInt probeHashChannel,
Optional<List<Integer>> probeOutputChannels,
OptionalInt totalOperatorsCount,
PartitioningSpillerFactory partitioningSpillerFactory,
boolean optimizeProbeForEmptyBuild)
{
return createJoinOperatorFactory(operatorId, planNodeId, lookupSourceFactory, probeTypes, probeJoinChannel, probeHashChannel, probeOutputChannels.orElse(rangeList(probeTypes.size())), JoinType.FULL_OUTER, totalOperatorsCount, partitioningSpillerFactory);
return createJoinOperatorFactory(
operatorId,
planNodeId,
lookupSourceFactory,
probeTypes,
probeJoinChannel,
probeHashChannel,
probeOutputChannels.orElse(rangeList(probeTypes.size())),
JoinType.FULL_OUTER,
totalOperatorsCount,
partitioningSpillerFactory,
optimizeProbeForEmptyBuild);
}

private static List<Integer> rangeList(int endExclusive)
Expand All @@ -95,7 +179,8 @@ private OperatorFactory createJoinOperatorFactory(
List<Integer> probeOutputChannels,
JoinType joinType,
OptionalInt totalOperatorsCount,
PartitioningSpillerFactory partitioningSpillerFactory)
PartitioningSpillerFactory partitioningSpillerFactory,
boolean optimizeProbeForEmptyBuild)
{
List<Type> probeOutputChannelTypes = probeOutputChannels.stream()
.map(probeTypes::get)
Expand All @@ -113,6 +198,7 @@ private OperatorFactory createJoinOperatorFactory(
totalOperatorsCount,
probeJoinChannel,
probeHashChannel,
partitioningSpillerFactory);
partitioningSpillerFactory,
optimizeProbeForEmptyBuild);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,7 @@ public class FeaturesConfig
private boolean inPredicatesAsInnerJoinsEnabled;
private double pushAggregationBelowJoinByteReductionThreshold = 1;
private boolean prefilterForGroupbyLimit;
private boolean isOptimizeJoinProbeWithEmptyBuildRuntime;

public enum PartitioningPrecisionStrategy
{
Expand Down Expand Up @@ -2342,4 +2343,17 @@ public FeaturesConfig setPrefilterForGroupbyLimit(boolean prefilterForGroupbyLim
this.prefilterForGroupbyLimit = prefilterForGroupbyLimit;
return this;
}

public boolean isOptimizeJoinProbeForEmptyBuildRuntimeEnabled()
{
return isOptimizeJoinProbeWithEmptyBuildRuntime;
}

@Config("optimizer.optimize-probe-for-empty-build-runtime")
@ConfigDescription("Optimize join probe at runtime if build side is empty")
public FeaturesConfig setOptimizeJoinProbeForEmptyBuildRuntimeEnabled(boolean isOptimizeJoinProbeWithEmptyBuildRuntime)
{
this.isOptimizeJoinProbeWithEmptyBuildRuntime = isOptimizeJoinProbeWithEmptyBuildRuntime;
return this;
}
}
Loading

0 comments on commit 614dd67

Please sign in to comment.