Skip to content

Commit

Permalink
Improving gz support for avro record readers (apache#9951)
Browse files Browse the repository at this point in the history
  • Loading branch information
Seunghyun Lee authored Dec 11, 2022
1 parent 0ca5fcb commit 2253bd7
Show file tree
Hide file tree
Showing 17 changed files with 246 additions and 274 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.MetricFieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.RecordReaderUtils;


/**
Expand Down Expand Up @@ -214,7 +215,7 @@ public static org.apache.avro.Schema getAvroSchemaFromPinotSchema(Schema pinotSc
*/
public static DataFileStream<GenericRecord> getAvroReader(File avroFile)
throws IOException {
if (avroFile.getName().endsWith(".gz")) {
if (RecordReaderUtils.isGZippedFile(avroFile)) {
return new DataFileStream<>(new GZIPInputStream(new FileInputStream(avroFile)), new GenericDatumReader<>());
} else {
return new DataFileStream<>(new FileInputStream(avroFile), new GenericDatumReader<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,9 @@


public class AvroRecordReaderTest extends AbstractRecordReaderTest {
private final File _dataFile = new File(_tempDir, "data.avro");

@Override
protected RecordReader createRecordReader()
protected RecordReader createRecordReader(File file)
throws Exception {
AvroRecordReader avroRecordReader = new AvroRecordReader();
avroRecordReader.init(_dataFile, _sourceFields, null);
Expand All @@ -59,4 +58,9 @@ protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
}
}
}

@Override
protected String getDataFileName() {
return "data.avro";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,20 @@

public class CSVRecordReaderTest extends AbstractRecordReaderTest {
private static final char CSV_MULTI_VALUE_DELIMITER = '\t';
private final File _dataFile = new File(_tempDir, "data.csv");

@Override
protected RecordReader createRecordReader()
protected RecordReader createRecordReader(File file)
throws Exception {
CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig();
csvRecordReaderConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER);
CSVRecordReader csvRecordReader = new CSVRecordReader();
csvRecordReader.init(_dataFile, _sourceFields, csvRecordReaderConfig);
csvRecordReader.init(file, _sourceFields, csvRecordReaderConfig);
return csvRecordReader;
}

@Override
protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
throws Exception {

Schema pinotSchema = getPinotSchema();
String[] columns = pinotSchema.getColumnNames().toArray(new String[0]);
try (FileWriter fileWriter = new FileWriter(_dataFile);
Expand All @@ -73,6 +71,11 @@ protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
}
}

@Override
protected String getDataFileName() {
return "data.csv";
}

@Override
protected void checkValue(RecordReader recordReader, List<Map<String, Object>> expectedRecordsMap,
List<Object[]> expectedPrimaryKeys)
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,19 @@


public class JSONRecordReaderTest extends AbstractRecordReaderTest {
private final File _dateFile = new File(_tempDir, "data.json");

@Override
protected RecordReader createRecordReader()
protected RecordReader createRecordReader(File file)
throws Exception {
JSONRecordReader recordReader = new JSONRecordReader();
recordReader.init(_dateFile, _sourceFields, null);
recordReader.init(file, _sourceFields, null);
return recordReader;
}

@Override
protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
throws Exception {
try (FileWriter fileWriter = new FileWriter(_dateFile)) {
try (FileWriter fileWriter = new FileWriter(_dataFile)) {
for (Map<String, Object> r : recordsToWrite) {
ObjectNode jsonRecord = JsonUtils.newObjectNode();
for (String key : r.keySet()) {
Expand All @@ -57,6 +56,11 @@ protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
}
}

@Override
protected String getDataFileName() {
return "data.json";
}

@Override
protected void checkValue(RecordReader recordReader, List<Map<String, Object>> expectedRecordsMap,
List<Object[]> expectedPrimaryKeys)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
* </ul>
*/
public class ORCRecordReader implements RecordReader {
private static final String EXTENSION = "orc";

private List<String> _orcFields;
private List<TypeDescription> _orcFieldTypes;
private boolean[] _includeOrcFields;
Expand All @@ -78,7 +80,7 @@ public class ORCRecordReader implements RecordReader {
public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
throws IOException {
Configuration configuration = new Configuration();
File orcFile = RecordReaderUtils.unpackIfRequired(dataFile, "orc");
File orcFile = RecordReaderUtils.unpackIfRequired(dataFile, EXTENSION);
Reader orcReader = OrcFile.createReader(new Path(orcFile.getAbsolutePath()),
OrcFile.readerOptions(configuration).filesystem(FileSystem.getLocal(configuration)));
TypeDescription orcSchema = orcReader.getSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,9 @@
* specific language governing permissions and limitations
* under the License.
*/

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
import java.util.zip.GZIPOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
Expand All @@ -38,26 +32,17 @@
import org.apache.orc.Writer;
import org.apache.pinot.spi.data.readers.AbstractRecordReaderTest;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.testng.annotations.Test;

import static java.nio.charset.StandardCharsets.UTF_8;


public class ORCRecordReaderTest extends AbstractRecordReaderTest {
private final File _dataFile = new File(_tempDir, "data.orc");

private void compressGzip(String sourcePath, String targetPath)
throws IOException {
try (GZIPOutputStream gos = new GZIPOutputStream(new FileOutputStream(Paths.get(targetPath).toFile()))) {
Files.copy(Paths.get(sourcePath), gos);
}
}

@Override
protected RecordReader createRecordReader()
protected RecordReader createRecordReader(File file)
throws Exception {
ORCRecordReader orcRecordReader = new ORCRecordReader();
orcRecordReader.init(_dataFile, _sourceFields, null);
orcRecordReader.init(file, _sourceFields, null);
return orcRecordReader;
}

Expand Down Expand Up @@ -158,16 +143,8 @@ protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
writer.close();
}

@Test
public void testGzipORCRecordReader()
throws Exception {
String gzipFileName = "data.orc.gz";
compressGzip(_dataFile.getAbsolutePath(), String.format("%s/%s", _tempDir, gzipFileName));
final File gzDataFile = new File(_tempDir, gzipFileName);
ORCRecordReader orcRecordReader = new ORCRecordReader();
orcRecordReader.init(gzDataFile, _sourceFields, null);
checkValue(orcRecordReader, _records, _primaryKeys);
orcRecordReader.rewind();
checkValue(orcRecordReader, _records, _primaryKeys);
@Override
protected String getDataFileName() {
return "data.orc";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
* https://javadoc.io/doc/org.apache.parquet/parquet-avro/latest/index.html</a>
*/
public class ParquetAvroRecordReader implements RecordReader {
private static final String EXTENSION = "parquet";

private Path _dataFilePath;
private AvroRecordExtractor _recordExtractor;
private ParquetReader<GenericRecord> _parquetReader;
Expand All @@ -49,7 +51,7 @@ public class ParquetAvroRecordReader implements RecordReader {
@Override
public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
throws IOException {
File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, "parquet");
File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, EXTENSION);
_dataFilePath = new Path(parquetFile.getAbsolutePath());
_parquetReader = ParquetUtils.getParquetAvroReader(_dataFilePath);
_recordExtractor = new AvroRecordExtractor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
* Record reader for Native Parquet file.
*/
public class ParquetNativeRecordReader implements RecordReader {
private static final String EXTENSION = "parquet";

private Path _dataFilePath;
private ParquetNativeRecordExtractor _recordExtractor;
private MessageType _schema;
Expand All @@ -59,7 +61,7 @@ public class ParquetNativeRecordReader implements RecordReader {
@Override
public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
throws IOException {
File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, "parquet");
File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, EXTENSION);
_dataFilePath = new Path(parquetFile.getAbsolutePath());
_hadoopConf = ParquetUtils.getParquetHadoopConfiguration();
_recordExtractor = new ParquetNativeRecordExtractor();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@
* It has two implementations: {@link ParquetAvroRecordReader} (Default) and {@link ParquetNativeRecordReader}.
*/
public class ParquetRecordReader implements RecordReader {
private static final String EXTENSION = "parquet";

private RecordReader _internalParquetRecordReader;
private boolean _useAvroParquetRecordReader = true;

@Override
public void init(File dataFile, @Nullable Set<String> fieldsToRead, @Nullable RecordReaderConfig recordReaderConfig)
throws IOException {
File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, "parquet");
File parquetFile = RecordReaderUtils.unpackIfRequired(dataFile, EXTENSION);
if (recordReaderConfig != null && ((ParquetRecordReaderConfig) recordReaderConfig).useParquetAvroRecordReader()) {
_internalParquetRecordReader = new ParquetAvroRecordReader();
} else if (recordReaderConfig != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,12 @@


public class ParquetNativeRecordReaderTest extends AbstractRecordReaderTest {
private final File _dataFile = new File(_tempDir, "data.parquet");

@Override
protected RecordReader createRecordReader()
protected RecordReader createRecordReader(File file)
throws Exception {
ParquetNativeRecordReader recordReader = new ParquetNativeRecordReader();
recordReader.init(_dataFile, _sourceFields, null);
recordReader.init(file, _sourceFields, null);
return recordReader;
}

Expand All @@ -63,4 +62,9 @@ protected void writeRecordsToFile(List<Map<String, Object>> recordsToWrite)
}
}
}

@Override
protected String getDataFileName() {
return "data.parquet";
}
}
Loading

0 comments on commit 2253bd7

Please sign in to comment.