Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
lokeshj1703 committed Nov 21, 2023
1 parent 2e26a7d commit 9fdc1d6
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -244,51 +244,6 @@ object AvroConversionUtils {
(nameParts.last, nameParts.init.mkString("."))
}

def areSchemasEquivalent(thisSchema: Schema, thatSchema: Schema): Boolean = {
if (thisSchema.getType != thatSchema.getType) {
return false
}
val thisSchemaResolved = handleUnion(thisSchema)
val thatSchemaResolved = handleUnion(thatSchema)
if (thisSchemaResolved.getType != thatSchemaResolved.getType) {
return false
}
if (thisSchemaResolved.getLogicalType != null) {
if (thatSchemaResolved.getLogicalType == null || thisSchemaResolved.getLogicalType.getName != thatSchemaResolved.getLogicalType.getName) {
return false
}
}
if (thisSchemaResolved.getType == Type.RECORD) {
if (thisSchemaResolved.getFields.length != thatSchemaResolved.getFields.length) {
return false
}
thisSchemaResolved.getFields.zip(thatSchemaResolved.getFields).forall { case (thisField, thatField) =>
getFieldsAreEquivalent(thisField, thatField)
}
} else {
true
}
}

private def getFieldsAreEquivalent(thisField: Schema.Field, thatField: Schema.Field): Boolean = {
val thisFieldSchema = handleUnion(thisField.schema)
val thatFieldSchema = handleUnion(thatField.schema)
if (thisFieldSchema.getType != thatFieldSchema.getType) {
return false
}
if (thisField.name != thatField.name) {
return false
}
if (thisFieldSchema.getType == Type.RECORD) {
return areSchemasEquivalent(thisField.schema, thatField.schema)
} else if (thisFieldSchema.getType == Type.ARRAY) {
return areSchemasEquivalent(thisFieldSchema.getElementType, thatFieldSchema.getElementType)
} else if (thisFieldSchema.getType == Type.MAP) {
return areSchemasEquivalent(thisFieldSchema.getValueType, thatFieldSchema.getValueType)
}
true
}

private def handleUnion(schema: Schema): Schema = {
if (schema.getType == Type.UNION) {
val index = if (schema.getTypes.get(0).getType == Schema.Type.NULL) 1 else 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public class FSUtils {
private static final int MAX_ATTEMPTS_RECOVER_LEASE = 10;
private static final String HOODIE_ENV_PROPS_PREFIX = "HOODIE_ENV_";

private static final String LOG_FILE_EXTENSION = ".log";

private static final PathFilter ALLOW_ALL_FILTER = file -> true;

public static Configuration prepareHadoopConf(Configuration conf) {
Expand Down Expand Up @@ -474,7 +476,7 @@ public static boolean isLogFile(Path logPath) {
}

public static boolean isLogFile(String fileName) {
if (fileName.contains(".log")) {
if (fileName.contains(LOG_FILE_EXTENSION)) {
Matcher matcher = LOG_FILE_PATTERN.matcher(fileName);
return matcher.find();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,30 +408,6 @@ class TestAvroConversionUtils extends FunSuite with Matchers {
internalRowCompare(row1, row2, sparkSchema)
}

test("test fields are equivalent") {
val baseAvroSchema = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"h0_record\",\"namespace\":\"hoodie.h0\",\"fields\""
+ ":[{\"name\":\"col9\",\"type\":[\"null\",\"bytes\"],\"default\":null}]}")
val schemaWithDifferentName = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"new_record\",\"namespace\":\"hoodie.h0\",\"fields\""
+ ":[{\"name\":\"col9\",\"type\":[\"null\",\"bytes\"],\"default\":null}]}")
val schemaWithDifferentFieldName = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"new_record\",\"namespace\":\"hoodie.h0\",\"fields\""
+ ":[{\"name\":\"col10\",\"type\":[\"null\",\"bytes\"],\"default\":null}]}")
val schemaWithDifferentFieldType = new Schema.Parser().parse("{\"type\":\"record\",\"name\":\"new_record\",\"namespace\":\"hoodie.h0\",\"fields\""
+ ":[{\"name\":\"col9\",\"type\":[\"null\",\"string\"],\"default\":null}]}")
assert(AvroConversionUtils.areSchemasEquivalent(baseAvroSchema, schemaWithDifferentName))
assert(!AvroConversionUtils.areSchemasEquivalent(baseAvroSchema, schemaWithDifferentFieldName))
assert(!AvroConversionUtils.areSchemasEquivalent(baseAvroSchema, schemaWithDifferentFieldType))

val complexAvroSchema = new Schema.Parser().parse(complexSchemaStr)
val complexSchemaWithDifferentNamespace = new Schema.Parser().parse(complexSchemaStr.replace("SchemaNS.SchemaName", "newNamspace"))
val complexSchemaWithDifferentMapFieldName = new Schema.Parser().parse(complexSchemaStr.replace("mapVal", "newVal"))
val complexSchemaWithDifferentListFieldName = new Schema.Parser().parse(complexSchemaStr.replace("arrayVal", "newArrayVal"))
val complexSchemaWithDifferentStructFieldName = new Schema.Parser().parse(complexSchemaStr.replace("innerKey", "newInnerKey"))
assert(AvroConversionUtils.areSchemasEquivalent(complexAvroSchema, complexSchemaWithDifferentNamespace))
assert(!AvroConversionUtils.areSchemasEquivalent(complexAvroSchema, complexSchemaWithDifferentMapFieldName))
assert(!AvroConversionUtils.areSchemasEquivalent(complexAvroSchema, complexSchemaWithDifferentListFieldName))
assert(!AvroConversionUtils.areSchemasEquivalent(complexAvroSchema, complexSchemaWithDifferentStructFieldName))
}

private def internalRowCompare(expected: Any, actual: Any, schema: DataType): Unit = {
schema match {
case StructType(fields) =>
Expand Down

0 comments on commit 9fdc1d6

Please sign in to comment.