Skip to content

Commit

Permalink
adapting to changes in eventuate local.
Browse files Browse the repository at this point in the history
  • Loading branch information
root committed Oct 12, 2017
1 parent b202851 commit 71bbef2
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 35 deletions.
3 changes: 2 additions & 1 deletion eventuate-tram-cdc-mysql-connector/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ apply plugin: PrivateModulePlugin

dependencies {
compile project(":eventuate-tram-messaging")
compile "io.eventuate.local.java:eventuate-local-java-cdc-mysql-connector:$eventuateLocalVersion"
compile "io.eventuate.local.java:eventuate-local-java-cdc-connector-mysql-binlog:$eventuateLocalVersion"
compile "io.eventuate.local.java:eventuate-local-java-cdc-connector-polling:$eventuateLocalVersion"
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
package io.eventuate.tram.cdc.mysql.connector;

import io.eventuate.javaclient.driver.EventuateDriverConfiguration;
import io.eventuate.local.common.*;
import io.eventuate.local.java.kafka.EventuateKafkaConfigurationProperties;
import io.eventuate.local.java.kafka.producer.EventuateKafkaProducer;
import io.eventuate.local.mysql.binlog.*;
import io.eventuate.local.polling.PollingCdcKafkaPublisher;
import io.eventuate.local.polling.PollingCdcProcessor;
import io.eventuate.local.polling.PollingDao;
import io.eventuate.local.polling.PollingDataProvider;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
Expand All @@ -20,15 +25,15 @@
import java.util.concurrent.TimeoutException;

@Configuration
@EnableConfigurationProperties({MySqlBinaryLogClientConfigurationProperties.class, EventuateLocalZookeperConfigurationProperties.class}
@EnableConfigurationProperties({EventuateConfigurationProperties.class, EventuateLocalZookeperConfigurationProperties.class}
)
@Import(EventuateDriverConfiguration.class)
public class MessageTableChangesToDestinationsConfiguration {

@Bean
@Profile("!EventuatePolling")
public SourceTableNameSupplier sourceTableNameSupplier(MySqlBinaryLogClientConfigurationProperties mySqlBinaryLogClientConfigurationProperties) {
return new SourceTableNameSupplier(mySqlBinaryLogClientConfigurationProperties.getSourceTableName(), MySQLTableConfig.EVENTS_TABLE_NAME);
public SourceTableNameSupplier sourceTableNameSupplier(EventuateConfigurationProperties eventuateConfigurationProperties) {
return new SourceTableNameSupplier(eventuateConfigurationProperties.getSourceTableName(), MySQLTableConfig.EVENTS_TABLE_NAME);
}

@Bean
Expand All @@ -41,16 +46,16 @@ public IWriteRowsEventDataParser eventDataParser(DataSource dataSource) {
@Profile("!EventuatePolling")

public MySqlBinaryLogClient<MessageWithDestination> mySqlBinaryLogClient(@Value("${spring.datasource.url}") String dataSourceURL,
MySqlBinaryLogClientConfigurationProperties mySqlBinaryLogClientConfigurationProperties,
EventuateConfigurationProperties eventuateConfigurationProperties,
SourceTableNameSupplier sourceTableNameSupplier,
IWriteRowsEventDataParser<MessageWithDestination> eventDataParser) throws IOException, TimeoutException {
JdbcUrl jdbcUrl = JdbcUrlParser.parse(dataSourceURL);
return new MySqlBinaryLogClient<>(eventDataParser,
mySqlBinaryLogClientConfigurationProperties.getDbUserName(),
mySqlBinaryLogClientConfigurationProperties.getDbPassword(),
eventuateConfigurationProperties.getDbUserName(),
eventuateConfigurationProperties.getDbPassword(),
jdbcUrl.getHost(),
jdbcUrl.getPort(),
mySqlBinaryLogClientConfigurationProperties.getBinlogClientId(),
eventuateConfigurationProperties.getBinlogClientId(),
sourceTableNameSupplier.getSourceTableName());
}

Expand All @@ -61,7 +66,7 @@ public EventuateKafkaProducer eventuateKafkaProducer(EventuateKafkaConfiguration

@Bean
@Profile("!EventuatePolling")
public MySQLCdcKafkaPublisher<MessageWithDestination> mySQLCdcKafkaPublisher(EventuateKafkaConfigurationProperties eventuateKafkaConfigurationProperties, DatabaseBinlogOffsetKafkaStore binlogOffsetKafkaStore, PublishingStrategy<MessageWithDestination> publishingStrategy) {
public CdcKafkaPublisher<MessageWithDestination> mySQLCdcKafkaPublisher(EventuateKafkaConfigurationProperties eventuateKafkaConfigurationProperties, DatabaseBinlogOffsetKafkaStore binlogOffsetKafkaStore, PublishingStrategy<MessageWithDestination> publishingStrategy) {
return new MySQLCdcKafkaPublisher<>(binlogOffsetKafkaStore, eventuateKafkaConfigurationProperties.getBootstrapServers(), publishingStrategy);
}

Expand All @@ -72,25 +77,24 @@ public PublishingStrategy<MessageWithDestination> publishingStrategy() {

@Bean
@Profile("!EventuatePolling")
public MySQLCdcProcessor<MessageWithDestination> mySQLCdcProcessor(MySqlBinaryLogClient<MessageWithDestination> mySqlBinaryLogClient, DatabaseBinlogOffsetKafkaStore binlogOffsetKafkaStore) {
public CdcProcessor<MessageWithDestination> mySQLCdcProcessor(MySqlBinaryLogClient<MessageWithDestination> mySqlBinaryLogClient, DatabaseBinlogOffsetKafkaStore binlogOffsetKafkaStore) {
return new MySQLCdcProcessor<>(mySqlBinaryLogClient, binlogOffsetKafkaStore);
}

@Bean
@Profile("!EventuatePolling")
public DatabaseBinlogOffsetKafkaStore binlogOffsetKafkaStore(EventuateKafkaConfigurationProperties eventuateKafkaConfigurationProperties,
MySqlBinaryLogClientConfigurationProperties mySqlBinaryLogClientConfigurationProperties,
EventuateConfigurationProperties eventuateConfigurationProperties,
MySqlBinaryLogClient mySqlBinaryLogClient,
EventuateKafkaProducer eventuateKafkaProducer) {
return new DatabaseBinlogOffsetKafkaStore(mySqlBinaryLogClientConfigurationProperties.getDbHistoryTopicName(), mySqlBinaryLogClient.getName(), eventuateKafkaProducer, eventuateKafkaConfigurationProperties);
return new DatabaseBinlogOffsetKafkaStore(eventuateConfigurationProperties.getDbHistoryTopicName(), mySqlBinaryLogClient.getName(), eventuateKafkaProducer, eventuateKafkaConfigurationProperties);
}

@Bean
@Profile("!EventuatePolling")
public EventTableChangesToAggregateTopicTranslator<MessageWithDestination> mysqlEventTableChangesToAggregateTopicTranslator(MySQLCdcKafkaPublisher<MessageWithDestination> mySQLCdcKafkaPublisher,
MySQLCdcProcessor<MessageWithDestination> mySQLCdcProcessor,
public EventTableChangesToAggregateTopicTranslator<MessageWithDestination> eventTableChangesToAggregateTopicTranslator(CdcKafkaPublisher<MessageWithDestination> cdcKafkaPublisher,
CdcProcessor<MessageWithDestination> cdcProcessor,
CuratorFramework curatorFramework) {
return new EventTableChangesToAggregateTopicTranslator<>(mySQLCdcKafkaPublisher, mySQLCdcProcessor, curatorFramework);
return new EventTableChangesToAggregateTopicTranslator<>(cdcKafkaPublisher, cdcProcessor, curatorFramework);
}

@Bean(destroyMethod = "close")
Expand All @@ -102,40 +106,31 @@ public CuratorFramework curatorFramework(EventuateLocalZookeperConfigurationProp

@Bean
@Profile("EventuatePolling")
public PollingCdcKafkaPublisher<MessageWithDestination> зollingCdcKafkaPublisher(EventuateKafkaConfigurationProperties eventuateKafkaConfigurationProperties,
PublishingStrategy<MessageWithDestination> publishingStrategy) {
public CdcKafkaPublisher<MessageWithDestination> pollingCdcKafkaPublisher(EventuateKafkaConfigurationProperties eventuateKafkaConfigurationProperties,
PublishingStrategy<MessageWithDestination> publishingStrategy) {

return new PollingCdcKafkaPublisher<>(eventuateKafkaConfigurationProperties.getBootstrapServers(), publishingStrategy);
}

@Bean
@Profile("EventuatePolling")
public PollingCdcProcessor<PollingMessageBean, MessageWithDestination, String> pollingCdcProcessor(MySqlBinaryLogClientConfigurationProperties mySqlBinaryLogClientConfigurationProperties,
public CdcProcessor<MessageWithDestination> pollingCdcProcessor(EventuateConfigurationProperties eventuateConfigurationProperties,
PollingDao<PollingMessageBean, MessageWithDestination, String> pollingDao) {

return new PollingCdcProcessor<>(pollingDao, mySqlBinaryLogClientConfigurationProperties.getPollingIntervalInMilliseconds());
}

@Bean
@Profile("EventuatePolling")
public EventTableChangesToAggregateTopicTranslator<MessageWithDestination> pollingEventTableChangesToAggregateTopicTranslator(PollingCdcKafkaPublisher<MessageWithDestination> pollingCdcKafkaPublisher,
PollingCdcProcessor<PollingMessageBean, MessageWithDestination, String> pollingCdcProcessor,
CuratorFramework curatorFramework) {

return new EventTableChangesToAggregateTopicTranslator<>(pollingCdcKafkaPublisher, pollingCdcProcessor, curatorFramework);
return new PollingCdcProcessor<>(pollingDao, eventuateConfigurationProperties.getPollingIntervalInMilliseconds());
}

@Bean
@Profile("EventuatePolling")
public PollingDao<PollingMessageBean, MessageWithDestination, String> pollingDao(PollingDataProvider<PollingMessageBean, MessageWithDestination, String> pollingDataProvider,
DataSource dataSource,
MySqlBinaryLogClientConfigurationProperties mySqlBinaryLogClientConfigurationProperties) {
EventuateConfigurationProperties eventuateConfigurationProperties) {

return new PollingDao<>(pollingDataProvider,
dataSource,
mySqlBinaryLogClientConfigurationProperties.getMaxEventsPerPolling(),
mySqlBinaryLogClientConfigurationProperties.getMaxAttemptsForPolling(),
mySqlBinaryLogClientConfigurationProperties.getPollingRetryIntervalInMilliseconds());
eventuateConfigurationProperties.getMaxEventsPerPolling(),
eventuateConfigurationProperties.getMaxAttemptsForPolling(),
eventuateConfigurationProperties.getPollingRetryIntervalInMilliseconds());
}

@Bean
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.eventuate.tram.cdc.mysql.connector;

import io.eventuate.javaclient.commonimpl.JSonMapper;
import io.eventuate.local.mysql.binlog.PublishingStrategy;
import io.eventuate.local.common.PublishingStrategy;
import io.eventuate.tram.messaging.common.Message;

import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.eventuate.tram.cdc.mysql.connector;

import io.eventuate.javaclient.commonimpl.JSonMapper;
import io.eventuate.local.mysql.binlog.PollingDataProvider;
import io.eventuate.local.polling.PollingDataProvider;
import io.eventuate.tram.messaging.common.MessageImpl;

import java.util.Map;
Expand Down

0 comments on commit 71bbef2

Please sign in to comment.