Skip to content

Commit

Permalink
Propagate runtime stats to page source provider
Browse files Browse the repository at this point in the history
  • Loading branch information
NikhilCollooru committed Jun 11, 2024
1 parent a795025 commit 2579141
Show file tree
Hide file tree
Showing 29 changed files with 117 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ protected final OperatorFactory createTableScanOperator(int operatorId, PlanNode
public Operator createOperator(DriverContext driverContext)
{
OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, planNodeId, "BenchmarkSource");
ConnectorPageSource pageSource = localQueryRunner.getPageSourceManager().createPageSource(session, split, tableHandle.withDynamicFilter(TupleDomain::all), columnHandles);
ConnectorPageSource pageSource = localQueryRunner.getPageSourceManager().createPageSource(session, split, tableHandle.withDynamicFilter(TupleDomain::all), columnHandles, operatorContext.getRuntimeStats());
return new PageSourceOperator(pageSource, operatorContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.plugin.bigquery;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorPageSource;
import com.facebook.presto.spi.ConnectorSession;
Expand Down Expand Up @@ -51,7 +52,8 @@ public ConnectorPageSource createPageSource(
ConnectorSplit split,
ConnectorTableLayoutHandle layout,
List<ColumnHandle> columns,
SplitContext splitContext)
SplitContext splitContext,
RuntimeStats runtimeStats)
{
BigQuerySplit bigQuerySplit = (BigQuerySplit) split;
if (bigQuerySplit.representsEmptyProjection()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ public ConnectorPageSource createPageSource(
ConnectorSplit split,
ConnectorTableLayoutHandle layout,
List<ColumnHandle> columns,
SplitContext splitContext)
SplitContext splitContext,
RuntimeStats runtimeStats)
{
DeltaSplit deltaSplit = (DeltaSplit) split;
DeltaTableLayoutHandle deltaTableLayoutHandle = (DeltaTableLayoutHandle) layout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.elasticsearch;

import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.type.StandardTypes;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
Expand Down Expand Up @@ -55,7 +56,8 @@ public ConnectorPageSource createPageSource(
ConnectorSplit split,
ConnectorTableLayoutHandle layout,
List<ColumnHandle> columns,
SplitContext splitContext)
SplitContext splitContext,
RuntimeStats runtimeStats)
{
requireNonNull(split, "split is null");
requireNonNull(layout, "layout is null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class HiveFileContext
private final long modificationTime;
private final boolean verboseRuntimeStatsEnabled;

private final RuntimeStats stats = new RuntimeStats();
private final RuntimeStats stats;

public HiveFileContext(
boolean cacheable,
Expand All @@ -59,6 +59,20 @@ public HiveFileContext(
OptionalLong length,
long modificationTime,
boolean verboseRuntimeStatsEnabled)
{
this(cacheable, cacheQuota, extraFileInfo, fileSize, startOffset, length, modificationTime, verboseRuntimeStatsEnabled, new RuntimeStats());
}

public HiveFileContext(
boolean cacheable,
CacheQuota cacheQuota,
Optional<ExtraHiveFileInfo<?>> extraFileInfo,
OptionalLong fileSize,
OptionalLong startOffset,
OptionalLong length,
long modificationTime,
boolean verboseRuntimeStatsEnabled,
RuntimeStats runtimeStats)
{
this.cacheable = cacheable;
this.cacheQuota = requireNonNull(cacheQuota, "cacheQuota is null");
Expand All @@ -68,6 +82,7 @@ public HiveFileContext(
this.length = requireNonNull(length, "length is null");
this.modificationTime = modificationTime;
this.verboseRuntimeStatsEnabled = verboseRuntimeStatsEnabled;
this.stats = requireNonNull(runtimeStats, "runtimeStats is null");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.hive;

import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.Subfield;
import com.facebook.presto.common.Subfield.NestedField;
import com.facebook.presto.common.Subfield.PathElement;
Expand Down Expand Up @@ -137,7 +138,8 @@ public ConnectorPageSource createPageSource(
ConnectorSplit split,
ConnectorTableLayoutHandle layout,
List<ColumnHandle> columns,
SplitContext splitContext)
SplitContext splitContext,
RuntimeStats runtimeStats)
{
HiveTableLayoutHandle hiveLayout = (HiveTableLayoutHandle) layout;

Expand Down Expand Up @@ -167,7 +169,8 @@ public ConnectorPageSource createPageSource(
OptionalLong.of(hiveSplit.getFileSplit().getStart()),
OptionalLong.of(hiveSplit.getFileSplit().getLength()),
hiveSplit.getFileSplit().getFileModifiedTime(),
HiveSessionProperties.isVerboseRuntimeStatsEnabled(session));
HiveSessionProperties.isVerboseRuntimeStatsEnabled(session),
runtimeStats);

if (columns.stream().anyMatch(columnHandle -> ((HiveColumnHandle) columnHandle).getColumnType().equals(AGGREGATED))) {
checkArgument(columns.stream().allMatch(columnHandle -> ((HiveColumnHandle) columnHandle).getColumnType().equals(AGGREGATED)), "Not all columns are of 'AGGREGATED' type");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2196,7 +2196,7 @@ private void doTestBucketedTableEvolutionWithDifferentReadCount(HiveStorageForma

ImmutableList.Builder<MaterializedRow> allRows = ImmutableList.builder();
for (ConnectorSplit split : splits) {
try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, tableHandle.getLayout().get(), columnHandles, NON_CACHEABLE)) {
try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, tableHandle.getLayout().get(), columnHandles, NON_CACHEABLE, new RuntimeStats())) {
MaterializedResult intermediateResult = materializeSourceDataStream(session, pageSource, getTypes(columnHandles));
allRows.addAll(intermediateResult.getMaterializedRows());
}
Expand Down Expand Up @@ -2441,7 +2441,7 @@ public void testGetRecords()

long rowNumber = 0;
long completedBytes = 0;
try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, hiveSplit, layoutHandle, columnHandles, NON_CACHEABLE)) {
try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, hiveSplit, layoutHandle, columnHandles, NON_CACHEABLE, new RuntimeStats())) {
MaterializedResult result = materializeSourceDataStream(session, pageSource, getTypes(columnHandles));

assertPageSourceType(pageSource, fileType);
Expand Down Expand Up @@ -2532,7 +2532,7 @@ public void testGetPartialRecords()
int dummyPartition = Integer.parseInt(partitionKeys.get(2).getValue().orElse(null));

long rowNumber = 0;
try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, hiveSplit, layoutHandle, columnHandles, NON_CACHEABLE)) {
try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, hiveSplit, layoutHandle, columnHandles, NON_CACHEABLE, new RuntimeStats())) {
assertPageSourceType(pageSource, fileType);
MaterializedResult result = materializeSourceDataStream(session, pageSource, getTypes(columnHandles));
for (MaterializedRow row : result) {
Expand Down Expand Up @@ -2571,7 +2571,7 @@ public void testGetRecordsUnpartitioned()
assertEquals(hiveSplit.getPartitionKeys(), ImmutableList.of());

long rowNumber = 0;
try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, layoutHandle, columnHandles, NON_CACHEABLE)) {
try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, layoutHandle, columnHandles, NON_CACHEABLE, new RuntimeStats())) {
assertPageSourceType(pageSource, TEXTFILE);
MaterializedResult result = materializeSourceDataStream(session, pageSource, getTypes(columnHandles));

Expand Down Expand Up @@ -2639,7 +2639,7 @@ public void testPartitionSchemaNonCanonical()
ConnectorSplit split = getOnlyElement(getAllSplits(splitSource));

ImmutableList<ColumnHandle> columnHandles = ImmutableList.of(column);
try (ConnectorPageSource ignored = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, layoutHandle, columnHandles, NON_CACHEABLE)) {
try (ConnectorPageSource ignored = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, layoutHandle, columnHandles, NON_CACHEABLE, new RuntimeStats())) {
fail("expected exception");
}
catch (PrestoException e) {
Expand Down Expand Up @@ -3149,7 +3149,7 @@ private void doTestBucketSortedTables(SchemaTableName table, boolean useTempPath

int actualRowCount = 0;
for (ConnectorSplit split : splits) {
try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, layoutHandle, columnHandles, NON_CACHEABLE)) {
try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, layoutHandle, columnHandles, NON_CACHEABLE, new RuntimeStats())) {
String lastValueAsc = null;
long lastValueDesc = -1;

Expand Down Expand Up @@ -4971,7 +4971,7 @@ protected void assertGetRecords(String tableName, HiveStorageFormat hiveStorageF

List<ColumnHandle> columnHandles = ImmutableList.copyOf(metadata.getColumnHandles(session, tableHandle).values());

ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, hiveSplit, layoutHandle, columnHandles, NON_CACHEABLE);
ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, hiveSplit, layoutHandle, columnHandles, NON_CACHEABLE, new RuntimeStats());
assertGetRecords(hiveStorageFormat, tableMetadata, hiveSplit, pageSource, columnHandles);
}
}
Expand Down Expand Up @@ -5261,7 +5261,7 @@ protected MaterializedResult readTable(

ImmutableList.Builder<MaterializedRow> allRows = ImmutableList.builder();
for (ConnectorSplit split : splits) {
try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, tableHandle.getLayout().get(), columnHandles, NON_CACHEABLE)) {
try (ConnectorPageSource pageSource = pageSourceProvider.createPageSource(transaction.getTransactionHandle(), session, split, tableHandle.getLayout().get(), columnHandles, NON_CACHEABLE, new RuntimeStats())) {
expectedStorageFormat.ifPresent(format -> assertPageSourceType(pageSource, format));
MaterializedResult result = materializeSourceDataStream(session, pageSource, getTypes(columnHandles));
allRows.addAll(result.getMaterializedRows());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.airlift.stats.CounterStat;
import com.facebook.presto.GroupByHashPageIndexerFactory;
import com.facebook.presto.cache.CacheConfig;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.hive.AbstractTestHiveClient.HiveTransaction;
import com.facebook.presto.hive.AbstractTestHiveClient.Transaction;
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
Expand Down Expand Up @@ -480,7 +481,8 @@ private void createTable(MetastoreContext metastoreContext, SchemaTableName tabl
split,
tableHandle.getLayout().get(),
columnHandles,
NON_CACHEABLE)) {
NON_CACHEABLE,
new RuntimeStats())) {
MaterializedResult result = materializeSourceDataStream(session, pageSource, getTypes(columnHandles));
assertEqualsIgnoreOrder(result.getMaterializedRows(), data.getMaterializedRows());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.hive;

import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.hive.AbstractTestHiveClient.HiveTransaction;
import com.facebook.presto.hive.AbstractTestHiveClient.Transaction;
Expand Down Expand Up @@ -95,7 +96,8 @@ public static MaterializedResult readTable(SchemaTableName tableName,
split,
tableHandle.getLayout().get(),
columnHandles,
NON_CACHEABLE)) {
NON_CACHEABLE,
new RuntimeStats())) {
MaterializedResult pageSourceResult = materializeSourceDataStream(session, pageSource, allTypes);
for (MaterializedRow row : pageSourceResult.getMaterializedRows()) {
Object[] dataValues = IntStream.range(0, row.getFieldCount())
Expand Down Expand Up @@ -153,7 +155,8 @@ public static MaterializedResult filterTable(SchemaTableName tableName,
split,
tableHandle.getLayout().get(),
projectedColumns,
NON_CACHEABLE)) {
NON_CACHEABLE,
new RuntimeStats())) {
MaterializedResult pageSourceResult = materializeSourceDataStream(session, pageSource, allTypes);
for (MaterializedRow row : pageSourceResult.getMaterializedRows()) {
Object[] dataValues = IntStream.range(0, row.getFieldCount())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.airlift.testing.TempFile;
import com.facebook.presto.cache.CacheConfig;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.StandardTypes;
Expand Down Expand Up @@ -198,7 +199,7 @@ config, createTestHdfsEnvironment(config, metastoreClientConfig),
getDefaultHiveAggregatedPageSourceFactories(config, metastoreClientConfig),
FUNCTION_AND_TYPE_MANAGER,
ROW_EXPRESSION_SERVICE);
return provider.createPageSource(transaction, getSession(config), split, tableHandle.getLayout().get(), ImmutableList.copyOf(getColumnHandles()), splitContext);
return provider.createPageSource(transaction, getSession(config), split, tableHandle.getLayout().get(), ImmutableList.copyOf(getColumnHandles()), splitContext, new RuntimeStats());
}

