Skip to content

Commit

Permalink
Drop getPreferredShuffleLayout methods from SPI
Browse files Browse the repository at this point in the history
Use getNewTableLayout/getInsertLayout instead as they are expected to be
mutually exclusive with getPreferredShuffleLayout*
  • Loading branch information
arhimondr committed Nov 22, 2024
1 parent 72e5c57 commit 300ca07
Show file tree
Hide file tree
Showing 20 changed files with 54 additions and 219 deletions.
108 changes: 37 additions & 71 deletions presto-hive/src/main/java/com/facebook/presto/hive/HiveMetadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -2995,7 +2995,22 @@ public Optional<ConnectorNewTableLayout> getInsertLayout(ConnectorSession sessio

Optional<HiveBucketHandle> hiveBucketHandle = getHiveBucketHandle(session, table);
if (!hiveBucketHandle.isPresent()) {
return Optional.empty();
if (!isShufflePartitionedColumnsForTableWriteEnabled(session) || table.getPartitionColumns().isEmpty()) {
return Optional.empty();
}

// TODO: the shuffle partitioning could use a better hash function (instead of Hive bucket function)
HivePartitioningHandle partitioningHandle = createHiveCompatiblePartitioningHandle(
SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE,
table.getPartitionColumns().stream()
.map(Column::getType)
.collect(toList()),
OptionalInt.empty());
List<String> partitionedBy = table.getPartitionColumns().stream()
.map(Column::getName)
.collect(toList());

return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy));
}
HiveBucketProperty bucketProperty = table.getStorage().getBucketProperty()
.orElseThrow(() -> new NoSuchElementException("Bucket property should be set"));
Expand Down Expand Up @@ -3031,40 +3046,6 @@ public Optional<ConnectorNewTableLayout> getInsertLayout(ConnectorSession sessio
return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionColumns));
}

@Override
public Optional<ConnectorNewTableLayout> getPreferredShuffleLayoutForInsert(ConnectorSession session, ConnectorTableHandle tableHandle)
{
HiveTableHandle hiveTableHandle = (HiveTableHandle) tableHandle;
SchemaTableName tableName = hiveTableHandle.getSchemaTableName();
MetastoreContext metastoreContext = getMetastoreContext(session);
Table table = metastore.getTable(metastoreContext, tableName.getSchemaName(), tableName.getTableName())
.orElseThrow(() -> new TableNotFoundException(tableName));

Optional<HiveBucketHandle> hiveBucketHandle = getHiveBucketHandle(session, table);
if (hiveBucketHandle.isPresent()) {
// For bucketed table, table partitioning (i.e. the bucketing scheme) should be respected,
// and there is no additional preferred shuffle partitioning
return Optional.empty();
}

if (!isShufflePartitionedColumnsForTableWriteEnabled(session) || table.getPartitionColumns().isEmpty()) {
return Optional.empty();
}

// TODO: the shuffle partitioning could use a better hash function (instead of Hive bucket function)
HivePartitioningHandle partitioningHandle = createHiveCompatiblePartitioningHandle(
SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE,
table.getPartitionColumns().stream()
.map(Column::getType)
.collect(toList()),
OptionalInt.empty());
List<String> partitionedBy = table.getPartitionColumns().stream()
.map(Column::getName)
.collect(toList());

return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy));
}

@Override
public Optional<ConnectorNewTableLayout> getNewTableLayout(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
Expand All @@ -3073,7 +3054,27 @@ public Optional<ConnectorNewTableLayout> getNewTableLayout(ConnectorSession sess
validateCsvColumns(tableMetadata);
Optional<HiveBucketProperty> bucketProperty = getBucketProperty(tableMetadata.getProperties());
if (!bucketProperty.isPresent()) {
return Optional.empty();
List<String> partitionedBy = getPartitionedBy(tableMetadata.getProperties());
if (!isShufflePartitionedColumnsForTableWriteEnabled(session) || partitionedBy.isEmpty()) {
return Optional.empty();
}

List<HiveColumnHandle> columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy), typeTranslator);
Map<String, HiveColumnHandle> columnHandlesByName = Maps.uniqueIndex(columnHandles, HiveColumnHandle::getName);
List<Column> partitionColumns = partitionedBy.stream()
.map(columnHandlesByName::get)
.map(columnHandle -> columnHandleToColumn(session, columnHandle))
.collect(toList());

// TODO: the shuffle partitioning could use a better hash function (instead of Hive bucket function)
HivePartitioningHandle partitioningHandle = createHiveCompatiblePartitioningHandle(
SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE,
partitionColumns.stream()
.map(Column::getType)
.collect(toList()),
OptionalInt.empty());

return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy));
}
checkArgument(bucketProperty.get().getBucketFunctionType().equals(BucketFunctionType.HIVE_COMPATIBLE),
"bucketFunctionType is expected to be HIVE_COMPATIBLE, got: %s",
Expand All @@ -3096,41 +3097,6 @@ public Optional<ConnectorNewTableLayout> getNewTableLayout(ConnectorSession sess
bucketedBy));
}

