Skip to content

Commit

Permalink
API,Core: Add scan planning metrics for skipped data/delete files (#5788
Browse files Browse the repository at this point in the history
)
  • Loading branch information
nastra authored Sep 21, 2022
1 parent a2c5380 commit 15b3e2f
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public void increment(long amount) {}
public long value() {
throw new UnsupportedOperationException("NOOP counter has no value");
}

@Override
public String toString() {
return "NOOP counter";
}
};

private final LongAdder counter;
Expand Down
20 changes: 20 additions & 0 deletions api/src/main/java/org/apache/iceberg/metrics/ScanReport.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ interface ScanMetricsResult {
@Nullable
CounterResult totalDeleteFileSizeInBytes();

@Nullable
CounterResult skippedDataFiles();

@Nullable
CounterResult skippedDeleteFiles();

static ScanMetricsResult fromScanMetrics(ScanMetrics scanMetrics) {
Preconditions.checkArgument(null != scanMetrics, "Invalid scan metrics: null");
return ImmutableScanMetricsResult.builder()
Expand All @@ -138,6 +144,8 @@ static ScanMetricsResult fromScanMetrics(ScanMetrics scanMetrics) {
.totalFileSizeInBytes(CounterResult.fromCounter(scanMetrics.totalFileSizeInBytes()))
.totalDeleteFileSizeInBytes(
CounterResult.fromCounter(scanMetrics.totalDeleteFileSizeInBytes()))
.skippedDataFiles(CounterResult.fromCounter(scanMetrics.skippedDataFiles()))
.skippedDeleteFiles(CounterResult.fromCounter(scanMetrics.skippedDeleteFiles()))
.build();
}
}
Expand All @@ -154,6 +162,8 @@ abstract class ScanMetrics {
public static final String TOTAL_FILE_SIZE_IN_BYTES = "total-file-size-in-bytes";
public static final String TOTAL_DELETE_FILE_SIZE_IN_BYTES = "total-delete-file-size-in-bytes";
public static final String SKIPPED_DATA_MANIFESTS = "skipped-data-manifests";
public static final String SKIPPED_DATA_FILES = "skipped-data-files";
public static final String SKIPPED_DELETE_FILES = "skipped-delete-files";

public static ScanMetrics noop() {
return ScanMetrics.of(MetricsContext.nullMetrics());
Expand Down Expand Up @@ -206,6 +216,16 @@ public Counter skippedDataManifests() {
return metricsContext().counter(SKIPPED_DATA_MANIFESTS, MetricsContext.Unit.COUNT);
}

@Value.Derived
public Counter skippedDataFiles() {
return metricsContext().counter(SKIPPED_DATA_FILES, MetricsContext.Unit.COUNT);
}

@Value.Derived
public Counter skippedDeleteFiles() {
return metricsContext().counter(SKIPPED_DELETE_FILES, MetricsContext.Unit.COUNT);
}

public static ScanMetrics of(MetricsContext metricsContext) {
return ImmutableScanMetrics.builder().metricsContext(metricsContext).build();
}
Expand Down
5 changes: 5 additions & 0 deletions api/src/main/java/org/apache/iceberg/metrics/Timer.java
Original file line number Diff line number Diff line change
Expand Up @@ -165,5 +165,10 @@ public <T> T timeCallable(Callable<T> callable) throws Exception {
public <T> T time(Supplier<T> supplier) {
return supplier.get();
}

@Override
public String toString() {
return "NOOP timer";
}
};
}
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,7 @@ private Iterable<CloseableIterable<ManifestEntry<DeleteFile>>> deleteManifestRea
.filterPartitions(partitionFilter)
.filterPartitions(partitionSet)
.caseSensitive(caseSensitive)
.scanMetrics(scanMetrics)
.liveEntries());
}
}
Expand Down
12 changes: 9 additions & 3 deletions core/src/main/java/org/apache/iceberg/ManifestGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -311,16 +311,22 @@ public CloseableIterator<T> iterator() {
if (ignoreExisting) {
entries =
CloseableIterable.filter(
entries, entry -> entry.status() != ManifestEntry.Status.EXISTING);
scanMetrics.skippedDataFiles(),
entries,
entry -> entry.status() != ManifestEntry.Status.EXISTING);
}

