Skip to content

Commit

Permalink
[Improve] Add common error for transform (apache#5815)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hisoka-X authored Nov 11, 2023
1 parent 64f19f2 commit f69f773
Show file tree
Hide file tree
Showing 12 changed files with 415 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
import org.apache.seatunnel.transform.exception.TransformCommonError;

import java.lang.reflect.Array;
import java.util.ArrayList;
Expand Down Expand Up @@ -63,10 +65,11 @@ private void initOutputFields(
List<SeaTunnelDataType<?>> fieldsType = new ArrayList<>();
for (Map.Entry<String, String> field : fields.entrySet()) {
String srcField = field.getValue();
int srcFieldIndex = inputRowType.indexOf(srcField);
if (srcFieldIndex == -1) {
throw new IllegalArgumentException(
"Cannot find [" + srcField + "] field in input row type");
int srcFieldIndex;
try {
srcFieldIndex = inputRowType.indexOf(srcField);
} catch (IllegalArgumentException e) {
throw TransformCommonError.cannotFindInputFieldError(getPluginName(), srcField);
}
fieldNames.add(field.getKey());
fieldOriginalIndexes.add(srcFieldIndex);
Expand Down Expand Up @@ -113,12 +116,15 @@ protected Object[] getOutputFieldValues(SeaTunnelRowAccessor inputRow) {
Object[] fieldValues = new Object[fieldNames.size()];
for (int i = 0; i < fieldOriginalIndexes.size(); i++) {
fieldValues[i] =
clone(fieldTypes.get(i), inputRow.getField(fieldOriginalIndexes.get(i)));
clone(
fieldNames.get(i),
fieldTypes.get(i),
inputRow.getField(fieldOriginalIndexes.get(i)));
}
return fieldValues;
}

private Object clone(SeaTunnelDataType<?> dataType, Object value) {
private Object clone(String field, SeaTunnelDataType<?> dataType, Object value) {
if (value == null) {
return null;
}
Expand Down Expand Up @@ -147,7 +153,7 @@ private Object clone(SeaTunnelDataType<?> dataType, Object value) {
Object newArray =
Array.newInstance(arrayType.getElementType().getTypeClass(), array.length);
for (int i = 0; i < array.length; i++) {
Array.set(newArray, i, clone(arrayType.getElementType(), array[i]));
Array.set(newArray, i, clone(field, arrayType.getElementType(), array[i]));
}
return newArray;
case MAP:
Expand All @@ -156,8 +162,8 @@ private Object clone(SeaTunnelDataType<?> dataType, Object value) {
Map<Object, Object> newMap = new HashMap<>();
for (Object key : map.keySet()) {
newMap.put(
clone(mapType.getKeyType(), key),
clone(mapType.getValueType(), map.get(key)));
clone(field, mapType.getKeyType(), key),
clone(field, mapType.getValueType(), map.get(key)));
}
return newMap;
case ROW:
Expand All @@ -166,7 +172,11 @@ private Object clone(SeaTunnelDataType<?> dataType, Object value) {

Object[] newFields = new Object[rowType.getTotalFields()];
for (int i = 0; i < rowType.getTotalFields(); i++) {
newFields[i] = clone(rowType.getFieldType(i), row.getField(i));
newFields[i] =
clone(
rowType.getFieldName(i),
rowType.getFieldType(i),
row.getField(i));
}
SeaTunnelRow newRow = new SeaTunnelRow(newFields);
newRow.setRowKind(row.getRowKind());
Expand All @@ -175,8 +185,8 @@ private Object clone(SeaTunnelDataType<?> dataType, Object value) {
case NULL:
return null;
default:
throw new UnsupportedOperationException(
"Unsupported type: " + dataType.getSqlType());
throw CommonError.unsupportedDataType(
getPluginName(), dataType.getSqlType().toString(), field);
}
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.transform.exception;

import org.apache.seatunnel.common.exception.CommonError;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELDS_NOT_FOUND;
import static org.apache.seatunnel.transform.exception.TransformCommonErrorCode.INPUT_FIELD_NOT_FOUND;

/** The common error of SeaTunnel transform. Please refer {@link CommonError} */
public class TransformCommonError {

public static TransformException cannotFindInputFieldError(String transform, String field) {
Map<String, String> params = new HashMap<>();
params.put("field", field);
params.put("transform", transform);
return new TransformException(INPUT_FIELD_NOT_FOUND, params);
}

public static TransformException cannotFindInputFieldsError(
String transform, List<String> fields) {
Map<String, String> params = new HashMap<>();
params.put("fields", String.join(",", fields));
params.put("transform", transform);
return new TransformException(INPUT_FIELDS_NOT_FOUND, params);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@

import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;

public enum FieldMapperTransformErrorCode implements SeaTunnelErrorCode {
enum TransformCommonErrorCode implements SeaTunnelErrorCode {
INPUT_FIELD_NOT_FOUND(
"FIELD_MAPPER_TRANSFORM-01", "field mapper input field not found in inputRowType");
"TRANSFORM_COMMON-01",
"The input field '<field>' of '<transform>' transform not found in upstream schema"),
INPUT_FIELDS_NOT_FOUND(
"TRANSFORM_COMMON-02",
"The input fields '<fields>' of '<transform>' transform not found in upstream schema");

private final String code;
private final String description;

FieldMapperTransformErrorCode(String code, String description) {
TransformCommonErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,14 @@
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;

import java.util.Map;

public class TransformException extends SeaTunnelRuntimeException {
public TransformException(SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage) {
super(seaTunnelErrorCode, errorMessage);
}

public TransformException(
SeaTunnelErrorCode seaTunnelErrorCode, String errorMessage, Throwable cause) {
super(seaTunnelErrorCode, errorMessage, cause);
}

public TransformException(SeaTunnelErrorCode seaTunnelErrorCode, Throwable cause) {
super(seaTunnelErrorCode, cause);
TransformException(SeaTunnelErrorCode seaTunnelErrorCode, Map<String, String> params) {
super(seaTunnelErrorCode, params);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;
import org.apache.seatunnel.transform.exception.FieldMapperTransformErrorCode;
import org.apache.seatunnel.transform.exception.FieldMapperTransformException;
import org.apache.seatunnel.transform.exception.TransformException;
import org.apache.seatunnel.transform.exception.TransformCommonError;

import org.apache.commons.collections4.CollectionUtils;

Expand All @@ -56,11 +54,18 @@ public FieldMapperTransform(
SeaTunnelRowType seaTunnelRowType = catalogTable.getTableSchema().toPhysicalRowDataType();
List<String> notFoundField =
fieldMapper.keySet().stream()
.filter(field -> seaTunnelRowType.indexOf(field) == -1)
.filter(
field -> {
try {
seaTunnelRowType.indexOf(field);
return false;
} catch (Exception e) {
return true;
}
})
.collect(Collectors.toList());
if (!CollectionUtils.isEmpty(notFoundField)) {
throw new TransformException(
FieldMapperTransformErrorCode.INPUT_FIELD_NOT_FOUND, notFoundField.toString());
throw TransformCommonError.cannotFindInputFieldsError(getPluginName(), notFoundField);
}
}

Expand Down Expand Up @@ -97,9 +102,7 @@ protected TableSchema transformTableSchema() {
(key, value) -> {
int fieldIndex = inputFieldNames.indexOf(key);
if (fieldIndex < 0) {
throw new FieldMapperTransformException(
FieldMapperTransformErrorCode.INPUT_FIELD_NOT_FOUND,
"Can not found field " + key + " from inputRowType");
throw TransformCommonError.cannotFindInputFieldError(getPluginName(), key);
}
Column oldColumn = inputColumns.get(fieldIndex);
PhysicalColumn outputColumn =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform;
import org.apache.seatunnel.transform.exception.FilterFieldTransformErrorCode;
import org.apache.seatunnel.transform.exception.TransformException;
import org.apache.seatunnel.transform.exception.TransformCommonError;

import org.apache.commons.collections4.CollectionUtils;

Expand All @@ -53,13 +52,20 @@ public FilterFieldTransform(
fields = config.get(FilterFieldTransformConfig.KEY_FIELDS).toArray(new String[0]);
List<String> canNotFoundFields =
Arrays.stream(fields)
.filter(field -> seaTunnelRowType.indexOf(field) == -1)
.filter(
field -> {
try {
seaTunnelRowType.indexOf(field);
return false;
} catch (Exception e) {
return true;
}
})
.collect(Collectors.toList());

if (!CollectionUtils.isEmpty(canNotFoundFields)) {
throw new TransformException(
FilterFieldTransformErrorCode.FILTER_FIELD_NOT_FOUND,
canNotFoundFields.toString());
throw TransformCommonError.cannotFindInputFieldsError(
getPluginName(), canNotFoundFields);
}
}

Expand Down Expand Up @@ -92,8 +98,7 @@ protected TableSchema transformTableSchema() {
String field = filterFields.get(i);
int inputFieldIndex = seaTunnelRowType.indexOf(field);
if (inputFieldIndex == -1) {
throw new IllegalArgumentException(
"Cannot find [" + field + "] field in input row type");
throw TransformCommonError.cannotFindInputFieldError(getPluginName(), field);
}
inputValueIndex[i] = inputFieldIndex;
outputColumns.add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.format.json.JsonToRowConverters;
import org.apache.seatunnel.transform.common.MultipleFieldOutputTransform;
import org.apache.seatunnel.transform.common.SeaTunnelRowAccessor;
import org.apache.seatunnel.transform.exception.TransformCommonError;
import org.apache.seatunnel.transform.exception.TransformException;

import com.jayway.jsonpath.JsonPath;
Expand All @@ -42,7 +44,6 @@
import java.util.concurrent.ConcurrentHashMap;

import static org.apache.seatunnel.transform.exception.JsonPathTransformErrorCode.JSON_PATH_COMPILE_ERROR;
import static org.apache.seatunnel.transform.exception.JsonPathTransformErrorCode.SRC_FIELD_NOT_FOUND;

@Slf4j
public class JsonPathTransform extends MultipleFieldOutputTransform {
Expand Down Expand Up @@ -108,10 +109,7 @@ private void initSrcFieldIndexArr() {
ColumnConfig columnConfig = columnConfigs.get(i);
String srcField = columnConfig.getSrcField();
if (!fieldNameSet.contains(srcField)) {
throw new TransformException(
SRC_FIELD_NOT_FOUND,
String.format(
"JsonPathTransform config not found src_field:[%s]", srcField));
throw TransformCommonError.cannotFindInputFieldError(getPluginName(), srcField);
}
this.srcFieldIndexArr[i] = seaTunnelRowType.indexOf(srcField);
}
Expand Down Expand Up @@ -161,9 +159,10 @@ private Object doTransform(
jsonString = JsonUtils.toJsonString(row.getFields());
break;
default:
throw new UnsupportedOperationException(
"JsonPathTransform unsupported sourceDataType: "
+ inputDataType.getSqlType());
throw CommonError.unsupportedDataType(
getPluginName(),
inputDataType.getSqlType().toString(),
columnConfig.getSrcField());
}
Object result = JSON_PATH_CACHE.get(columnConfig.getPath()).read(jsonString);
JsonNode jsonNode = JsonUtils.toJsonNode(result);
Expand Down
Loading

0 comments on commit f69f773

Please sign in to comment.