Skip to content

evryfs/asyncapi-generator

Repository files navigation

asyncapi-generator

The asyncapi-generator is an open-source tool for generating code and schemas from AsyncAPI Yaml specifications.

The project is currently in BETA.

Supported generators

  • Kotlin - Data classes with Jakarta Validation annotations
  • Java - POJOs with Jakarta Validation annotations
  • Spring Kafka - Producer and Consumer templates for both Kotlin and Java
  • Avro - Schema generation from AsyncAPI schemas

The current documentation provided is still a draft, found in docs/ folder at the repository root.

Usage

Currently, the asyncapi-generator BETA version is available as a maven plugin through maven central.

Example usage in your pom.xml:

<plugin>
    <groupId>dev.banking.asyncapi.generator</groupId>
    <artifactId>asyncapi-generator-maven-plugin</artifactId>
    <version>0.0.1</version> <!-- current BETA version -->
    <executions>
        <execution>
            <id>generate-example</id>
            <phase>generate-sources</phase>
            <goals>
                <goal>generate</goal>
            </goals>
            <configuration>
                <generatorName>kotlin</generatorName> <!-- options: kotlin, java - default kotlin -->
                <inputFile>path/to/my/asyncapi_specification.yaml</inputFile>
                <modelPackage>my.package.path.model</modelPackage> <!-- package name for data class/POJO -->
                <clientPackage>my.package.path.client</clientPackage> <!-- package name for kafka client - default modelPackage-->
                <schemaPackage>my.package.path.schema</schemaPackage> <!-- package name for avro schema - default modePackage -->
                <configuration>
                    <generateModels>true</generateModels> <!-- can skip models - default is true -->
                    <generateSpringKafkaClient>true</generateSpringKafkaClient> <!-- default is false -->
                    <generateAvroSchema>true</generateAvroSchema> <!-- default is false -->
                </configuration>
            </configuration>
        </execution>
    </executions>
</plugin>

Spring Kafka Clients

For the spring kafka clients, we generate the Producer and Consumer depending on the operation that the event message has in the specification. If not operation is specified, we generate both.

Producer

A spring kafka producer is generated by the 'ChannelName' as the producer, with 'Producer' as a suffix.

Example:

channels:
  customerUpdated:
    address: customer.updated.v1
    messages:
      customerRead:
        $ref: '#/components/messages/customerReadPayload'
    description: My description

This yaml will generate a CustomerUpdatedProducer class, that accepts as payload, a 'message' customerReadPayload. It can looks something like,

/**
 * Producer for topic 'customer.updated.v1'
 */
@Component
class CustomerUpdatedProducer(
    private val kafkaTemplate: KafkaTemplate<String, Any>
) {
    private val log = LoggerFactory.getLogger(CustomerUpdatedProducer::class.java)

    fun sendCustomerReadPayload(key: String, message: CustomerReadPayload) {
        log.info("Sending CustomerReadPayload to topic=customer.updated.v1, key={}", key)
        kafkaTemplate.send(ProducerRecord("customer.updated.v1", key, message))
    }
}

And this class can be injected into the application service class, and used.

Consumer

A spring kafka consumer is a bit different. Here, we will generate a KafkaListener which the application does not have to 'run' anywhere, it will simply pick up event message from the specified topic. The consumer will generate to entities,

  • A interface with suffix 'Handler'
  • A class that inject said interface 'Handler' and has a 'listen' method

Example:

channels:
  customerContactInfoUpdate:
    address: customer.contact-info-update.v1
    # This channel handles messages of two different types of message
    messages:
      updateEmail:
        $ref: '#/components/messages/updateEmailPayload'
      updatePhoneNumber:
        $ref: '#/components/messages/updatePhoneNumberPayload'
    description: My deacription

This yaml will generate an interface CustomerContactInfoUpdateHandler and CustomerContactIntoUpdateListener, which will look something like,

/**
 * Handler for messages on topic 'customer.contact-info-update.v1'
 */
interface CustomerContactInfoUpdateHandler {
    fun onCustomerEmailPayload(message: KafkaMessage<CustomerEmailPayload>)
    fun onCustomerPhoneNumberPayload(message: KafkaMessage<CustomerPhoneNumberPayload>)
}

and

/**
 * Spring Kafka Listener for topic 'customer.contact-info-update.v1'
 */
