Skip to content

chore(next): Update starters and extract common reusable code to an aditional module #119

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Aug 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions async/async-commons/async-commons.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ dependencies {

compileOnly 'io.projectreactor:reactor-core'
api 'com.fasterxml.jackson.core:jackson-databind'
api 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
implementation 'commons-io:commons-io:2.16.1'
implementation 'io.cloudevents:cloudevents-json-jackson:4.0.1'

Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
package org.reactivecommons.async.rabbit.config;
package org.reactivecommons.async.commons;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import lombok.extern.log4j.Log4j2;
import lombok.extern.java.Log;
import org.reactivecommons.async.api.DefaultCommandHandler;
import org.reactivecommons.async.api.HandlerRegistry;
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
import org.reactivecommons.async.api.handlers.registered.RegisteredEventListener;
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
import org.reactivecommons.async.commons.HandlerResolver;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.stream.Stream;

import static org.reactivecommons.async.api.HandlerRegistry.DEFAULT_DOMAIN;

@NoArgsConstructor(access = AccessLevel.PRIVATE)
@Log4j2
@Log
public class HandlerResolverBuilder {

public static HandlerResolver buildResolver(String domain,
Expand Down Expand Up @@ -81,7 +81,7 @@ public <T, D> RegisteredCommandHandler<T, D> getCommandHandler(String path) {
if (r.getDomainEventListeners().containsKey(domain)) {
return Stream.concat(r.getDomainEventListeners().get(domain).stream(), getDynamics(domain, r));
}
log.warn("Domain " + domain + "does not have a connection defined in your configuration and you want to listen from it");
log.log(Level.WARNING, "Domain " + domain + "does not have a connection defined in your configuration and you want to listen from it");
return Stream.empty();
})
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
Expand All @@ -102,7 +102,7 @@ public <T, D> RegisteredCommandHandler<T, D> getCommandHandler(String path) {
if (r.getDomainEventListeners().containsKey(domain)) {
return r.getDomainEventListeners().get(domain).stream();
}
log.warn("Domain " + domain + "does not have a connection defined in your configuration and you want to listen from it");
log.log(Level.WARNING, "Domain " + domain + "does not have a connection defined in your configuration and you want to listen from it");
return Stream.empty();
})
.collect(ConcurrentHashMap::new, (map, handler) -> map.put(handler.getPath(), handler),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import io.cloudevents.jackson.JsonFormat;

public class DefaultObjectMapperSupplier implements ObjectMapperSupplier {
Expand All @@ -11,7 +12,8 @@ public ObjectMapper get() {
final ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.findAndRegisterModules();
objectMapper.registerModule(JsonFormat.getCloudEventJacksonModule()); // TODO: Review if this is necessary
objectMapper.registerModule(new JavaTimeModule());
objectMapper.registerModule(JsonFormat.getCloudEventJacksonModule());
return objectMapper;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
package org.reactivecommons.async.kafka.communications;

import lombok.AllArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import reactor.core.publisher.Flux;
import reactor.kafka.receiver.KafkaReceiver;
import reactor.kafka.receiver.ReceiverOptions;
import reactor.kafka.receiver.ReceiverRecord;

import java.util.List;

import static org.apache.kafka.clients.consumer.ConsumerConfig.DEFAULT_MAX_POLL_RECORDS;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.MAX_POLL_RECORDS_CONFIG;


@AllArgsConstructor
public class ReactiveMessageListener {
private final ReceiverOptions<String, byte[]> receiverOptions;

public Flux<ReceiverRecord<String, byte[]>> listen(String groupId, List<String> topics) { // Notification events
ReceiverOptions<String, byte[]> options = receiverOptions.consumerProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
ReceiverOptions<String, byte[]> options = receiverOptions.consumerProperty(GROUP_ID_CONFIG, groupId);
return KafkaReceiver.create(options.subscription(topics))
.receive();
}

public int getMaxConcurrency() {
Object property = receiverOptions.consumerProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG);
Object property = receiverOptions.consumerProperty(MAX_POLL_RECORDS_CONFIG);
if (property instanceof Integer) {
return (int) property;
}
return ConsumerConfig.DEFAULT_MAX_POLL_RECORDS;
return DEFAULT_MAX_POLL_RECORDS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@ public class TopologyCreator {
private final AdminClient adminClient;
private final KafkaCustomizations customizations;
private final Map<String, Boolean> existingTopics;
private final boolean checkTopics;

public TopologyCreator(AdminClient adminClient, KafkaCustomizations customizations) {
public TopologyCreator(AdminClient adminClient, KafkaCustomizations customizations, boolean checkTopics) {
this.adminClient = adminClient;
this.customizations = customizations;
this.checkTopics = checkTopics;
this.existingTopics = getTopics();
}

@SneakyThrows
public Map<String, Boolean> getTopics() {
if (!checkTopics) {
return Map.of();
}
ListTopicsResult topics = adminClient.listTopics(new ListTopicsOptions().timeoutMs(TIMEOUT_MS));
return topics.names().get().stream().collect(Collectors.toConcurrentMap(name -> name, name -> true));
}
Expand Down Expand Up @@ -68,7 +73,7 @@ protected NewTopic toNewTopic(TopicCustomization customization) {
}

public void checkTopic(String topicName) {
if (!existingTopics.containsKey(topicName)) {
if (checkTopics && !existingTopics.containsKey(topicName)) {
throw new TopicNotFoundException("Topic not found: " + topicName + ". Please create it before send a message.");
// TODO: should refresh topics?? getTopics();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public ApplicationNotificationsListener(ReactiveMessageListener receiver,

@Override
protected Function<Message, Mono<Object>> rawMessageHandler(String executorPath) {
final RegisteredEventListener<Object, Object> handler = resolver.getEventListener(executorPath);
final RegisteredEventListener<Object, Object> handler = resolver.getNotificationListener(executorPath);

Function<Message, Object> converter = resolveConverter(handler);
final EventExecutor<Object> executor = new EventExecutor<>(handler.getHandler(), converter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void shouldCreateTopics() {
create.complete(null);
doReturn(create).when(createTopicsResult).all();
when(adminClient.createTopics(any())).thenReturn(createTopicsResult);
creator = new TopologyCreator(adminClient, customizations);
creator = new TopologyCreator(adminClient, customizations, true);
// Act
Mono<Void> flow = creator.createTopics(List.of("topic1", "topic2"));
// Assert
Expand All @@ -73,7 +73,7 @@ void shouldCheckTopics() {
names.complete(Set.of("topic1", "topic2"));
doReturn(names).when(listTopicsResult).names();
when(adminClient.listTopics(any(ListTopicsOptions.class))).thenReturn(listTopicsResult);
creator = new TopologyCreator(adminClient, customizations);
creator = new TopologyCreator(adminClient, customizations, true);
// Act
creator.checkTopic("topic1");
// Assert
Expand All @@ -87,7 +87,7 @@ void shouldFailWhenCheckTopics() {
names.complete(Set.of("topic1", "topic2"));
doReturn(names).when(listTopicsResult).names();
when(adminClient.listTopics(any(ListTopicsOptions.class))).thenReturn(listTopicsResult);
creator = new TopologyCreator(adminClient, customizations);
creator = new TopologyCreator(adminClient, customizations, true);
// Assert
assertThrows(TopicNotFoundException.class, () ->
// Act
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import com.rabbitmq.client.AMQP;
import io.cloudevents.CloudEvent;
import io.cloudevents.core.provider.EventFormatProvider;
import io.cloudevents.jackson.JsonFormat;
import lombok.extern.java.Log;
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
import org.reactivecommons.async.commons.DiscardNotifier;
Expand Down Expand Up @@ -34,7 +32,6 @@
import static org.reactivecommons.async.commons.Headers.SERVED_QUERY_ID;

@Log
//TODO: Organizar inferencia de tipos de la misma forma que en comandos y eventos
public class ApplicationQueryListener extends GenericMessageListener {
private final MessageConverter converter;
private final HandlerResolver handlerResolver;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.longThat;
import static org.mockito.Mockito.verify;
import static reactor.core.publisher.Mono.*;
import static reactor.core.publisher.Mono.error;

@ExtendWith(MockitoExtension.class)
public class ApplicationCommandListenerTest extends ListenerReporterTestSuperClass{
Expand All @@ -34,7 +34,7 @@ void shouldSendErrorMetricToCustomErrorReporter() throws InterruptedException {
final HandlerRegistry registry = HandlerRegistry.register()
.handleCommand("app.command.test", m -> error(new RuntimeException("testEx")), DummyMessage.class);
assertSendErrorToCustomReporter(registry, createSource(Command::getName, command));
verify(errorReporter).reportMetric(eq("command"), eq("app.command.test"), longThat(time -> time > 0 ), eq(false));
verify(errorReporter).reportMetric(eq("command"), eq("app.command.test"), longThat(time -> time >= 0 ), eq(false));
}

@Test
Expand Down
144 changes: 142 additions & 2 deletions docs/docs/reactive-commons/1-getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
sidebar_position: 1
---

# Getting Started

import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';

<Tabs>
<TabItem value="rabbitmq" label="RabbitMQ" default>
# Getting Started

This quick start tutorial sets up a single node RabbitMQ and runs the sample reactive sender and consumer using Reactive
Commons.
Expand Down Expand Up @@ -119,7 +120,146 @@ If you want to use it, you should read the [Creating a CloudEvent guide](11-crea

</TabItem>
<TabItem value="kafka" label="Kafka">
Comming soon...
This quick start tutorial sets up a single node Kafka and runs the sample reactive sender and consumer using Reactive
Commons.

## Requirements

You need Java JRE installed (Java 17 or later).

## Start Kafka

Start a Kafka broker on your local machine with all the defaults (e.g. port is 9092).

### Containerized

You can run it with Docker or Podman.

The following docker compose has a Kafka broker, a Zookeeper and a Kafka UI.

docker-compose.yml
```yaml
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.1
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- "2181:2181"

kafka:
image: confluentinc/cp-kafka:7.4.1
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
ports:
- "9092:9092"
depends_on:
- zookeeper

kafka-ui:
image: provectuslabs/kafka-ui:latest
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
ports:
- "8081:8080"
depends_on:
- kafka
```

```shell
docker-compose up
```

You may set in /etc/hosts (or equivalent) the following entry:

```txt
127.0.0.1 kafka
```

To enter the Kafka UI, open your browser and go to `http://localhost:8081`

## Spring Boot Application

The Spring Boot sample publishes and consumes messages with the `DomainEventBus`. This application illustrates how to
configure Reactive Commons using RabbitMQ in a Spring Boot environment.

To build your own application using the Reactive Commons API, you need to include a dependency to Reactive Commons.

### Current version

![Maven metadata URL](https://img.shields.io/maven-metadata/v?metadataUrl=https%3A%2F%2Frepo1.maven.org%2Fmaven2%2Forg%2Freactivecommons%2Fasync-commons-rabbit-starter%2Fmaven-metadata.xml)

### Dependency

```groovy
dependencies {
implementation "org.reactivecommons:async-kafka-starter:<version>"
}
```

### Configuration properties

Also you need to include the name for your app in the `application.properties`, it is important because this value will
be used
to name the application queues inside RabbitMQ:

```properties
spring.application.name=MyAppName
```

Or in your `application.yaml`

```yaml
spring:
application:
name: MyAppName
```

You can set the RabbitMQ connection properties through spring boot with
the [`spring.kafka.*` properties](https://docs.spring.io/spring-boot/docs/current/reference/html/application-properties.html)

```yaml
spring:
kafka:
bootstrap-servers: localhost:9092
```

You can also set it in runtime for example from a secret, so you can create the `KafkaProperties` bean like:

```java title="org.reactivecommons.async.rabbit.config.RabbitProperties"

@Configuration
public class MyKafkaConfig {

@Bean
@Primary
public KafkaProperties myRCKafkaProperties() {
KafkaProperties properties = new KafkaProperties();
properties.setBootstrapServers(List.of("localhost:9092"));
return properties;
}
}
```

### Multi Broker Instances of Kafka or Multi Domain support

Enables to you the ability to listen events from different domains.

### Cloud Events

Includes the Cloud Events specification.

If you want to use it, you should read the [Creating a CloudEvent guide](11-creating-a-cloud-event.md)

</TabItem>
</Tabs>

3 changes: 2 additions & 1 deletion docs/docs/reactive-commons/10-wildcards.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
sidebar_position: 10
---

# Wildcards

import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';

<Tabs>
<TabItem value="rabbitmq" label="RabbitMQ" default>
# Wildcards

You may need to listen variable event names that have the same structure, in that case you have the
method `handleDynamicEvents` in the `HandlerRegistry`, so you can specify a pattern with '*' wildcard, it does not
Expand Down
Loading
Loading