Skip to content

Commit

Permalink
[Iceberg] Collect superset of supported statistics
Browse files Browse the repository at this point in the history
The set of ColumnStatisticsMetadata defined by the Hive
and non-Hive connectors are not equivalent. However, it is possible
to collect the superset of the relevant metadata and use it for ANALYZE.
The returned statistics just need to be filtered out to contain only
the relevant column statistics.

This may include duplicate calculations for some statistics. For
example, with distinct values Iceberg puffin files can store the
result of sketch_theta for distinct values, but the code path for
storing the statistic in the HMS requires a direct value from
approx_distinct. Thus, ANALYZE may compute a value twice.
  • Loading branch information
ZacBlanco authored and tdcmeehan committed Dec 16, 2024
1 parent 6bce6c2 commit 22a70a7
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;

import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.hive.metastore.Statistics.fromComputedStatistics;
import static com.facebook.presto.spi.statistics.TableStatisticType.ROW_COUNT;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableMap.toImmutableMap;

public final class HiveStatisticsUtil
{
Expand Down Expand Up @@ -61,9 +63,14 @@ public static PartitionStatistics createPartitionStatistics(
ConnectorSession session,
Map<String, Type> columnTypes,
ComputedStatistics computedStatistics,
Set<ColumnStatisticMetadata> supportedColumnStatistics,
DateTimeZone timeZone)
{
Map<ColumnStatisticMetadata, Block> computedColumnStatistics = computedStatistics.getColumnStatistics();
Map<ColumnStatisticMetadata, Block> computedColumnStatistics = computedStatistics.getColumnStatistics()
.entrySet()
.stream()
.filter((entry) -> supportedColumnStatistics.contains(entry.getKey()))
.collect(toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));

Block rowCountBlock = Optional.ofNullable(computedStatistics.getTableStatistics().get(ROW_COUNT))
.orElseThrow(() -> new VerifyException("rowCount not present"));
Expand All @@ -73,6 +80,15 @@ public static PartitionStatistics createPartitionStatistics(
return createPartitionStatistics(session, rowCountOnlyBasicStatistics, columnTypes, computedColumnStatistics, timeZone);
}

public static PartitionStatistics createPartitionStatistics(
ConnectorSession session,
Map<String, Type> columnTypes,
ComputedStatistics computedStatistics,
DateTimeZone timeZone)
{
return createPartitionStatistics(session, columnTypes, computedStatistics, computedStatistics.getColumnStatistics().keySet(), timeZone);
}

public static Map<ColumnStatisticMetadata, Block> getColumnStatistics(Map<List<String>, ComputedStatistics> statistics, List<String> partitionValues)
{
return Optional.ofNullable(statistics.get(partitionValues))
Expand All @@ -81,10 +97,11 @@ public static Map<ColumnStatisticMetadata, Block> getColumnStatistics(Map<List<S
}

// TODO: Collect file count, on-disk size and in-memory size during ANALYZE

/**
* This method updates old {@link PartitionStatistics} with new statistics, only if the new
* partition stats are not empty. This method always overwrites each of the
* {@link HiveColumnStatistics} contained in the new partition statistics.
* This method updates old {@link PartitionStatistics} with new statistics, only if the new
* partition stats are not empty. This method always overwrites each of the
* {@link HiveColumnStatistics} contained in the new partition statistics.
*
* @param oldPartitionStats old version of partition statistics
* @param newPartitionStats new version of partition statistics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,10 +478,23 @@ public IcebergTableHandle getTableHandleForStatisticsCollection(ConnectorSession
@Override
public TableStatisticsMetadata getStatisticsCollectionMetadata(ConnectorSession session, ConnectorTableMetadata tableMetadata)
{
org.apache.iceberg.Table table = getIcebergTable(session, tableMetadata.getTable());
org.apache.iceberg.Table icebergTable = getIcebergTable(session, tableMetadata.getTable());
Set<ColumnStatisticMetadata> hiveColumnStatistics = getHiveSupportedColumnStatistics(session, icebergTable, tableMetadata);
Set<ColumnStatisticMetadata> supportedStatistics = ImmutableSet.<ColumnStatisticMetadata>builder()
.addAll(hiveColumnStatistics)
// iceberg table-supported statistics
.addAll(super.getStatisticsCollectionMetadata(session, tableMetadata).getColumnStatistics())
.build();
Set<TableStatisticType> tableStatistics = ImmutableSet.of(ROW_COUNT);
return new TableStatisticsMetadata(supportedStatistics, tableStatistics, emptyList());
}

private Set<ColumnStatisticMetadata> getHiveSupportedColumnStatistics(ConnectorSession session, org.apache.iceberg.Table table, ConnectorTableMetadata tableMetadata)
{
MetricsConfig metricsConfig = MetricsConfig.forTable(table);
Set<ColumnStatisticMetadata> columnStatistics = tableMetadata.getColumns().stream()
.filter(column -> !column.isHidden() && metricsConfig.columnMode(column.getName()) != None.get())
return tableMetadata.getColumns().stream()
.filter(column -> !column.isHidden())
.filter(column -> metricsConfig.columnMode(column.getName()) != None.get())
.flatMap(meta -> {
try {
return metastore.getSupportedColumnStatistics(getMetastoreContext(session), meta.getType())
Expand All @@ -494,9 +507,6 @@ public TableStatisticsMetadata getStatisticsCollectionMetadata(ConnectorSession
}
})
.collect(toImmutableSet());

Set<TableStatisticType> tableStatistics = ImmutableSet.of(ROW_COUNT);
return new TableStatisticsMetadata(columnStatistics, tableStatistics, emptyList());
}

@Override
Expand Down Expand Up @@ -525,11 +535,31 @@ public void finishStatisticsCollection(ConnectorSession session, ConnectorTableH
Map<List<String>, ComputedStatistics> computedStatisticsMap = createComputedStatisticsToPartitionMap(computedStatistics, partitionColumnNames, columnTypes);

// commit analyze to unpartitioned table
PartitionStatistics tableStatistics = createPartitionStatistics(session, columnTypes, computedStatisticsMap.get(ImmutableList.<String>of()), timeZone);
ConnectorTableMetadata metadata = getTableMetadata(session, tableHandle);
org.apache.iceberg.Table icebergTable = getIcebergTable(session, icebergTableHandle.getSchemaTableName());
Set<ColumnStatisticMetadata> hiveSupportedStatistics = getHiveSupportedColumnStatistics(session, icebergTable, metadata);
PartitionStatistics tableStatistics = createPartitionStatistics(
session,
columnTypes,
computedStatisticsMap.get(ImmutableList.<String>of()),
hiveSupportedStatistics,
timeZone);
metastore.updateTableStatistics(metastoreContext,
table.getDatabaseName(),
table.getTableName(),
oldStats -> updatePartitionStatistics(oldStats, tableStatistics));

Set<ColumnStatisticMetadata> icebergSupportedStatistics = super.getStatisticsCollectionMetadata(session, metadata).getColumnStatistics();
Collection<ComputedStatistics> icebergComputedStatistics = computedStatistics.stream().map(stat -> {
ComputedStatistics.Builder builder = ComputedStatistics.builder(stat.getGroupingColumns(), stat.getGroupingValues());
stat.getTableStatistics()
.forEach(builder::addTableStatistic);
stat.getColumnStatistics().entrySet().stream()
.filter(entry -> icebergSupportedStatistics.contains(entry.getKey()))
.forEach(entry -> builder.addColumnStatistic(entry.getKey(), entry.getValue()));
return builder.build();
}).collect(toImmutableList());
super.finishStatisticsCollection(session, tableHandle, icebergComputedStatistics);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public static TableStatistics mergeHiveStatistics(TableStatistics icebergStatist
.setRange(icebergColumnStats.getRange())
.setNullsFraction(icebergColumnStats.getNullsFraction())
.setDistinctValuesCount(icebergColumnStats.getDistinctValuesCount())
.setHistogram(icebergColumnStats.getHistogram())
.setRange(icebergColumnStats.getRange());
if (hiveColumnStats != null) {
// NDVs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public static DistributedQueryRunner createIcebergQueryRunner(
queryRunner.createCatalog("jmx", "jmx");
}

if (catalogType == HIVE.name()) {
if (catalogType.equals(HIVE.name())) {
ExtendedHiveMetastore metastore = getFileHiveMetastore(icebergDataDirectory);
if (!metastore.getDatabase(METASTORE_CONTEXT, "tpch").isPresent()) {
queryRunner.execute("CREATE SCHEMA tpch");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,27 @@
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.predicate.ValueSet;
import com.facebook.presto.common.transaction.TransactionId;
import com.facebook.presto.hive.HdfsConfiguration;
import com.facebook.presto.hive.HdfsConfigurationInitializer;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveClientConfig;
import com.facebook.presto.hive.HiveHdfsConfiguration;
import com.facebook.presto.hive.MetastoreClientConfig;
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.file.FileHiveMetastore;
import com.facebook.presto.iceberg.CatalogType;
import com.facebook.presto.iceberg.IcebergColumnHandle;
import com.facebook.presto.iceberg.IcebergHiveTableOperationsConfig;
import com.facebook.presto.iceberg.IcebergMetadataColumn;
import com.facebook.presto.iceberg.IcebergUtil;
import com.facebook.presto.metadata.CatalogManager;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.Constraint;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.analyzer.MetadataResolver;
import com.facebook.presto.spi.plan.TableScanNode;
Expand All @@ -39,26 +55,37 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.iceberg.StatisticsFile;
import org.apache.iceberg.Table;
import org.apache.iceberg.UpdateStatistics;
import org.intellij.lang.annotations.Language;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.io.File;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.facebook.presto.common.type.DoubleType.DOUBLE;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.PARTITION_KEY;
import static com.facebook.presto.hive.BaseHiveColumnHandle.ColumnType.REGULAR;
import static com.facebook.presto.hive.metastore.InMemoryCachingHiveMetastore.memoizeMetastore;
import static com.facebook.presto.iceberg.CatalogType.HIVE;
import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static com.facebook.presto.iceberg.IcebergQueryRunner.TEST_DATA_DIRECTORY;
import static com.facebook.presto.iceberg.IcebergQueryRunner.createIcebergQueryRunner;
import static com.facebook.presto.iceberg.IcebergSessionProperties.HIVE_METASTORE_STATISTICS_MERGE_STRATEGY;
import static com.facebook.presto.iceberg.IcebergSessionProperties.PUSHDOWN_FILTER_ENABLED;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES;
import static com.facebook.presto.testing.assertions.Assert.assertEquals;
Expand Down Expand Up @@ -159,6 +186,7 @@ public void testStatsWithPartitionedTableAnalyzed()
assertQuerySucceeds("CREATE TABLE statsWithPartitionAnalyze WITH (partitioning = ARRAY['orderdate']) as SELECT * FROM statsNoPartitionAnalyze");
assertQuerySucceeds("ANALYZE statsNoPartitionAnalyze");
assertQuerySucceeds("ANALYZE statsWithPartitionAnalyze");
deleteTableStatistics("statsWithPartitionAnalyze");
Metadata meta = getQueryRunner().getMetadata();
TransactionId txid = getQueryRunner().getTransactionManager().beginTransaction(false);
Session session = getSession().beginTransactionId(txid, getQueryRunner().getTransactionManager(), new AllowAllAccessControl());
Expand Down Expand Up @@ -295,12 +323,17 @@ public void testHiveStatisticsMergeFlags()
{
assertQuerySucceeds("CREATE TABLE mergeFlagsStats (i int, v varchar)");
assertQuerySucceeds("INSERT INTO mergeFlagsStats VALUES (0, '1'), (1, '22'), (2, '333'), (NULL, 'aaaaa'), (4, NULL)");
assertQuerySucceeds("ANALYZE mergeFlagsStats"); // stats stored in
assertQuerySucceeds("ANALYZE mergeFlagsStats");

// invalidate puffin files so only hive stats can be returned
deleteTableStatistics("mergeFlagsStats");

// Test stats without merging doesn't return NDVs or data size
Session session = Session.builder(getSession())
.setCatalogSessionProperty("iceberg", HIVE_METASTORE_STATISTICS_MERGE_STRATEGY, "")
.build();
TableStatistics stats = getTableStatistics(session, "mergeFlagsStats");

Map<String, ColumnStatistics> columnStatistics = getColumnNameMap(stats);
assertEquals(columnStatistics.get("i").getDistinctValuesCount(), Estimate.unknown());
assertEquals(columnStatistics.get("i").getDataSize(), Estimate.unknown());
Expand Down Expand Up @@ -468,6 +501,64 @@ static void assertStatValue(StatsSchema column, MaterializedResult result, Set<S
});
}

private void deleteTableStatistics(String tableName)
{
Table icebergTable = loadTable(tableName);
UpdateStatistics statsUpdate = icebergTable.updateStatistics();
icebergTable.statisticsFiles().stream().map(StatisticsFile::snapshotId).forEach(statsUpdate::removeStatistics);
statsUpdate.commit();
}

private Table loadTable(String tableName)
{
CatalogManager catalogManager = getDistributedQueryRunner().getCoordinator().getCatalogManager();
ConnectorId connectorId = catalogManager.getCatalog(ICEBERG_CATALOG).get().getConnectorId();

return IcebergUtil.getHiveIcebergTable(getFileHiveMetastore(),
getHdfsEnvironment(),
new IcebergHiveTableOperationsConfig(),
getQueryRunner().getDefaultSession().toConnectorSession(connectorId),
SchemaTableName.valueOf("tpch." + tableName));
}

protected ExtendedHiveMetastore getFileHiveMetastore()
{
FileHiveMetastore fileHiveMetastore = new FileHiveMetastore(getHdfsEnvironment(),
Optional.of(getCatalogDirectory(HIVE))
.filter(File::exists)
.map(File::getPath)
.orElseThrow(() -> new RuntimeException("Catalog directory does not exist: " + getCatalogDirectory(HIVE))),
"test");
return memoizeMetastore(fileHiveMetastore, false, 1000, 0);
}

protected static HdfsEnvironment getHdfsEnvironment()
{
HiveClientConfig hiveClientConfig = new HiveClientConfig();
MetastoreClientConfig metastoreClientConfig = new MetastoreClientConfig();
HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hiveClientConfig, metastoreClientConfig),
ImmutableSet.of(),
hiveClientConfig);
return new HdfsEnvironment(hdfsConfiguration, metastoreClientConfig, new NoHdfsAuthentication());
}

protected File getCatalogDirectory(CatalogType catalogType)
{
Path dataDirectory = getDistributedQueryRunner().getCoordinator().getDataDirectory();
switch (catalogType) {
case HIVE:
return dataDirectory
.resolve(TEST_DATA_DIRECTORY)
.resolve(HIVE.name())
.toFile();
case HADOOP:
case NESSIE:
return dataDirectory.toFile();
}

throw new PrestoException(NOT_SUPPORTED, "Unsupported Presto Iceberg catalog type " + catalogType);
}

private static Map<String, ColumnStatistics> getColumnNameMap(TableStatistics statistics)
{
return statistics.getColumnStatistics().entrySet().stream().collect(Collectors.toMap(e ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@

import static com.facebook.presto.SystemSessionProperties.isForceSingleNodeOutput;
import static com.facebook.presto.SystemSessionProperties.isSingleNodeExecutionEnabled;
import static com.facebook.presto.SystemSessionProperties.shouldOptimizerUseHistograms;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.sql.TemporaryTableUtil.assignPartitioningVariables;
import static com.facebook.presto.sql.TemporaryTableUtil.assignTemporaryTableColumnNames;
Expand Down Expand Up @@ -121,7 +122,7 @@ public BasePlanFragmenter(
this.idAllocator = requireNonNull(idAllocator, "idAllocator is null");
this.variableAllocator = requireNonNull(variableAllocator, "variableAllocator is null");
this.outputTableWriterNodeIds = ImmutableSet.copyOf(requireNonNull(outputTableWriterNodeIds, "outputTableWriterNodeIds is null"));
this.statisticsAggregationPlanner = new StatisticsAggregationPlanner(variableAllocator, metadata.getFunctionAndTypeManager(), session);
this.statisticsAggregationPlanner = new StatisticsAggregationPlanner(variableAllocator, metadata.getFunctionAndTypeManager(), session, shouldOptimizerUseHistograms(session));
}

public SubPlan buildRootFragment(PlanNode root, FragmentProperties properties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@
import java.util.Optional;
import java.util.Set;

import static com.facebook.presto.SystemSessionProperties.shouldOptimizerUseHistograms;
import static com.facebook.presto.common.type.BigintType.BIGINT;
import static com.facebook.presto.common.type.VarbinaryType.VARBINARY;
import static com.facebook.presto.metadata.MetadataUtil.getConnectorIdOrThrow;
Expand Down Expand Up @@ -134,7 +135,7 @@ public LogicalPlanner(
this.idAllocator = requireNonNull(idAllocator, "idAllocator is null");
this.metadata = requireNonNull(metadata, "metadata is null");
this.variableAllocator = requireNonNull(variableAllocator, "variableAllocator is null");
this.statisticsAggregationPlanner = new StatisticsAggregationPlanner(this.variableAllocator, metadata.getFunctionAndTypeManager(), session);
this.statisticsAggregationPlanner = new StatisticsAggregationPlanner(this.variableAllocator, metadata.getFunctionAndTypeManager(), session, shouldOptimizerUseHistograms(session));
this.sqlParser = requireNonNull(sqlParser, "sqlParser is null");
}

Expand Down
Loading

0 comments on commit 22a70a7

Please sign in to comment.