Skip to content

Commit

Permalink
Revert changes for ignoring stats at RowGroupFilter level
Browse files Browse the repository at this point in the history
  • Loading branch information
asingh committed Jun 29, 2015
1 parent e861b18 commit 64c2617
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.parquet.filter2.predicate.SchemaCompatibilityValidator;
import org.apache.parquet.filter2.statisticslevel.StatisticsFilter;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;

import static org.apache.parquet.Preconditions.checkNotNull;
Expand All @@ -39,47 +38,21 @@
* no filtering will be performed.
*/
public class RowGroupFilter implements Visitor<List<BlockMetaData>> {
private static final int STATISTICS_FIXED_VERSION = 161; // 1.6.1
private final List<BlockMetaData> blocks;
private final MessageType schema;
private final Boolean skipStatisticsChecks;

public static List<BlockMetaData> filterRowGroups(Filter filter, ParquetMetadata footer,
MessageType schema) {
public static List<BlockMetaData> filterRowGroups(Filter filter, List<BlockMetaData> blocks, MessageType schema) {
checkNotNull(filter, "filter");
return filter.accept(new RowGroupFilter(footer, schema));
return filter.accept(new RowGroupFilter(blocks, schema));
}

private RowGroupFilter(ParquetMetadata footer, MessageType schema) {
String createdBy = checkNotNull(footer.getFileMetaData().getCreatedBy(), "createdBy");
this.skipStatisticsChecks = shouldIgnoreStatistics(createdBy);
this.blocks = checkNotNull(footer.getBlocks(), "blocks");
private RowGroupFilter(List<BlockMetaData> blocks, MessageType schema) {
this.blocks = checkNotNull(blocks, "blocks");
this.schema = checkNotNull(schema, "schema");
}

private static boolean shouldIgnoreStatistics(String createdBy) {
final String[] versionTokens = createdBy.split(" ");

if (versionTokens.length < 3) {
return true;
}

final String app = versionTokens[0];
final String version = versionTokens[2].substring(0, 5).replaceAll("\\.", "");

if (app.equalsIgnoreCase("parquet-mr")) {
return Integer.parseInt(version) < STATISTICS_FIXED_VERSION;
}

return true;
}

@Override
public List<BlockMetaData> visit(FilterCompat.FilterPredicateCompat filterPredicateCompat) {
if (skipStatisticsChecks) {
return blocks;
}

FilterPredicate filterPredicate = filterPredicateCompat.getFilterPredicate();

// check that the schema of the filter matches the schema of the file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -670,8 +670,7 @@ List<ParquetInputSplit> getSplits(Configuration configuration, List<Footer> foot
List<BlockMetaData> filteredBlocks;

totalRowGroups += blocks.size();
filteredBlocks = RowGroupFilter.filterRowGroups(filter, parquetMetaData, parquetMetaData.getFileMetaData
().getSchema());
filteredBlocks = RowGroupFilter.filterRowGroups(filter, blocks, parquetMetaData.getFileMetaData().getSchema());
rowGroupsDropped += blocks.size() - filteredBlocks.size();

if (filteredBlocks.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,12 @@ private void initReader() throws IOException {
if (footersIterator.hasNext()) {
Footer footer = footersIterator.next();

List<BlockMetaData> blocks = footer.getParquetMetadata().getBlocks();

MessageType fileSchema = footer.getParquetMetadata().getFileMetaData().getSchema();

List<BlockMetaData> filteredBlocks = RowGroupFilter.filterRowGroups(
filter, footer.getParquetMetadata(), fileSchema);
filter, blocks, fileSchema);

reader = new InternalParquetRecordReader<T>(readSupport, filter);
reader.initialize(fileSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.parquet.filter.UnboundRecordFilter;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.compat.FilterCompat.Filter;
import org.apache.parquet.filter2.statisticslevel.StatisticsFilter;
import org.apache.parquet.hadoop.api.ReadSupport;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
Expand Down Expand Up @@ -158,7 +157,7 @@ private void initializeInternalReader(ParquetInputSplit split, Configuration con
footer = readFooter(configuration, path, range(split.getStart(), split.getEnd()));
MessageType fileSchema = footer.getFileMetaData().getSchema();
Filter filter = getFilter(configuration);
filteredBlocks = filterRowGroups(filter, footer, fileSchema);
filteredBlocks = filterRowGroups(filter, footer.getBlocks(), fileSchema);
} else {
// otherwise we find the row groups that were selected on the client
footer = readFooter(configuration, path, NO_FILTER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,8 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;

import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Types;
import org.junit.Before;
import org.junit.Test;

import org.apache.parquet.column.statistics.IntStatistics;
Expand All @@ -43,121 +37,65 @@
import static org.apache.parquet.hadoop.TestInputFormat.makeBlockFromStats;

public class TestRowGroupFilter {
private List<BlockMetaData> blocks;
private BlockMetaData b1;
private BlockMetaData b2;
private BlockMetaData b3;
private BlockMetaData b4;
private BlockMetaData b5;
private BlockMetaData b6;
private MessageType schema;
private IntColumn foo;

@Before
public void setUp() throws Exception {
blocks = new ArrayList<BlockMetaData>();
@Test
public void testApplyRowGroupFilters() {

List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();

IntStatistics stats1 = new IntStatistics();
stats1.setMinMax(10, 100);
stats1.setNumNulls(4);
b1 = makeBlockFromStats(stats1, 301);
BlockMetaData b1 = makeBlockFromStats(stats1, 301);
blocks.add(b1);

IntStatistics stats2 = new IntStatistics();
stats2.setMinMax(8, 102);
stats2.setNumNulls(0);
b2 = makeBlockFromStats(stats2, 302);
BlockMetaData b2 = makeBlockFromStats(stats2, 302);
blocks.add(b2);

IntStatistics stats3 = new IntStatistics();
stats3.setMinMax(100, 102);
stats3.setNumNulls(12);
b3 = makeBlockFromStats(stats3, 303);
BlockMetaData b3 = makeBlockFromStats(stats3, 303);
blocks.add(b3);


IntStatistics stats4 = new IntStatistics();
stats4.setMinMax(0, 0);
stats4.setNumNulls(304);
b4 = makeBlockFromStats(stats4, 304);
BlockMetaData b4 = makeBlockFromStats(stats4, 304);
blocks.add(b4);


IntStatistics stats5 = new IntStatistics();
stats5.setMinMax(50, 50);
stats5.setNumNulls(7);
b5 = makeBlockFromStats(stats5, 305);
BlockMetaData b5 = makeBlockFromStats(stats5, 305);
blocks.add(b5);

IntStatistics stats6 = new IntStatistics();
stats6.setMinMax(0, 0);
stats6.setNumNulls(12);
b6 = makeBlockFromStats(stats6, 306);
BlockMetaData b6 = makeBlockFromStats(stats6, 306);
blocks.add(b6);

schema = MessageTypeParser.parseMessageType("message Document { optional int32 foo; }");
foo = intColumn("foo");
}

@Test
public void testSkippedRowGroupFilters() {

ParquetMetadata parquetMetadata = new ParquetMetadata(
new FileMetaData(
new MessageType("test", Types.required(PrimitiveType.PrimitiveTypeName.INT32).named
("field")),
new HashMap<String, String>(),
"parquet-mr version 1.6.0rc3 (build d4d5a07ec9bd262ca1e93c309f1d7d4a74ebda4c) "),
blocks
);

List<BlockMetaData> filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(eq(foo, 50)),
parquetMetadata, schema);
assertEquals(Arrays.asList(b1, b2, b3, b4, b5, b6), filtered);

filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(notEq(foo, 50)), parquetMetadata, schema);
assertEquals(Arrays.asList(b1, b2, b3, b4, b5, b6), filtered);

filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(eq(foo, null)), parquetMetadata, schema);
assertEquals(Arrays.asList(b1, b2, b3, b4, b5, b6), filtered);

filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(notEq(foo, null)), parquetMetadata, schema);
assertEquals(Arrays.asList(b1, b2, b3, b4, b5, b6), filtered);

filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(eq(foo, 0)), parquetMetadata, schema);
assertEquals(Arrays.asList(b1, b2, b3, b4, b5, b6), filtered);
}

@Test
public void testAppliedRowGroupFilters() {

ParquetMetadata parquetMetadata = new ParquetMetadata(
new FileMetaData(
new MessageType("test", Types.required(PrimitiveType.PrimitiveTypeName.INT32).named
("field")),
new HashMap<String, String>(),
"parquet-mr version 1.7.0rc3 (build d4d5a07ec9bd262ca1e93c309f1d7d4a74ebda4c) "),
blocks
);

checkFilteredBlocks(parquetMetadata);
}
MessageType schema = MessageTypeParser.parseMessageType("message Document { optional int32 foo; }");
IntColumn foo = intColumn("foo");

private void checkFilteredBlocks(ParquetMetadata parquetMetadata) {
List<BlockMetaData> filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(eq(foo, 50)),
parquetMetadata, schema);
List<BlockMetaData> filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(eq(foo, 50)), blocks, schema);
assertEquals(Arrays.asList(b1, b2, b5), filtered);

filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(notEq(foo, 50)), parquetMetadata, schema);
filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(notEq(foo, 50)), blocks, schema);
assertEquals(Arrays.asList(b1, b2, b3, b4, b5, b6), filtered);

filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(eq(foo, null)), parquetMetadata, schema);
filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(eq(foo, null)), blocks, schema);
assertEquals(Arrays.asList(b1, b3, b4, b5, b6), filtered);

filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(notEq(foo, null)), parquetMetadata, schema);
filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(notEq(foo, null)), blocks, schema);
assertEquals(Arrays.asList(b1, b2, b3, b5, b6), filtered);

filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(eq(foo, 0)), parquetMetadata, schema);
filtered = RowGroupFilter.filterRowGroups(FilterCompat.get(eq(foo, 0)), blocks, schema);
assertEquals(Arrays.asList(b6), filtered);
}

Expand Down

0 comments on commit 64c2617

Please sign in to comment.