Skip to content

Commit

Permalink
Add extra config and session property to toggle join spilling
Browse files Browse the repository at this point in the history
  • Loading branch information
Saksham Sachdev authored and highker committed Sep 30, 2020
1 parent 9a11908 commit d8be70d
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public final class SystemSessionProperties
public static final String FAST_INEQUALITY_JOINS = "fast_inequality_joins";
public static final String QUERY_PRIORITY = "query_priority";
public static final String SPILL_ENABLED = "spill_enabled";
public static final String JOIN_SPILL_ENABLED = "join_spill_enabled";
public static final String AGGREGATION_OPERATOR_UNSPILL_MEMORY_LIMIT = "aggregation_operator_unspill_memory_limit";
public static final String OPTIMIZE_DISTINCT_AGGREGATIONS = "optimize_mixed_distinct_aggregations";
public static final String LEGACY_ROW_FIELD_ORDINAL_ACCESS = "legacy_row_field_ordinal_access";
Expand Down Expand Up @@ -568,6 +569,23 @@ public SystemSessionProperties(
return spillEnabled;
},
value -> value),
new PropertyMetadata<>(
JOIN_SPILL_ENABLED,
"Experimental: Enable join spilling",
BOOLEAN,
Boolean.class,
featuresConfig.isJoinSpillingEnabled(),
false,
value -> {
boolean joinSpillEnabled = (Boolean) value;
if (joinSpillEnabled && !featuresConfig.isSpillEnabled()) {
throw new PrestoException(
INVALID_SESSION_PROPERTY,
format("%s cannot be set to true; spilling is not configured", JOIN_SPILL_ENABLED));
}
return joinSpillEnabled;
},
value -> value),
new PropertyMetadata<>(
AGGREGATION_OPERATOR_UNSPILL_MEMORY_LIMIT,
"Experimental: How much memory can should be allocated per aggragation operator in unspilling process",
Expand Down Expand Up @@ -1183,6 +1201,11 @@ public static boolean isSpillEnabled(Session session)
return session.getSystemProperty(SPILL_ENABLED, Boolean.class);
}

public static boolean isJoinSpillingEnabled(Session session)
{
return session.getSystemProperty(JOIN_SPILL_ENABLED, Boolean.class);
}

