Skip to content

Commit

Permalink
Fix schema refresh for KafkaAvroSchemaDeserializer (apache#10118)
Browse files Browse the repository at this point in the history
Co-authored-by: rmahindra123 <rmahindra@Rajeshs-MacBook-Pro.local>
  • Loading branch information
rmahindra123 and rmahindra123 authored Nov 20, 2023
1 parent 979132b commit 84990ae
Showing 1 changed file with 22 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hudi.utilities.sources;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer;
import org.apache.hudi.utilities.exception.HoodieReadFromSourceException;
Expand Down Expand Up @@ -78,18 +79,25 @@ public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, Spa

try {
props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName).getName());
if (deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
if (schemaProvider == null) {
throw new HoodieReadFromSourceException("SchemaProvider has to be set to use KafkaAvroSchemaDeserializer");
}
props.put(KAFKA_VALUE_DESERIALIZER_SCHEMA.key(), schemaProvider.getSourceSchema().toString());
}
} catch (ClassNotFoundException e) {
String error = "Could not load custom avro kafka deserializer: " + deserializerClassName;
LOG.error(error);
throw new HoodieReadFromSourceException(error, e);
}
this.offsetGen = new KafkaOffsetGen(props);

if (deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
configureSchemaDeserializer();
}
offsetGen = new KafkaOffsetGen(props);
}

@Override
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String> lastCheckpointStr, long sourceLimit) {
if (deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) {
configureSchemaDeserializer();
offsetGen = new KafkaOffsetGen(props);
}
return super.fetchNewData(lastCheckpointStr, sourceLimit);
}

@Override
Expand Down Expand Up @@ -121,4 +129,11 @@ protected JavaRDD<GenericRecord> maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<
return kafkaRDD.map(consumerRecord -> (GenericRecord) consumerRecord.value());
}
}

private void configureSchemaDeserializer() {
if (schemaProvider == null) {
throw new HoodieReadFromSourceException("SchemaProvider has to be set to use KafkaAvroSchemaDeserializer");
}
props.put(KAFKA_VALUE_DESERIALIZER_SCHEMA.key(), schemaProvider.getSourceSchema().toString());
}
}

0 comments on commit 84990ae

Please sign in to comment.