The asyncapi-generator is an open-source tool for generating code and schemas from AsyncAPI Yaml specifications.
The project is currently in BETA.
- Kotlin - Data classes with Jakarta Validation annotations
- Java - POJOs with Jakarta Validation annotations
- Spring Kafka - Producer and Consumer templates for both Kotlin and Java
- Avro - Schema generation from AsyncAPI schemas
The current documentation provided is still a draft, found in docs/ folder at the repository root.
Currently, the asyncapi-generator BETA version is available as a maven plugin through maven central.
Example usage in your pom.xml:
<plugin>
<groupId>dev.banking.asyncapi.generator</groupId>
<artifactId>asyncapi-generator-maven-plugin</artifactId>
<version>0.0.1</version> <!-- current BETA version -->
<executions>
<execution>
<id>generate-example</id>
<phase>generate-sources</phase>
<goals>
<goal>generate</goal>
</goals>
<configuration>
<generatorName>kotlin</generatorName> <!-- options: kotlin, java - default kotlin -->
<inputFile>path/to/my/asyncapi_specification.yaml</inputFile>
<modelPackage>my.package.path.model</modelPackage> <!-- package name for data class/POJO -->
<clientPackage>my.package.path.client</clientPackage> <!-- package name for kafka client - default modelPackage-->
<schemaPackage>my.package.path.schema</schemaPackage> <!-- package name for avro schema - default modePackage -->
<configuration>
<generateModels>true</generateModels> <!-- can skip models - default is true -->
<generateSpringKafkaClient>true</generateSpringKafkaClient> <!-- default is false -->
<generateAvroSchema>true</generateAvroSchema> <!-- default is false -->
</configuration>
</configuration>
</execution>
</executions>
</plugin>For the spring kafka clients, we generate the Producer and Consumer depending on the operation that the event message has in the specification. If not operation is specified, we generate both.
A spring kafka producer is generated by the 'ChannelName' as the producer, with 'Producer' as a suffix.
Example:
channels:
customerUpdated:
address: customer.updated.v1
messages:
customerRead:
$ref: '#/components/messages/customerReadPayload'
description: My descriptionThis yaml will generate a CustomerUpdatedProducer class, that accepts as payload, a 'message' customerReadPayload. It can looks something like,
/**
* Producer for topic 'customer.updated.v1'
*/
@Component
class CustomerUpdatedProducer(
private val kafkaTemplate: KafkaTemplate<String, Any>
) {
private val log = LoggerFactory.getLogger(CustomerUpdatedProducer::class.java)
fun sendCustomerReadPayload(key: String, message: CustomerReadPayload) {
log.info("Sending CustomerReadPayload to topic=customer.updated.v1, key={}", key)
kafkaTemplate.send(ProducerRecord("customer.updated.v1", key, message))
}
}And this class can be injected into the application service class, and used.
A spring kafka consumer is a bit different. Here, we will generate a KafkaListener which the application does not have to 'run' anywhere, it
will simply pick up event message from the specified topic. The consumer will generate to entities,
- A interface with suffix 'Handler'
- A class that inject said interface 'Handler' and has a 'listen' method
Example:
channels:
customerContactInfoUpdate:
address: customer.contact-info-update.v1
# This channel handles messages of two different types of message
messages:
updateEmail:
$ref: '#/components/messages/updateEmailPayload'
updatePhoneNumber:
$ref: '#/components/messages/updatePhoneNumberPayload'
description: My deacriptionThis yaml will generate an interface CustomerContactInfoUpdateHandler and CustomerContactIntoUpdateListener, which will look something like,
/**
* Handler for messages on topic 'customer.contact-info-update.v1'
*/
interface CustomerContactInfoUpdateHandler {
fun onCustomerEmailPayload(message: KafkaMessage<CustomerEmailPayload>)
fun onCustomerPhoneNumberPayload(message: KafkaMessage<CustomerPhoneNumberPayload>)
}and
/**
* Spring Kafka Listener for topic 'customer.contact-info-update.v1'
*/
@Component
@ConditionalOnBean(CustomerContactInfoUpdateHandler::class)
class CustomerContactInfoUpdateListener(
private val handler: CustomerContactInfoUpdateHandler
) {
private val log = LoggerFactory.getLogger(CustomerContactInfoUpdateListener::class.java)
@KafkaListener(topics = ["customer.contact-info-update.v1"], groupId = "\${spring.kafka.consumer.group-id}")
fun listen(record: ConsumerRecord<String, Any>) {
val payload = record.value()
// Convert headers to Map<String, Any>
val headers = record.headers().associate { it.key() to String(it.value()) }
when (payload) {
is CustomerEmailPayload -> {
log.debug("Dispatching CustomerEmailPayload from topic customer.contact-info-update.v1")
// Wrap in KafkaMessage
val message = KafkaMessage(
payload = payload,
key = record.key(),
headers = headers,
topic = record.topic(),
partition = record.partition(),
offset = record.offset(),
timestamp = record.timestamp()
)
handler.onCustomerEmailPayload(message)
}
is CustomerPhoneNumberPayload -> {
log.debug("Dispatching CustomerPhoneNumberPayload from topic customer.contact-info-update.v1")
// Wrap in KafkaMessage
val message = KafkaMessage(
payload = payload,
key = record.key(),
headers = headers,
topic = record.topic(),
partition = record.partition(),
offset = record.offset(),
timestamp = record.timestamp()
)
handler.onCustomerPhoneNumberPayload(message)
}
else -> {
log.warn("Received unknown message type on topic customer.contact-info-update.v1: {}", payload::class.java.name)
}
}
}
}The way this can be used, is through a service in the application that implements the 'Handler' interface, and is able to do something with the received message.
Example:
@Service
class CustomerUpdatedService : CustomerUpdatedHandler {
private val log = log()
override fun onCustomerReadPayload(message: KafkaMessage<CustomerReadPayload>) {
log.info("Worked, received customer update message: {}", message)
}
}One of the reasons this is the implementation, is that we needed a way to 'activate' the listener. There can be use-cases where you inherit some generated code from
a AsyncAPI specification, but are only interested in parts of it. If we would generate the KafkaListener directly, it could create false positive and metrics because
the application would be receive the messages and doing anything with them. By having this 'implement interface' step, we avoid this.
The core parsing logic is stable and handles the structural validation of AsyncAPI documents.
- AsyncAPI YAML Support: Reads and Parses yaml format.
- AsyncAPI JSON Support: Reads and Parses yaml format.
- Context-Aware Error Handling: Provides precise error messages with line numbers and JSON paths.
- Reference Resolution: Supports internal and external file references.
- Components: Full parsing support for Schemas, Messages, Channels, Parameters, etc.
- Yaml Schema: Fully supported (default format).
- Multi-Format Schemas:
- JSON Schema: Support for parsing JSON schemas defined in AsyncAPI documents.
- Avro Schema: Support for parsing Avro schemas defined in AsyncAPI documents.
- Protobuf Schema: Support for parsing Protobuf schemas defined in AsyncAPI documents.
- RAML Schema: Support for parsing Protobuf schemas defined in AsyncAPI documents.
- Structural Validation: Ensures the AsyncAPI document adheres to the AsyncAPI specification.
- Context-Aware Error Handling: Provides precise error messages with line numbers and context.
- Warnings: Provides warnings for best practices and potential issues.
- Formatted Warnings: Enhanced warning messages with suggestions for improvement.
- Stable Release: Move from BETA to stable release with comprehensive testing.
- CLI Tool: Publish the already made CLI module to package managers like brew and dnf.
- Documentation: Complete documentation with examples and guides.
- Additional Generators: Expand support for more programming languages and frameworks, i.e., Quarkus Kafka.
- Enhanced Schema Support: Full support for multi-format schemas including Avro and Protobuf.
- Serialization: Consider using kotlinx-serialization for writing bundled schemas to files.