Skip to content

Commit

Permalink
[Improve] Improve read with parquet type convert error (apache#6683)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored Apr 12, 2024
1 parent 4f4fd7b commit 6c65805
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.source.reader;

import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
Expand All @@ -28,6 +29,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
Expand Down Expand Up @@ -67,6 +69,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

@Slf4j
public class ParquetReadStrategy extends AbstractReadStrategy {
Expand All @@ -75,6 +78,7 @@ public class ParquetReadStrategy extends AbstractReadStrategy {
private static final long NANOS_PER_MILLISECOND = 1000000;
private static final long MILLIS_PER_DAY = TimeUnit.DAYS.toMillis(1L);
private static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
private static final String PARQUET = "Parquet";

private int[] indexes;

Expand Down Expand Up @@ -234,6 +238,12 @@ private Object resolveObject(Object field, SeaTunnelDataType<?> fieldType) {

@Override
public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws FileConnectorException {
return getSeaTunnelRowTypeInfo(TablePath.DEFAULT, path);
}

@Override
public SeaTunnelRowType getSeaTunnelRowTypeInfo(TablePath tablePath, String path)
throws FileConnectorException {
ParquetMetadata metadata;
try (ParquetFileReader reader =
hadoopFileSystemProxy.doWithHadoopAuth(
Expand All @@ -259,19 +269,22 @@ public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws FileConnecto
String[] fields = new String[readColumns.size()];
SeaTunnelDataType<?>[] types = new SeaTunnelDataType[readColumns.size()];
indexes = new int[readColumns.size()];
for (int i = 0; i < readColumns.size(); i++) {
fields[i] = readColumns.get(i);
Type type = originalSchema.getType(fields[i]);
int fieldIndex = originalSchema.getFieldIndex(fields[i]);
indexes[i] = fieldIndex;
types[i] = parquetType2SeaTunnelType(type);
}
buildColumnsWithErrorCheck(
tablePath,
IntStream.range(0, readColumns.size()).iterator(),
i -> {
fields[i] = readColumns.get(i);
Type type = originalSchema.getType(fields[i]);
int fieldIndex = originalSchema.getFieldIndex(fields[i]);
indexes[i] = fieldIndex;
types[i] = parquetType2SeaTunnelType(type, fields[i]);
});
seaTunnelRowType = new SeaTunnelRowType(fields, types);
seaTunnelRowTypeWithPartition = mergePartitionTypes(path, seaTunnelRowType);
return getActualSeaTunnelRowTypeInfo();
}

private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type) {
private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type, String name) {
if (type.isPrimitive()) {
switch (type.asPrimitiveType().getPrimitiveTypeName()) {
case INT32:
Expand All @@ -287,9 +300,8 @@ private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type) {
case DATE:
return LocalTimeType.LOCAL_DATE_TYPE;
default:
String errorMsg = String.format("Not support this type [%s]", type);
throw new FileConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg);
throw CommonError.convertToSeaTunnelTypeError(
PARQUET, type.toString(), name);
}
case INT64:
if (type.asPrimitiveType().getOriginalType() == OriginalType.TIMESTAMP_MILLIS) {
Expand Down Expand Up @@ -324,9 +336,7 @@ private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type) {
int scale = Integer.parseInt(splits[1]);
return new DecimalType(precision, scale);
default:
String errorMsg = String.format("Not support this type [%s]", type);
throw new FileConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg);
throw CommonError.convertToSeaTunnelTypeError("Parquet", type.toString(), name);
}
} else {
LogicalTypeAnnotation logicalTypeAnnotation =
Expand All @@ -339,7 +349,7 @@ private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type) {
for (int i = 0; i < fields.size(); i++) {
Type fieldType = fields.get(i);
SeaTunnelDataType<?> seaTunnelDataType =
parquetType2SeaTunnelType(fields.get(i));
parquetType2SeaTunnelType(fields.get(i), name);
fieldNames[i] = fieldType.getName();
seaTunnelDataTypes[i] = seaTunnelDataType;
}
Expand All @@ -349,9 +359,9 @@ private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type) {
case MAP:
GroupType groupType = type.asGroupType().getType(0).asGroupType();
SeaTunnelDataType<?> keyType =
parquetType2SeaTunnelType(groupType.getType(0));
parquetType2SeaTunnelType(groupType.getType(0), name);
SeaTunnelDataType<?> valueType =
parquetType2SeaTunnelType(groupType.getType(1));
parquetType2SeaTunnelType(groupType.getType(1), name);
return new MapType<>(keyType, valueType);
case LIST:
Type elementType;
Expand All @@ -360,7 +370,8 @@ private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type) {
} catch (Exception e) {
elementType = type.asGroupType().getType(0);
}
SeaTunnelDataType<?> fieldType = parquetType2SeaTunnelType(elementType);
SeaTunnelDataType<?> fieldType =
parquetType2SeaTunnelType(elementType, name);
switch (fieldType.getSqlType()) {
case STRING:
return ArrayType.STRING_ARRAY_TYPE;
Expand All @@ -379,17 +390,12 @@ private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type) {
case DOUBLE:
return ArrayType.DOUBLE_ARRAY_TYPE;
default:
String errorMsg =
String.format(
"SeaTunnel array type not supported this genericType [%s] yet",
fieldType);
throw new FileConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE, errorMsg);
throw CommonError.convertToSeaTunnelTypeError(
PARQUET, type.toString(), name);
}
default:
throw new FileConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE,
"SeaTunnel file connector not support this nest type");
throw CommonError.convertToSeaTunnelTypeError(
PARQUET, type.toString(), name);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,23 @@
import org.apache.seatunnel.shade.com.typesafe.config.Config;

import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;

import java.io.Closeable;
import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

public interface ReadStrategy extends Serializable, Closeable {
void init(HadoopConf conf);
Expand All @@ -38,6 +46,11 @@ void read(String path, String tableId, Collector<SeaTunnelRow> output)

SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws FileConnectorException;

default SeaTunnelRowType getSeaTunnelRowTypeInfo(TablePath tablePath, String path)
throws FileConnectorException {
return getSeaTunnelRowTypeInfo(path);
}

default SeaTunnelRowType getSeaTunnelRowTypeInfoWithUserConfigRowType(
String path, SeaTunnelRowType rowType) throws FileConnectorException {
return getSeaTunnelRowTypeInfo(path);
Expand All @@ -53,4 +66,27 @@ default SeaTunnelRowType getSeaTunnelRowTypeInfoWithUserConfigRowType(

// todo: use CatalogTable
SeaTunnelRowType getActualSeaTunnelRowTypeInfo();

default <T> void buildColumnsWithErrorCheck(
TablePath tablePath, Iterator<T> keys, Consumer<T> getDataType) {
Map<String, String> unsupported = new LinkedHashMap<>();
while (keys.hasNext()) {
try {
getDataType.accept(keys.next());
} catch (SeaTunnelRuntimeException e) {
if (e.getSeaTunnelErrorCode()
.equals(CommonErrorCode.CONVERT_TO_SEATUNNEL_TYPE_ERROR_SIMPLE)) {
unsupported.put(e.getParams().get("field"), e.getParams().get("dataType"));
} else {
throw e;
}
}
}
if (!unsupported.isEmpty()) {
throw CommonError.getCatalogTableWithUnsupportedType(
this.getClass().getSimpleName().replace("ReadStrategy", ""),
tablePath.getFullName(),
unsupported);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;

Expand Down Expand Up @@ -189,6 +190,27 @@ public void testParquetReadArray() throws Exception {
AutoGenerateParquetData.deleteFile();
}

@DisabledOnOs(OS.WINDOWS)
@Test
public void testParquetReadUnsupportedType() throws Exception {
AutoGenerateParquetDataWithUnsupportedType.generateTestData();
ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
parquetReadStrategy.init(localConf);
SeaTunnelRuntimeException exception =
Assertions.assertThrows(
SeaTunnelRuntimeException.class,
() ->
parquetReadStrategy.getSeaTunnelRowTypeInfo(
AutoGenerateParquetDataWithUnsupportedType.DATA_FILE_PATH));
Assertions.assertEquals(
"ErrorCode:[COMMON-20], ErrorDescription:['Parquet' table 'default.default.default' unsupported get catalog table with field data types"
+ " '{\"id\":\"required group id (LIST) {\\n repeated group array (LIST) {\\n repeated binary array;\\n }\\n}\",\"id2\":\"required group id2 (LIST) {\\n repeated group array (LIST)"
+ " {\\n repeated binary array;\\n }\\n}\"}']",
exception.getMessage());
AutoGenerateParquetData.deleteFile();
}

public static class TestCollector implements Collector<SeaTunnelRow> {

private final List<SeaTunnelRow> rows = new ArrayList<>();
Expand Down Expand Up @@ -281,4 +303,45 @@ public static void deleteFile() {
}
}
}

public static class AutoGenerateParquetDataWithUnsupportedType {

public static final String DATA_FILE_PATH = "/tmp/data_unsupported.parquet";

public static void generateTestData() throws IOException {
deleteFile();
String schemaString =
"{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":{\"type\": \"array\", \"items\": {\"type\": \"array\", \"items\": \"bytes\"}}},{\"name\":\"id2\",\"type\":{\"type\": \"array\", \"items\": {\"type\": \"array\", \"items\": \"bytes\"}}},{\"name\":\"long\",\"type\":\"long\"}]}";
Schema schema = new Schema.Parser().parse(schemaString);

Configuration conf = new Configuration();

Path file = new Path(DATA_FILE_PATH);

ParquetWriter<GenericRecord> writer =
AvroParquetWriter.<GenericRecord>builder(file)
.withSchema(schema)
.withConf(conf)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.build();

GenericRecord record1 = new GenericData.Record(schema);
GenericArray<GenericData.Array<Utf8>> id =
new GenericData.Array<>(2, schema.getField("id").schema());
id.add(new GenericData.Array<>(2, schema.getField("id").schema().getElementType()));
id.add(new GenericData.Array<>(2, schema.getField("id").schema().getElementType()));
record1.put("id", id);
record1.put("id2", id);
record1.put("long", Long.MAX_VALUE);
writer.write(record1);
writer.close();
}

public static void deleteFile() {
File parquetFile = new File(DATA_FILE_PATH);
if (parquetFile.exists()) {
parquetFile.delete();
}
}
}
}

0 comments on commit 6c65805

Please sign in to comment.