20
20
import com .booking .replication .controller .WebServer ;
21
21
import com .booking .replication .coordinator .Coordinator ;
22
22
import com .booking .replication .coordinator .ZookeeperCoordinator ;
23
+ import com .booking .replication .it .util .MySQLRunner ;
23
24
import com .booking .replication .supplier .Supplier ;
24
25
import com .booking .replication .supplier .mysql .binlog .BinaryLogSupplier ;
25
26
import com .fasterxml .jackson .databind .ObjectMapper ;
51
52
public class ReplicatorKafkaJSONTest {
52
53
53
54
private static final Logger LOG = LogManager .getLogger (ReplicatorKafkaJSONTest .class );
55
+ private static final MySQLRunner MYSQL_RUNNER = new MySQLRunner ();
54
56
55
57
private static final String ZOOKEEPER_LEADERSHIP_PATH = "/replicator/leadership" ;
56
58
private static final String ZOOKEEPER_CHECKPOINT_PATH = "/replicator/checkpoint" ;
57
59
58
60
private static final String CHECKPOINT_DEFAULT = "{\" timestamp\" : 0, \" serverId\" : 1, \" gtid\" : null, \" binlog\" : {\" filename\" : \" binlog.000001\" , \" position\" : 4}}" ;
59
61
62
+ private static MySQLConfiguration MYSQL_CONFIG = new MySQLConfiguration (
63
+ ReplicatorKafkaJSONTest .MYSQL_SCHEMA ,
64
+ ReplicatorKafkaJSONTest .MYSQL_USERNAME ,
65
+ ReplicatorKafkaJSONTest .MYSQL_PASSWORD ,
66
+ ReplicatorKafkaJSONTest .MYSQL_CONF_FILE ,
67
+ Collections .singletonList (ReplicatorKafkaJSONTest .MYSQL_INIT_SCRIPT ),
68
+ null ,
69
+ null
70
+ );
60
71
private static final String MYSQL_SCHEMA = "replicator" ;
61
72
private static final String MYSQL_ROOT_USERNAME = "root" ;
62
73
private static final String MYSQL_USERNAME = "replicator" ;
@@ -83,19 +94,8 @@ public class ReplicatorKafkaJSONTest {
83
94
@ BeforeClass
84
95
public static void before () {
85
96
ServicesProvider servicesProvider = ServicesProvider .build (ServicesProvider .Type .CONTAINERS );
86
-
87
97
ReplicatorKafkaJSONTest .zookeeper = servicesProvider .startZookeeper ();
88
98
89
- MySQLConfiguration mySQLConfiguration = new MySQLConfiguration (
90
- ReplicatorKafkaJSONTest .MYSQL_SCHEMA ,
91
- ReplicatorKafkaJSONTest .MYSQL_USERNAME ,
92
- ReplicatorKafkaJSONTest .MYSQL_PASSWORD ,
93
- ReplicatorKafkaJSONTest .MYSQL_CONF_FILE ,
94
- Collections .singletonList (ReplicatorKafkaJSONTest .MYSQL_INIT_SCRIPT ),
95
- null ,
96
- null
97
- );
98
-
99
99
MySQLConfiguration mySQLActiveSchemaConfiguration = new MySQLConfiguration (
100
100
ReplicatorKafkaJSONTest .MYSQL_ACTIVE_SCHEMA ,
101
101
ReplicatorKafkaJSONTest .MYSQL_USERNAME ,
@@ -106,7 +106,7 @@ public static void before() {
106
106
null
107
107
);
108
108
109
- ReplicatorKafkaJSONTest .mysqlBinaryLog = servicesProvider .startMySQL (mySQLConfiguration );
109
+ ReplicatorKafkaJSONTest .mysqlBinaryLog = servicesProvider .startMySQL (MYSQL_CONFIG );
110
110
ReplicatorKafkaJSONTest .mysqlActiveSchema = servicesProvider .startMySQL (mySQLActiveSchemaConfiguration );
111
111
Network network = Network .newNetwork ();
112
112
ReplicatorKafkaJSONTest .kafkaZk = servicesProvider .startZookeeper (network , "kafkaZk" );
@@ -121,7 +121,7 @@ public void testReplicator() throws Exception {
121
121
122
122
File file = new File ("src/test/resources/" + ReplicatorKafkaJSONTest .MYSQL_TEST_SCRIPT );
123
123
124
- runMysqlScripts ( this . getConfiguration (), file .getAbsolutePath ());
124
+ MYSQL_RUNNER . runMysqlScript ( ReplicatorKafkaJSONTest . mysqlBinaryLog , MYSQL_CONFIG , file .getAbsolutePath (), null , true );
125
125
126
126
replicator .wait (1L , TimeUnit .MINUTES );
127
127
@@ -211,67 +211,6 @@ public void testReplicator() throws Exception {
211
211
replicator .stop ();
212
212
}
213
213
214
- private boolean runMysqlScripts (Map <String , Object > configuration , String scriptFilePath ) {
215
- BufferedReader reader ;
216
- Statement statement ;
217
- BasicDataSource dataSource = initDatasource (configuration , Driver .class .getName ());
218
- try (Connection connection = dataSource .getConnection ()) {
219
- statement = connection .createStatement ();
220
- reader = new BufferedReader (new FileReader (scriptFilePath ));
221
- String line ;
222
- // read script line by line
223
- ReplicatorKafkaJSONTest .LOG .info ("Executing query from " + scriptFilePath );
224
- String s ;
225
- StringBuilder sb = new StringBuilder ();
226
-
227
- FileReader fr = new FileReader (new File (scriptFilePath ));
228
- BufferedReader br = new BufferedReader (fr );
229
- while ((s = br .readLine ()) != null ) {
230
- sb .append (s );
231
- }
232
- br .close ();
233
-
234
- String [] inst = sb .toString ().split (";" );
235
- for (String query : inst ) {
236
- if (!query .trim ().equals ("" )) {
237
- statement .execute (query );
238
- ReplicatorKafkaJSONTest .LOG .info (query );
239
- }
240
- }
241
- return true ;
242
- } catch (Exception exception ) {
243
- ReplicatorKafkaJSONTest .LOG .warn (String .format ("error executing query \" %s\" : %s" , scriptFilePath , exception .getMessage ()));
244
- return false ;
245
- }
246
-
247
- }
248
-
249
- private BasicDataSource initDatasource (Map <String , Object > configuration , Object driverClass ) {
250
- List <String > hostnames = (List <String >) configuration .get (BinaryLogSupplier .Configuration .MYSQL_HOSTNAME );
251
- Object port = configuration .getOrDefault (BinaryLogSupplier .Configuration .MYSQL_PORT , "3306" );
252
- Object schema = configuration .get (BinaryLogSupplier .Configuration .MYSQL_SCHEMA );
253
- Object username = configuration .get (BinaryLogSupplier .Configuration .MYSQL_USERNAME );
254
- Object password = configuration .get (BinaryLogSupplier .Configuration .MYSQL_PASSWORD );
255
-
256
- Objects .requireNonNull (hostnames , String .format ("Configuration required: %s" , BinaryLogSupplier .Configuration .MYSQL_HOSTNAME ));
257
- Objects .requireNonNull (schema , String .format ("Configuration required: %s" , BinaryLogSupplier .Configuration .MYSQL_SCHEMA ));
258
- Objects .requireNonNull (username , String .format ("Configuration required: %s" , BinaryLogSupplier .Configuration .MYSQL_USERNAME ));
259
- Objects .requireNonNull (password , String .format ("Configuration required: %s" , BinaryLogSupplier .Configuration .MYSQL_PASSWORD ));
260
-
261
- return this .getDataSource (driverClass .toString (), hostnames .get (0 ), Integer .parseInt (port .toString ()), schema .toString (), username .toString (), password .toString ());
262
- }
263
-
264
- private BasicDataSource getDataSource (String driverClass , String hostname , int port , String schema , String username , String password ) {
265
- BasicDataSource dataSource = new BasicDataSource ();
266
-
267
- dataSource .setDriverClassName (driverClass );
268
- dataSource .setUrl (String .format (CONNECTION_URL_FORMAT , hostname , port , schema ));
269
- dataSource .setUsername (username );
270
- dataSource .setPassword (password );
271
-
272
- return dataSource ;
273
- }
274
-
275
214
private Map <String , Object > getConfiguration () {
276
215
Map <String , Object > configuration = new HashMap <>();
277
216
0 commit comments