diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala index 61241b81a06c3..d84679eaf923a 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/AvroConversionUtils.scala @@ -244,51 +244,6 @@ object AvroConversionUtils { (nameParts.last, nameParts.init.mkString(".")) } - def areSchemasEquivalent(thisSchema: Schema, thatSchema: Schema): Boolean = { - if (thisSchema.getType != thatSchema.getType) { - return false - } - val thisSchemaResolved = handleUnion(thisSchema) - val thatSchemaResolved = handleUnion(thatSchema) - if (thisSchemaResolved.getType != thatSchemaResolved.getType) { - return false - } - if (thisSchemaResolved.getLogicalType != null) { - if (thatSchemaResolved.getLogicalType == null || thisSchemaResolved.getLogicalType.getName != thatSchemaResolved.getLogicalType.getName) { - return false - } - } - if (thisSchemaResolved.getType == Type.RECORD) { - if (thisSchemaResolved.getFields.length != thatSchemaResolved.getFields.length) { - return false - } - thisSchemaResolved.getFields.zip(thatSchemaResolved.getFields).forall { case (thisField, thatField) => - getFieldsAreEquivalent(thisField, thatField) - } - } else { - true - } - } - - private def getFieldsAreEquivalent(thisField: Schema.Field, thatField: Schema.Field): Boolean = { - val thisFieldSchema = handleUnion(thisField.schema) - val thatFieldSchema = handleUnion(thatField.schema) - if (thisFieldSchema.getType != thatFieldSchema.getType) { - return false - } - if (thisField.name != thatField.name) { - return false - } - if (thisFieldSchema.getType == Type.RECORD) { - return areSchemasEquivalent(thisField.schema, thatField.schema) - } else if (thisFieldSchema.getType == Type.ARRAY) { - return areSchemasEquivalent(thisFieldSchema.getElementType, thatFieldSchema.getElementType) - } else if (thisFieldSchema.getType == Type.MAP) { - return areSchemasEquivalent(thisFieldSchema.getValueType, thatFieldSchema.getValueType) - } - true - } - private def handleUnion(schema: Schema): Schema = { if (schema.getType == Type.UNION) { val index = if (schema.getTypes.get(0).getType == Schema.Type.NULL) 1 else 0 diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index f5e294c382fb1..770c811a2e8e4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -87,6 +87,8 @@ public class FSUtils { private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10; private static final String HOODIE_ENV_PROPS_PREFIX = "HOODIE_ENV_"; + private static final String LOG_FILE_EXTENSION = ".log"; + private static final PathFilter ALLOW_ALL_FILTER = file -> true; public static Configuration prepareHadoopConf(Configuration conf) { @@ -474,7 +476,7 @@ public static boolean isLogFile(Path logPath) { } public static boolean isLogFile(String fileName) { - if (fileName.contains(".log")) { + if (fileName.contains(LOG_FILE_EXTENSION)) { Matcher matcher = LOG_FILE_PATTERN.matcher(fileName); return matcher.find(); } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala index 01efa5cbb38a4..5cd6ac3954eed 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/TestAvroConversionUtils.scala @@ -408,30 +408,6 @@ class TestAvroConversionUtils extends FunSuite with Matchers { internalRowCompare(row1, row2, sparkSchema) } - test("test fields are equivalent") { - val baseAvroSchema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"h0_record\",\"namespace\":\"hoodie.h0\",\"fields\"" - + ":[{\"name\":\"col9\",\"type\":[\"null\",\"bytes\"],\"default\":null}]}") - val schemaWithDifferentName = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"new_record\",\"namespace\":\"hoodie.h0\",\"fields\"" - + ":[{\"name\":\"col9\",\"type\":[\"null\",\"bytes\"],\"default\":null}]}") - val schemaWithDifferentFieldName = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"new_record\",\"namespace\":\"hoodie.h0\",\"fields\"" - + ":[{\"name\":\"col10\",\"type\":[\"null\",\"bytes\"],\"default\":null}]}") - val schemaWithDifferentFieldType = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"new_record\",\"namespace\":\"hoodie.h0\",\"fields\"" - + ":[{\"name\":\"col9\",\"type\":[\"null\",\"string\"],\"default\":null}]}") - assert(AvroConversionUtils.areSchemasEquivalent(baseAvroSchema, schemaWithDifferentName)) - assert(!AvroConversionUtils.areSchemasEquivalent(baseAvroSchema, schemaWithDifferentFieldName)) - assert(!AvroConversionUtils.areSchemasEquivalent(baseAvroSchema, schemaWithDifferentFieldType)) - - val complexAvroSchema = new Schema.Parser().parse(complexSchemaStr) - val complexSchemaWithDifferentNamespace = new Schema.Parser().parse(complexSchemaStr.replace("SchemaNS.SchemaName", "newNamspace")) - val complexSchemaWithDifferentMapFieldName = new Schema.Parser().parse(complexSchemaStr.replace("mapVal", "newVal")) - val complexSchemaWithDifferentListFieldName = new Schema.Parser().parse(complexSchemaStr.replace("arrayVal", "newArrayVal")) - val complexSchemaWithDifferentStructFieldName = new Schema.Parser().parse(complexSchemaStr.replace("innerKey", "newInnerKey")) - assert(AvroConversionUtils.areSchemasEquivalent(complexAvroSchema, complexSchemaWithDifferentNamespace)) - assert(!AvroConversionUtils.areSchemasEquivalent(complexAvroSchema, complexSchemaWithDifferentMapFieldName)) - assert(!AvroConversionUtils.areSchemasEquivalent(complexAvroSchema, complexSchemaWithDifferentListFieldName)) - assert(!AvroConversionUtils.areSchemasEquivalent(complexAvroSchema, complexSchemaWithDifferentStructFieldName)) - } - private def internalRowCompare(expected: Any, actual: Any, schema: DataType): Unit = { schema match { case StructType(fields) =>