private static TupleDomain<ColumnHandle> getToSkipTupleDomain()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.cache.CacheConfig;
import com.facebook.presto.common.Page;
import com.facebook.presto.common.PageBuilder;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.common.type.Type;
Expand Down Expand Up @@ -304,7 +305,7 @@ private static ConnectorPageSource createPageSource(HiveTransactionHandle transa
getDefaultHiveAggregatedPageSourceFactories(config, metastoreClientConfig),
FUNCTION_AND_TYPE_MANAGER,
ROW_EXPRESSION_SERVICE);
return provider.createPageSource(transaction, getSession(config, new HiveCommonClientConfig()), split, tableHandle.getLayout().get(), ImmutableList.copyOf(getColumnHandles()), NON_CACHEABLE);
return provider.createPageSource(transaction, getSession(config, new HiveCommonClientConfig()), split, tableHandle.getLayout().get(), ImmutableList.copyOf(getColumnHandles()), NON_CACHEABLE, new RuntimeStats());
}

private static ConnectorPageSink createPageSink(HiveTransactionHandle transaction, HiveClientConfig config, MetastoreClientConfig metastoreClientConfig, ExtendedHiveMetastore metastore, Path outputPath, HiveWriterStats stats)
Expand Down
Loading

0 comments on commit 2579141

Please sign in to comment.