public static DataSize getAggregationOperatorUnspillMemoryLimit(Session session)
{
DataSize memoryLimitForMerge = session.getSystemProperty(AGGREGATION_OPERATOR_UNSPILL_MEMORY_LIMIT, DataSize.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class FeaturesConfig
{
@VisibleForTesting
static final String SPILL_ENABLED = "experimental.spill-enabled";
static final String JOIN_SPILL_ENABLED = "experimental.join-spill-enabled";
@VisibleForTesting
static final String SPILLER_SPILL_PATH = "experimental.spiller-spill-path";

Expand Down Expand Up @@ -115,6 +116,7 @@ public class FeaturesConfig
private ArrayAggGroupImplementation arrayAggGroupImplementation = ArrayAggGroupImplementation.NEW;
private MultimapAggGroupImplementation multimapAggGroupImplementation = MultimapAggGroupImplementation.NEW;
private boolean spillEnabled;
private boolean joinSpillingEnabled;
private DataSize aggregationOperatorUnspillMemoryLimit = new DataSize(4, DataSize.Unit.MEGABYTE);
private List<Path> spillerSpillPaths = ImmutableList.of();
private int spillerThreads = 4;
Expand Down Expand Up @@ -807,6 +809,24 @@ public FeaturesConfig setSpillEnabled(boolean spillEnabled)
return this;
}

public boolean isJoinSpillingEnabled()
{
return joinSpillingEnabled;
}

@Config(JOIN_SPILL_ENABLED)
public FeaturesConfig setJoinSpillingEnabled(boolean joinSpillingEnabled)
{
this.joinSpillingEnabled = joinSpillingEnabled;
return this;
}

@AssertTrue(message = "If " + JOIN_SPILL_ENABLED + " is set to true, spilling must be enabled " + SPILL_ENABLED)
public boolean isSpillEnabledIfJoinSpillingIsEnabled()
{
return !isJoinSpillingEnabled() || isSpillEnabled();
}

public boolean isIterativeOptimizerEnabled()
{
return iterativeOptimizerEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@
import static com.facebook.presto.SystemSessionProperties.getTaskWriterCount;
import static com.facebook.presto.SystemSessionProperties.isEnableDynamicFiltering;
import static com.facebook.presto.SystemSessionProperties.isExchangeCompressionEnabled;
import static com.facebook.presto.SystemSessionProperties.isJoinSpillingEnabled;
import static com.facebook.presto.SystemSessionProperties.isOptimizeCommonSubExpressions;
import static com.facebook.presto.SystemSessionProperties.isOptimizedRepartitioningEnabled;
import static com.facebook.presto.SystemSessionProperties.isSpillEnabled;
Expand Down Expand Up @@ -2154,7 +2155,7 @@ private JoinBridgeManager<PartitionedLookupSourceFactory> createLookupSourceFact
OptionalInt buildHashChannel = buildHashVariable.map(variableChannelGetter(buildSource))
.map(OptionalInt::of).orElse(OptionalInt.empty());

boolean spillEnabled = isSpillEnabled(context.getSession());
boolean spillEnabled = isSpillEnabled(context.getSession()) && isJoinSpillingEnabled(context.getSession());
boolean buildOuter = node.getType() == RIGHT || node.getType() == FULL;
int partitionCount = buildContext.getDriverInstanceCount().orElse(1);

Expand Down Expand Up @@ -2338,7 +2339,7 @@ private OperatorFactory createLookupJoin(
private OptionalInt getJoinOperatorsCountForSpill(LocalExecutionPlanContext context, Session session)
{
OptionalInt driverInstanceCount = context.getDriverInstanceCount();
if (isSpillEnabled(session)) {
if (isSpillEnabled(session) && isJoinSpillingEnabled(session)) {
checkState(driverInstanceCount.isPresent(), "A fixed distribution is required for JOIN when spilling is enabled");
}
return driverInstanceCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.Optional;

import static com.facebook.presto.SystemSessionProperties.getTaskConcurrency;
import static com.facebook.presto.SystemSessionProperties.isJoinSpillingEnabled;
import static com.facebook.presto.SystemSessionProperties.isSpillEnabled;
import static com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher.searchFrom;
import static com.facebook.presto.sql.planner.optimizations.StreamPreferredProperties.defaultParallelism;
Expand Down Expand Up @@ -189,7 +190,7 @@ private boolean isSwappedJoinValid(JoinNode join)
private boolean checkProbeSidePropertySatisfied(PlanNode node, Context context)
{
StreamPreferredProperties requiredProbeProperty;
if (isSpillEnabled(context.getSession())) {
if (isSpillEnabled(context.getSession()) && isJoinSpillingEnabled(context.getSession())) {
requiredProbeProperty = fixedParallelism();
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
import static com.facebook.presto.SystemSessionProperties.getTaskPartitionedWriterCount;
import static com.facebook.presto.SystemSessionProperties.getTaskWriterCount;
import static com.facebook.presto.SystemSessionProperties.isDistributedSortEnabled;
import static com.facebook.presto.SystemSessionProperties.isJoinSpillingEnabled;
import static com.facebook.presto.SystemSessionProperties.isSpillEnabled;
import static com.facebook.presto.SystemSessionProperties.isTableWriterMergeOperatorEnabled;
import static com.facebook.presto.common.type.BigintType.BIGINT;
Expand Down Expand Up @@ -663,7 +664,7 @@ public PlanWithProperties visitUnion(UnionNode node, StreamPreferredProperties p
public PlanWithProperties visitJoin(JoinNode node, StreamPreferredProperties parentPreferences)
{
PlanWithProperties probe;
if (isSpillEnabled(session)) {
if (isSpillEnabled(session) && isJoinSpillingEnabled(session)) {
probe = planAndEnforce(
node.getLeft(),
fixedParallelism(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package com.facebook.presto.sql.planner.optimizations;

import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.metadata.TableLayout;
Expand Down Expand Up @@ -84,7 +83,9 @@
import java.util.Set;
import java.util.stream.Collectors;

import static com.facebook.presto.SystemSessionProperties.isJoinSpillingEnabled;
import static com.facebook.presto.SystemSessionProperties.isOptimizeFullOuterJoinWithCoalesce;
import static com.facebook.presto.SystemSessionProperties.isSpillEnabled;
import static com.facebook.presto.SystemSessionProperties.planWithTableNodePartitioning;
import static com.facebook.presto.common.predicate.TupleDomain.toLinkedMap;
import static com.facebook.presto.spi.relation.DomainTranslator.BASIC_COLUMN_EXTRACTOR;
Expand Down Expand Up @@ -784,7 +785,7 @@ private static Optional<List<VariableReferenceExpression>> translateToNonConstan

static boolean spillPossible(Session session, JoinNode.Type joinType)
{
if (!SystemSessionProperties.isSpillEnabled(session)) {
if (!isSpillEnabled(session) || !isJoinSpillingEnabled(session)) {
return false;
}
switch (joinType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.List;

import static com.facebook.presto.SystemSessionProperties.getTaskConcurrency;
import static com.facebook.presto.SystemSessionProperties.isJoinSpillingEnabled;
import static com.facebook.presto.SystemSessionProperties.isSpillEnabled;
import static com.facebook.presto.sql.planner.optimizations.PlanNodeSearcher.searchFrom;
import static com.facebook.presto.sql.planner.optimizations.StreamPreferredProperties.defaultParallelism;
Expand Down Expand Up @@ -93,7 +94,7 @@ public Void visitJoin(JoinNode node, Void context)
checkArgument(requiredBuildProperty.isSatisfiedBy(buildProperties), "Build side needs an additional local exchange for join: %s", node.getId());

StreamPreferredProperties requiredProbeProperty;
if (isSpillEnabled(session)) {
if (isSpillEnabled(session) && isJoinSpillingEnabled(session)) {
requiredProbeProperty = fixedParallelism();
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ public void testDefaults()
.setRe2JDfaStatesLimit(Integer.MAX_VALUE)
.setRe2JDfaRetries(5)
.setSpillEnabled(false)
.setJoinSpillingEnabled(false)
.setAggregationOperatorUnspillMemoryLimit(DataSize.valueOf("4MB"))
.setSpillerSpillPaths("")
.setSpillerThreads(4)
Expand Down Expand Up @@ -208,6 +209,7 @@ public void testExplicitPropertyMappings()
.put("re2j.dfa-states-limit", "42")
.put("re2j.dfa-retries", "42")
.put("experimental.spill-enabled", "true")
.put("experimental.join-spill-enabled", "true")
.put("experimental.aggregation-operator-unspill-memory-limit", "100MB")
.put("experimental.spiller-spill-path", "/tmp/custom/spill/path1,/tmp/custom/spill/path2")
.put("experimental.spiller-threads", "42")
Expand Down Expand Up @@ -302,6 +304,7 @@ public void testExplicitPropertyMappings()
.setRe2JDfaStatesLimit(42)
.setRe2JDfaRetries(42)
.setSpillEnabled(true)
.setJoinSpillingEnabled(true)
.setAggregationOperatorUnspillMemoryLimit(DataSize.valueOf("100MB"))
.setSpillerSpillPaths("/tmp/custom/spill/path1,/tmp/custom/spill/path2")
.setSpillerThreads(42)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@ public static DistributedQueryRunner createQueryRunner()
.setSchema(TINY_SCHEMA_NAME)
.setSystemProperty(SystemSessionProperties.TASK_CONCURRENCY, "2")
.setSystemProperty(SystemSessionProperties.SPILL_ENABLED, "true")
.setSystemProperty(SystemSessionProperties.JOIN_SPILL_ENABLED, "true")
.setSystemProperty(SystemSessionProperties.AGGREGATION_OPERATOR_UNSPILL_MEMORY_LIMIT, "128kB")
.setSystemProperty(SystemSessionProperties.USE_MARK_DISTINCT, "false")
.build();

ImmutableMap<String, String> extraProperties = ImmutableMap.<String, String>builder()
.put("experimental.spill-enabled", "true")
.put("experimental.spiller-spill-path", Paths.get(System.getProperty("java.io.tmpdir"), "presto", "spills").toString())
.put("experimental.spiller-max-used-space-threshold", "1.0")
.put("experimental.memory-revoking-threshold", "0.0") // revoke always
Expand Down

0 comments on commit d8be70d

Please sign in to comment.