From d015ce7a3b86a529f4db79ed8ac8dbe28c62d6b8 Mon Sep 17 00:00:00 2001 From: Peter Huang Date: Sun, 21 Oct 2018 21:00:08 -0700 Subject: [PATCH] [FLINK-7243][connector] Add ParquetInputFormats for Row, Map, and POJO. This closes #6483. --- .gitignore | 1 + .../formats/parquet/ParquetInputFormat.java | 290 ++++++++++ .../parquet/ParquetMapInputFormat.java | 127 +++++ .../parquet/ParquetPojoInputFormat.java | 131 +++++ .../parquet/ParquetRowInputFormat.java | 50 ++ .../parquet/utils/ParentDataHolder.java | 27 + .../parquet/utils/ParquetRecordReader.java | 299 ++++++++++ .../parquet/utils/ParquetSchemaConverter.java | 435 +++++++++++++++ .../parquet/utils/ParquetTimestampUtils.java | 61 +++ .../formats/parquet/utils/RowConverter.java | 398 ++++++++++++++ .../parquet/utils/RowMaterializer.java | 51 ++ .../formats/parquet/utils/RowReadSupport.java | 54 ++ .../parquet/ParquetMapInputFormatTest.java | 107 ++++ .../parquet/ParquetPojoInputFormatTest.java | 93 ++++ .../parquet/ParquetRowInputFormatTest.java | 421 ++++++++++++++ .../formats/parquet/generated/Address.java | 517 ------------------ .../parquet/pojo/PojoSimpleRecord.java | 52 ++ .../utils/ParquetRecordReaderTest.java | 321 +++++++++++ .../utils/ParquetSchemaConverterTest.java | 136 +++++ .../flink/formats/parquet/utils/TestUtil.java | 173 ++++++ .../src/test/resources/avro/nested.avsc | 35 ++ .../src/test/resources/avro/simple.avsc | 12 + 22 files changed, 3274 insertions(+), 517 deletions(-) create mode 100644 flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java create mode 100644 flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetMapInputFormat.java create mode 100644 flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetPojoInputFormat.java create mode 100644 flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java create mode 100644 flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParentDataHolder.java create mode 100644 flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java create mode 100644 flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java create mode 100644 flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetTimestampUtils.java create mode 100644 flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java create mode 100644 flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowMaterializer.java create mode 100644 flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowReadSupport.java create mode 100644 flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetMapInputFormatTest.java create mode 100644 flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetPojoInputFormatTest.java create mode 100644 flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/ParquetRowInputFormatTest.java delete mode 100644 flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/Address.java create mode 100644 flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/pojo/PojoSimpleRecord.java create mode 100644 flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetRecordReaderTest.java create mode 100644 flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverterTest.java create mode 100644 flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/utils/TestUtil.java create mode 100644 flink-formats/flink-parquet/src/test/resources/avro/nested.avsc create mode 100644 flink-formats/flink-parquet/src/test/resources/avro/simple.avsc diff --git a/.gitignore b/.gitignore index 20749c2424251..fdf7bedfb263c 100644 --- a/.gitignore +++ b/.gitignore @@ -19,6 +19,7 @@ tmp build-target flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/avro/ flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/generated/ +flink-formats/flink-parquet/src/test/java/org/apache/flink/formats/parquet/generated/ flink-runtime-web/web-dashboard/assets/fonts/ flink-runtime-web/web-dashboard/node_modules/ flink-runtime-web/web-dashboard/bower_components/ diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java new file mode 100644 index 0000000000000..e7484cbddee1f --- /dev/null +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetInputFormat.java @@ -0,0 +1,290 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.io.CheckpointableInputFormat; +import org.apache.flink.api.common.io.FileInputFormat; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.formats.parquet.utils.ParquetRecordReader; +import org.apache.flink.formats.parquet.utils.ParquetSchemaConverter; +import org.apache.flink.formats.parquet.utils.RowReadSupport; +import org.apache.flink.metrics.Counter; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Type; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * The base InputFormat class to read from Parquet files. + * For specific return types the {@link #convert(Row)} method need to be implemented. + * + *