if (evaluator != null) {
entries =
CloseableIterable.filter(
entries, entry -> evaluator.eval((GenericDataFile) entry.file()));
scanMetrics.skippedDataFiles(),
entries,
entry -> evaluator.eval((GenericDataFile) entry.file()));
}

entries = CloseableIterable.filter(entries, manifestEntryPredicate);
entries =
CloseableIterable.filter(
scanMetrics.skippedDataFiles(), entries, manifestEntryPredicate);

iterable = entryFn.apply(manifest, entries);

Expand Down
9 changes: 8 additions & 1 deletion core/src/main/java/org/apache/iceberg/ManifestReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ CloseableIterable<ManifestEntry<F>> entries() {
requireStatsProjection ? withStatsColumns(columns) : columns;

return CloseableIterable.filter(
content == FileType.DATA_FILES
? scanMetrics.skippedDataFiles()
: scanMetrics.skippedDeleteFiles(),
open(projection(fileSchema, fileProjection, projectColumns, caseSensitive)),
entry ->
entry != null
Expand Down Expand Up @@ -255,7 +258,11 @@ private CloseableIterable<ManifestEntry<F>> open(Schema projection) {

CloseableIterable<ManifestEntry<F>> liveEntries() {
return CloseableIterable.filter(
entries(), entry -> entry != null && entry.status() != ManifestEntry.Status.DELETED);
content == FileType.DATA_FILES
? scanMetrics.skippedDataFiles()
: scanMetrics.skippedDeleteFiles(),
entries(),
entry -> entry != null && entry.status() != ManifestEntry.Status.DELETED);
}

/** @return an Iterator of DataFile. Makes defensive copies of files before returning */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,16 @@ static void toJson(ScanMetricsResult metrics, JsonGenerator gen) throws IOExcept
CounterResultParser.toJson(metrics.totalDeleteFileSizeInBytes(), gen);
}

if (null != metrics.skippedDataFiles()) {
gen.writeFieldName(ScanMetrics.SKIPPED_DATA_FILES);
CounterResultParser.toJson(metrics.skippedDataFiles(), gen);
}

if (null != metrics.skippedDeleteFiles()) {
gen.writeFieldName(ScanMetrics.SKIPPED_DELETE_FILES);
CounterResultParser.toJson(metrics.skippedDeleteFiles(), gen);
}

gen.writeEndObject();
}

