Skip to content

Commit

Permalink
[modify]解决JobManager由于FlinkHudiTable未定义字段导致的异常
Browse files Browse the repository at this point in the history
  • Loading branch information
aib628 committed Nov 29, 2023
1 parent 39b32ea commit 35980b6
Showing 1 changed file with 20 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,24 @@

package org.apache.hudi.table.format.mor;

import org.apache.hudi.common.model.HoodieRecord;

import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.exception.HoodieException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Statistics for merge on read table source.
*/
public class MergeOnReadTableState implements Serializable {

private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(MergeOnReadTableState.class);

private final RowType rowType;
private final RowType requiredRowType;
Expand All @@ -56,6 +59,8 @@ public MergeOnReadTableState(
this.inputSplits = inputSplits;
this.pkFields = pkFields;
this.operationPos = rowType.getFieldIndex(HoodieRecord.OPERATION_METADATA_FIELD);

checkRequiredFieldValidation();
}

public RowType getRowType() {
Expand Down Expand Up @@ -114,4 +119,14 @@ public LogicalType[] getPkTypes(int[] pkOffsets) {
return Arrays.stream(pkOffsets).mapToObj(offset -> requiredTypes[offset])
.toArray(LogicalType[]::new);
}

private void checkRequiredFieldValidation() {
final List<String> fieldNames = rowType.getFieldNames();
List<String> invalidFieldNames = requiredRowType.getFieldNames().stream().filter(requiredFieldName -> !fieldNames.contains(requiredFieldName)).collect(Collectors.toList());

if (!invalidFieldNames.isEmpty()) {
LOG.warn("Unknown required fields: {}, all known are: {}", String.join(",", invalidFieldNames), String.join(",", fieldNames));
throw new HoodieException("Unknown required fields: " + String.join(",", invalidFieldNames));
}
}
}

0 comments on commit 35980b6

Please sign in to comment.