Using {@link ParquetRecordReader} to read files instead of {@link org.apache.flink.core.fs.FSDataInputStream}, + * we override {@link #open(FileInputSplit)} and {@link #close()} to change the behaviors. + * + * @param The type of record to read. + */ +public abstract class ParquetInputFormat + extends FileInputFormat + implements CheckpointableInputFormat> { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = LoggerFactory.getLogger(ParquetInputFormat.class); + + /** + * The flag to specify whether to skip file splits with wrong schema. + */ + private boolean skipWrongSchemaFileSplit = false; + + /** + * The flag to specify whether to skip corrupted record. + */ + private boolean skipCorruptedRecord = false; + + /** + * The flag to track that the current split should be skipped. + */ + private boolean skipThisSplit = false; + + private TypeInformation[] fieldTypes; + + private String[] fieldNames; + + private transient Counter recordConsumed; + + private transient MessageType expectedFileSchema; + + private transient ParquetRecordReader parquetRecordReader; + + /** + * Read parquet files with given parquet file schema. + * + * @param path The path of the file to read. + * @param messageType schema of parquet file + */ + + protected ParquetInputFormat(Path path, MessageType messageType) { + super(path); + this.expectedFileSchema = checkNotNull(messageType, "messageType"); + RowTypeInfo rowTypeInfo = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(expectedFileSchema); + this.fieldTypes = rowTypeInfo.getFieldTypes(); + this.fieldNames = rowTypeInfo.getFieldNames(); + // read whole parquet file as one file split + this.unsplittable = true; + } + + @Override + public void configure(Configuration parameters) { + super.configure(parameters); + + if (!this.skipWrongSchemaFileSplit) { + this.skipWrongSchemaFileSplit = parameters.getBoolean(PARQUET_SKIP_WRONG_SCHEMA_SPLITS, false); + } + + if (this.skipCorruptedRecord) { + this.skipCorruptedRecord = parameters.getBoolean(PARQUET_SKIP_CORRUPTED_RECORD, false); + } + } + + /** + * Configures the fields to be read and returned by the ParquetInputFormat. Selected fields must be present + * in the configured schema. + * + * @param fieldNames Names of all selected fields. + */ + public void selectFields(String[] fieldNames) { + checkNotNull(fieldNames, "fieldNames"); + this.fieldNames = fieldNames; + RowTypeInfo rowTypeInfo = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(expectedFileSchema); + TypeInformation[] selectFieldTypes = new TypeInformation[fieldNames.length]; + for (int i = 0; i < fieldNames.length; i++) { + try { + selectFieldTypes[i] = rowTypeInfo.getTypeAt(fieldNames[i]); + } catch (IndexOutOfBoundsException e) { + throw new IllegalArgumentException(String.format("Fail to access Field %s , " + + "which is not contained in the file schema", fieldNames[i]), e); + } + } + this.fieldTypes = selectFieldTypes; + } + + @Override + public Tuple2 getCurrentState() { + return parquetRecordReader.getCurrentReadPosition(); + } + + @Override + public void open(FileInputSplit split) throws IOException { + // reset the flag when open a new split + this.skipThisSplit = false; + org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration(); + InputFile inputFile = + HadoopInputFile.fromPath(new org.apache.hadoop.fs.Path(split.getPath().toUri()), configuration); + ParquetReadOptions options = ParquetReadOptions.builder().build(); + ParquetFileReader fileReader = new ParquetFileReader(inputFile, options); + MessageType fileSchema = fileReader.getFileMetaData().getSchema(); + MessageType readSchema = getReadSchema(fileSchema, split.getPath()); + if (skipThisSplit) { + LOG.warn(String.format( + "Escaped the file split [%s] due to mismatch of file schema to expected result schema", + split.getPath().toString())); + } else { + this.parquetRecordReader = new ParquetRecordReader<>(new RowReadSupport(), readSchema, FilterCompat.NOOP); + this.parquetRecordReader.initialize(fileReader, configuration); + this.parquetRecordReader.setSkipCorruptedRecord(this.skipCorruptedRecord); + + if (this.recordConsumed == null) { + this.recordConsumed = getRuntimeContext().getMetricGroup().counter("parquet-records-consumed"); + } + + LOG.debug(String.format("Open ParquetInputFormat with FileInputSplit [%s]", split.getPath().toString())); + } + } + + @Override + public void reopen(FileInputSplit split, Tuple2 state) throws IOException { + Preconditions.checkNotNull(split, "reopen() cannot be called on a null split."); + Preconditions.checkNotNull(state, "reopen() cannot be called with a null initial state."); + this.open(split); + // seek to the read position in the split that we were at when the checkpoint was taken. + parquetRecordReader.seek(state.f0, state.f1); + } + + /** + * Get field names of read result. + * + * @return field names array + */ + protected String[] getFieldNames() { + return fieldNames; + } + + /** + * Get field types of read result. + * + * @return field types array + */ + protected TypeInformation[] getFieldTypes() { + return fieldTypes; + } + + @Override + public void close() throws IOException { + if (parquetRecordReader != null) { + parquetRecordReader.close(); + } + } + + @Override + public boolean reachedEnd() throws IOException { + if (skipThisSplit) { + return true; + } + + return parquetRecordReader.reachEnd(); + } + + @Override + public E nextRecord(E e) throws IOException { + if (reachedEnd()) { + return null; + } + + recordConsumed.inc(); + return convert(parquetRecordReader.nextRecord()); + } + + /** + * This ParquetInputFormat read parquet record as Row by default. Sub classes of it can extend this method + * to further convert row to other types, such as POJO, Map or Tuple. + * + * @param row row read from parquet file + * @return E target result type + */ + protected abstract E convert(Row row); + + /** + * Generates and returns the read schema based on the projected fields for a given file. + * + * @param fileSchema The schema of the given file. + * @param filePath The path of the given file. + * @return The read schema based on the given file's schema and the projected fields. + */ + private MessageType getReadSchema(MessageType fileSchema, Path filePath) { + RowTypeInfo fileTypeInfo = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(fileSchema); + List types = new ArrayList<>(); + for (int i = 0; i < fieldNames.length; ++i) { + String readFieldName = fieldNames[i]; + TypeInformation readFieldType = fieldTypes[i]; + if (fileTypeInfo.getFieldIndex(readFieldName) < 0) { + if (!skipWrongSchemaFileSplit) { + throw new IllegalArgumentException("Field " + readFieldName + " cannot be found in schema of " + + " Parquet file: " + filePath + "."); + } else { + this.skipThisSplit = true; + return fileSchema; + } + } + + if (!readFieldType.equals(fileTypeInfo.getTypeAt(readFieldName))) { + if (!skipWrongSchemaFileSplit) { + throw new IllegalArgumentException("Expecting type " + readFieldType + " for field " + readFieldName + + " but found type " + fileTypeInfo.getTypeAt(readFieldName) + " in Parquet file: " + + filePath + "."); + } else { + this.skipThisSplit = true; + return fileSchema; + } + } + types.add(fileSchema.getType(readFieldName)); + } + + return new MessageType(fileSchema.getName(), types); + } + + /** + * The config parameter which defines whether to skip file split with wrong schema. + */ + public static final String PARQUET_SKIP_WRONG_SCHEMA_SPLITS = "skip.splits.wrong.schema"; + + /** + * The config parameter which defines whether to skip corrupted record. + */ + public static final String PARQUET_SKIP_CORRUPTED_RECORD = "skip.corrupted.record"; + +} diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetMapInputFormat.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetMapInputFormat.java new file mode 100644 index 0000000000000..111fcb981e42c --- /dev/null +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetMapInputFormat.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.Row; + +import org.apache.parquet.schema.MessageType; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * An implementation of {@link ParquetInputFormat} to read {@link Map} records from Parquet files. + */ +public class ParquetMapInputFormat extends ParquetInputFormat { + + public ParquetMapInputFormat(Path path, MessageType messageType) { + super(path, messageType); + } + + @Override + protected Map convert(Row row) { + Map map = new HashMap<>(); + convert(map, row, getFieldTypes(), getFieldNames()); + return map; + } + + @SuppressWarnings("unchecked") + private void convert(Map map, Row row, TypeInformation[] fieldTypes, String[] fieldNames) { + for (int i = 0; i < fieldNames.length; i++) { + if (row.getField(i) != null) { + if (fieldTypes[i] instanceof BasicTypeInfo + || fieldTypes[i] instanceof PrimitiveArrayTypeInfo + || fieldTypes[i] instanceof BasicArrayTypeInfo) { + map.put(fieldNames[i], row.getField(i)); + } else if (fieldTypes[i] instanceof RowTypeInfo) { + Map nestedRow = new HashMap<>(); + RowTypeInfo nestedRowTypeInfo = (RowTypeInfo) fieldTypes[i]; + convert(nestedRow, (Row) row.getField(i), + nestedRowTypeInfo.getFieldTypes(), nestedRowTypeInfo.getFieldNames()); + map.put(fieldNames[i], nestedRow); + } else if (fieldTypes[i] instanceof MapTypeInfo) { + Map nestedMap = new HashMap<>(); + MapTypeInfo mapTypeInfo = (MapTypeInfo) fieldTypes[i]; + convert(nestedMap, (Map) row.getField(i), mapTypeInfo); + map.put(fieldNames[i], nestedMap); + } else if (fieldTypes[i] instanceof ObjectArrayTypeInfo) { + List nestedObjectList = new ArrayList<>(); + ObjectArrayTypeInfo objectArrayTypeInfo = (ObjectArrayTypeInfo) fieldTypes[i]; + convert(nestedObjectList, (Row[]) row.getField(i), objectArrayTypeInfo); + map.put(fieldNames[i], nestedObjectList); + } + } + } + } + + @SuppressWarnings("unchecked") + private void convert(Map target, Map source, MapTypeInfo mapTypeInfo) { + TypeInformation valueTypeInfp = mapTypeInfo.getValueTypeInfo(); + + for (String key : source.keySet()) { + if (valueTypeInfp instanceof RowTypeInfo) { + Map nestedRow = new HashMap<>(); + convert(nestedRow, (Row) source.get(key), + ((RowTypeInfo) valueTypeInfp).getFieldTypes(), ((RowTypeInfo) valueTypeInfp).getFieldNames()); + target.put(key, nestedRow); + } else if (valueTypeInfp instanceof MapTypeInfo) { + Map nestedMap = new HashMap<>(); + convert(nestedMap, (Map) source.get(key), (MapTypeInfo) valueTypeInfp); + target.put(key, nestedMap); + } else if (valueTypeInfp instanceof ObjectArrayTypeInfo) { + List nestedObjectList = new ArrayList<>(); + convert(nestedObjectList, (Object[]) source.get(key), (ObjectArrayTypeInfo) valueTypeInfp); + target.put(key, nestedObjectList); + } + } + } + + @SuppressWarnings("unchecked") + private void convert(List target, Object[] source, ObjectArrayTypeInfo objectArrayTypeInfo) { + TypeInformation itemType = objectArrayTypeInfo.getComponentInfo(); + for (Object field : source) { + if (itemType instanceof RowTypeInfo) { + Map nestedRow = new HashMap<>(); + convert(nestedRow, (Row) field, + ((RowTypeInfo) itemType).getFieldTypes(), ((RowTypeInfo) itemType).getFieldNames()); + target.add(nestedRow); + } else if (itemType instanceof MapTypeInfo) { + Map nestedMap = new HashMap<>(); + MapTypeInfo mapTypeInfo = (MapTypeInfo) itemType; + convert(nestedMap, (Map) field, mapTypeInfo); + target.add(nestedMap); + } else if (itemType instanceof ObjectArrayTypeInfo) { + List nestedObjectList = new ArrayList<>(); + convert(nestedObjectList, (Row[]) field, (ObjectArrayTypeInfo) itemType); + target.add(nestedObjectList); + } + } + + } +} diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetPojoInputFormat.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetPojoInputFormat.java new file mode 100644 index 0000000000000..cfe3d934b672f --- /dev/null +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetPojoInputFormat.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.Row; +import org.apache.flink.util.Preconditions; + +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * An implementation of {@link ParquetInputFormat} to read POJO records from Parquet files. + */ +public class ParquetPojoInputFormat extends ParquetInputFormat { + + private static final Logger LOG = LoggerFactory.getLogger(ParquetPojoInputFormat.class); + private final Class pojoTypeClass; + private final TypeSerializer typeSerializer; + private transient Field[] pojoFields; + + public ParquetPojoInputFormat(Path filePath, MessageType messageType, PojoTypeInfo pojoTypeInfo) { + super(filePath, messageType); + this.pojoTypeClass = pojoTypeInfo.getTypeClass(); + this.typeSerializer = pojoTypeInfo.createSerializer(new ExecutionConfig()); + final Map fieldMap = new HashMap<>(); + findAllFields(pojoTypeClass, fieldMap); + selectFields(fieldMap.keySet().toArray(new String[0])); + } + + @Override + public void open(FileInputSplit split) throws IOException { + super.open(split); + pojoFields = new Field[getFieldNames().length]; + LOG.error("Fields number is %d", getFieldNames().length); + final Map fieldMap = new HashMap<>(); + findAllFields(pojoTypeClass, fieldMap); + + for (int i = 0; i < getFieldNames().length; ++i) { + String fieldName = getFieldNames()[i]; + pojoFields[i] = fieldMap.get(fieldName); + + if (pojoFields[i] != null) { + pojoFields[i].setAccessible(true); + } else { + throw new RuntimeException( + String.format("There is no field called %s in %s", fieldName, pojoTypeClass.getName())); + } + } + } + + private void findAllFields(Class clazz, Map fieldMap) { + + for (Field field : clazz.getDeclaredFields()) { + fieldMap.put(field.getName(), field); + } + + if (clazz.getSuperclass() != null) { + findAllFields(clazz.getSuperclass(), fieldMap); + } + } + + @Override + protected E convert(Row row) { + E result = typeSerializer.createInstance(); + for (int i = 0; i < row.getArity(); ++i) { + try { + if (pojoFields[i].getType().isAssignableFrom(List.class)) { + pojoFields[i].set(result, Collections.singletonList(row.getField(i))); + } else { + pojoFields[i].set(result, row.getField(i)); + } + } catch (IllegalAccessException e) { + throw new RuntimeException( + String.format("Parsed value could not be set in POJO field %s", getFieldNames()[i])); + } + } + + return result; + } + + /** + * Extracts the {@link TypeInformation}s from {@link PojoTypeInfo} according to the given field name. + */ + private static TypeInformation[] extractTypeInfos(PojoTypeInfo pojoTypeInfo, String[] fieldNames) { + Preconditions.checkNotNull(pojoTypeInfo); + Preconditions.checkNotNull(fieldNames); + Preconditions.checkArgument(pojoTypeInfo.getArity() >= fieldNames.length); + TypeInformation[] fieldTypes = new TypeInformation[fieldNames.length]; + for (int i = 0; i < fieldNames.length; ++i) { + String fieldName = fieldNames[i]; + Preconditions.checkNotNull(fieldName, "The field can't be null"); + int fieldPos = pojoTypeInfo.getFieldIndex(fieldName); + Preconditions.checkArgument(fieldPos >= 0, + String.format("Field %s is not a member of POJO type %s", + fieldName, pojoTypeInfo.getTypeClass().getName())); + fieldTypes[i] = pojoTypeInfo.getTypeAt(fieldPos); + } + + return fieldTypes; + } +} diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java new file mode 100644 index 0000000000000..f010a50cc8ec2 --- /dev/null +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/ParquetRowInputFormat.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.core.fs.Path; +import org.apache.flink.types.Row; + +import org.apache.parquet.schema.MessageType; + +/** + * An implementation of {@link ParquetInputFormat} to read {@link Row} records from Parquet files. + */ +public class ParquetRowInputFormat extends ParquetInputFormat implements ResultTypeQueryable { + private static final long serialVersionUID = 11L; + private RowTypeInfo returnType; + + public ParquetRowInputFormat(Path path, MessageType messageType) { + super(path, messageType); + this.returnType = new RowTypeInfo(getFieldTypes(), getFieldNames()); + } + + @Override + public TypeInformation getProducedType() { + return returnType; + } + + @Override + protected Row convert(Row row) { + return row; + } +} diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParentDataHolder.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParentDataHolder.java new file mode 100644 index 0000000000000..428fd3913c998 --- /dev/null +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParentDataHolder.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +/** + * Interface for {@link RowConverter} for extracting nested value from parquet record. + */ +public interface ParentDataHolder { + + void add(int fieldIndex, Object object); +} diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java new file mode 100644 index 0000000000000..e5d67cef5d7a9 --- /dev/null +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetRecordReader.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.flink.api.java.tuple.Tuple2; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.compat.FilterCompat.Filter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.api.InitContext; +import org.apache.parquet.hadoop.api.ReadSupport; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.io.api.RecordMaterializer; +import org.apache.parquet.io.api.RecordMaterializer.RecordMaterializationException; +import org.apache.parquet.schema.MessageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.CheckReturnValue; +import javax.annotation.meta.When; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Customized {@link org.apache.parquet.hadoop.ParquetRecordReader} that support start read from particular position. + */ +public class ParquetRecordReader { + private static final Logger LOG = LoggerFactory.getLogger(ParquetRecordReader.class); + + private ColumnIOFactory columnIOFactory; + private Filter filter; + private MessageType readSchema; + private MessageType fileSchema; + private ReadSupport readSupport; + + private RecordMaterializer recordMaterializer; + private ParquetFileReader reader; + private RecordReader recordReader; + private boolean skipCorruptedRecord = true; + + private T readRecord; + private boolean readRecordReturned = true; + + // number of records in file + private long numTotalRecords; + // number of records that were read from file + private long numReadRecords = 0; + + // id of current block + private int currentBlock = -1; + private long numRecordsUpToPreviousBlock = 0; + private long numRecordsUpToCurrentBlock = 0; + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema, Filter filter) { + this.filter = checkNotNull(filter, "readSupport"); + this.readSupport = checkNotNull(readSupport, "readSchema"); + this.readSchema = checkNotNull(readSchema, "filter"); + } + + public ParquetRecordReader(ReadSupport readSupport, MessageType readSchema) { + this(readSupport, readSchema, FilterCompat.NOOP); + } + + public void setSkipCorruptedRecord(boolean skipCorruptedRecord) { + this.skipCorruptedRecord = skipCorruptedRecord; + } + + public void initialize(ParquetFileReader reader, Configuration configuration) { + this.reader = reader; + FileMetaData parquetFileMetadata = reader.getFooter().getFileMetaData(); + // real schema of parquet file + this.fileSchema = parquetFileMetadata.getSchema(); + Map fileMetadata = parquetFileMetadata.getKeyValueMetaData(); + ReadSupport.ReadContext readContext = readSupport.init(new InitContext( + configuration, toSetMultiMap(fileMetadata), readSchema)); + + this.columnIOFactory = new ColumnIOFactory(parquetFileMetadata.getCreatedBy()); + this.recordMaterializer = readSupport.prepareForRead( + configuration, fileMetadata, readSchema, readContext); + this.numTotalRecords = reader.getRecordCount(); + } + + private RecordReader createRecordReader(PageReadStore pages) throws IOException { + if (pages == null) { + throw new IOException( + "Expecting more rows but reached last block. Read " + numReadRecords + " out of " + numTotalRecords); + } + MessageColumnIO columnIO = columnIOFactory.getColumnIO(readSchema, fileSchema, true); + return columnIO.getRecordReader(pages, recordMaterializer, filter); + } + + /** + * Moves the reading position to the given block and seeks to and reads the given record. + * + * @param block The block to seek to. + * @param recordInBlock The number of the record in the block to return next. + */ + public void seek(long block, long recordInBlock) throws IOException { + + List blockMetaData = reader.getRowGroups(); + + if (block == -1L && recordInBlock == -1L) { + // the split was fully consumed + currentBlock = blockMetaData.size() - 1; + numReadRecords = numTotalRecords; + numRecordsUpToCurrentBlock = numTotalRecords; + return; + } + + // init all counters for the start of the first block + currentBlock = 0; + numRecordsUpToPreviousBlock = 0; + numRecordsUpToCurrentBlock = blockMetaData.get(0).getRowCount(); + numReadRecords = 0; + + // seek to the given block + while (currentBlock < block) { + currentBlock++; + reader.skipNextRowGroup(); + numRecordsUpToPreviousBlock = numRecordsUpToCurrentBlock; + numRecordsUpToCurrentBlock += blockMetaData.get(currentBlock).getRowCount(); + numReadRecords = numRecordsUpToPreviousBlock; + } + + // seek to and read the given record + PageReadStore pages = reader.readNextRowGroup(); + recordReader = createRecordReader(pages); + for (int i = 0; i <= recordInBlock; i++) { + readNextRecord(); + } + } + + /** + * Returns the current read position in the split, i.e., the current block and + * the number of records that were returned from that block. + * + * @return The current read position in the split. + */ + public Tuple2 getCurrentReadPosition() { + + // compute number of returned records + long numRecordsReturned = numReadRecords; + if (!readRecordReturned && numReadRecords > 0) { + numRecordsReturned -= 1; + } + + if (numRecordsReturned == numTotalRecords) { + // all records of split returned. + return Tuple2.of(-1L, -1L); + } + + if (numRecordsReturned == numRecordsUpToCurrentBlock) { + // all records of block returned. Next record is in next block + return Tuple2.of(currentBlock + 1L, 0L); + } + + // compute number of returned records of this block + long numRecordsOfBlockReturned = numRecordsReturned - numRecordsUpToPreviousBlock; + return Tuple2.of((long) currentBlock, numRecordsOfBlockReturned); + } + + /** + * Checks if the record reader returned all records. + * This method must be called before a record can be returned. + * + * @return False if there are more records to be read. True if all records have been returned. + */ + public boolean reachEnd() throws IOException { + // check if we have a read row that was not returned yet + if (readRecord != null && !readRecordReturned) { + return false; + } + // check if there are more rows to be read + if (numReadRecords >= numTotalRecords) { + return true; + } + // try to read next row + return !readNextRecord(); + } + + /** + * Reads the next record. + * + * @return True if a record could be read, false otherwise. + */ + private boolean readNextRecord() throws IOException { + boolean recordFound = false; + while (!recordFound) { + // no more records left + if (numReadRecords >= numTotalRecords) { + return false; + } + + try { + if (numReadRecords == numRecordsUpToCurrentBlock) { + // advance to next block + PageReadStore pages = reader.readNextRowGroup(); + recordReader = createRecordReader(pages); + numRecordsUpToPreviousBlock = numRecordsUpToCurrentBlock; + numRecordsUpToCurrentBlock += pages.getRowCount(); + currentBlock++; + } + + numReadRecords++; + try { + readRecord = recordReader.read(); + readRecordReturned = false; + } catch (RecordMaterializationException e) { + String errorMessage = String.format( + "skipping a corrupt record in block number [%d] record number [%s] of file %s", + currentBlock, numReadRecords - numRecordsUpToPreviousBlock, reader.getFile()); + + if (!skipCorruptedRecord) { + LOG.error(errorMessage); + throw e; + } else { + LOG.warn(errorMessage); + } + continue; + } + + if (readRecord == null) { + readRecordReturned = true; + numReadRecords = numRecordsUpToCurrentBlock; + LOG.debug("filtered record reader reached end of block"); + continue; + } + + recordFound = true; + LOG.debug("read value: {}", readRecord); + } catch (RecordMaterializationException e) { + LOG.error(String.format("Can not read value at %d in block %d in file %s", + numReadRecords - numRecordsUpToPreviousBlock, currentBlock, reader.getFile()), e); + if (!skipCorruptedRecord) { + throw e; + } + return false; + } + } + + return true; + } + + /** + * Returns the next record. + * Note that the reachedEnd() method must be called before. + * + * @return The next record. + */ + @CheckReturnValue(when = When.NEVER) + public T nextRecord() { + readRecordReturned = true; + return readRecord; + } + + public void close() throws IOException { + if (reader != null) { + reader.close(); + } + } + + private static Map> toSetMultiMap(Map map) { + Map> setMultiMap = new HashMap<>(); + for (Map.Entry entry : map.entrySet()) { + Set set = Collections.singleton(entry.getValue()); + setMultiMap.put(entry.getKey(), Collections.unmodifiableSet(set)); + } + return Collections.unmodifiableMap(setMultiMap); + } +} diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java new file mode 100644 index 0000000000000..35e197761423c --- /dev/null +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetSchemaConverter.java @@ -0,0 +1,435 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; + +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.schema.Types; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Array; +import java.util.ArrayList; +import java.util.List; + +/** + * Schema converter converts Parquet schema to and from Flink internal types. + */ +public class ParquetSchemaConverter { + private static final Logger LOGGER = LoggerFactory.getLogger(RowConverter.class); + public static final String MAP_VALUE = "value"; + public static final String LIST_ARRAY_TYPE = "array"; + public static final String LIST_ELEMENT = "element"; + public static final String LIST_GROUP_NAME = "list"; + public static final String MESSAGE_ROOT = "root"; + + /** + * Converts Parquet schema to Flink Internal Type. + * + * @param type Parquet schema + * @return Flink type information + */ + public static TypeInformation fromParquetType(MessageType type) { + return convertFields(type.getFields()); + } + + /** + * Converts Flink Internal Type to Parquet schema. + * + * @param typeInformation Flink type information + * @param legacyMode is standard LIST and MAP schema or back-compatible schema + * @return Parquet schema + */ + public static MessageType toParquetType(TypeInformation typeInformation, boolean legacyMode) { + return (MessageType) convertField(null, typeInformation, Type.Repetition.OPTIONAL, legacyMode); + } + + private static TypeInformation convertFields(List parquetFields) { + List> types = new ArrayList<>(); + List names = new ArrayList<>(); + for (Type field : parquetFields) { + TypeInformation subType = convertParquetTypeToTypeInfo(field); + if (subType != null) { + types.add(subType); + names.add(field.getName()); + } else { + LOGGER.error("Parquet field {} in schema type {} can not be converted to Flink Internal Type", + field.getName(), field.getOriginalType().name()); + } + } + + return new RowTypeInfo(types.toArray(new TypeInformation[0]), + names.toArray(new String[0])); + } + + private static TypeInformation convertParquetTypeToTypeInfo(final Type fieldType) { + TypeInformation typeInfo; + if (fieldType.isPrimitive()) { + OriginalType originalType = fieldType.getOriginalType(); + PrimitiveType primitiveType = fieldType.asPrimitiveType(); + switch (primitiveType.getPrimitiveTypeName()) { + case BINARY: + if (originalType != null) { + switch (originalType) { + case DECIMAL: + typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO; + break; + case UTF8: + case ENUM: + case JSON: + case BSON: + typeInfo = BasicTypeInfo.STRING_TYPE_INFO; + break; + default: + throw new UnsupportedOperationException("Unsupported original type : " + originalType.name() + + " for primitive type BINARY"); + } + } else { + typeInfo = BasicTypeInfo.STRING_TYPE_INFO; + } + break; + case BOOLEAN: + typeInfo = BasicTypeInfo.BOOLEAN_TYPE_INFO; + break; + case INT32: + if (originalType != null) { + switch (originalType) { + case TIME_MICROS: + case TIME_MILLIS: + typeInfo = SqlTimeTypeInfo.TIME; + break; + case TIMESTAMP_MICROS: + case TIMESTAMP_MILLIS: + typeInfo = SqlTimeTypeInfo.TIMESTAMP; + break; + case DATE: + typeInfo = SqlTimeTypeInfo.DATE; + break; + case UINT_8: + case UINT_16: + case UINT_32: + typeInfo = BasicTypeInfo.INT_TYPE_INFO; + break; + case INT_8: + typeInfo = org.apache.flink.api.common.typeinfo.Types.BYTE; + break; + case INT_16: + typeInfo = org.apache.flink.api.common.typeinfo.Types.SHORT; + break; + case INT_32: + typeInfo = BasicTypeInfo.INT_TYPE_INFO; + break; + default: + throw new UnsupportedOperationException("Unsupported original type : " + + originalType.name() + " for primitive type INT32"); + } + } else { + typeInfo = BasicTypeInfo.INT_TYPE_INFO; + } + break; + case INT64: + if (originalType != null) { + switch (originalType) { + case TIME_MICROS: + typeInfo = SqlTimeTypeInfo.TIME; + break; + case TIMESTAMP_MICROS: + case TIMESTAMP_MILLIS: + typeInfo = SqlTimeTypeInfo.TIMESTAMP; + break; + case INT_64: + case DECIMAL: + typeInfo = BasicTypeInfo.LONG_TYPE_INFO; + break; + default: + throw new UnsupportedOperationException("Unsupported original type : " + + originalType.name() + " for primitive type INT64"); + } + } else { + typeInfo = BasicTypeInfo.LONG_TYPE_INFO; + } + break; + case INT96: + // It stores a timestamp type data, we read it as millisecond + typeInfo = SqlTimeTypeInfo.TIMESTAMP; + break; + case FLOAT: + typeInfo = BasicTypeInfo.FLOAT_TYPE_INFO; + break; + case DOUBLE: + typeInfo = BasicTypeInfo.DOUBLE_TYPE_INFO; + break; + case FIXED_LEN_BYTE_ARRAY: + if (originalType != null) { + switch (originalType) { + case DECIMAL: + typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO; + break; + default: + throw new UnsupportedOperationException("Unsupported original type : " + originalType.name() + + " for primitive type FIXED_LEN_BYTE_ARRAY"); + } + } else { + typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO; + } + break; + default: + throw new UnsupportedOperationException("Unsupported schema: " + fieldType); + } + } else { + GroupType parquetGroupType = fieldType.asGroupType(); + OriginalType originalType = parquetGroupType.getOriginalType(); + if (originalType != null) { + switch (originalType) { + case LIST: + if (parquetGroupType.getFieldCount() != 1) { + throw new UnsupportedOperationException("Invalid list type " + parquetGroupType); + } + Type repeatedType = parquetGroupType.getType(0); + if (!repeatedType.isRepetition(Type.Repetition.REPEATED)) { + throw new UnsupportedOperationException("Invalid list type " + parquetGroupType); + } + + if (repeatedType.isPrimitive()) { + typeInfo = convertParquetPrimitiveListToFlinkArray(repeatedType); + } else { + // Backward-compatibility element group name can be any string (element/array/other) + GroupType elementType = repeatedType.asGroupType(); + // If the repeated field is a group with multiple fields, then its type is the element + // type and elements are required. + if (elementType.getFieldCount() > 1) { + + for (Type type : elementType.getFields()) { + if (!type.isRepetition(Type.Repetition.REQUIRED)) { + throw new UnsupportedOperationException( + String.format("List field [%s] in List [%s] has to be required. ", + type.toString(), fieldType.getName())); + } + } + typeInfo = ObjectArrayTypeInfo.getInfoFor( + convertParquetTypeToTypeInfo(elementType)); + } else { + Type internalType = elementType.getType(0); + if (internalType.isPrimitive()) { + typeInfo = convertParquetPrimitiveListToFlinkArray(internalType); + } else { + // No need to do special process for group named array and tuple + GroupType tupleGroup = internalType.asGroupType(); + if (tupleGroup.getFieldCount() == 1 && tupleGroup.getFields().get(0) + .isRepetition(Type.Repetition.REQUIRED)) { + typeInfo = ObjectArrayTypeInfo.getInfoFor( + convertParquetTypeToTypeInfo(internalType)); + } else { + throw new UnsupportedOperationException( + String.format("Unrecgonized List schema [%s] according to Parquet" + + " standard", parquetGroupType.toString())); + } + } + } + } + break; + + case MAP_KEY_VALUE: + case MAP: + // The outer-most level must be a group annotated with MAP + // that contains a single field named key_value + if (parquetGroupType.getFieldCount() != 1 || parquetGroupType.getType(0).isPrimitive()) { + throw new UnsupportedOperationException("Invalid map type " + parquetGroupType); + } + + // The middle level must be a repeated group with a key field for map keys + // and, optionally, a value field for map values. But we can't enforce two strict condition here + // the schema generated by Parquet lib doesn't contain LogicalType + // ! mapKeyValType.getOriginalType().equals(OriginalType.MAP_KEY_VALUE) + GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType(); + if (!mapKeyValType.isRepetition(Type.Repetition.REPEATED) + || mapKeyValType.getFieldCount() != 2) { + throw new UnsupportedOperationException( + "The middle level of Map should be single field named key_value. Invalid map type " + + parquetGroupType); + } + + Type keyType = mapKeyValType.getType(0); + + // The key field encodes the map's key type. This field must have repetition required and + // must always be present. + if (!keyType.isPrimitive() || !keyType.isRepetition(Type.Repetition.REQUIRED) + || !keyType.asPrimitiveType().getPrimitiveTypeName().equals( + PrimitiveType.PrimitiveTypeName.BINARY) + || !keyType.getOriginalType().equals(OriginalType.UTF8)) { + throw new IllegalArgumentException("Map key type must be required binary (UTF8): " + + keyType); + } + + Type valueType = mapKeyValType.getType(1); + return new MapTypeInfo<>(BasicTypeInfo.STRING_TYPE_INFO, + convertParquetTypeToTypeInfo(valueType)); + default: + throw new UnsupportedOperationException("Unsupported schema: " + fieldType); + } + } else { + // if no original type than it is a record + return convertFields(parquetGroupType.getFields()); + } + } + + return typeInfo; + } + + private static TypeInformation convertParquetPrimitiveListToFlinkArray(Type type) { + // Backward-compatibility element group doesn't exist also allowed + TypeInformation flinkType = convertParquetTypeToTypeInfo(type); + if (flinkType.isBasicType()) { + return BasicArrayTypeInfo.getInfoFor(Array.newInstance(flinkType.getTypeClass(), 0).getClass()); + } else { + // flinkType here can be either SqlTimeTypeInfo or BasicTypeInfo.BIG_DEC_TYPE_INFO, + // So it should be converted to ObjectArrayTypeInfo + return ObjectArrayTypeInfo.getInfoFor(flinkType); + } + } + + private static Type convertField(String fieldName, TypeInformation typeInfo, + Type.Repetition inheritRepetition, boolean legacyMode) { + Type fieldType = null; + + Type.Repetition repetition = inheritRepetition == null ? Type.Repetition.OPTIONAL : inheritRepetition; + if (typeInfo instanceof BasicTypeInfo) { + BasicTypeInfo basicTypeInfo = (BasicTypeInfo) typeInfo; + if (basicTypeInfo.equals(BasicTypeInfo.BIG_DEC_TYPE_INFO) + || basicTypeInfo.equals(BasicTypeInfo.BIG_INT_TYPE_INFO)) { + fieldType = Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition) + .as(OriginalType.DECIMAL).named(fieldName); + } else if (basicTypeInfo.equals(BasicTypeInfo.INT_TYPE_INFO)) { + fieldType = Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition) + .as(OriginalType.INT_32).named(fieldName); + } else if (basicTypeInfo.equals(BasicTypeInfo.DOUBLE_TYPE_INFO)) { + fieldType = Types.primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, repetition).named(fieldName); + } else if (basicTypeInfo.equals(BasicTypeInfo.FLOAT_TYPE_INFO)) { + fieldType = Types.primitive(PrimitiveType.PrimitiveTypeName.FLOAT, repetition).named(fieldName); + } else if (basicTypeInfo.equals(BasicTypeInfo.LONG_TYPE_INFO)) { + fieldType = Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition) + .as(OriginalType.INT_64).named(fieldName); + } else if (basicTypeInfo.equals(BasicTypeInfo.SHORT_TYPE_INFO)) { + fieldType = Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition) + .as(OriginalType.INT_16).named(fieldName); + } else if (basicTypeInfo.equals(BasicTypeInfo.BYTE_TYPE_INFO)) { + fieldType = Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition) + .as(OriginalType.INT_8).named(fieldName); + } else if (basicTypeInfo.equals(BasicTypeInfo.CHAR_TYPE_INFO)) { + fieldType = Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition) + .as(OriginalType.UTF8).named(fieldName); + } else if (basicTypeInfo.equals(BasicTypeInfo.BOOLEAN_TYPE_INFO)) { + fieldType = Types.primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN, repetition).named(fieldName); + } else if (basicTypeInfo.equals(BasicTypeInfo.DATE_TYPE_INFO) + || basicTypeInfo.equals(BasicTypeInfo.STRING_TYPE_INFO)) { + fieldType = Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition) + .as(OriginalType.UTF8) + .named(fieldName); + } + } else if (typeInfo instanceof MapTypeInfo) { + MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo; + + if (mapTypeInfo.getKeyTypeInfo().equals(BasicTypeInfo.STRING_TYPE_INFO)) { + fieldType = Types.map(repetition) + .value(convertField(MAP_VALUE, mapTypeInfo.getValueTypeInfo(), Type.Repetition.OPTIONAL, legacyMode)) + .named(fieldName); + } else { + throw new UnsupportedOperationException(String.format("Can not convert Flink MapTypeInfo %s to Parquet" + + " Map type as key has to be String", typeInfo.toString())); + } + } else if (typeInfo instanceof ObjectArrayTypeInfo) { + ObjectArrayTypeInfo objectArrayTypeInfo = (ObjectArrayTypeInfo) typeInfo; + + // Get all required sub fields + GroupType componentGroup = (GroupType) convertField(LIST_ELEMENT, objectArrayTypeInfo.getComponentInfo(), + Type.Repetition.REQUIRED, legacyMode); + + GroupType elementGroup = Types.repeatedGroup().named(LIST_ELEMENT); + elementGroup = elementGroup.withNewFields(componentGroup.getFields()); + fieldType = Types.buildGroup(repetition) + .addField(elementGroup) + .as(OriginalType.LIST) + .named(fieldName); + } else if (typeInfo instanceof BasicArrayTypeInfo) { + BasicArrayTypeInfo basicArrayType = (BasicArrayTypeInfo) typeInfo; + + if (legacyMode) { + + // Add extra layer of Group according to Parquet's standard + Type listGroup = Types.repeatedGroup().addField( + convertField(LIST_ELEMENT, basicArrayType.getComponentInfo(), + Type.Repetition.REQUIRED, legacyMode)).named(LIST_GROUP_NAME); + + fieldType = Types.buildGroup(repetition) + .addField(listGroup) + .as(OriginalType.LIST).named(fieldName); + } else { + PrimitiveType primitiveTyp = + convertField(fieldName, basicArrayType.getComponentInfo(), + Type.Repetition.REQUIRED, legacyMode).asPrimitiveType(); + fieldType = Types.buildGroup(repetition) + .repeated(primitiveTyp.getPrimitiveTypeName()) + .as(primitiveTyp.getOriginalType()) + .named(LIST_ARRAY_TYPE) + .as(OriginalType.LIST).named(fieldName); + } + } else if (typeInfo instanceof SqlTimeTypeInfo) { + if (typeInfo.equals(SqlTimeTypeInfo.DATE)) { + fieldType = Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition) + .as(OriginalType.DATE).named(fieldName); + } else if (typeInfo.equals(SqlTimeTypeInfo.TIME)) { + fieldType = Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition) + .as(OriginalType.TIME_MILLIS).named(fieldName); + } else if (typeInfo.equals(SqlTimeTypeInfo.TIMESTAMP)) { + fieldType = Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition) + .as(OriginalType.TIMESTAMP_MILLIS).named(fieldName); + } else { + throw new UnsupportedOperationException("Unsupported SqlTimeTypeInfo " + typeInfo.toString()); + } + + } else { + RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo; + List types = new ArrayList<>(); + String[] fieldNames = rowTypeInfo.getFieldNames(); + TypeInformation[] fieldTypes = rowTypeInfo.getFieldTypes(); + for (int i = 0; i < rowTypeInfo.getArity(); i++) { + types.add(convertField(fieldNames[i], fieldTypes[i], repetition, legacyMode)); + } + + if (fieldName == null) { + fieldType = new MessageType(MESSAGE_ROOT, types); + } else { + fieldType = new GroupType(repetition, fieldName, types); + } + } + + return fieldType; + } +} diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetTimestampUtils.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetTimestampUtils.java new file mode 100644 index 0000000000000..c58e28b2bdf1e --- /dev/null +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/ParquetTimestampUtils.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.parquet.io.api.Binary; + +import java.nio.ByteBuffer; +import java.util.concurrent.TimeUnit; + +/** + * Utility class for decoding INT96 encoded parquet timestamp to timestamp millis in GMT. + * This class is equivalent of @see org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime, + * which produces less intermediate objects during decoding. + */ +public final class ParquetTimestampUtils { + private static final int JULIAN_EPOCH_OFFSET_DAYS = 2_440_588; + private static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1); + private static final long NANOS_PER_MILLISECOND = TimeUnit.MILLISECONDS.toNanos(1); + + private ParquetTimestampUtils() {} + + /** + * Returns GMT timestamp from binary encoded parquet timestamp (12 bytes - julian date + time of day nanos). + * + * @param timestampBinary INT96 parquet timestamp + * @return timestamp in millis, GMT timezone + */ + public static long getTimestampMillis(Binary timestampBinary) { + if (timestampBinary.length() != 12) { + throw new IllegalArgumentException("Parquet timestamp must be 12 bytes, actual " + timestampBinary.length()); + } + byte[] bytes = timestampBinary.getBytes(); + + // little endian encoding - need to invert byte order + long timeOfDayNanos = ByteBuffer.wrap(new byte[] {bytes[7], bytes[6], bytes[5], bytes[4], + bytes[3], bytes[2], bytes[1], bytes[0]}).getLong(); + int julianDay = ByteBuffer.wrap(new byte[] {bytes[11], bytes[10], bytes[9], bytes[8]}).getInt(); + + return julianDayToMillis(julianDay) + (timeOfDayNanos / NANOS_PER_MILLISECOND); + } + + private static long julianDayToMillis(int julianDay) { + return (julianDay - JULIAN_EPOCH_OFFSET_DAYS) * MILLIS_IN_DAY; + } +} diff --git a/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java new file mode 100644 index 0000000000000..3452278a4ffc2 --- /dev/null +++ b/flink-formats/flink-parquet/src/main/java/org/apache/flink/formats/parquet/utils/RowConverter.java @@ -0,0 +1,398 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.formats.parquet.utils; + +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.MapTypeInfo; +import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; +import org.apache.flink.types.Row; + +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.Converter; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.io.api.PrimitiveConverter; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; + +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Extends from {@link GroupConverter} to convert an nested Parquet Record into Row. + */ +public class RowConverter extends GroupConverter implements ParentDataHolder { + private final Converter[] converters; + private final ParentDataHolder parentDataHolder; + private final TypeInformation typeInfo; + private Row currentRow; + private int posInParentRow; + + public RowConverter(MessageType messageType, TypeInformation typeInfo) { + this(messageType, typeInfo, null, 0); + } + + public RowConverter(GroupType schema, TypeInformation typeInfo, ParentDataHolder parent, int pos) { + this.typeInfo = typeInfo; + this.parentDataHolder = parent; + this.posInParentRow = pos; + this.converters = new Converter[schema.getFieldCount()]; + + int i = 0; + if (typeInfo.getArity() >= 1 && (typeInfo instanceof CompositeType)) { + for (Type field : schema.getFields()) { + converters[i] = createConverter(field, i, ((CompositeType) typeInfo).getTypeAt(i), this); + i++; + } + } + } + + private static Converter createConverter( + Type field, + int fieldPos, + TypeInformation typeInformation, + ParentDataHolder parentDataHolder) { + if (field.isPrimitive()) { + return new RowConverter.RowPrimitiveConverter(field, parentDataHolder, fieldPos); + } else if (typeInformation instanceof MapTypeInfo) { + return new RowConverter.MapConverter((GroupType) field, (MapTypeInfo) typeInformation, + parentDataHolder, fieldPos); + } else if (typeInformation instanceof BasicArrayTypeInfo) { + Type elementType = field.asGroupType().getFields().get(0); + Class typeClass = ((BasicArrayTypeInfo) typeInformation).getComponentInfo().getTypeClass(); + if (typeClass.equals(Character.class)) { + return new RowConverter.ArrayConverter(elementType, + Character.class, BasicTypeInfo.CHAR_TYPE_INFO, parentDataHolder, fieldPos); + } else if (typeClass.equals(Boolean.class)) { + return new RowConverter.ArrayConverter(elementType, + Boolean.class, BasicTypeInfo.BOOLEAN_TYPE_INFO, parentDataHolder, fieldPos); + } else if (typeClass.equals(Short.class)) { + return new RowConverter.ArrayConverter(elementType, + Short.class, BasicTypeInfo.SHORT_TYPE_INFO, parentDataHolder, fieldPos); + } else if (typeClass.equals(Integer.class)) { + return new RowConverter.ArrayConverter(elementType, + Integer.class, BasicTypeInfo.INSTANT_TYPE_INFO, parentDataHolder, fieldPos); + } else if (typeClass.equals(Long.class)) { + return new RowConverter.ArrayConverter(elementType, + Long.class, BasicTypeInfo.LONG_TYPE_INFO, parentDataHolder, fieldPos); + } else if (typeClass.equals(Double.class)) { + return new RowConverter.ArrayConverter(elementType, + Double.class, BasicTypeInfo.DOUBLE_TYPE_INFO, parentDataHolder, fieldPos); + } else if (typeClass.equals(String.class)) { + return new RowConverter.ArrayConverter(elementType, + String.class, BasicTypeInfo.STRING_TYPE_INFO, parentDataHolder, fieldPos); + } else if (typeClass.equals(Date.class)) { + return new RowConverter.ArrayConverter(elementType, + Date.class, SqlTimeTypeInfo.DATE, parentDataHolder, fieldPos); + } else if (typeClass.equals(Time.class)) { + return new RowConverter.ArrayConverter