Skip to content

Commit

Permalink
Merge pull request #535 from smferguson/event_replay
Browse files Browse the repository at this point in the history
Issue #532 - add event replay (bootstrapping from a point-in-time)
  • Loading branch information
Ben Osheroff authored Jan 24, 2017
2 parents 1b9e068 + 7d26ef3 commit c6cc9d6
Show file tree
Hide file tree
Showing 13 changed files with 78 additions and 18 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ include ETL, cache building/expiring, metrics collection, and search indexing.

advanced features:

- Can do `SELECT * from table` (bootstrapping) initial loads of a table
- Can do `SELECT * from table` (bootstrapping) initial loads of a table.
- supports automatic position recover on master promotion
- flexible partitioning schemes for Kakfa - by database, table, primary key, or column
- Maxwell pulls all this off by acting as a full mysql replica, including a SQL
Expand Down
8 changes: 8 additions & 0 deletions docs/docs/bootstrapping.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ option | description
--port PORT | mysql port
--database DATABASE | mysql database containing the table to bootstrap
--table TABLE | mysql table to bootstrap
--where WHERE_CLAUSE | where clause to restrict the rows bootstrapped from the specified table

### Using the maxwell.bootstrap table
***
Expand All @@ -19,6 +20,13 @@ Alternatively you can insert a row in the `maxwell.bootstrap` table to trigger a
```
mysql> insert into maxwell.bootstrap (database_name, table_name) values ('fooDB', 'barTable');
```
Optionally, you can include a where clause to replay part of the data.

bin/maxwell-bootstrap --config localhost.properties --database foobar --table test --log_level info

or

bin/maxwell-bootstrap --config localhost.properties --database foobar --table test --where "my_date >= '2017-01-07 00:00:00'" --log_level info

### Async vs Sync bootstrapping
***
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ protected String bootstrapTable(RowMap rowmap) {
return (String) rowmap.getData("table_name");
}

protected String bootstrapWhere(RowMap rowmap) {
return (String) rowmap.getData("where_clause");
}

abstract public boolean shouldSkip(RowMap row) throws SQLException, IOException;

abstract public void startBootstrap(RowMap startBootstrapRow, AbstractProducer producer, Replicator replicator) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ private void run(String[] argv) throws Exception {
getInsertedRowsCount(connection, config.monitorBootstrapID);
rowId = config.monitorBootstrapID;
} else {
Long totalRows = calculateRowCount(connection, config.databaseName, config.tableName);
rowId = insertBootstrapStartRow(connection, config.databaseName, config.tableName, totalRows);
Long totalRows = calculateRowCount(connection, config.databaseName, config.tableName, config.whereClause);
rowId = insertBootstrapStartRow(connection, config.databaseName, config.tableName, config.whereClause, totalRows);
}

try {
Expand Down Expand Up @@ -151,22 +151,30 @@ private Long getTotalRowCount(Connection connection, Long bootstrapRowID) throws
}
}

