diff --git a/presto-docs/src/main/sphinx/presto_cpp/properties.rst b/presto-docs/src/main/sphinx/presto_cpp/properties.rst index 07f23a64f5a7f..495b77c9decc2 100644 --- a/presto-docs/src/main/sphinx/presto_cpp/properties.rst +++ b/presto-docs/src/main/sphinx/presto_cpp/properties.rst @@ -30,7 +30,6 @@ Presto C++ workers. optimizer.optimize-hash-generation=false regex-library=RE2J use-alternative-function-signatures=true - experimental.table-writer-merge-operator-enabled=false These Presto coordinator configuration properties are described here, in alphabetical order. @@ -45,15 +44,6 @@ alphabetical order. Set this property to ``0`` to disable canceling. -``experimental.table-writer-merge-operator-enabled`` -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -* **Type:** ``boolean`` -* **Default value:** ``true`` - - Merge TableWriter output before sending to TableFinishOperator. This property must be set to - ``false``. - ``native-execution-enabled`` ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ diff --git a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java index 4b81f7e3a0e04..f66aeaf89a2ee 100644 --- a/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java +++ b/presto-main/src/main/java/com/facebook/presto/SystemSessionProperties.java @@ -199,7 +199,6 @@ public final class SystemSessionProperties public static final String MAX_CONCURRENT_MATERIALIZATIONS = "max_concurrent_materializations"; public static final String PUSHDOWN_SUBFIELDS_ENABLED = "pushdown_subfields_enabled"; public static final String PUSHDOWN_SUBFIELDS_FROM_LAMBDA_ENABLED = "pushdown_subfields_from_lambda_enabled"; - public static final String TABLE_WRITER_MERGE_OPERATOR_ENABLED = "table_writer_merge_operator_enabled"; public static final String INDEX_LOADER_TIMEOUT = "index_loader_timeout"; public static final String OPTIMIZED_REPARTITIONING_ENABLED = "optimized_repartitioning"; public static final String AGGREGATION_PARTITIONING_MERGING_STRATEGY = "aggregation_partitioning_merging_strategy"; @@ -1064,11 +1063,6 @@ public SystemSessionProperties( "Experimental: enable dereference pushdown", featuresConfig.isPushdownDereferenceEnabled(), false), - booleanProperty( - TABLE_WRITER_MERGE_OPERATOR_ENABLED, - "Experimental: enable table writer merge operator", - featuresConfig.isTableWriterMergeOperatorEnabled(), - false), new PropertyMetadata<>( INDEX_LOADER_TIMEOUT, "Timeout for loading indexes for index joins", @@ -2515,11 +2509,6 @@ public static boolean isPushdownDereferenceEnabled(Session session) return session.getSystemProperty(PUSHDOWN_DEREFERENCE_ENABLED, Boolean.class); } - public static boolean isTableWriterMergeOperatorEnabled(Session session) - { - return session.getSystemProperty(TABLE_WRITER_MERGE_OPERATOR_ENABLED, Boolean.class); - } - public static Duration getIndexLoaderTimeout(Session session) { return session.getSystemProperty(INDEX_LOADER_TIMEOUT, Duration.class); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/TemporaryTableUtil.java b/presto-main/src/main/java/com/facebook/presto/sql/TemporaryTableUtil.java index df31b5d4c8544..f40bc95a8d0a2 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/TemporaryTableUtil.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/TemporaryTableUtil.java @@ -66,7 +66,6 @@ import java.util.stream.Collectors; import static com.facebook.presto.SystemSessionProperties.getTaskPartitionedWriterCount; -import static com.facebook.presto.SystemSessionProperties.isTableWriterMergeOperatorEnabled; import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.common.type.VarbinaryType.VARBINARY; import static com.facebook.presto.spi.plan.ExchangeEncoding.COLUMNAR; @@ -327,52 +326,32 @@ public static TableFinishNode createTemporaryTableWriteWithExchanges( // Disabled by default. Enable when the column statistics are essential for future runtime adaptive plan optimizations boolean enableStatsCollectionForTemporaryTable = SystemSessionProperties.isEnableStatsCollectionForTemporaryTable(session); - - if (isTableWriterMergeOperatorEnabled(session)) { - StatisticAggregations.Parts localAggregations = splitIntoPartialAndIntermediate(aggregations.getPartialAggregation(), variableAllocator, metadata.getFunctionAndTypeManager()); - tableWriterMerge = new TableWriterMergeNode( - sourceLocation, - idAllocator.getNextId(), - gatheringExchange( - idAllocator.getNextId(), - LOCAL, - new TableWriterNode( - sourceLocation, - idAllocator.getNextId(), - writerSource, - Optional.of(insertReference), - variableAllocator.newVariable("partialrows", BIGINT), - variableAllocator.newVariable("partialfragments", VARBINARY), - variableAllocator.newVariable("partialtablecommitcontext", VARBINARY), - outputs, - outputColumnNames, - outputNotNullColumnVariables, - Optional.of(partitioningScheme), - enableStatsCollectionForTemporaryTable ? Optional.of(localAggregations.getPartialAggregation()) : Optional.empty(), - Optional.empty(), - Optional.of(Boolean.TRUE))), - variableAllocator.newVariable("intermediaterows", BIGINT), - variableAllocator.newVariable("intermediatefragments", VARBINARY), - variableAllocator.newVariable("intermediatetablecommitcontext", VARBINARY), - enableStatsCollectionForTemporaryTable ? Optional.of(localAggregations.getIntermediateAggregation()) : Optional.empty()); - } - else { - tableWriterMerge = new TableWriterNode( - sourceLocation, - idAllocator.getNextId(), - writerSource, - Optional.of(insertReference), - variableAllocator.newVariable("partialrows", BIGINT), - variableAllocator.newVariable("partialfragments", VARBINARY), - variableAllocator.newVariable("partialtablecommitcontext", VARBINARY), - outputs, - outputColumnNames, - outputNotNullColumnVariables, - Optional.of(partitioningScheme), - enableStatsCollectionForTemporaryTable ? Optional.of(aggregations.getPartialAggregation()) : Optional.empty(), - Optional.empty(), - Optional.of(Boolean.TRUE)); - } + StatisticAggregations.Parts localAggregations = splitIntoPartialAndIntermediate(aggregations.getPartialAggregation(), variableAllocator, metadata.getFunctionAndTypeManager()); + tableWriterMerge = new TableWriterMergeNode( + sourceLocation, + idAllocator.getNextId(), + gatheringExchange( + idAllocator.getNextId(), + LOCAL, + new TableWriterNode( + sourceLocation, + idAllocator.getNextId(), + writerSource, + Optional.of(insertReference), + variableAllocator.newVariable("partialrows", BIGINT), + variableAllocator.newVariable("partialfragments", VARBINARY), + variableAllocator.newVariable("partialtablecommitcontext", VARBINARY), + outputs, + outputColumnNames, + outputNotNullColumnVariables, + Optional.of(partitioningScheme), + enableStatsCollectionForTemporaryTable ? Optional.of(localAggregations.getPartialAggregation()) : Optional.empty(), + Optional.empty(), + Optional.of(Boolean.TRUE))), + variableAllocator.newVariable("intermediaterows", BIGINT), + variableAllocator.newVariable("intermediatefragments", VARBINARY), + variableAllocator.newVariable("intermediatetablecommitcontext", VARBINARY), + enableStatsCollectionForTemporaryTable ? Optional.of(localAggregations.getIntermediateAggregation()) : Optional.empty()); return new TableFinishNode( sourceLocation, diff --git a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index 0ba01d7e9649b..5c2040e6ade8a 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -58,7 +58,8 @@ "deprecated.legacy-join-using", "use-legacy-scheduler", "max-stage-retries", - "deprecated.group-by-uses-equal"}) + "deprecated.group-by-uses-equal", + "experimental.table-writer-merge-operator-enabled"}) public class FeaturesConfig { @VisibleForTesting @@ -177,8 +178,6 @@ public class FeaturesConfig private boolean pushdownSubfieldsEnabled; private boolean pushdownSubfieldsFromLambdaEnabled; - private boolean tableWriterMergeOperatorEnabled = true; - private Duration indexLoaderTimeout = new Duration(20, SECONDS); private boolean listBuiltInFunctionsOnly = true; @@ -1693,18 +1692,6 @@ public boolean isPushdownDereferenceEnabled() return pushdownDereferenceEnabled; } - public boolean isTableWriterMergeOperatorEnabled() - { - return tableWriterMergeOperatorEnabled; - } - - @Config("experimental.table-writer-merge-operator-enabled") - public FeaturesConfig setTableWriterMergeOperatorEnabled(boolean tableWriterMergeOperatorEnabled) - { - this.tableWriterMergeOperatorEnabled = tableWriterMergeOperatorEnabled; - return this; - } - @Config("index-loader-timeout") @ConfigDescription("Time limit for loading indexes for index joins") public FeaturesConfig setIndexLoaderTimeout(Duration indexLoaderTimeout) diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java index 2304e0eb1cbe8..d93be0f969d4d 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragmenterUtils.java @@ -47,7 +47,6 @@ import static com.facebook.presto.SystemSessionProperties.getQueryMaxStageCount; import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput; import static com.facebook.presto.SystemSessionProperties.isRecoverableGroupedExecutionEnabled; -import static com.facebook.presto.SystemSessionProperties.isTableWriterMergeOperatorEnabled; import static com.facebook.presto.spi.StandardErrorCode.QUERY_HAS_TOO_MANY_STAGES; import static com.facebook.presto.spi.StandardWarningCode.TOO_MANY_STAGES; import static com.facebook.presto.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION; @@ -168,7 +167,6 @@ private static SubPlan analyzeGroupedExecution(Session session, SubPlan subPlan, * - One table writer per task */ boolean recoverable = isRecoverableGroupedExecutionEnabled(session) && - isTableWriterMergeOperatorEnabled(session) && parentContainsTableFinish && (fragment.getRoot() instanceof TableWriterMergeNode || fragment.getRoot() instanceof TableWriterNode) && properties.isRecoveryEligible(); diff --git a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java index 0fde898e14801..d9dc2ff02e5a0 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/planner/optimizations/AddLocalExchanges.java @@ -74,7 +74,6 @@ import static com.facebook.presto.SystemSessionProperties.isQuickDistinctLimitEnabled; import static com.facebook.presto.SystemSessionProperties.isSegmentedAggregationEnabled; import static com.facebook.presto.SystemSessionProperties.isSpillEnabled; -import static com.facebook.presto.SystemSessionProperties.isTableWriterMergeOperatorEnabled; import static com.facebook.presto.common.type.BigintType.BIGINT; import static com.facebook.presto.common.type.VarbinaryType.VARBINARY; import static com.facebook.presto.operator.aggregation.AggregationUtils.hasSingleNodeExecutionPreference; @@ -549,10 +548,6 @@ public PlanWithProperties visitTableWriter(TableWriterNode originalTableWriterNo return planAndEnforceChildren(originalTableWriterNode, singleStream(), defaultParallelism(session)); } - if (!isTableWriterMergeOperatorEnabled(session)) { - return planAndEnforceChildren(originalTableWriterNode, fixedParallelism(), fixedParallelism()); - } - Optional statisticAggregations = originalTableWriterNode .getStatisticsAggregation() .map(aggregations -> splitIntoPartialAndIntermediate( diff --git a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index f169e275e7a33..4ec67782a1ab7 100644 --- a/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -153,7 +153,6 @@ public void testDefaults() .setPushdownSubfieldsEnabled(false) .setPushdownSubfieldsFromLambdaEnabled(false) .setPushdownDereferenceEnabled(false) - .setTableWriterMergeOperatorEnabled(true) .setIndexLoaderTimeout(new Duration(20, SECONDS)) .setOptimizedRepartitioningEnabled(false) .setListBuiltInFunctionsOnly(true) @@ -349,7 +348,6 @@ public void testExplicitPropertyMappings() .put("experimental.pushdown-subfields-enabled", "true") .put("pushdown-subfields-from-lambda-enabled", "true") .put("experimental.pushdown-dereference-enabled", "true") - .put("experimental.table-writer-merge-operator-enabled", "false") .put("index-loader-timeout", "10s") .put("experimental.optimized-repartitioning", "true") .put("list-built-in-functions-only", "false") @@ -542,7 +540,6 @@ public void testExplicitPropertyMappings() .setPushdownSubfieldsEnabled(true) .setPushdownSubfieldsFromLambdaEnabled(true) .setPushdownDereferenceEnabled(true) - .setTableWriterMergeOperatorEnabled(false) .setIndexLoaderTimeout(new Duration(10, SECONDS)) .setOptimizedRepartitioningEnabled(true) .setListBuiltInFunctionsOnly(false) diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/NativeQueryRunnerUtils.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/NativeQueryRunnerUtils.java index de7d0f33b1012..3afa67b275c22 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/NativeQueryRunnerUtils.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/NativeQueryRunnerUtils.java @@ -52,7 +52,6 @@ public static Map getNativeWorkerSystemProperties() // To achieve that, we set inline-sql-functions to false. .put("inline-sql-functions", "false") .put("use-alternative-function-signatures", "true") - .put("experimental.table-writer-merge-operator-enabled", "false") .build(); } diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeWriter.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeWriter.java index 4c4291b580739..0e6ac20b8eb51 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeWriter.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeWriter.java @@ -92,7 +92,6 @@ public void testTableFormats() { Session session = Session.builder(getSession()) .setSystemProperty("scale_writers", "true") - .setSystemProperty("table_writer_merge_operator_enabled", "true") .setSystemProperty("task_writer_count", "1") .setSystemProperty("task_partitioned_writer_count", "2") .setCatalogSessionProperty("hive", "collect_column_statistics_on_write", "true") @@ -441,7 +440,6 @@ private Session buildSessionForTableWrite() { return Session.builder(getSession()) .setSystemProperty("scale_writers", "true") - .setSystemProperty("table_writer_merge_operator_enabled", "true") .setSystemProperty("task_writer_count", "1") .setSystemProperty("task_partitioned_writer_count", "2") .setCatalogSessionProperty("hive", "collect_column_statistics_on_write", "true") diff --git a/presto-native-execution/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeSimpleQueries.java b/presto-native-execution/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeSimpleQueries.java index 40207a517d020..1056502f3fca8 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeSimpleQueries.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/spark/TestPrestoSparkNativeSimpleQueries.java @@ -129,7 +129,6 @@ public void testJsonFileBasedFunction() public void testAggregationCompanionFunction() { Session session = Session.builder(getSession()) - .setSystemProperty("table_writer_merge_operator_enabled", "false") .setCatalogSessionProperty("hive", "collect_column_statistics_on_write", "false") .setCatalogSessionProperty("hive", "orc_compression_codec", "ZSTD") .build();