This module adds ability to work with Kafka via declarative approach or directly using MaaS API.
<dependency>
<groupId>com.netcracker.cloud.maas.declarative</groupId>
<artifactId>maas-declarative-kafka-client-spring</artifactId>
</dependency>- Add config properties to application.yml (All the config props described below)
maas:
kafka:
client:
consumer:
your_consumer_name:
topic:
actual-name: "actual_kafka_topic_name" # Optional
name: your_topic_name
template: "maas_template" # Optional
managedby: maas or self
namespace: namespace_name
on-topic-exists: merge or fail # Optional
is-tenant: true or false
pool-duration: 1000 # Overrides common pool duration
instance-count: 2 # used for concurrent handling
dedicated-thread-pool-size: 5
group:
id: your_group_id
# default kafka client properties
kafka-consumer:
property:
key:
deserializer: org.apache.kafka.common.serialization.StringDeserializer
value:
deserializer: org.apache.kafka.common.serialization.StringDeserializer
...
producer:
your_producer_name:
topic:
actual-name: "actual_kafka_topic_name" # Optional
name: your_topic_name
template: "maas_template" # Optional
managedby: maas or self
namespace: namespace_name
on-topic-exists: merge or fail # Optional
is-tenant: true or false
# default kafka client properties
kafka-producer:
property:
key:
serializer: org.apache.kafka.common.serialization.StringSerializer
value:
serializer: org.apache.kafka.common.serialization.StringSerializer
... - Configure, initialize and activate clients (consumer/producer)
2.1 Configure clients with serializers/deserializers described in application.yml
@Configuration
public class SomeConfigClass {
@Autowired
MaasKafkaClientFactory clientFactory;
@Bean
MaasKafkaProducer maasKafkaProducer() {
// Get producer according to config described above
MaasKafkaProducerCreationRequest producerCreationRequest =
MaasKafkaProducerCreationRequest.builder()
.setProducerDefinition(clientFactory.getProducerDefinition("your_producer_name"))
.build();
MaasKafkaProducer producer = clientFactory.createProducer(producerCreationRequest);
producer.initAsync()
.handle((v, e) -> {
if (e != null) {
// Handle error
} else {
// Post initialization logic
// for example let's activate producer
producer.activateAsync()
.handle((vd, ex) -> {
if (ex != null) {
// Handle error
} else {
// Post activation logic
}
return null;
});
}
return null;
});
return producer;
}
@Bean
MaasKafkaConsumer maasKafkaConsumer(final KafkaProducer producer) {
Consumer<ConsumerRecord<String, String>> recordConsumer = record -> {
// Some logic
ProducerRecord<String, String> someRecord = new ProducerRecord(
null,
// any key,
// any data,
null,
null
);
producer.sendSync(someRecord); // Sending as example
// Some logic
};
// Optionally you can create custom errorhandler
ConsumerErrorHandler errorHandler = (exception, errorRecord, handledRecords, consumer) -> {
// Custom error handling logic
};
MaasKafkaConsumerCreationRequest consumerCreationRequest =
MaasKafkaConsumerCreationRequest.builder()
.setConsumerDefinition(clientFactory.getConsumerDefinition("your_consumer_name"))
.setErrorHandler(errorHandler) // If no set used default error handler
.setHandler(recordConsumer)
.build();
MaasKafkaConsumer consumer = clientFactory.createConsumer(consumerCreationRequest);
consumer.initAsync()
.handle((v, e) -> {
if (e != null) {
// Handle error
} else {
// Post initialization logic
// for example let's activate consumer
consumer.activateAsync()
.handle((vd, ex) -> {
if (ex != null) {
// Handle error
} else {
// Post activation logic
}
return null;
});
}
return null;
});
return consumer;
}
}2.2 Configure clients with serializers/deserializers as beans
If you fill serializers/deserializers in application.yml and still would like to use beans, in this case beans will be used instead (You can fill only value serializer/deserializer as bean and key serializer/deserializer set in application.yml or vice versa)
@Configuration
public class SomeConfigClass {
@Autowired
MaasKafkaClientFactory clientFactory;
@Autowired
@Qualifier(value = "keyDeserializer")
Deserializer<String> keyDeserializer;
@Autowired
@Qualifier(value = "valueDeserializer")
Deserializer<String> valueDeserializer;
@Autowired
@Qualifier(value = "keySerializer")
Serializer<String> keySerializer;
@Autowired
@Qualifier(value = "valueSerializer")
Serializer<String> valueSerializer;
@Bean
MaasKafkaProducer maasKafkaProducer() {
// Get producer according to config described above
MaasKafkaProducerCreationRequest producerCreationRequest =
MaasKafkaProducerCreationRequest.builder()
.setProducerDefinition(clientFactory.getProducerDefinition("your_producer_name"))
.setValueSerializer(valueSerializer)
.setKeySerializer(keySerializer)
.build();
KafkaProducer producer = clientFactory.createProducer(producerCreationRequest);
producer.initAsync()
.handle((v, e) -> {
if (e != null) {
// Handle error
} else {
// Post initialization logic
// for example let's activate producer
producer.activateAsync()
.handle((vd, ex) -> {
if (ex != null) {
// Handle error
} else {
// Post activation logic
}
return null;
});
}
return null;
});
return producer;
}
@Bean
MaasKafkaConsumer maasKafkaConsumer(final MaasKafkaProducer producer) {
Consumer<ConsumerRecord<String, String>> recordConsumer = record -> {
// Some logic
ProducerRecord<String, String> someRecord = new ProducerRecord(
null,
// any key,
// any data,
null,
null
);
producer.sendSync(someRecord); // Sending as example
// Some logic
};
MaasKafkaConsumerCreationRequest consumerCreationRequest =
MaasKafkaConsumerCreationRequest.builder()
.setConsumerDefinition(clientFactory.getConsumerDefinition("your_consumer_name"))
.setHandler(recordConsumer)
.setValueDeserializer(valueDeserializer)
.setKeyDeserializer(keyDeserializer)
.build();
MaasKafkaConsumer consumer = clientFactory.createConsumer(consumerCreationRequest);
consumer.initAsync()
.handle((v, e) -> {
if (e != null) {
// Handle error
} else {
// Post initialization logic
// for example let's activate consumer
consumer.activateAsync()
.handle((vd, ex) -> {
if (ex != null) {
// Handle error
} else {
// Post activation logic
}
return null;
});
}
return null;
});
return consumer;
}
}- (Optional) External activation after initialization, activation this way move client from INITIALIZED state to ACTIVE, if this method will be called before client has been initialized, it will be waited until client became INITIALIZED ( more info described in javadocs)
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.event.EventListener;
@Configuration
public class SomeConfig {
@Autowired
ClientStateManagerService clientStateManagerService;
@EventListener(ApplicationReadyEvent.class)
public void activateKafkaClients() {
clientStateManagerService.activateInitializedClients(null);
}
}Approach above fits only if client is produced as MaasKafkaClient instance bean. Not supported collecting from Map or Collection of MaasKafkaClient instances.
Also, you can initialize and activate client out of configuration
Also, you can create consumer with setting 'CustomProcessed' = true. If you create the client this way, both the event management process and multitenancy will be disabled
Instead, step 1 from 3.1 there is a way to configure clients in runtime only
All contracts(Mandatory/Optional property) described for application.yaml config implemented in builders
- Create client definition using builders
@Configuration
public class SomeConfigClass {
@Autowired
MaasKafkaClientFactory clientFactory;
// Create kafka producer
@Bean
MaasKafkaProducer maasKafkaProducer() {
// Create topic definition
TopicDefinition testTopicDefinition = TopicDefinition.builder()
.setName("test_topic_name")
.setNamespace("test_namespace")
.setManagedBy(ManagedBy.SELF)
.build();
// Create producer definition
MaasKafkaProducerDefinition producerDefinition = MaasKafkaProducerDefinition.builder()
.setTopic(testTopicDefinition)
.setTenant(true)
.build();
// Create producer creation request
MaasKafkaProducerCreationRequest producerCreationRequest =
MaasKafkaProducerCreationRequest.builder()
.setProducerDefinition(producerDefinition)
.build();
MaasKafkaProducer producer = clientFactory.createProducer(producerCreationRequest);
producer.initAsync()
.handle((v, e) -> {
if (e != null) {
// Handle error
} else {
// Post initialization logic
// for example let's activate producer
producer.activateAsync()
.handle((vd, ex) -> {
if (ex != null) {
// Handle error
} else {
// Post activation logic
}
return null;
});
}
return null;
});
return producer;
}
// Create kafka consumer
@Bean
MaasKafkaConsumer maasKafkaConsumer(final MaasKafkaProducer producer) {
Consumer<ConsumerRecord<String, String>> recordConsumer = record -> {
// Some logic
ProducerRecord<String, String> someRecord = new ProducerRecord(
null,
// any key,
// any data,
null,
null
);
producer.sendSync(someRecord); // Sending as example
// Some logic
};
// Create topic definition
TopicDefinition testTopicDefinition = TopicDefinition.builder()
.setName("test_topic_name")
.setNamespace("test_namespace")
.setManagedBy(ManagedBy.SELF)
.build();
// Create consumer definition
MaasKafkaConsumerDefinition consumerDefinition = MaasKafkaConsumerDefinition.builder()
.setTopic(testTopicDefinition)
.setTenant(true)
.setInstanceCount(1)
.setPollDuration(1)
.setGroupId("test-group")
.build();
// Create consumer creation request
MaasKafkaConsumerCreationRequest consumerCreationRequest =
MaasKafkaConsumerCreationRequest.builder()
.setConsumerDefinition(consumerDefinition)
.setHandler(recordConsumer)
.build();
MaasKafkaConsumer consumer = clientFactory.createConsumer(consumerCreationRequest);
consumer.initAsync()
.handle((v, e) -> {
if (e != null) {
// Handle error
} else {
// Post initialization logic
// for example let's activate consumer
consumer.activateAsync()
.handle((vd, ex) -> {
if (ex != null) {
// Handle error
} else {
// Post activation logic
}
return null;
});
}
return null;
});
return consumer;
}
}Other steps related with activation/deactivation and Serializer/Deserializer logic are the same as in 3.1
import org.springframework.beans.factory.annotation.Autowired;
public class SomeClass {
@Autowired
ClientStateManagerService clientStateManagerService;
void onDeactivation() {
clientStateManagerService.emitClientDeactivationEvent();
}
void onActivation() {
clientStateManagerService.emitClientActivationEvent();
}
}| Name | Type | Mandatory | Default value | Description |
|---|---|---|---|---|
| maas.opentracing.kafka.enabled | Boolean | false | true | Enables opentracing in kafka |
| maas.kafka.acceptable-tenants | String | false | List tenants for using in multitenant environment (dev only) | |
| maas.kafka.consumer-thread-pool-size | Integer | false | 2 | The number of threads in pool for kafka consumers |
| maas.kafka.consumer-pool-duration | Integer | false | 4000 | Common consumer pool timeout in milliseconds |
| maas.agent.url | String | false | http://maas-agent:8080 |
Contains maas agent url |
| quarkus.jaeger.propagation | String | false | b3 | Context propagation type |
| maas.tenant-manager.url | String | false | tenant-manager:8080 | Contains Tenant Manager url |
| Name | Type | Mandatory | Default value | Description |
|---|---|---|---|---|
| maas.kafka.client.producer.[your-producer-name].topic.name | String | true | The name of kafka topic | |
| maas.kafka.client.producer.[your-producer-name].topic.actual-name | String | false | Actual topic name in kafka brocker (This property support placeholders {tanant-id} - resolve tenants from Tenant Manager and {namespace} - resolved from property maas.kafka.client.producer.[your-producer-name].topic.namespace ) | |
| maas.kafka.client.producer.[your-producer-name].topic.on-topic-exist | String | false | fail | merge or fail. By default fail and applying configuration is failed on topic collision. merge option ignore collision and insert registration in MaaS database linking it to existing topic |
| maas.kafka.client.producer.[your-producer-name].topic.template | String | false | The template of maas topic | |
| maas.kafka.client.producer.[your-producer-name].topic.namespace | String | true | Service namespace | |
| maas.kafka.client.producer.[your-producer-name].topic.managedby | String | true | Used to indicate that topic should be crated by microservice in case of self. There are situations when we need to dynamically create topic and maas can't deal with it. For example ${quarkus.application.namespace}-${cpq.agm-core-integration.kafka.agreement-entity-type}_dbCleaningNotification-{tenant-id}. In this case we will create topic from microservice using MaaS API.Available values: maas/self Additional params like replication, partitions should be added in case of self topic creation | |
| maas.kafka.client.producer.[your-producer-name].topic.configs.[nay-kafka-topic-property] | String | false | Any kafka topic property (Usen only if managedby is "self") | |
| maas.kafka.client.producer.[your-producer-name].is-tenant | Boolean | true | Tells producer that all topic used bgy them are tenants | |
| maas.kafka.client.producer.[your-producer-name].key.serializer | String | false | Serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface | |
| maas.kafka.client.producer.[your-producer-name].value.serializer | String | false | Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface | |
| maas.kafka.client.producer.[your-producer-name].kafka-producer.property.[any-kafka-producer-property] | String | false | Any kafka producer property |
| Name | Type | Mandatory | Default value | Description |
|---|---|---|---|---|
| maas.kafka.client.consumer.[your-consumer-name].topic.name | String | true | The name of kafka topic | |
| maas.kafka.client.consumer.[your-consumer-name].topic.actual-name | String | false | Actual topic name in kafka brocker (This property support placeholders {tanant-id} - resolve tenants from Tenant Manager and {namespace} - resolved from property maas.kafka.client.producer.[your-producer-name].topic.namespace ) | |
| maas.kafka.client.consumer.[your-consumer-name].topic.on-topic-exist | String | false | fail | merge or fail. By default fail and applying configuration is failed on topic collision. merge option ignore collision and insert registration in MaaS database linking it to existing topic |
| maas.kafka.client.consumer.[your-consumer-name].topic.template | String | false | The template of maas topic | |
| maas.kafka.client.consumer.[your-consumer-name].topic.namespace | String | true | Service namespace | |
| maas.kafka.client.consumer.[your-consumer-name].topic.managedby | String | true | Used to indicate that topic should be crated by microservice in case of self. There are situations when we need to dynamically create topic and maas can't deal with it. For example ${quarkus.application.namespace}-${cpq.agm-core-integration.kafka.agreement-entity-type}_dbCleaningNotification-{tenant-id}. In this case we will create topic from microservice using MaaS API.Available values: maas/self Additional params like replication, partitions should be added in case of self topic creation | |
| maas.kafka.client.consumer.[your-consumer-name].topic.configs.[nay-kafka-topic-property] | String | true | Any kafka topic property (Usen only if managedby is "self") | |
| maas.kafka.client.consumer.[your-consumer-name].is-tenant | String | true | Tells consumer that all topic used bgy them are tenants | |
| maas.kafka.client.consumer.[your-consumer-name].key.deserializer | String | false | Deserializer class for key that implements the org.apache.kafka.common.serialization.Deserializer interface | |
| maas.kafka.client.consumer.[your-consumer-name].value.deserializer | String | false | Deserializer class for value that implements the org.apache.kafka.common.serialization.Deserializer interface | |
| maas.kafka.client.consumer.[your-consumer-name].group.id | String | true | A unique string that identifies the consumer group this consumer belongs to. Can contain {{tenantId}} placeholder | |
| maas.kafka.client.consumer.[your-consumer-name].kafka-consumer.property.[any-kafka-consumer-property] | String | false | Any kafka consumer property | |
| maas.kafka.client.consumer.[your-consumer-name].dedicated-thread-pool-size | Integer | false | If defined - a separate thread pool will be created with defined number of threads |
In local development all cloud dependent implementations are replaced. However, if you want to use your custom or cloud implementations you can do what described below:
You can enable MaaS implementations in development/test profile via setting maas.kafka.maas.enabled = true and maas.agent.url =
You can enable local working with Tenant manager by providing bean of type TenantServiceProvider
with order = 0 (max priority)
You can provide your own credentials for topics by providing bean of class
CredExtractorProvider with order = 0 (max priority)
Multitenancy is supported by setting is-tenant=true in client definition. If at least one client is multitenant
you should not set maas.tenant.default-id to any value in your application. If after service start you see that
lib can't find topic, check that log about default tenant id looks like
[class=c.n.cloud.maas.kafka.tenant.impl.ActiveTenantInfoServiceImpl] [method=getActiveTenantIds] Default tenant id:
Blue Green functionality requires to monitor BG state from Consul. This monitoring is done via BlueGreenStatePublisher bean. This bean provided automatically by dependency:
<dependency>
<groupId>com.netcracker.cloud</groupId>
<artifactId>blue-green-state-monitor-spring</artifactId>
</dependency>See documentation of blue-green-state-monitor-spring for detail how configure BlueGreenStatePublisher.