Skip to content

Commit

Permalink
Allow for batch updates of Glue column statistics
Browse files Browse the repository at this point in the history
  • Loading branch information
alexjo2144 authored and losipiuk committed Aug 16, 2021
1 parent 38937db commit c10cd01
Show file tree
Hide file tree
Showing 16 changed files with 187 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ public void updatePartitionStatistics(HiveIdentity identity, String databaseName
delegate.updatePartitionStatistics(identity, table, partitionName, update);
}

public void updatePartitionStatistics(HiveIdentity identity, String databaseName, String tableName, Map<String, Function<PartitionStatistics, PartitionStatistics>> updates)
{
Table table = getExistingTable(identity, databaseName, tableName);
delegate.updatePartitionStatistics(identity, table, updates);
}

public List<String> getAllTables(String databaseName)
{
return delegate.getAllTables(databaseName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.hive.metastore;

import com.google.common.collect.ImmutableMap;
import io.trino.plugin.hive.HivePartition;
import io.trino.plugin.hive.HiveType;
import io.trino.plugin.hive.PartitionStatistics;
Expand Down Expand Up @@ -49,7 +50,12 @@ public interface HiveMetastore

void updateTableStatistics(HiveIdentity identity, String databaseName, String tableName, AcidTransaction transaction, Function<PartitionStatistics, PartitionStatistics> update);

void updatePartitionStatistics(HiveIdentity identity, Table table, String partitionName, Function<PartitionStatistics, PartitionStatistics> update);
default void updatePartitionStatistics(HiveIdentity identity, Table table, String partitionName, Function<PartitionStatistics, PartitionStatistics> update)
{
updatePartitionStatistics(identity, table, ImmutableMap.of(partitionName, update));
}

void updatePartitionStatistics(HiveIdentity identity, Table table, Map<String, Function<PartitionStatistics, PartitionStatistics>> updates);

List<String> getAllTables(String databaseName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,13 @@ public void updatePartitionStatistics(HiveIdentity identity, Table table, String
delegate.updatePartitionStatistics(identity, table, partitionName, update);
}

@Override
public void updatePartitionStatistics(HiveIdentity identity, Table table, Map<String, Function<PartitionStatistics, PartitionStatistics>> updates)
{
verifyRecordingMode();
delegate.updatePartitionStatistics(identity, table, updates);
}

@Override
public List<String> getAllTables(String databaseName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.trino.plugin.hive.HiveErrorCode.HIVE_CORRUPTED_COLUMN_STATISTICS;
Expand Down Expand Up @@ -385,14 +386,16 @@ public synchronized void setTableStatistics(HiveIdentity identity, Table table,
// TODO: Allow updating statistics for 2 tables in the same transaction
public synchronized void setPartitionStatistics(HiveIdentity identity, Table table, Map<List<String>, PartitionStatistics> partitionStatisticsMap)
{
Map<String, Function<PartitionStatistics, PartitionStatistics>> updates = partitionStatisticsMap.entrySet().stream().collect(
toImmutableMap(
entry -> getPartitionName(table, entry.getKey()),
entry -> oldPartitionStats -> updatePartitionStatistics(oldPartitionStats, entry.getValue())));
setExclusive((delegate, hdfsEnvironment) ->
partitionStatisticsMap.forEach((partitionValues, newPartitionStats) ->
delegate.updatePartitionStatistics(
identity,
table.getDatabaseName(),
table.getTableName(),
getPartitionName(table, partitionValues),
oldPartitionStats -> updatePartitionStatistics(oldPartitionStats, newPartitionStats))));
delegate.updatePartitionStatistics(
identity,
table.getDatabaseName(),
table.getTableName(),
updates));
}

// For HiveBasicStatistics, we only overwrite the original statistics if the new one is not empty.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,8 +197,7 @@ public void updateTableStatistics(
public void updatePartitionStatistics(
HiveIdentity identity,
Table table,
String partitionName,
Function<PartitionStatistics, PartitionStatistics> update)
Map<String, Function<PartitionStatistics, PartitionStatistics>> updates)
{
throw new TrinoException(NOT_SUPPORTED, "updatePartitionStatistics");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,23 @@ public void updatePartitionStatistics(HiveIdentity identity, Table table, String
}
}

@Override
public void updatePartitionStatistics(HiveIdentity identity, Table table, Map<String, Function<PartitionStatistics, PartitionStatistics>> updates)
{
try {
delegate.updatePartitionStatistics(updateIdentity(identity), table, updates);
}
finally {
HiveIdentity hiveIdentity = updateIdentity(identity);
updates.forEach((partitionName, update) -> {
HivePartitionName hivePartitionName = hivePartitionName(hiveTableName(table.getDatabaseName(), table.getTableName()), partitionName);
partitionStatisticsCache.invalidate(new WithIdentity<>(hiveIdentity, hivePartitionName));
// basic stats are stored as partition properties
partitionCache.invalidate(new WithIdentity<>(hiveIdentity, hivePartitionName));
});
}
}

@Override
public List<String> getAllTables(String databaseName)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -429,21 +429,23 @@ public synchronized void updateTableStatistics(HiveIdentity identity, String dat
}

@Override
public synchronized void updatePartitionStatistics(HiveIdentity identity, Table table, String partitionName, Function<PartitionStatistics, PartitionStatistics> update)
public synchronized void updatePartitionStatistics(HiveIdentity identity, Table table, Map<String, Function<PartitionStatistics, PartitionStatistics>> updates)
{
PartitionStatistics originalStatistics = getPartitionStatistics(table, extractPartitionValues(partitionName));
PartitionStatistics updatedStatistics = update.apply(originalStatistics);
updates.forEach((partitionName, update) -> {
PartitionStatistics originalStatistics = getPartitionStatistics(table, extractPartitionValues(partitionName));
PartitionStatistics updatedStatistics = update.apply(originalStatistics);

List<String> partitionValues = extractPartitionValues(partitionName);
Path partitionDirectory = getPartitionMetadataDirectory(table, partitionValues);
PartitionMetadata partitionMetadata = readSchemaFile("partition", partitionDirectory, partitionCodec)
.orElseThrow(() -> new PartitionNotFoundException(new SchemaTableName(table.getDatabaseName(), table.getTableName()), partitionValues));
List<String> partitionValues = extractPartitionValues(partitionName);
Path partitionDirectory = getPartitionMetadataDirectory(table, partitionValues);
PartitionMetadata partitionMetadata = readSchemaFile("partition", partitionDirectory, partitionCodec)
.orElseThrow(() -> new PartitionNotFoundException(new SchemaTableName(table.getDatabaseName(), table.getTableName()), partitionValues));

PartitionMetadata updatedMetadata = partitionMetadata
.withParameters(updateStatisticsParameters(partitionMetadata.getParameters(), updatedStatistics.getBasicStatistics()))
.withColumnStatistics(updatedStatistics.getColumnStatistics());
PartitionMetadata updatedMetadata = partitionMetadata
.withParameters(updateStatisticsParameters(partitionMetadata.getParameters(), updatedStatistics.getBasicStatistics()))
.withColumnStatistics(updatedStatistics.getColumnStatistics());

writeSchemaFile("partition", partitionDirectory, partitionCodec, updatedMetadata, true);
writeSchemaFile("partition", partitionDirectory, partitionCodec, updatedMetadata, true);
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.trino.spi.statistics.ColumnStatisticType;
import io.trino.spi.type.Type;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -261,46 +262,51 @@ public void updateTableColumnStatistics(Table table, Map<String, HiveColumnStati
}

@Override
public void updatePartitionStatistics(Partition partition, Map<String, HiveColumnStatistics> updatedColumnStatistics)
public void updatePartitionStatistics(Set<PartitionStatisticsUpdate> partitionStatisticsUpdates)
{
try {
Map<Partition, Map<String, HiveColumnStatistics>> currentStatistics = getPartitionColumnStatistics(
partitionStatisticsUpdates.stream()
.map(PartitionStatisticsUpdate::getPartition).collect(toImmutableList()));

List<CompletableFuture<Void>> updateFutures = new ArrayList<>();
for (PartitionStatisticsUpdate update : partitionStatisticsUpdates) {
Partition partition = update.getPartition();
Map<String, HiveColumnStatistics> updatedColumnStatistics = update.getColumnStatistics();

HiveBasicStatistics partitionStats = getHiveBasicStatistics(partition.getParameters());
List<ColumnStatistics> columnStats = toGlueColumnStatistics(partition, updatedColumnStatistics, partitionStats.getRowCount()).stream()
.filter(this::isGlueWritable)
.collect(toUnmodifiableList());

List<List<ColumnStatistics>> columnChunks = Lists.partition(columnStats, GLUE_COLUMN_WRITE_STAT_PAGE_SIZE);

List<CompletableFuture<Void>> writePartitionStatsFutures = columnChunks.stream()
.map(columnChunk ->
runAsync(() -> glueClient.updateColumnStatisticsForPartition(new UpdateColumnStatisticsForPartitionRequest()
columnChunks.forEach(columnChunk ->
updateFutures.add(runAsync(() -> glueClient.updateColumnStatisticsForPartition(
new UpdateColumnStatisticsForPartitionRequest()
.withCatalogId(catalogId)
.withDatabaseName(partition.getDatabaseName())
.withTableName(partition.getTableName())
.withPartitionValues(partition.getValues())
.withColumnStatisticsList(columnChunk)), writeExecutor))
.collect(toUnmodifiableList());
.withColumnStatisticsList(columnChunk)),
writeExecutor)));

Map<String, HiveColumnStatistics> currentColumnStatistics = this.getPartitionColumnStatisticsIfPresent(partition).orElse(ImmutableMap.of());
Set<String> removedStatistics = difference(currentColumnStatistics.keySet(), updatedColumnStatistics.keySet());
List<CompletableFuture<Void>> deleteStatsFutures = removedStatistics.stream()
.map(column -> runAsync(() ->
glueClient.deleteColumnStatisticsForPartition(new DeleteColumnStatisticsForPartitionRequest()
Set<String> removedStatistics = difference(currentStatistics.get(partition).keySet(), updatedColumnStatistics.keySet());
removedStatistics.forEach(column ->
updateFutures.add(runAsync(() -> glueClient.deleteColumnStatisticsForPartition(
new DeleteColumnStatisticsForPartitionRequest()
.withCatalogId(catalogId)
.withDatabaseName(partition.getDatabaseName())
.withTableName(partition.getTableName())
.withPartitionValues(partition.getValues())
.withColumnName(column)), writeExecutor))
.collect(toUnmodifiableList());

ImmutableList<CompletableFuture<Void>> updateOperationsFutures = ImmutableList.<CompletableFuture<Void>>builder()
.addAll(writePartitionStatsFutures)
.addAll(deleteStatsFutures)
.build();

getFutureValue(allOf(updateOperationsFutures.toArray(CompletableFuture[]::new)));
.withColumnName(column)),
writeExecutor)));
}
try {
getFutureValue(allOf(updateFutures.toArray(CompletableFuture[]::new)));
}
catch (RuntimeException ex) {
if (ex.getCause() != null && ex.getCause() instanceof EntityNotFoundException) {
throw new TrinoException(HIVE_PARTITION_NOT_FOUND, ex.getCause());
}
throw new TrinoException(HIVE_METASTORE_ERROR, ex);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ public void updateTableColumnStatistics(Table table, Map<String, HiveColumnStati
}

@Override
public void updatePartitionStatistics(Partition partition, Map<String, HiveColumnStatistics> columnStatistics)
public void updatePartitionStatistics(Set<PartitionStatisticsUpdate> partitionStatisticsUpdates)
{
if (!columnStatistics.isEmpty()) {
if (partitionStatisticsUpdates.stream().anyMatch(update -> !update.getColumnStatistics().isEmpty())) {
throw new TrinoException(NOT_SUPPORTED, "Glue metastore column level statistics are disabled");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.hive.metastore.glue;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.trino.plugin.hive.metastore.HiveColumnStatistics;
import io.trino.plugin.hive.metastore.Partition;
Expand All @@ -24,6 +25,8 @@
import java.util.Map;
import java.util.Set;

import static java.util.Objects.requireNonNull;

public interface GlueColumnStatisticsProvider
{
Set<ColumnStatisticType> getSupportedColumnStatistics(Type type);
Expand All @@ -39,5 +42,32 @@ default Map<String, HiveColumnStatistics> getPartitionColumnStatistics(Partition

void updateTableColumnStatistics(Table table, Map<String, HiveColumnStatistics> columnStatistics);

void updatePartitionStatistics(Partition partition, Map<String, HiveColumnStatistics> columnStatistics);
default void updatePartitionStatistics(Partition partition, Map<String, HiveColumnStatistics> columnStatistics)
{
updatePartitionStatistics(ImmutableSet.of(new PartitionStatisticsUpdate(partition, columnStatistics)));
}

void updatePartitionStatistics(Set<PartitionStatisticsUpdate> partitionStatisticsUpdates);

class PartitionStatisticsUpdate
{
private final Partition partition;
private final Map<String, HiveColumnStatistics> columnStatistics;

public PartitionStatisticsUpdate(Partition partition, Map<String, HiveColumnStatistics> columnStatistics)
{
this.partition = requireNonNull(partition, "partition is null");
this.columnStatistics = ImmutableMap.copyOf(requireNonNull(columnStatistics, "columnStatistics is null"));
}

public Partition getPartition()
{
return partition;
}

public Map<String, HiveColumnStatistics> getColumnStatistics()
{
return columnStatistics;
}
}
}
Loading

0 comments on commit c10cd01

Please sign in to comment.