@Override
public Optional<ConnectorNewTableLayout> getPreferredShuffleLayoutForNewTable(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
validatePartitionColumns(tableMetadata);
validateBucketColumns(tableMetadata);
Optional<HiveBucketProperty> bucketProperty = getBucketProperty(tableMetadata.getProperties());
if (bucketProperty.isPresent()) {
// For bucketed table, table partitioning (i.e. the bucketing scheme) should be respected,
// and there is no additional preferred shuffle partitioning
return Optional.empty();
}

List<String> partitionedBy = getPartitionedBy(tableMetadata.getProperties());
if (!isShufflePartitionedColumnsForTableWriteEnabled(session) || partitionedBy.isEmpty()) {
return Optional.empty();
}

List<HiveColumnHandle> columnHandles = getColumnHandles(tableMetadata, ImmutableSet.copyOf(partitionedBy), typeTranslator);
Map<String, HiveColumnHandle> columnHandlesByName = Maps.uniqueIndex(columnHandles, HiveColumnHandle::getName);
List<Column> partitionColumns = partitionedBy.stream()
.map(columnHandlesByName::get)
.map(columnHandle -> columnHandleToColumn(session, columnHandle))
.collect(toList());

// TODO: the shuffle partitioning could use a better hash function (instead of Hive bucket function)
HivePartitioningHandle partitioningHandle = createHiveCompatiblePartitioningHandle(
SHUFFLE_MAX_PARALLELISM_FOR_PARTITIONED_TABLE_WRITE,
partitionColumns.stream()
.map(Column::getType)
.collect(toList()),
OptionalInt.empty());

return Optional.of(new ConnectorNewTableLayout(partitioningHandle, partitionedBy));
}

@Override
public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,6 @@ public Optional<NewTableLayout> getNewTableLayout(Session session, String catalo
return delegate.getNewTableLayout(session, catalogName, tableMetadata);
}

@Override
public Optional<NewTableLayout> getPreferredShuffleLayoutForNewTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata)
{
return delegate.getPreferredShuffleLayoutForNewTable(session, catalogName, tableMetadata);
}

@Override
public OutputTableHandle beginCreateTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata, Optional<NewTableLayout> layout)
{
Expand All @@ -308,12 +302,6 @@ public Optional<NewTableLayout> getInsertLayout(Session session, TableHandle tar
return delegate.getInsertLayout(session, target);
}

@Override
public Optional<NewTableLayout> getPreferredShuffleLayoutForInsert(Session session, TableHandle target)
{
return delegate.getPreferredShuffleLayoutForInsert(session, target);
}

@Override
public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(Session session, String catalogName, ConnectorTableMetadata tableMetadata)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,6 @@ public interface Metadata

Optional<NewTableLayout> getNewTableLayout(Session session, String catalogName, ConnectorTableMetadata tableMetadata);

@Experimental
Optional<NewTableLayout> getPreferredShuffleLayoutForNewTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata);

/**
* Begin the atomic creation of a table with data.
*/
Expand All @@ -261,9 +258,6 @@ public interface Metadata

Optional<NewTableLayout> getInsertLayout(Session session, TableHandle target);

@Experimental
Optional<NewTableLayout> getPreferredShuffleLayoutForInsert(Session session, TableHandle target);

