Skip to content

Commit

Permalink
eventuate-tram#1: Support ActiveMQ. Basic implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
dartartem committed Apr 25, 2018
1 parent 38a3dd9 commit 223dba7
Show file tree
Hide file tree
Showing 18 changed files with 334 additions and 43 deletions.
10 changes: 10 additions & 0 deletions docker-compose-postgres-polling.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@ kafka:
- KAFKA_HEAP_OPTS=-Xmx320m -Xms320m
- ZOOKEEPER_SERVERS=zookeeper:2181

activemq:
image: webcenter/activemq:5.14.3
ports:
- 8161:8161
- 61616:61616
- 5672:5672
- 61613:61613
- 1883:1883
- 61614:61614

postgres:
build: ./postgres
ports:
Expand Down
1 change: 1 addition & 0 deletions eventuate-tram-cdc-mysql-connector/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ apply plugin: PrivateModulePlugin

dependencies {
compile project(":eventuate-tram-messaging")
compile project(":eventuate-tram-producer-activemq")
compile "io.eventuate.local.java:eventuate-local-java-cdc-connector-mysql-binlog:$eventuateLocalVersion"
compile "io.eventuate.local.java:eventuate-local-java-cdc-connector-polling:$eventuateLocalVersion"
compile "io.eventuate.local.java:eventuate-local-java-cdc-connector-postgres-wal:$eventuateLocalVersion"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,17 @@
import io.eventuate.javaclient.spring.jdbc.EventuateSchema;
import io.eventuate.local.common.*;
import io.eventuate.local.db.log.common.*;
import io.eventuate.local.java.common.broker.DataProducerFactory;
import io.eventuate.local.java.kafka.EventuateKafkaConfigurationProperties;
import io.eventuate.local.java.kafka.producer.EventuateKafkaProducer;
import io.eventuate.local.mysql.binlog.*;
import io.eventuate.local.polling.PollingCdcKafkaPublisher;
import io.eventuate.local.polling.PollingCdcDataPublisher;
import io.eventuate.local.polling.PollingCdcProcessor;
import io.eventuate.local.polling.PollingDao;
import io.eventuate.local.polling.PollingDataProvider;
import io.eventuate.local.postgres.wal.PostgresWalClient;
import io.eventuate.local.postgres.wal.PostgresWalMessageParser;
import io.eventuate.tram.data.producer.activemq.EventuateActiveMQProducer;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
Expand Down Expand Up @@ -78,6 +80,18 @@ public EventuateKafkaProducer eventuateKafkaProducer(EventuateKafkaConfiguration
return new EventuateKafkaProducer(eventuateKafkaConfigurationProperties.getBootstrapServers());
}

@Bean
@Profile("ActiveMQ")
public DataProducerFactory activeMQDataProducerFactory(@Value("${activemq.url}") String activeMQURL) {
return () -> new EventuateActiveMQProducer(activeMQURL);
}

@Bean
@Profile("!ActiveMQ")
public DataProducerFactory kafkaDataProducerFactory(EventuateKafkaConfigurationProperties eventuateKafkaConfigurationProperties) {
return () -> new EventuateKafkaProducer(eventuateKafkaConfigurationProperties.getBootstrapServers());
}

@Bean
public PublishingStrategy<MessageWithDestination> publishingStrategy() {
return new MessageWithDestinationPublishingStrategy();
Expand All @@ -94,10 +108,10 @@ public DebeziumBinlogOffsetKafkaStore debeziumBinlogOffsetKafkaStore(EventuateCo

@Bean
public EventTableChangesToAggregateTopicTranslator<MessageWithDestination> eventTableChangesToAggregateTopicTranslator(EventuateConfigurationProperties eventuateConfigurationProperties,
CdcKafkaPublisher<MessageWithDestination> cdcKafkaPublisher,
CdcDataPublisher<MessageWithDestination> cdcDataPublisher,
CdcProcessor<MessageWithDestination> cdcProcessor,
CuratorFramework curatorFramework) {
return new EventTableChangesToAggregateTopicTranslator<>(cdcKafkaPublisher, cdcProcessor, curatorFramework, eventuateConfigurationProperties.getLeadershipLockPath());
return new EventTableChangesToAggregateTopicTranslator<>(cdcDataPublisher, cdcProcessor, curatorFramework, eventuateConfigurationProperties.getLeadershipLockPath());
}

@Bean(destroyMethod = "close")
Expand All @@ -108,11 +122,13 @@ public CuratorFramework curatorFramework(EventuateLocalZookeperConfigurationProp

@Bean
@Profile("!EventuatePolling")
public CdcKafkaPublisher<MessageWithDestination> cdcKafkaPublisher(EventuateKafkaConfigurationProperties eventuateKafkaConfigurationProperties,
public CdcDataPublisher<MessageWithDestination> cdcDataPublisher(DataProducerFactory dataProducerFactory,
EventuateKafkaConfigurationProperties eventuateKafkaConfigurationProperties,
DatabaseOffsetKafkaStore databaseOffsetKafkaStore,
PublishingStrategy<MessageWithDestination> publishingStrategy) {

return new DbLogBasedCdcKafkaPublisher<>(databaseOffsetKafkaStore,
return new DbLogBasedCdcDataPublisher<>(dataProducerFactory,
databaseOffsetKafkaStore,
eventuateKafkaConfigurationProperties.getBootstrapServers(),
publishingStrategy);
}
Expand All @@ -139,10 +155,10 @@ public DatabaseOffsetKafkaStore databaseOffsetKafkaStore(EventuateConfigurationP

@Bean
@Profile("EventuatePolling")
public CdcKafkaPublisher<MessageWithDestination> pollingCdcKafkaPublisher(EventuateKafkaConfigurationProperties eventuateKafkaConfigurationProperties,
PublishingStrategy<MessageWithDestination> publishingStrategy) {
public CdcDataPublisher<MessageWithDestination> pollingCdcDataPublisher(DataProducerFactory dataProducerFactory,
PublishingStrategy<MessageWithDestination> publishingStrategy) {

return new PollingCdcKafkaPublisher<>(eventuateKafkaConfigurationProperties.getBootstrapServers(), publishingStrategy);
return new PollingCdcDataPublisher<>(dataProducerFactory, publishingStrategy);
}

@Bean
Expand All @@ -156,8 +172,8 @@ public CdcProcessor<MessageWithDestination> pollingCdcProcessor(EventuateConfigu
@Bean
@Profile("EventuatePolling")
public PollingDao<PollingMessageBean, MessageWithDestination, String> pollingDao(PollingDataProvider<PollingMessageBean, MessageWithDestination, String> pollingDataProvider,
DataSource dataSource,
EventuateConfigurationProperties eventuateConfigurationProperties) {
DataSource dataSource,
EventuateConfigurationProperties eventuateConfigurationProperties) {

return new PollingDao<>(pollingDataProvider,
dataSource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ apply plugin: PrivateModulePlugin
dependencies {
compile project(":eventuate-tram-producer-jdbc")
compile project(":eventuate-tram-consumer-kafka")
compile project(":eventuate-tram-consumer-activemq")
compile project(":eventuate-tram-commands")

testCompile project(":eventuate-tram-cdc-mysql-connector-autoconfigure")
Expand Down
8 changes: 8 additions & 0 deletions eventuate-tram-consumer-activemq/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
apply plugin: PublicModulePlugin

dependencies {
compile project(":eventuate-tram-consumer-common")
compile "org.apache.activemq:activemq-core:5.7.0"
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package io.eventuate.tram.consumer.activemq;

import io.eventuate.javaclient.commonimpl.JSonMapper;
import io.eventuate.tram.consumer.common.DuplicateMessageDetector;
import io.eventuate.tram.messaging.common.Message;
import io.eventuate.tram.messaging.common.MessageImpl;
import io.eventuate.tram.messaging.consumer.MessageConsumer;
import io.eventuate.tram.messaging.consumer.MessageHandler;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.transaction.support.TransactionTemplate;

import javax.jms.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;

public class MessageConsumerActiveMQImpl implements MessageConsumer {

private Logger logger = LoggerFactory.getLogger(getClass());

@Autowired
private TransactionTemplate transactionTemplate;

@Autowired
private DuplicateMessageDetector duplicateMessageDetector;

private ActiveMQConnectionFactory connectionFactory;

private Connection connection;
private Session session;
private List<javax.jms.MessageConsumer> consumers = new ArrayList<>();
private List<Future<Void>> processingFutures = new ArrayList<>();

private AtomicBoolean runFlag = new AtomicBoolean(true);

public MessageConsumerActiveMQImpl(String url) {
connectionFactory = new ActiveMQConnectionFactory(url);
try {
connection = connectionFactory.createConnection();
connection.setExceptionListener(e -> logger.error(e.getMessage(), e));
connection.start();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
} catch (JMSException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}

@Override
public void subscribe(String subscriberId, Set<String> channels, MessageHandler handler) {
try {
for (String channel : channels) {
Destination destination = session.createQueue(channel);
javax.jms.MessageConsumer consumer = session.createConsumer(destination);
consumers.add(consumer);

processingFutures.add(CompletableFuture.supplyAsync(() -> process(subscriberId, consumer, handler)));
}
} catch (JMSException e) {
logger.error(e.getMessage(), e);
throw new RuntimeException(e);
}
}

private Void process(String subscriberId, javax.jms.MessageConsumer consumer, MessageHandler handler) {
while (runFlag.get()) {
try {
javax.jms.Message message = consumer.receive(100);

if (message == null) {
continue;
}

TextMessage textMessage = (TextMessage) message;
Message tramMessage = JSonMapper.fromJson(textMessage.getText(), MessageImpl.class);

transactionTemplate.execute(ts -> {
if (duplicateMessageDetector.isDuplicate(subscriberId, tramMessage.getId())) {
logger.trace("Duplicate message {} {}", subscriberId, tramMessage.getId());
acknowledge(textMessage);
return null;
}

try {
logger.trace("Invoking handler {} {}", subscriberId, tramMessage.getId());
handler.accept(tramMessage);
logger.trace("handled message {} {}", subscriberId, tramMessage.getId());
} catch (Throwable t) {
logger.trace("Got exception {} {}", subscriberId, tramMessage.getId());
logger.trace("Got exception ", t);
} finally {
acknowledge(textMessage);
}

return null;
});

} catch (JMSException e) {
logger.error(e.getMessage(), e);
}
}

try {
consumer.close();
} catch (JMSException e) {
logger.error(e.getMessage(), e);
}

return null;
}

private void acknowledge(TextMessage textMessage) {
try {
textMessage.acknowledge();
} catch (JMSException e) {
logger.error(e.getMessage(), e);
}
}

public void close() {
runFlag.set(false);

processingFutures.forEach(f -> {
try {
f.get();
} catch (InterruptedException | ExecutionException e) {
logger.error(e.getMessage(), e);
}
});

try {
session.close();
connection.close();
} catch (JMSException e) {
logger.error(e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.eventuate.tram.consumer.activemq;

import io.eventuate.tram.consumer.common.TramConsumerCommonConfiguration;
import io.eventuate.tram.messaging.consumer.MessageConsumer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@Import(TramConsumerCommonConfiguration.class)
public class TramConsumerActiveMQConfiguration {
@Bean
public MessageConsumer messageConsumer(@Value("${activemq.url}") String activeMQURL) {
return new MessageConsumerActiveMQImpl(activeMQURL);
}
}
16 changes: 16 additions & 0 deletions eventuate-tram-consumer-common/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
apply plugin: PublicModulePlugin

dependencies {
compile project(":eventuate-tram-messaging")

compile "io.eventuate.local.java:eventuate-local-java-common:$eventuateLocalVersion"
compile "org.springframework.boot:spring-boot-starter-jdbc:$springBootVersion"
compile 'mysql:mysql-connector-java:5.1.36'
compile ('org.postgresql:postgresql:9.4-1200-jdbc41') {
exclude group: "org.slf4j", module: "slf4j-simple"
}
compile "io.eventuate.client.java:eventuate-client-java-common-impl:$eventuateClientVersion"
compile "io.eventuate.client.java:eventuate-client-java-jdbc-common:$eventuateClientVersion"
}


Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.eventuate.tram.consumer.kafka;
package io.eventuate.tram.consumer.common;

public interface DuplicateMessageDetector {
boolean isDuplicate(String consumerId, String messageId);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.eventuate.tram.consumer.kafka;
package io.eventuate.tram.consumer.common;

import io.eventuate.javaclient.spring.jdbc.EventuateSchema;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.eventuate.tram.consumer.common;

import io.eventuate.javaclient.spring.jdbc.EventuateSchema;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TramConsumerCommonConfiguration {

@Bean
public EventuateSchema eventuateSchema(@Value("${eventuate.database.schema:#{null}}") String eventuateDatabaseSchema) {
return new EventuateSchema(eventuateDatabaseSchema);
}

@Bean
@ConditionalOnMissingBean(DuplicateMessageDetector.class)
public DuplicateMessageDetector duplicateMessageDetector(EventuateSchema eventuateSchema) {
return new SqlTableBasedDuplicateMessageDetector(eventuateSchema);
}
}
12 changes: 1 addition & 11 deletions eventuate-tram-consumer-kafka/build.gradle
Original file line number Diff line number Diff line change
@@ -1,18 +1,8 @@
apply plugin: PublicModulePlugin

dependencies {
compile project(":eventuate-tram-messaging")

compile "io.eventuate.local.java:eventuate-local-java-common:$eventuateLocalVersion"
compile project(":eventuate-tram-consumer-common")
compile "io.eventuate.local.java:eventuate-local-java-kafka:$eventuateLocalVersion"
compile "org.springframework.boot:spring-boot-starter-jdbc:$springBootVersion"
compile 'mysql:mysql-connector-java:5.1.36'
compile ('org.postgresql:postgresql:9.4-1200-jdbc41') {
exclude group: "org.slf4j", module: "slf4j-simple"
}
compile "io.eventuate.client.java:eventuate-client-java-common-impl:$eventuateClientVersion"
compile "io.eventuate.client.java:eventuate-client-java-jdbc-common:$eventuateClientVersion"

testCompile "org.springframework.boot:spring-boot-starter-test:$springBootVersion"
}

Expand Down
Loading

0 comments on commit 223dba7

Please sign in to comment.