@Component
@ConditionalOnBean(CustomerContactInfoUpdateHandler::class)
class CustomerContactInfoUpdateListener(
    private val handler: CustomerContactInfoUpdateHandler
) {

    private val log = LoggerFactory.getLogger(CustomerContactInfoUpdateListener::class.java)

    @KafkaListener(topics = ["customer.contact-info-update.v1"], groupId = "\${spring.kafka.consumer.group-id}")
    fun listen(record: ConsumerRecord<String, Any>) {
        val payload = record.value()

        // Convert headers to Map<String, Any>
        val headers = record.headers().associate { it.key() to String(it.value()) }

        when (payload) {
            is CustomerEmailPayload -> {
                log.debug("Dispatching CustomerEmailPayload from topic customer.contact-info-update.v1")

                // Wrap in KafkaMessage
                val message = KafkaMessage(
                    payload = payload,
                    key = record.key(),
                    headers = headers,
                    topic = record.topic(),
                    partition = record.partition(),
                    offset = record.offset(),
                    timestamp = record.timestamp()
                )

                handler.onCustomerEmailPayload(message)
            }
            is CustomerPhoneNumberPayload -> {
                log.debug("Dispatching CustomerPhoneNumberPayload from topic customer.contact-info-update.v1")

                // Wrap in KafkaMessage
                val message = KafkaMessage(
                    payload = payload,
                    key = record.key(),
                    headers = headers,
                    topic = record.topic(),
                    partition = record.partition(),
                    offset = record.offset(),
                    timestamp = record.timestamp()
                )

                handler.onCustomerPhoneNumberPayload(message)
            }
            else -> {
                log.warn("Received unknown message type on topic customer.contact-info-update.v1: {}", payload::class.java.name)
            }
        }
    }
}

The way this can be used, is through a service in the application that implements the 'Handler' interface, and is able to do something with the received message.

Example:

@Service
class CustomerUpdatedService : CustomerUpdatedHandler {

    private val log = log()

    override fun onCustomerReadPayload(message: KafkaMessage<CustomerReadPayload>) {
        log.info("Worked, received customer update message: {}", message)
    }
}

One of the reasons this is the implementation, is that we needed a way to 'activate' the listener. There can be use-cases where you inherit some generated code from a AsyncAPI specification, but are only interested in parts of it. If we would generate the KafkaListener directly, it could create false positive and metrics because the application would be receive the messages and doing anything with them. By having this 'implement interface' step, we avoid this.

Features supported

Parser

The core parsing logic is stable and handles the structural validation of AsyncAPI documents.

  • AsyncAPI YAML Support: Reads and Parses yaml format.
  • AsyncAPI JSON Support: Reads and Parses yaml format.
  • Context-Aware Error Handling: Provides precise error messages with line numbers and JSON paths.
  • Reference Resolution: Supports internal and external file references.
  • Components: Full parsing support for Schemas, Messages, Channels, Parameters, etc.

Schema Formats

  • Yaml Schema: Fully supported (default format).
  • Multi-Format Schemas:
    • JSON Schema: Support for parsing JSON schemas defined in AsyncAPI documents.
    • Avro Schema: Support for parsing Avro schemas defined in AsyncAPI documents.
    • Protobuf Schema: Support for parsing Protobuf schemas defined in AsyncAPI documents.
    • RAML Schema: Support for parsing Protobuf schemas defined in AsyncAPI documents.

Validator

  • Structural Validation: Ensures the AsyncAPI document adheres to the AsyncAPI specification.
  • Context-Aware Error Handling: Provides precise error messages with line numbers and context.
  • Warnings: Provides warnings for best practices and potential issues.
  • Formatted Warnings: Enhanced warning messages with suggestions for improvement.

Future plans

  • Stable Release: Move from BETA to stable release with comprehensive testing.
  • CLI Tool: Publish the already made CLI module to package managers like brew and dnf.
  • Documentation: Complete documentation with examples and guides.
  • Additional Generators: Expand support for more programming languages and frameworks, i.e., Quarkus Kafka.
  • Enhanced Schema Support: Full support for multi-format schemas including Avro and Protobuf.
  • Serialization: Consider using kotlinx-serialization for writing bundled schemas to files.

About

AsyncAPI Code Generator

Resources

License

Contributing

Stars

Watchers

Forks

Packages

No packages published

Contributors 2

  •  
  •