/**
* Describes statistics that must be collected during a write.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -733,17 +733,6 @@ public Optional<NewTableLayout> getInsertLayout(Session session, TableHandle tab
.map(layout -> new NewTableLayout(connectorId, catalogMetadata.getTransactionHandleFor(connectorId), layout));
}

@Override
public Optional<NewTableLayout> getPreferredShuffleLayoutForInsert(Session session, TableHandle table)
{
ConnectorId connectorId = table.getConnectorId();
CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, connectorId);
ConnectorMetadata metadata = catalogMetadata.getMetadata();

return metadata.getPreferredShuffleLayoutForInsert(session.toConnectorSession(connectorId), table.getConnectorHandle())
.map(layout -> new NewTableLayout(connectorId, catalogMetadata.getTransactionHandleFor(connectorId), layout));
}

@Override
public TableStatisticsMetadata getStatisticsCollectionMetadataForWrite(Session session, String catalogName, ConnectorTableMetadata tableMetadata)
{
Expand Down Expand Up @@ -795,19 +784,6 @@ public Optional<NewTableLayout> getNewTableLayout(Session session, String catalo
.map(layout -> new NewTableLayout(connectorId, transactionHandle, layout));
}

@Override
public Optional<NewTableLayout> getPreferredShuffleLayoutForNewTable(Session session, String catalogName, ConnectorTableMetadata tableMetadata)
{
CatalogMetadata catalogMetadata = getCatalogMetadataForWrite(session, catalogName);
ConnectorId connectorId = catalogMetadata.getConnectorId();
ConnectorMetadata metadata = catalogMetadata.getMetadata();

ConnectorTransactionHandle transactionHandle = catalogMetadata.getTransactionHandleFor(connectorId);
ConnectorSession connectorSession = session.toConnectorSession(connectorId);
return metadata.getPreferredShuffleLayoutForNewTable(connectorSession, tableMetadata)
.map(layout -> new NewTableLayout(connectorId, transactionHandle, layout));
}

@Override
public void beginQuery(Session session, Set<ConnectorId> connectors)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,6 @@ public static TableFinishNode createTemporaryTableWriteWithoutExchanges(
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.of(Boolean.TRUE)),
Optional.of(insertReference),
outputVar,
Expand Down Expand Up @@ -349,7 +348,6 @@ public static TableFinishNode createTemporaryTableWriteWithExchanges(
outputColumnNames,
outputNotNullColumnVariables,
Optional.of(partitioningScheme),
Optional.empty(),
enableStatsCollectionForTemporaryTable ? Optional.of(localAggregations.getPartialAggregation()) : Optional.empty(),
Optional.empty(),
Optional.of(Boolean.TRUE))),
Expand All @@ -371,7 +369,6 @@ public static TableFinishNode createTemporaryTableWriteWithExchanges(
outputColumnNames,
outputNotNullColumnVariables,
Optional.of(partitioningScheme),
Optional.empty(),
enableStatsCollectionForTemporaryTable ? Optional.of(aggregations.getPartialAggregation()) : Optional.empty(),
Optional.empty(),
Optional.of(Boolean.TRUE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,6 @@ public PlanNode visitTableWriter(TableWriterNode node, RewriteContext<FragmentPr
if (node.getTablePartitioningScheme().isPresent()) {
context.get().setDistribution(node.getTablePartitioningScheme().get().getPartitioning().getHandle(), metadata, session);
}
if (node.getPreferredShufflePartitioningScheme().isPresent()) {
context.get().setDistribution(node.getPreferredShufflePartitioningScheme().get().getPartitioning().getHandle(), metadata, session);
}
return context.defaultRewrite(node, context.get());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,6 @@ public Optional<PlanNode> visitTableWriter(TableWriterNode node, Context context
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty(),
Optional.empty());
context.addPlan(node, new CanonicalPlan(result, strategy));
return Optional.of(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,6 @@ private RelationPlan createTableCreationPlan(Analysis analysis, Query query)
analysis.getParameters(),
analysis.getCreateTableComment());
Optional<NewTableLayout> newTableLayout = metadata.getNewTableLayout(session, destination.getCatalogName(), tableMetadata);
Optional<NewTableLayout> preferredShuffleLayout = metadata.getPreferredShuffleLayoutForNewTable(session, destination.getCatalogName(), tableMetadata);

List<String> columnNames = tableMetadata.getColumns().stream()
.filter(column -> !column.isHidden())
.map(ColumnMetadata::getName)
Expand All @@ -291,7 +289,6 @@ private RelationPlan createTableCreationPlan(Analysis analysis, Query query)
columnNames,
tableMetadata.getColumns(),
newTableLayout,
preferredShuffleLayout,
statisticsMetadata);
}

Expand Down Expand Up @@ -372,7 +369,6 @@ private RelationPlan buildInternalInsertPlan(
plan = new RelationPlan(projectNode, scope, projectNode.getOutputVariables());

Optional<NewTableLayout> newTableLayout = metadata.getInsertLayout(session, tableHandle);
Optional<NewTableLayout> preferredShuffleLayout = metadata.getPreferredShuffleLayoutForInsert(session, tableHandle);

String catalogName = tableHandle.getConnectorId().getCatalogName();
TableStatisticsMetadata statisticsMetadata = metadata.getStatisticsCollectionMetadataForWrite(session, catalogName, tableMetadata.getMetadata());
Expand All @@ -384,7 +380,6 @@ private RelationPlan buildInternalInsertPlan(
visibleTableColumnNames,
visibleTableColumns,
newTableLayout,
preferredShuffleLayout,
statisticsMetadata);
}

Expand All @@ -395,11 +390,8 @@ private RelationPlan createTableWriterPlan(
List<String> columnNames,
List<ColumnMetadata> columnMetadataList,
Optional<NewTableLayout> writeTableLayout,
Optional<NewTableLayout> preferredShuffleLayout,
TableStatisticsMetadata statisticsMetadata)
{
verify(!(writeTableLayout.isPresent() && preferredShuffleLayout.isPresent()), "writeTableLayout and preferredShuffleLayout cannot both exist");

PlanNode source = plan.getRoot();

if (!analysis.isCreateTableAsSelectWithData()) {
Expand All @@ -408,7 +400,6 @@ private RelationPlan createTableWriterPlan(

List<VariableReferenceExpression> variables = plan.getFieldMappings();
Optional<PartitioningScheme> tablePartitioningScheme = getPartitioningSchemeForTableWrite(writeTableLayout, columnNames, variables);
Optional<PartitioningScheme> preferredShufflePartitioningScheme = getPartitioningSchemeForTableWrite(preferredShuffleLayout, columnNames, variables);

verify(columnNames.size() == variables.size(), "columnNames.size() != variables.size(): %s and %s", columnNames, variables);
Map<String, VariableReferenceExpression> columnToVariableMap = zip(columnNames.stream(), plan.getFieldMappings().stream(), SimpleImmutableEntry::new)
Expand Down Expand Up @@ -440,7 +431,6 @@ private RelationPlan createTableWriterPlan(
columnNames,
notNullColumnVariables,
tablePartitioningScheme,
preferredShufflePartitioningScheme,
// partial aggregation is run within the TableWriteOperator to calculate the statistics for
// the data consumed by the TableWriteOperator
Optional.of(aggregations.getPartialAggregation()),
Expand Down Expand Up @@ -471,7 +461,6 @@ private RelationPlan createTableWriterPlan(
columnNames,
notNullColumnVariables,
tablePartitioningScheme,
preferredShufflePartitioningScheme,
Optional.empty(),
Optional.empty(),
Optional.of(Boolean.FALSE)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class PushTableWriteThroughUnion
// guaranteed regardless of this optimizer. The level of local parallelism will be
// determined by LocalExecutionPlanner separately, and shouldn't be a concern of
// this optimizer.
.matching(tableWriter -> !(tableWriter.getTablePartitioningScheme().isPresent() || tableWriter.getPreferredShufflePartitioningScheme().isPresent()))
.matching(tableWriter -> !tableWriter.getTablePartitioningScheme().isPresent())
.with(source().matching(union().capturedAs(CHILD)));

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,6 @@ public Result apply(TableWriterNode node, Captures captures, Context context)
node.getColumnNames(),
node.getNotNullColumnVariables(),
node.getTablePartitioningScheme(),
node.getPreferredShufflePartitioningScheme(),
rewrittenStatisticsAggregation,
node.getTaskCountIfScaledWriter(),
node.getIsTemporaryTableWriter()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ public Result apply(TableWriterNode node, Captures captures, Context context)
node.getColumnNames(),
node.getNotNullColumnVariables(),
node.getTablePartitioningScheme(),
node.getPreferredShufflePartitioningScheme(),
node.getStatisticsAggregation(),
Optional.of(initialTaskNumber),
node.getIsTemporaryTableWriter()));
Expand Down
Loading

0 comments on commit 300ca07

Please sign in to comment.