Skip to content

Commit

Permalink
Decoupled MessageConsumer from JDBC - using the SQL-based duplicate d…
Browse files Browse the repository at this point in the history
…etector is now optional
  • Loading branch information
cer committed Mar 20, 2019
1 parent b38a569 commit d21a8ab
Show file tree
Hide file tree
Showing 27 changed files with 107 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ apply plugin: PrivateModulePlugin

dependencies {
compile project(":eventuate-tram-producer-jdbc")

compile project(":eventuate-tram-consumer-jdbc")

compile project(":eventuate-tram-consumer-kafka")
compile project(":eventuate-tram-consumer-activemq")
compile project(":eventuate-tram-commands")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.eventuate.tram.commands.common.DefaultChannelMapping;
import io.eventuate.tram.commands.consumer.CommandDispatcher;
import io.eventuate.tram.commands.producer.TramCommandProducerConfiguration;
import io.eventuate.tram.consumer.common.TramNoopDuplicateMessageDetectorConfiguration;
import io.eventuate.tram.consumer.kafka.TramConsumerKafkaConfiguration;
import io.eventuate.tram.messaging.consumer.MessageConsumer;
import io.eventuate.tram.messaging.producer.jdbc.TramMessageProducerJdbcConfiguration;
Expand All @@ -18,7 +19,7 @@
@EnableAutoConfiguration
@Import({TramConsumerKafkaConfiguration.class,
TramMessageProducerJdbcConfiguration.class,
TramCommandProducerConfiguration.class
TramCommandProducerConfiguration.class, TramNoopDuplicateMessageDetectorConfiguration.class
})
public class TramCommandsAndEventsIntegrationTestConfiguration {

Expand Down
4 changes: 4 additions & 0 deletions eventuate-tram-consumer-activemq/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ apply plugin: PublicModulePlugin

dependencies {
compile project(":eventuate-tram-consumer-common")

// TODO this needs to be removed
compile project(":eventuate-tram-consumer-jdbc")

compile "org.apache.activemq:activemq-core:5.7.0"
}

Expand Down
11 changes: 3 additions & 8 deletions eventuate-tram-consumer-common/build.gradle
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
apply plugin: PublicModulePlugin

dependencies {
compile project(":eventuate-tram-jdbc")
// compile project(":eventuate-tram-jdbc")
compile project(":eventuate-tram-messaging")
compile project(":eventuate-tram-messaging-jdbc")

compile "io.eventuate.local.java:eventuate-local-java-common:$eventuateLocalVersion"
compile "org.springframework.boot:spring-boot-starter-jdbc:$springBootVersion"
compile 'mysql:mysql-connector-java:5.1.36'
compile ('org.postgresql:postgresql:9.4-1200-jdbc41') {
exclude group: "org.slf4j", module: "slf4j-simple"
}

compile "io.eventuate.client.java:eventuate-client-java-common-impl:$eventuateClientVersion"
compile "io.eventuate.client.java:eventuate-client-java-jdbc-common:$eventuateClientVersion"
// compile "io.eventuate.client.java:eventuate-client-java-jdbc-common:$eventuateClientVersion"
}


Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package io.eventuate.tram.consumer.common;

public class NoopDuplicateMessageDetector implements DuplicateMessageDetector {
public class NoopDuplicateMessageDetector implements DuplicateMessageDetector {

@Override
public boolean isDuplicate(String consumerId, String messageId) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,14 @@
package io.eventuate.tram.consumer.common;

import io.eventuate.javaclient.spring.jdbc.EventuateSchema;
import io.eventuate.tram.jdbc.CommonJdbcMessagingConfiguration;
import io.eventuate.tram.messaging.common.MessageInterceptor;
import io.eventuate.tram.messaging.common.sql.SqlDialectConfiguration;
import io.eventuate.tram.messaging.common.sql.SqlDialectSelector;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.transaction.support.TransactionTemplate;

import java.util.List;

@Configuration
@Import({SqlDialectConfiguration.class, CommonJdbcMessagingConfiguration.class})
public class TramConsumerCommonConfiguration {
@Bean
@ConditionalOnMissingBean(DuplicateMessageDetector.class)
public DuplicateMessageDetector duplicateMessageDetector(EventuateSchema eventuateSchema,
SqlDialectSelector sqlDialectSelector, TransactionTemplate transactionTemplate) {
return new SqlTableBasedDuplicateMessageDetector(eventuateSchema,
sqlDialectSelector.getDialect().getCurrentTimeInMillisecondsExpression(), transactionTemplate);
}

@Autowired(required=false)
private MessageInterceptor[] messageInterceptors = new MessageInterceptor[0];
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.eventuate.tram.consumer.common;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TramNoopDuplicateMessageDetectorConfiguration {

@Bean
public DuplicateMessageDetector duplicateMessageDetector() {
return new NoopDuplicateMessageDetector();
}
}
9 changes: 9 additions & 0 deletions eventuate-tram-consumer-jdbc/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@


dependencies {
compile project(":eventuate-tram-consumer-common")
compile project(":eventuate-tram-messaging-jdbc")

testCompile "org.springframework.boot:spring-boot-starter-test:$springBootCdcVersion"

}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.eventuate.tram.consumer.common;
package io.eventuate.tram.consumer.jdbc;

import io.eventuate.javaclient.spring.jdbc.EventuateSchema;
import io.eventuate.tram.consumer.common.DuplicateMessageDetector;
import io.eventuate.tram.consumer.common.SubscriberIdAndMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.eventuate.tram.consumer.jdbc;

import io.eventuate.javaclient.spring.jdbc.EventuateSchema;
import io.eventuate.tram.consumer.common.DuplicateMessageDetector;
import io.eventuate.tram.jdbc.CommonJdbcMessagingConfiguration;
import io.eventuate.tram.messaging.common.sql.SqlDialectConfiguration;
import io.eventuate.tram.messaging.common.sql.SqlDialectSelector;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.transaction.support.TransactionTemplate;

@Configuration
@Import({SqlDialectConfiguration.class, CommonJdbcMessagingConfiguration.class})
public class TramConsumerJdbcConfiguration {

@Bean
@ConditionalOnMissingBean(DuplicateMessageDetector.class)
public DuplicateMessageDetector duplicateMessageDetector(EventuateSchema eventuateSchema,
SqlDialectSelector sqlDialectSelector, TransactionTemplate transactionTemplate) {
return new SqlTableBasedDuplicateMessageDetector(eventuateSchema,
sqlDialectSelector.getDialect().getCurrentTimeInMillisecondsExpression(), transactionTemplate);
}

}
Original file line number Diff line number Diff line change
@@ -1,22 +1,31 @@
package io.eventuate.tram.consumer.kafka;
package io.eventuate.tram.consumer.jdbc;

import io.eventuate.tram.consumer.common.DuplicateMessageDetector;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.test.context.junit4.SpringRunner;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = DuplicateMessageDetectorTestConfiguration.class)
public class DuplicateMessageDetectorTest {
@SpringBootTest(classes = {SqlTableBasedDuplicateMessageDetectorTest.DuplicateMessageDetectorTestConfiguration.class}, webEnvironment = SpringBootTest.WebEnvironment.NONE)
public class SqlTableBasedDuplicateMessageDetectorTest {

@Autowired
private DuplicateMessageDetector duplicateMessageDetector;

@Configuration
@Import(TramConsumerJdbcConfiguration.class)
@EnableAutoConfiguration
static public class DuplicateMessageDetectorTestConfiguration {
}

@Test
public void shouldDetectDuplicate() {

Expand Down
1 change: 1 addition & 0 deletions eventuate-tram-consumer-kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ apply plugin: PublicModulePlugin

dependencies {
compile project(":eventuate-tram-consumer-common")
compile project(":eventuate-tram-consumer-jdbc")
compile project(":eventuate-tram-jdbc")
compile project(":eventuate-tram-messaging")

Expand Down

This file was deleted.

3 changes: 3 additions & 0 deletions eventuate-tram-consumer-rabbitmq/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ apply plugin: PublicModulePlugin

dependencies {
compile project(":eventuate-tram-consumer-common")
// TODO this needs to be removed
compile project(":eventuate-tram-consumer-jdbc")

compile group: 'com.rabbitmq', name: 'amqp-client', version: '5.2.0'

compile('org.apache.curator:curator-framework:2.11.0')
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package io.eventuate.tram.e2e.tests.redis.messages;

import io.eventuate.jdbcredis.TramJdbcRedisConfiguration;
import io.eventuate.tram.consumer.common.NoopDuplicateMessageDetector;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@EnableAutoConfiguration
@Import({TramJdbcRedisConfiguration.class})
@Import({TramJdbcRedisConfiguration.class, NoopDuplicateMessageDetector.class})
public class JdbcRedisTramMessageTestConfiguration {
}
1 change: 1 addition & 0 deletions eventuate-tram-jdbc-activemq/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ apply plugin: PublicModulePlugin
dependencies {

compile project(":eventuate-tram-producer-jdbc")
compile project(":eventuate-tram-consumer-jdbc")
compile project(":eventuate-tram-consumer-activemq")
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package io.eventuate.jdbcactivemq;

import io.eventuate.tram.consumer.activemq.TramConsumerActiveMQConfiguration;
import io.eventuate.tram.consumer.jdbc.TramConsumerJdbcConfiguration;
import io.eventuate.tram.messaging.producer.jdbc.TramMessageProducerJdbcConfiguration;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@Import({TramConsumerActiveMQConfiguration.class, TramMessageProducerJdbcConfiguration.class, })
@Import({TramConsumerActiveMQConfiguration.class, TramMessageProducerJdbcConfiguration.class, TramConsumerJdbcConfiguration.class})
public class TramJdbcActiveMQConfiguration {
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package io.eventuate.jdbckafka;

import io.eventuate.tram.consumer.jdbc.TramConsumerJdbcConfiguration;
import io.eventuate.tram.consumer.kafka.TramConsumerKafkaConfiguration;
import io.eventuate.tram.messaging.producer.jdbc.TramMessageProducerJdbcConfiguration;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@Import({TramConsumerKafkaConfiguration.class, TramMessageProducerJdbcConfiguration.class, })
@Import({TramConsumerKafkaConfiguration.class, TramMessageProducerJdbcConfiguration.class, TramConsumerJdbcConfiguration.class})
public class TramJdbcKafkaConfiguration {
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package io.eventuate.jdbcrabbitmq;

import io.eventuate.tram.consumer.jdbc.TramConsumerJdbcConfiguration;
import io.eventuate.tram.consumer.rabbitmq.TramConsumerRabbitMQConfiguration;
import io.eventuate.tram.messaging.producer.jdbc.TramMessageProducerJdbcConfiguration;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@Import({TramConsumerRabbitMQConfiguration.class, TramMessageProducerJdbcConfiguration.class})
@Import({TramConsumerRabbitMQConfiguration.class, TramMessageProducerJdbcConfiguration.class, TramConsumerJdbcConfiguration.class})
public class TramJdbcRabbitMQConfiguration {
}
1 change: 1 addition & 0 deletions eventuate-tram-jdbc-redis/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ apply plugin: PublicModulePlugin
dependencies {

compile project(":eventuate-tram-producer-jdbc")
compile project(":eventuate-tram-consumer-jdbc")
compile project(":eventuate-tram-consumer-redis")
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package io.eventuate.jdbcredis;

import io.eventuate.tram.consumer.jdbc.TramConsumerJdbcConfiguration;
import io.eventuate.tram.consumer.redis.TramConsumerRedisConfiguration;
import io.eventuate.tram.messaging.producer.jdbc.TramMessageProducerJdbcConfiguration;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@Import({TramConsumerRedisConfiguration.class, TramMessageProducerJdbcConfiguration.class})
@Import({TramConsumerRedisConfiguration.class, TramMessageProducerJdbcConfiguration.class, TramConsumerJdbcConfiguration.class})
public class TramJdbcRedisConfiguration {

}
7 changes: 7 additions & 0 deletions eventuate-tram-messaging-jdbc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@ apply plugin: PublicModulePlugin

dependencies {
compile project(":eventuate-tram-messaging")
compile project(":eventuate-tram-jdbc")

compile "org.springframework.boot:spring-boot-starter-jdbc:$springBootVersion"
compile 'mysql:mysql-connector-java:5.1.36'
compile ('org.postgresql:postgresql:9.4-1200-jdbc41') {
exclude group: "org.slf4j", module: "slf4j-simple"
}

compile "org.springframework.boot:spring-boot-starter:$springBootVersion"
testCompile "org.springframework.boot:spring-boot-starter-test:$springBootCdcVersion"
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
package io.eventuate.tram.messaging.common.sql;

import io.eventuate.tram.jdbc.CommonJdbcMessagingConfiguration;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;

@Configuration
@Import(CommonJdbcMessagingConfiguration.class)
public class SqlDialectConfiguration {

@Bean
Expand Down
1 change: 0 additions & 1 deletion eventuate-tram-producer-jdbc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ dependencies {
compile project(":eventuate-tram-messaging-jdbc")

compile "org.springframework.boot:spring-boot-starter-jdbc:$springBootVersion"
compile 'mysql:mysql-connector-java:5.1.36'
compile "com.fasterxml.jackson.core:jackson-databind:2.8.3"
compile "io.eventuate.client.java:eventuate-client-java-jdbc-common:$eventuateClientVersion"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.common.collect.ImmutableSet;
import io.eventuate.javaclient.commonimpl.JSonMapper;
import io.eventuate.tram.consumer.common.TramConsumerCommonConfiguration;
import io.eventuate.tram.consumer.common.TramNoopDuplicateMessageDetectorConfiguration;
import io.eventuate.tram.consumer.redis.MessageConsumerRedisImpl;
import io.eventuate.tram.consumer.redis.RedisCoordinatorFactory;
import io.eventuate.tram.consumer.redis.RedisCoordinatorFactoryImpl;
Expand Down Expand Up @@ -44,7 +45,7 @@ public class MessagingTest {

@Configuration
@EnableAutoConfiguration
@Import({CommonRedisConfiguration.class, TramConsumerCommonConfiguration.class})
@Import({CommonRedisConfiguration.class, TramConsumerCommonConfiguration.class, TramNoopDuplicateMessageDetectorConfiguration.class})
public static class Config {
}

Expand Down
3 changes: 2 additions & 1 deletion set-env-mysql-binlog.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ export RABBITMQ_URL=${DOCKER_HOST_IP}
export REDIS_SERVERS=${DOCKER_HOST_IP}:6379
export REDIS_PARTITIONS=2

unset SPRING_PROFILES_ACTIVE
export SPRING_PROFILES_ACTIVE=

1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ include 'eventuate-tram-consumer-kafka'
include 'eventuate-tram-consumer-activemq'
include 'eventuate-tram-consumer-rabbitmq'
include 'eventuate-tram-consumer-redis'
include 'eventuate-tram-consumer-jdbc'
include 'eventuate-tram-jdbc-kafka'
include 'eventuate-tram-jdbc-activemq'
include 'eventuate-tram-jdbc-rabbitmq'
Expand Down

0 comments on commit d21a8ab

Please sign in to comment.