private Long calculateRowCount(Connection connection, String db, String table) throws SQLException {
private Long calculateRowCount(Connection connection, String db, String table, String whereClause) throws SQLException {
LOGGER.info("counting rows");
String sql = String.format("select count(*) from `%s`.%s", db, table);
if ( whereClause != null ) {
sql += String.format(" where %s", whereClause);
}
PreparedStatement preparedStatement = connection.prepareStatement(sql);
ResultSet resultSet = preparedStatement.executeQuery();
resultSet.next();
return resultSet.getLong(1);
}

private long insertBootstrapStartRow(Connection connection, String db, String table, Long totalRows) throws SQLException {
private long insertBootstrapStartRow(Connection connection, String db, String table, String whereClause, Long totalRows) throws SQLException {
LOGGER.info("inserting bootstrap start row");
String sql = "insert into `bootstrap` (database_name, table_name, total_rows) values(?, ?, ?)";
String sql = null;
sql = "insert into `bootstrap` (database_name, table_name, where_clause, total_rows) values(?, ?, ?, ?)";

PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS);
preparedStatement.setString(1, db);
preparedStatement.setString(2, table);
preparedStatement.setLong(3, totalRows);

preparedStatement.setString(3, whereClause);
preparedStatement.setLong(4, totalRows);

preparedStatement.execute();
ResultSet generatedKeys = preparedStatement.getGeneratedKeys();
generatedKeys.next();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.zendesk.maxwell.bootstrap;

import joptsimple.*;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -20,6 +21,7 @@ public class MaxwellBootstrapUtilityConfig extends AbstractConfig {
public String databaseName;
public String schemaDatabaseName;
public String tableName;
public String whereClause;
public String log_level;

public Long abortBootstrapID;
Expand All @@ -40,6 +42,7 @@ protected OptionParser buildOptionParser() {
parser.accepts( "__separator_1", "" );
parser.accepts( "database", "database that contains the table to bootstrap").withRequiredArg();
parser.accepts( "table", "table to bootstrap").withRequiredArg();
parser.accepts( "where", "where clause to restrict the rows bootstrapped from the specified table. e.g. my_date >= '2017-01-01 11:07:13'").withOptionalArg();
parser.accepts( "__separator_2", "" );
parser.accepts( "abort", "bootstrap_id to abort" ).withRequiredArg();
parser.accepts( "monitor", "bootstrap_id to monitor" ).withRequiredArg();
Expand Down Expand Up @@ -129,6 +132,9 @@ else if ( !options.has("abort") && !options.has("monitor") )
this.tableName = (String) options.valueOf("table");
else if ( !options.has("abort") && !options.has("monitor") )
usage("please specify a table");

if ( options.has("where") && !StringUtils.isEmpty(((String) options.valueOf("where"))) )
this.whereClause = (String) options.valueOf("where");
}

private void parseFile(String filename, boolean abortOnMissing) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,13 @@ public void startBootstrap(RowMap startBootstrapRow, AbstractProducer producer,
String databaseName = bootstrapDatabase(startBootstrapRow);
String tableName = bootstrapTable(startBootstrapRow);

LOGGER.debug(String.format("bootstrapping request for %s.%s", databaseName, tableName));
String whereClause = bootstrapWhere(startBootstrapRow);

String logString = String.format("bootstrapping request for %s.%s", databaseName, tableName);
if ( whereClause != null ) {
logString += String.format(" with where clause %s", whereClause);
}
LOGGER.debug(logString);

Schema schema = replicator.getSchema();
Database database = findDatabase(schema, databaseName);
Expand All @@ -50,9 +56,9 @@ public void startBootstrap(RowMap startBootstrapRow, AbstractProducer producer,
try ( Connection connection = getConnection();
Connection streamingConnection = getStreamingConnection()) {
setBootstrapRowToStarted(startBootstrapRow, connection);
ResultSet resultSet = getAllRows(databaseName, tableName, schema, streamingConnection);
ResultSet resultSet = getAllRows(databaseName, tableName, schema, whereClause, streamingConnection);
int insertedRows = 0;
lastInsertedRowsUpdateTimeMillis = 0; // ensure updateInsertedRowsColumn is called at least once
lastInsertedRowsUpdateTimeMillis = 0; // ensure updateInsertedRowsColumn is called at least once
while ( resultSet.next() ) {
RowMap row = bootstrapEventRowMap("bootstrap-insert", table, position);
setRowValues(row, resultSet, table);
Expand Down Expand Up @@ -178,14 +184,22 @@ private void ensureTable(String tableName, Database database) {
findTable(tableName, database);
}

private ResultSet getAllRows(String databaseName, String tableName, Schema schema, Connection connection) throws SQLException, InterruptedException {
private ResultSet getAllRows(String databaseName, String tableName, Schema schema, String whereClause,
Connection connection) throws SQLException, InterruptedException {
Statement statement = createBatchStatement(connection);
String pk = schema.findDatabase(databaseName).findTable(tableName).getPKString();

String sql = String.format("select * from `%s`.%s", databaseName, tableName);

if ( whereClause != null && !whereClause.equals("") ) {
sql += String.format(" where %s", whereClause);
}

if ( pk != null && !pk.equals("") ) {
return statement.executeQuery(String.format("select * from `%s`.%s order by %s", databaseName, tableName, pk));
} else {
return statement.executeQuery(String.format("select * from `%s`.%s", databaseName, tableName));
sql += String.format(" order by %s", pk);
}

return statement.executeQuery(sql);
}

private Statement createBatchStatement(Connection connection) throws SQLException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ public static void upgradeSchemaStoreSchema(Connection c) throws SQLException, I
performAlter(c, "alter table `bootstrap` modify column inserted_rows bigint unsigned not null default 0");
}

if ( !getTableColumns("bootstrap", c).containsKey("where_clause") ) {
performAlter(c, "alter table `bootstrap` add column where_clause varchar(1024)");
}

HashMap<String, String> schemaColumns = getTableColumns("schemas", c);
if ( !schemaColumns.containsKey("charset")) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/sql/maxwell_schema_bootstrap.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ CREATE TABLE IF NOT EXISTS `bootstrap` (
id int unsigned auto_increment NOT NULL primary key,
database_name varchar(255) NOT NULL,
table_name varchar(255) NOT NULL,
where_clause varchar(255),
is_complete tinyint(1) unsigned NOT NULL default 0,
inserted_rows bigint(20) unsigned NOT NULL DEFAULT 0,
total_rows bigint(20) unsigned NOT NULL DEFAULT 0,
Expand All @@ -11,4 +12,3 @@ CREATE TABLE IF NOT EXISTS `bootstrap` (
binlog_file varchar(255) default NULL,
binlog_position int unsigned default 0
);

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ public void testMultipleRowBootstrap() throws Exception {
runJSON("json/bootstrap-multiple-row");
}

@Test
public void testMultipleRowBootstrapWithWhereclause() throws Exception {
runJSON("json/bootstrap-multiple-row-with-whereclause");
}

@Test
public void testNoPkTableBootstrap() throws Exception {
runJSON("json/bootstrap-no-pk");
Expand Down
2 changes: 1 addition & 1 deletion src/test/resources/sql/json/bootstrap-multiple-row
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ insert into maxwell.bootstrap set database_name = 'shard_1', table_name = 'minim
-> { database: "shard_1", table: "minimal", type: "insert", data: {id: 1, account_id: 1, text_field: "hello"} }
-> { database: "shard_1", table: "minimal", type: "insert", data: {id: 2, account_id: 2, text_field: "bonjour"} }
-> { database: "shard_1", table: "minimal", type: "insert", data: {id: 3, account_id: 3, text_field: "goeiedag"} }
-> { database: "maxwell", table: "bootstrap", type: "insert", data: {id: 1, database_name: "shard_1", table_name: "minimal", is_complete: 0, inserted_rows: 0, binlog_position:0, total_rows: 0, created_at: null, started_at: null, completed_at: null, binlog_file: null }}
-> { database: "maxwell", table: "bootstrap", type: "insert", data: {id: 1, database_name: "shard_1", table_name: "minimal", is_complete: 0, inserted_rows: 0, binlog_position:0, total_rows: 0, created_at: null, started_at: null, completed_at: null, binlog_file: null, where_clause: null }}
-> { database: "shard_1", table: "minimal", type: "bootstrap-start", data: {} }
-> { database: "shard_1", table: "minimal", type: "bootstrap-insert", data: {id: 1, account_id: 1, text_field: "hello" } }
-> { database: "shard_1", table: "minimal", type: "bootstrap-insert", data: {id: 2, account_id: 2, text_field: "bonjour"} }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
truncate table maxwell.bootstrap;
truncate table shard_1.minimal;

insert into minimal set account_id = 1, text_field='hello';
insert into minimal set account_id = 2, text_field='bonjour';
insert into minimal set account_id = 3, text_field='goeiedag';
insert into maxwell.bootstrap set database_name = 'shard_1', table_name = 'minimal', where_clause = 'id > 1';
-> { database: "maxwell", table: "bootstrap", type: "insert", data: {id: 1, database_name: "shard_1", table_name: "minimal", is_complete: 0, inserted_rows: 0, binlog_position:0, total_rows: 0, created_at: null, started_at: null, completed_at: null, binlog_file: null, where_clause: "id > 1" }}
-> { database: "shard_1", table: "minimal", type: "bootstrap-start", data: {} }
-> { database: "shard_1", table: "minimal", type: "bootstrap-insert", data: {id: 2, account_id: 2, text_field: "bonjour"} }
-> { database: "shard_1", table: "minimal", type: "bootstrap-insert", data: {id: 3, account_id: 3, text_field: "goeiedag"} }
-> { database: "shard_1", table: "minimal", type: "bootstrap-complete", data: {} }
2 changes: 1 addition & 1 deletion src/test/resources/sql/json/bootstrap-single-row
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ insert into minimal set account_id = 1, text_field='hello'
insert into maxwell.bootstrap set database_name = 'shard_1', table_name = 'minimal'

-> { database: "shard_1", table: "minimal", type:"insert", data: { id: 1, account_id: 1, text_field: "hello"} }
-> { database: "maxwell", table: "bootstrap",type: "insert", data: { database_name: "shard_1", inserted_rows: 0, total_rows: 0, binlog_position: 0, id: 1, is_complete: 0, table_name: "minimal", created_at: null, started_at: null, completed_at: null, binlog_file: null } }
-> { database: "maxwell", table: "bootstrap",type: "insert", data: { database_name: "shard_1", inserted_rows: 0, total_rows: 0, binlog_position: 0, id: 1, is_complete: 0, table_name: "minimal", created_at: null, started_at: null, completed_at: null, binlog_file: null, where_clause: null } }
-> { database: "shard_1", table: "minimal", type: "bootstrap-start", data: {} }
-> { database: "shard_1", table: "minimal", type: "bootstrap-insert", data: { id: 1, account_id: 1, text_field: "hello"} }
-> { database: "shard_1", table: "minimal", type: "bootstrap-complete", data: {}}
2 changes: 1 addition & 1 deletion src/test/resources/sql/json/bootstrap-whitelist
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ insert into shard_1.minimal set account_id = 1, text_field='hello'
insert into maxwell.bootstrap set database_name = 'shard_1', table_name = 'minimal'

-> { database: "shard_1", table: "minimal", type:"insert", data: { id: 1, account_id: 1, text_field: "hello"} }
-> { database: "maxwell", table: "bootstrap",type: "insert", data: { database_name: "shard_1", inserted_rows: 0, total_rows: 0, binlog_position: 0, id: 1, is_complete: 0, table_name: "minimal", created_at: null, started_at: null, completed_at: null, binlog_file: null } }
-> { database: "maxwell", table: "bootstrap",type: "insert", data: { database_name: "shard_1", inserted_rows: 0, total_rows: 0, binlog_position: 0, id: 1, is_complete: 0, table_name: "minimal", created_at: null, started_at: null, completed_at: null, binlog_file: null, where_clause: null } }
-> { database: "shard_1", table: "minimal", type: "bootstrap-start", data: {} }
-> { database: "shard_1", table: "minimal", type: "bootstrap-insert", data: { id: 1, account_id: 1, text_field: "hello"} }
-> { database: "shard_1", table: "minimal", type: "bootstrap-complete", data: {}}

0 comments on commit c6cc9d6

Please sign in to comment.