Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.hadoop.fs.Path;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
Expand All @@ -38,11 +39,20 @@ public class ParquetRecordReader implements RecordReader {
@Override
public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
throws IOException {
if (recordReaderConfig == null || ((ParquetRecordReaderConfig) recordReaderConfig).useParquetAvroRecordReader()) {
if (recordReaderConfig != null && ((ParquetRecordReaderConfig) recordReaderConfig).useParquetAvroRecordReader()) {
_internalParquetRecordReader = new ParquetAvroRecordReader();
} else {
} else if (recordReaderConfig != null
&& ((ParquetRecordReaderConfig) recordReaderConfig).useParquetNativeRecordReader()) {
_useAvroParquetRecordReader = false;
_internalParquetRecordReader = new ParquetNativeRecordReader();
} else {
// No reader type specified. Determine using file metadata
if (ParquetUtils.hasAvroSchemaInFileMetadata(new Path(dataFile.getAbsolutePath()))) {
_internalParquetRecordReader = new ParquetAvroRecordReader();
} else {
_useAvroParquetRecordReader = false;
_internalParquetRecordReader = new ParquetNativeRecordReader();
}
}
_internalParquetRecordReader.init(dataFile, fieldsToRead, recordReaderConfig);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,33 @@
*/
public class ParquetRecordReaderConfig implements RecordReaderConfig {
private static final String USE_PARQUET_AVRO_RECORDER_READER = "useParquetAvroRecordReader";
private boolean _useParquetAvroRecordReader = true;
private static final String USE_PARQUET_NATIVE_RECORDER_READER = "useParquetNativeRecordReader";

private boolean _useParquetAvroRecordReader;
private boolean _useParquetNativeRecordReader;
private Configuration _conf;

public ParquetRecordReaderConfig() {
}

public ParquetRecordReaderConfig(Configuration conf) {
_conf = conf;
_useParquetAvroRecordReader = conf.getBoolean(USE_PARQUET_AVRO_RECORDER_READER, true);
_useParquetAvroRecordReader = conf.getBoolean(USE_PARQUET_AVRO_RECORDER_READER, false);
_useParquetNativeRecordReader = conf.getBoolean(USE_PARQUET_NATIVE_RECORDER_READER, false);
}

public boolean useParquetAvroRecordReader() {
return _useParquetAvroRecordReader;
}

public boolean useParquetNativeRecordReader() {
return _useParquetNativeRecordReader;
}

public void setUseParquetNativeRecordReader(boolean useParquetNativeRecordReader) {
_useParquetNativeRecordReader = useParquetNativeRecordReader;
}

public void setUseParquetAvroRecordReader(boolean useParquetAvroRecordReader) {
_useParquetAvroRecordReader = useParquetAvroRecordReader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@

public class ParquetUtils {
private static final String DEFAULT_FS = "file:///";
private static final String AVRO_SCHEMA_METADATA_KEY = "parquet.avro.schema";
private static final String OLD_AVRO_SCHEMA_METADATA_KEY = "avro.schema";

private ParquetUtils() {
}
Expand Down Expand Up @@ -69,19 +71,28 @@ public static Schema getParquetAvroSchema(Path path)
ParquetMetadata footer =
ParquetFileReader.readFooter(getParquetHadoopConfiguration(), path, ParquetMetadataConverter.NO_FILTER);
Map<String, String> metaData = footer.getFileMetaData().getKeyValueMetaData();
String schemaString = metaData.get("parquet.avro.schema");
if (schemaString == null) {
// Try the older property
schemaString = metaData.get("avro.schema");
}
if (schemaString != null) {

if (hasAvroSchemaInFileMetadata(path)) {
String schemaString = metaData.get(AVRO_SCHEMA_METADATA_KEY);
if (schemaString == null) {
// Try the older property
schemaString = metaData.get(OLD_AVRO_SCHEMA_METADATA_KEY);
}
return new Schema.Parser().parse(schemaString);
} else {
MessageType parquetSchema = footer.getFileMetaData().getSchema();
return new AvroSchemaConverter().convert(parquetSchema);
}
}

public static boolean hasAvroSchemaInFileMetadata(Path path) throws IOException {
ParquetMetadata footer =
ParquetFileReader.readFooter(getParquetHadoopConfiguration(), path, ParquetMetadataConverter.NO_FILTER);
Map<String, String> metaData = footer.getFileMetaData().getKeyValueMetaData();

return metaData.containsKey(AVRO_SCHEMA_METADATA_KEY) || metaData.containsKey(OLD_AVRO_SCHEMA_METADATA_KEY);
}

public static Configuration getParquetHadoopConfiguration() {
// The file path used in ParquetRecordReader is a local file path without prefix 'file:///',
// so we have to make sure that the configuration item 'fs.defaultFS' is set to 'file:///'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,23 @@ public void testParquetNativeRecordReader()
testReadParquetFile(nativeRecordReader, SAMPLE_RECORDS_SIZE);
}

@Test
public void testFileMetadataParsing()
throws IOException {
final ParquetRecordReader parquetRecordReader = new ParquetRecordReader();
File avroParquetFile = new File(getClass().getClassLoader().getResource("data-avro.parquet").getFile());
parquetRecordReader.init(avroParquetFile, null, null);
// Should be avro since file metadata has avro schema
Assert.assertTrue(parquetRecordReader.useAvroParquetRecordReader());


final ParquetRecordReader parquetRecordReader2 = new ParquetRecordReader();
File nativeParquetFile = new File(getClass().getClassLoader().getResource("users.parquet").getFile());
parquetRecordReader.init(nativeParquetFile, null, null);
// Should be native since file metadata does not have avro schema
Assert.assertFalse(parquetRecordReader.useAvroParquetRecordReader());
}

@Test
public void testComparison()
throws IOException {
Expand All @@ -116,10 +133,12 @@ public void testComparison()
private void testComparison(File dataFile, int totalRecords)
throws IOException {
final ParquetRecordReader avroRecordReader = new ParquetRecordReader();
avroRecordReader.init(dataFile, null, null);
ParquetRecordReaderConfig avroRecordReaderConfig = new ParquetRecordReaderConfig();
avroRecordReaderConfig.setUseParquetAvroRecordReader(true);
avroRecordReader.init(dataFile, null, avroRecordReaderConfig);
final ParquetRecordReader nativeRecordReader = new ParquetRecordReader();
ParquetRecordReaderConfig parquetRecordReaderConfig = new ParquetRecordReaderConfig();
parquetRecordReaderConfig.setUseParquetAvroRecordReader(false);
parquetRecordReaderConfig.setUseParquetNativeRecordReader(true);
nativeRecordReader.init(dataFile, null, parquetRecordReaderConfig);
Assert.assertTrue(avroRecordReader.useAvroParquetRecordReader());
Assert.assertFalse(nativeRecordReader.useAvroParquetRecordReader());
Expand Down
Binary file not shown.