diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java index e9353bb26660..2bf92280faf5 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/AvroKafkaSource.java @@ -19,6 +19,7 @@ package org.apache.hudi.utilities.sources; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.util.Option; import org.apache.hudi.utilities.UtilHelpers; import org.apache.hudi.utilities.deser.KafkaAvroSchemaDeserializer; import org.apache.hudi.utilities.exception.HoodieReadFromSourceException; @@ -78,18 +79,25 @@ public AvroKafkaSource(TypedProperties props, JavaSparkContext sparkContext, Spa try { props.put(NATIVE_KAFKA_VALUE_DESERIALIZER_PROP, Class.forName(deserializerClassName).getName()); - if (deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) { - if (schemaProvider == null) { - throw new HoodieReadFromSourceException("SchemaProvider has to be set to use KafkaAvroSchemaDeserializer"); - } - props.put(KAFKA_VALUE_DESERIALIZER_SCHEMA.key(), schemaProvider.getSourceSchema().toString()); - } } catch (ClassNotFoundException e) { String error = "Could not load custom avro kafka deserializer: " + deserializerClassName; LOG.error(error); throw new HoodieReadFromSourceException(error, e); } - this.offsetGen = new KafkaOffsetGen(props); + + if (deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) { + configureSchemaDeserializer(); + } + offsetGen = new KafkaOffsetGen(props); + } + + @Override + protected InputBatch> fetchNewData(Option lastCheckpointStr, long sourceLimit) { + if (deserializerClassName.equals(KafkaAvroSchemaDeserializer.class.getName())) { + configureSchemaDeserializer(); + offsetGen = new KafkaOffsetGen(props); + } + return super.fetchNewData(lastCheckpointStr, sourceLimit); } @Override @@ -121,4 +129,11 @@ protected JavaRDD maybeAppendKafkaOffsets(JavaRDD (GenericRecord) consumerRecord.value()); } } + + private void configureSchemaDeserializer() { + if (schemaProvider == null) { + throw new HoodieReadFromSourceException("SchemaProvider has to be set to use KafkaAvroSchemaDeserializer"); + } + props.put(KAFKA_VALUE_DESERIALIZER_SCHEMA.key(), schemaProvider.getSourceSchema().toString()); + } }