Skip to content

Commit

Permalink
fix(schema): fix glue schema registry caching issue (#1750)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: apatra <arindam.patra@booking.com>
  • Loading branch information
arindampatra33 and apatra authored Apr 16, 2024
1 parent 1fad75e commit 9086300
Showing 1 changed file with 7 additions and 9 deletions.
16 changes: 7 additions & 9 deletions src/main/java/org/akhq/models/Record.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,10 @@ public Record(RecordMetadata record, SchemaRegistryType schemaRegistryType, byte
this.bytesKey = bytesKey;
this.awsGlueKafkaDeserializer = awsGlueKafkaDeserializer;
this.keySchemaId = getAvroSchemaId(this.bytesKey);
this.keySubject = getAvroSchemaSubject(this.keySchemaId);
this.keySubject = getAvroSchemaSubject(this.keySchemaId, this.bytesKey);
this.bytesValue = bytesValue;
this.valueSchemaId = getAvroSchemaId(this.bytesValue);
this.valueSubject = getAvroSchemaSubject(this.valueSchemaId);
this.valueSubject = getAvroSchemaSubject(this.valueSchemaId, this.bytesValue);
this.headers = headers;
this.truncated = false;
}
Expand All @@ -134,10 +134,10 @@ public Record(SchemaRegistryClient client, ConsumerRecord<byte[], byte[]> record
this.bytesKey = record.key();
this.awsGlueKafkaDeserializer = awsGlueKafkaDeserializer;
this.keySchemaId = getAvroSchemaId(this.bytesKey);
this.keySubject = getAvroSchemaSubject(this.keySchemaId);
this.keySubject = getAvroSchemaSubject(this.keySchemaId, this.bytesKey);
this.bytesValue = bytesValue;
this.valueSchemaId = getAvroSchemaId(this.bytesValue);
this.valueSubject = getAvroSchemaSubject(this.valueSchemaId);
this.valueSubject = getAvroSchemaSubject(this.valueSchemaId, this.bytesValue);
for (Header header: record.headers()) {
String headerValue = String.valueOf(ContentUtils.convertToObject(header.value()));
this.headers.add(new KeyValue<>(header.key(), headerValue));
Expand Down Expand Up @@ -339,16 +339,14 @@ private String getAvroSchemaId(byte[] payload) {
return null;
}

private String getAvroSchemaSubject(String schemaId) {
private String getAvroSchemaSubject(String schemaId, byte[] payload) {
if (schemaId == null || client == null) {
return null;
}
try {
if(awsGlueKafkaDeserializer!= null) {
String[] schemaArnSplitted = ( (GlueSchemaRegistryKafkaDeserializer) awsGlueKafkaDeserializer)
.getGlueSchemaRegistryDeserializationFacade()
.getSchemaRegistryClient().getSchemaVersionResponse(schemaId).schemaArn().split("/");
return schemaArnSplitted[schemaArnSplitted.length-1];
return ( (GlueSchemaRegistryKafkaDeserializer) awsGlueKafkaDeserializer)
.getGlueSchemaRegistryDeserializationFacade().getSchema(payload).getSchemaName();
}

ParsedSchema schemaById = client.getSchemaById(Integer.parseInt(schemaId));
Expand Down

0 comments on commit 9086300

Please sign in to comment.