Skip to content

Commit

Permalink
feat(schema): enable schema update from protobuf and json schemas (#760)
Browse files Browse the repository at this point in the history
  • Loading branch information
bakjos authored and tchiotludo committed Oct 24, 2021
1 parent 5a6b4e4 commit 01870d3
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 8 deletions.
2 changes: 1 addition & 1 deletion client/src/components/Form/Form.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class Form extends Root {
)}
<div className="col-sm-10" style={{ height: '100%' }}>
<AceEditor
mode="json"
mode={ formData.schemaType === "PROTOBUF"? "protobuf" : "json" }
id={name}
theme="merbivore_soft"
value={formData[name]}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ class SchemaUpdate extends Form {
formData: {
subject: '',
compatibility: '',
schema: ''
schema: '',
schemaType: ''
},
errors: {},
roles: JSON.parse(sessionStorage.getItem('roles'))
Expand All @@ -57,7 +58,11 @@ class SchemaUpdate extends Form {
.label('Compatibility Level'),
schema: Joi.string()
.required()
.label('Latest Schema')
.label('Latest Schema'),
schemaType: Joi.string()
.label('Schema Type'),
references: Joi.any()
.label('Refecences')
};

componentDidMount() {
Expand All @@ -81,7 +86,9 @@ class SchemaUpdate extends Form {

formData.subject = latestSchemaVersion.subject;
formData.compatibility = latestSchemaVersion.compatibilityLevel;
formData.schema = JSON.stringify(JSON.parse(latestSchemaVersion.schema), null, 2);
formData.schemaType = latestSchemaVersion.schemaType;
formData.schema = "PROTOBUF" === formData.schemaType?latestSchemaVersion.schema :JSON.stringify(JSON.parse(latestSchemaVersion.schema), null, 2);
formData.references = latestSchemaVersion.references;

this.setState(formData);
};
Expand All @@ -92,7 +99,9 @@ class SchemaUpdate extends Form {
clusterId,
subject: formData.subject,
compatibilityLevel: formData.compatibility,
schema: formData.schema
schema: formData.schema,
schemaType: formData.schemaType,
references: formData.references
};

this.postApi(uriUpdateSchema(clusterId, formData.subject), body)
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/akhq/controllers/SchemaController.java
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public Schema updateSchema(String cluster, String subject, @Body Schema schema)
}

private Schema registerSchema(String cluster, @Body Schema schema) throws IOException, RestClientException {
Schema register = this.schemaRepository.register(cluster, schema.getSubject(), schema.getSchema(), schema.getReferences());
Schema register = this.schemaRepository.register(cluster, schema.getSubject(), schema.getSchemaType(), schema.getSchema(), schema.getReferences());

if (schema.getCompatibilityLevel() != null) {
this.schemaRepository.updateConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,9 +182,13 @@ public List<String> testCompatibility(String clusterId, String subject, org.apac
}

public Schema register(String clusterId, String subject, String schema, List<SchemaReference> references) throws IOException, RestClientException {
return register(clusterId, subject, null, schema, references);
}

public Schema register(String clusterId, String subject, String type, String schema, List<SchemaReference> references) throws IOException, RestClientException {
int id = this.kafkaModule
.getRegistryRestClient(clusterId)
.registerSchema(schema, "AVRO", references, subject);
.registerSchema(schema, type != null? type: "AVRO", references, subject);

Schema latestVersion = getLatestVersion(clusterId, subject);

Expand Down Expand Up @@ -293,7 +297,7 @@ public Deserializer getKafkaProtoDeserializer(String clusterId) {
Deserializer deserializer;
SchemaRegistryType schemaRegistryType = getSchemaRegistryType(clusterId);
if (schemaRegistryType == SchemaRegistryType.TIBCO) {
throw new IllegalArgumentException("Configured schema registry type was 'tibco', but TIBCO JSON client is not supported");
throw new IllegalArgumentException("Configured schema registry type was 'tibco', but TIBCO PROTOBUF client is not supported");
} else {
deserializer = new KafkaProtobufDeserializer(this.kafkaModule.getRegistryClient(clusterId));
}
Expand Down

0 comments on commit 01870d3

Please sign in to comment.