Expand Down Expand Up @@ -115,6 +125,8 @@ static ScanMetricsResult fromJson(JsonNode json) {
CounterResultParser.fromJson(ScanMetrics.TOTAL_FILE_SIZE_IN_BYTES, json))
.totalDeleteFileSizeInBytes(
CounterResultParser.fromJson(ScanMetrics.TOTAL_DELETE_FILE_SIZE_IN_BYTES, json))
.skippedDataFiles(CounterResultParser.fromJson(ScanMetrics.SKIPPED_DATA_FILES, json))
.skippedDeleteFiles(CounterResultParser.fromJson(ScanMetrics.SKIPPED_DELETE_FILES, json))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.metrics.LoggingScanReporter;
import org.apache.iceberg.metrics.ScanReport;
import org.apache.iceberg.metrics.ScanReport.ScanMetricsResult;
import org.apache.iceberg.metrics.ScanReporter;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.Test;
Expand Down Expand Up @@ -61,16 +62,16 @@ public void scanningWithMultipleDataManifests() throws IOException {

assertThat(scanReport.tableName()).isEqualTo(tableName);
assertThat(scanReport.snapshotId()).isEqualTo(2L);
assertThat(scanReport.scanMetrics().totalPlanningDuration().totalDuration())
.isGreaterThan(Duration.ZERO);
assertThat(scanReport.scanMetrics().resultDataFiles().value()).isEqualTo(3);
assertThat(scanReport.scanMetrics().resultDeleteFiles().value()).isEqualTo(0);
assertThat(scanReport.scanMetrics().scannedDataManifests().value()).isEqualTo(2);
assertThat(scanReport.scanMetrics().skippedDataManifests().value()).isEqualTo(0);
assertThat(scanReport.scanMetrics().totalDataManifests().value()).isEqualTo(2);
assertThat(scanReport.scanMetrics().totalDeleteManifests().value()).isEqualTo(0);
assertThat(scanReport.scanMetrics().totalFileSizeInBytes().value()).isEqualTo(30L);
assertThat(scanReport.scanMetrics().totalDeleteFileSizeInBytes().value()).isEqualTo(0L);
ScanMetricsResult result = scanReport.scanMetrics();
assertThat(result.totalPlanningDuration().totalDuration()).isGreaterThan(Duration.ZERO);
assertThat(result.resultDataFiles().value()).isEqualTo(3);
assertThat(result.resultDeleteFiles().value()).isEqualTo(0);
assertThat(result.scannedDataManifests().value()).isEqualTo(2);
assertThat(result.skippedDataManifests().value()).isEqualTo(0);
assertThat(result.totalDataManifests().value()).isEqualTo(2);
assertThat(result.totalDeleteManifests().value()).isEqualTo(0);
assertThat(result.totalFileSizeInBytes().value()).isEqualTo(30L);
assertThat(result.totalDeleteFileSizeInBytes().value()).isEqualTo(0L);

// we should hit only a single data manifest and only a single data file
try (CloseableIterable<FileScanTask> fileScanTasks =
Expand All @@ -79,19 +80,19 @@ public void scanningWithMultipleDataManifests() throws IOException {
}

scanReport = reporter.lastReport();
result = scanReport.scanMetrics();
assertThat(scanReport).isNotNull();
assertThat(scanReport.tableName()).isEqualTo(tableName);
assertThat(scanReport.snapshotId()).isEqualTo(2L);
assertThat(scanReport.scanMetrics().totalPlanningDuration().totalDuration())
.isGreaterThan(Duration.ZERO);
assertThat(scanReport.scanMetrics().resultDataFiles().value()).isEqualTo(1);
assertThat(scanReport.scanMetrics().resultDeleteFiles().value()).isEqualTo(0);
assertThat(scanReport.scanMetrics().scannedDataManifests().value()).isEqualTo(1);
assertThat(scanReport.scanMetrics().skippedDataManifests().value()).isEqualTo(1);
assertThat(scanReport.scanMetrics().totalDataManifests().value()).isEqualTo(2);
assertThat(scanReport.scanMetrics().totalDeleteManifests().value()).isEqualTo(0);
assertThat(scanReport.scanMetrics().totalFileSizeInBytes().value()).isEqualTo(10L);
assertThat(scanReport.scanMetrics().totalDeleteFileSizeInBytes().value()).isEqualTo(0L);
assertThat(result.totalPlanningDuration().totalDuration()).isGreaterThan(Duration.ZERO);
assertThat(result.resultDataFiles().value()).isEqualTo(1);
assertThat(result.resultDeleteFiles().value()).isEqualTo(0);
assertThat(result.scannedDataManifests().value()).isEqualTo(1);
assertThat(result.skippedDataManifests().value()).isEqualTo(1);
assertThat(result.totalDataManifests().value()).isEqualTo(2);
assertThat(result.totalDeleteManifests().value()).isEqualTo(0);
assertThat(result.totalFileSizeInBytes().value()).isEqualTo(10L);
assertThat(result.totalDeleteFileSizeInBytes().value()).isEqualTo(0L);
}

@Test
Expand All @@ -118,16 +119,82 @@ public void scanningWithDeletes() throws IOException {
assertThat(scanReport).isNotNull();
assertThat(scanReport.tableName()).isEqualTo("scan-planning-with-deletes");
assertThat(scanReport.snapshotId()).isEqualTo(2L);
assertThat(scanReport.scanMetrics().totalPlanningDuration().totalDuration())
.isGreaterThan(Duration.ZERO);
assertThat(scanReport.scanMetrics().resultDataFiles().value()).isEqualTo(3);
assertThat(scanReport.scanMetrics().resultDeleteFiles().value()).isEqualTo(2);
assertThat(scanReport.scanMetrics().scannedDataManifests().value()).isEqualTo(1);
assertThat(scanReport.scanMetrics().skippedDataManifests().value()).isEqualTo(0);
assertThat(scanReport.scanMetrics().totalDataManifests().value()).isEqualTo(1);
assertThat(scanReport.scanMetrics().totalDeleteManifests().value()).isEqualTo(1);
assertThat(scanReport.scanMetrics().totalFileSizeInBytes().value()).isEqualTo(30L);
assertThat(scanReport.scanMetrics().totalDeleteFileSizeInBytes().value()).isEqualTo(20L);
ScanMetricsResult result = scanReport.scanMetrics();
assertThat(result.totalPlanningDuration().totalDuration()).isGreaterThan(Duration.ZERO);
assertThat(result.resultDataFiles().value()).isEqualTo(3);
assertThat(result.resultDeleteFiles().value()).isEqualTo(2);
assertThat(result.scannedDataManifests().value()).isEqualTo(1);
assertThat(result.skippedDataManifests().value()).isEqualTo(0);
assertThat(result.totalDataManifests().value()).isEqualTo(1);
assertThat(result.totalDeleteManifests().value()).isEqualTo(1);
assertThat(result.totalFileSizeInBytes().value()).isEqualTo(30L);
assertThat(result.totalDeleteFileSizeInBytes().value()).isEqualTo(20L);
}

@Test
public void scanningWithSkippedDataFiles() throws IOException {
String tableName = "scan-planning-with-skipped-data-files";
Table table =
TestTables.create(
tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(), formatVersion, reporter);
table.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
table.newAppend().appendFile(FILE_B).appendFile(FILE_C).commit();
TableScan tableScan = table.newScan();

try (CloseableIterable<FileScanTask> fileScanTasks =
tableScan.filter(Expressions.equal("data", "1")).planFiles()) {
fileScanTasks.forEach(task -> {});
}

ScanReport scanReport = reporter.lastReport();
assertThat(scanReport).isNotNull();
assertThat(scanReport.tableName()).isEqualTo(tableName);
assertThat(scanReport.snapshotId()).isEqualTo(2L);
ScanMetricsResult result = scanReport.scanMetrics();
assertThat(result.skippedDataFiles().value()).isEqualTo(1);
assertThat(result.skippedDeleteFiles().value()).isEqualTo(0);
assertThat(result.totalPlanningDuration().totalDuration()).isGreaterThan(Duration.ZERO);
assertThat(result.resultDataFiles().value()).isEqualTo(1);
assertThat(result.resultDeleteFiles().value()).isEqualTo(0);
assertThat(result.scannedDataManifests().value()).isEqualTo(1);
assertThat(result.skippedDataManifests().value()).isEqualTo(1);
assertThat(result.totalDataManifests().value()).isEqualTo(2);
assertThat(result.totalDeleteManifests().value()).isEqualTo(0);
assertThat(result.totalFileSizeInBytes().value()).isEqualTo(10L);
assertThat(result.totalDeleteFileSizeInBytes().value()).isEqualTo(0L);
}

@Test
public void scanningWithSkippedDeleteFiles() throws IOException {
String tableName = "scan-planning-with-skipped-delete-files";
Table table =
TestTables.create(
tableDir, tableName, SCHEMA, SPEC, SortOrder.unsorted(), formatVersion, reporter);
table.newAppend().appendFile(FILE_A).appendFile(FILE_D).commit();
table.newRowDelta().addDeletes(FILE_A_DELETES).addDeletes(FILE_D2_DELETES).commit();
TableScan tableScan = table.newScan();

try (CloseableIterable<FileScanTask> fileScanTasks =
tableScan.filter(Expressions.equal("data", "1")).planFiles()) {
fileScanTasks.forEach(task -> {});
}

ScanReport scanReport = reporter.lastReport();
assertThat(scanReport).isNotNull();
assertThat(scanReport.tableName()).isEqualTo(tableName);
assertThat(scanReport.snapshotId()).isEqualTo(2L);
ScanMetricsResult result = scanReport.scanMetrics();
assertThat(result.totalPlanningDuration().totalDuration()).isGreaterThan(Duration.ZERO);
assertThat(result.resultDataFiles().value()).isEqualTo(1);
assertThat(result.resultDeleteFiles().value()).isEqualTo(1);
assertThat(result.skippedDataFiles().value()).isEqualTo(1);
assertThat(result.skippedDeleteFiles().value()).isEqualTo(1);
assertThat(result.scannedDataManifests().value()).isEqualTo(1);
assertThat(result.skippedDataManifests().value()).isEqualTo(0);
assertThat(result.totalDataManifests().value()).isEqualTo(1);
assertThat(result.totalDeleteManifests().value()).isEqualTo(1);
assertThat(result.totalFileSizeInBytes().value()).isEqualTo(10L);
assertThat(result.totalDeleteFileSizeInBytes().value()).isEqualTo(10L);
}

private static class TestScanReporter implements ScanReporter {
Expand Down
Loading

0 comments on commit 15b3e2f

Please sign in to comment.