diff --git a/README.md b/README.md index 8a0d06914..c018b919e 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ include ETL, cache building/expiring, metrics collection, and search indexing. advanced features: -- Can do `SELECT * from table` (bootstrapping) initial loads of a table +- Can do `SELECT * from table` (bootstrapping) initial loads of a table. - supports automatic position recover on master promotion - flexible partitioning schemes for Kakfa - by database, table, primary key, or column - Maxwell pulls all this off by acting as a full mysql replica, including a SQL diff --git a/docs/docs/bootstrapping.md b/docs/docs/bootstrapping.md index fa704ef53..95d066c01 100644 --- a/docs/docs/bootstrapping.md +++ b/docs/docs/bootstrapping.md @@ -11,6 +11,7 @@ option | description --port PORT | mysql port --database DATABASE | mysql database containing the table to bootstrap --table TABLE | mysql table to bootstrap +--where WHERE_CLAUSE | where clause to restrict the rows bootstrapped from the specified table ### Using the maxwell.bootstrap table *** @@ -19,6 +20,13 @@ Alternatively you can insert a row in the `maxwell.bootstrap` table to trigger a ``` mysql> insert into maxwell.bootstrap (database_name, table_name) values ('fooDB', 'barTable'); ``` +Optionally, you can include a where clause to replay part of the data. + +bin/maxwell-bootstrap --config localhost.properties --database foobar --table test --log_level info + +or + +bin/maxwell-bootstrap --config localhost.properties --database foobar --table test --where "my_date >= '2017-01-07 00:00:00'" --log_level info ### Async vs Sync bootstrapping *** diff --git a/src/main/java/com/zendesk/maxwell/bootstrap/AbstractBootstrapper.java b/src/main/java/com/zendesk/maxwell/bootstrap/AbstractBootstrapper.java index 3afb9e249..48a0d4113 100644 --- a/src/main/java/com/zendesk/maxwell/bootstrap/AbstractBootstrapper.java +++ b/src/main/java/com/zendesk/maxwell/bootstrap/AbstractBootstrapper.java @@ -41,6 +41,10 @@ protected String bootstrapTable(RowMap rowmap) { return (String) rowmap.getData("table_name"); } + protected String bootstrapWhere(RowMap rowmap) { + return (String) rowmap.getData("where_clause"); + } + abstract public boolean shouldSkip(RowMap row) throws SQLException, IOException; abstract public void startBootstrap(RowMap startBootstrapRow, AbstractProducer producer, Replicator replicator) throws Exception; diff --git a/src/main/java/com/zendesk/maxwell/bootstrap/MaxwellBootstrapUtility.java b/src/main/java/com/zendesk/maxwell/bootstrap/MaxwellBootstrapUtility.java index 4cb15a997..b6ad9d660 100644 --- a/src/main/java/com/zendesk/maxwell/bootstrap/MaxwellBootstrapUtility.java +++ b/src/main/java/com/zendesk/maxwell/bootstrap/MaxwellBootstrapUtility.java @@ -46,8 +46,8 @@ private void run(String[] argv) throws Exception { getInsertedRowsCount(connection, config.monitorBootstrapID); rowId = config.monitorBootstrapID; } else { - Long totalRows = calculateRowCount(connection, config.databaseName, config.tableName); - rowId = insertBootstrapStartRow(connection, config.databaseName, config.tableName, totalRows); + Long totalRows = calculateRowCount(connection, config.databaseName, config.tableName, config.whereClause); + rowId = insertBootstrapStartRow(connection, config.databaseName, config.tableName, config.whereClause, totalRows); } try { @@ -151,22 +151,30 @@ private Long getTotalRowCount(Connection connection, Long bootstrapRowID) throws } } - private Long calculateRowCount(Connection connection, String db, String table) throws SQLException { + private Long calculateRowCount(Connection connection, String db, String table, String whereClause) throws SQLException { LOGGER.info("counting rows"); String sql = String.format("select count(*) from `%s`.%s", db, table); + if ( whereClause != null ) { + sql += String.format(" where %s", whereClause); + } PreparedStatement preparedStatement = connection.prepareStatement(sql); ResultSet resultSet = preparedStatement.executeQuery(); resultSet.next(); return resultSet.getLong(1); } - private long insertBootstrapStartRow(Connection connection, String db, String table, Long totalRows) throws SQLException { + private long insertBootstrapStartRow(Connection connection, String db, String table, String whereClause, Long totalRows) throws SQLException { LOGGER.info("inserting bootstrap start row"); - String sql = "insert into `bootstrap` (database_name, table_name, total_rows) values(?, ?, ?)"; + String sql = null; + sql = "insert into `bootstrap` (database_name, table_name, where_clause, total_rows) values(?, ?, ?, ?)"; + PreparedStatement preparedStatement = connection.prepareStatement(sql, Statement.RETURN_GENERATED_KEYS); preparedStatement.setString(1, db); preparedStatement.setString(2, table); - preparedStatement.setLong(3, totalRows); + + preparedStatement.setString(3, whereClause); + preparedStatement.setLong(4, totalRows); + preparedStatement.execute(); ResultSet generatedKeys = preparedStatement.getGeneratedKeys(); generatedKeys.next(); diff --git a/src/main/java/com/zendesk/maxwell/bootstrap/MaxwellBootstrapUtilityConfig.java b/src/main/java/com/zendesk/maxwell/bootstrap/MaxwellBootstrapUtilityConfig.java index 05e563b38..c3e7bd5b6 100644 --- a/src/main/java/com/zendesk/maxwell/bootstrap/MaxwellBootstrapUtilityConfig.java +++ b/src/main/java/com/zendesk/maxwell/bootstrap/MaxwellBootstrapUtilityConfig.java @@ -1,6 +1,7 @@ package com.zendesk.maxwell.bootstrap; import joptsimple.*; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,6 +21,7 @@ public class MaxwellBootstrapUtilityConfig extends AbstractConfig { public String databaseName; public String schemaDatabaseName; public String tableName; + public String whereClause; public String log_level; public Long abortBootstrapID; @@ -40,6 +42,7 @@ protected OptionParser buildOptionParser() { parser.accepts( "__separator_1", "" ); parser.accepts( "database", "database that contains the table to bootstrap").withRequiredArg(); parser.accepts( "table", "table to bootstrap").withRequiredArg(); + parser.accepts( "where", "where clause to restrict the rows bootstrapped from the specified table. e.g. my_date >= '2017-01-01 11:07:13'").withOptionalArg(); parser.accepts( "__separator_2", "" ); parser.accepts( "abort", "bootstrap_id to abort" ).withRequiredArg(); parser.accepts( "monitor", "bootstrap_id to monitor" ).withRequiredArg(); @@ -129,6 +132,9 @@ else if ( !options.has("abort") && !options.has("monitor") ) this.tableName = (String) options.valueOf("table"); else if ( !options.has("abort") && !options.has("monitor") ) usage("please specify a table"); + + if ( options.has("where") && !StringUtils.isEmpty(((String) options.valueOf("where"))) ) + this.whereClause = (String) options.valueOf("where"); } private void parseFile(String filename, boolean abortOnMissing) { diff --git a/src/main/java/com/zendesk/maxwell/bootstrap/SynchronousBootstrapper.java b/src/main/java/com/zendesk/maxwell/bootstrap/SynchronousBootstrapper.java index c98475395..2d10cdfd4 100644 --- a/src/main/java/com/zendesk/maxwell/bootstrap/SynchronousBootstrapper.java +++ b/src/main/java/com/zendesk/maxwell/bootstrap/SynchronousBootstrapper.java @@ -37,7 +37,13 @@ public void startBootstrap(RowMap startBootstrapRow, AbstractProducer producer, String databaseName = bootstrapDatabase(startBootstrapRow); String tableName = bootstrapTable(startBootstrapRow); - LOGGER.debug(String.format("bootstrapping request for %s.%s", databaseName, tableName)); + String whereClause = bootstrapWhere(startBootstrapRow); + + String logString = String.format("bootstrapping request for %s.%s", databaseName, tableName); + if ( whereClause != null ) { + logString += String.format(" with where clause %s", whereClause); + } + LOGGER.debug(logString); Schema schema = replicator.getSchema(); Database database = findDatabase(schema, databaseName); @@ -50,9 +56,9 @@ public void startBootstrap(RowMap startBootstrapRow, AbstractProducer producer, try ( Connection connection = getConnection(); Connection streamingConnection = getStreamingConnection()) { setBootstrapRowToStarted(startBootstrapRow, connection); - ResultSet resultSet = getAllRows(databaseName, tableName, schema, streamingConnection); + ResultSet resultSet = getAllRows(databaseName, tableName, schema, whereClause, streamingConnection); int insertedRows = 0; - lastInsertedRowsUpdateTimeMillis = 0; // ensure updateInsertedRowsColumn is called at least once + lastInsertedRowsUpdateTimeMillis = 0; // ensure updateInsertedRowsColumn is called at least once while ( resultSet.next() ) { RowMap row = bootstrapEventRowMap("bootstrap-insert", table, position); setRowValues(row, resultSet, table); @@ -178,14 +184,22 @@ private void ensureTable(String tableName, Database database) { findTable(tableName, database); } - private ResultSet getAllRows(String databaseName, String tableName, Schema schema, Connection connection) throws SQLException, InterruptedException { + private ResultSet getAllRows(String databaseName, String tableName, Schema schema, String whereClause, + Connection connection) throws SQLException, InterruptedException { Statement statement = createBatchStatement(connection); String pk = schema.findDatabase(databaseName).findTable(tableName).getPKString(); + + String sql = String.format("select * from `%s`.%s", databaseName, tableName); + + if ( whereClause != null && !whereClause.equals("") ) { + sql += String.format(" where %s", whereClause); + } + if ( pk != null && !pk.equals("") ) { - return statement.executeQuery(String.format("select * from `%s`.%s order by %s", databaseName, tableName, pk)); - } else { - return statement.executeQuery(String.format("select * from `%s`.%s", databaseName, tableName)); + sql += String.format(" order by %s", pk); } + + return statement.executeQuery(sql); } private Statement createBatchStatement(Connection connection) throws SQLException, InterruptedException { diff --git a/src/main/java/com/zendesk/maxwell/schema/SchemaStoreSchema.java b/src/main/java/com/zendesk/maxwell/schema/SchemaStoreSchema.java index d2309f0dd..b76842a39 100644 --- a/src/main/java/com/zendesk/maxwell/schema/SchemaStoreSchema.java +++ b/src/main/java/com/zendesk/maxwell/schema/SchemaStoreSchema.java @@ -110,6 +110,9 @@ public static void upgradeSchemaStoreSchema(Connection c) throws SQLException, I performAlter(c, "alter table `bootstrap` modify column inserted_rows bigint unsigned not null default 0"); } + if ( !getTableColumns("bootstrap", c).containsKey("where_clause") ) { + performAlter(c, "alter table `bootstrap` add column where_clause varchar(1024)"); + } HashMap schemaColumns = getTableColumns("schemas", c); if ( !schemaColumns.containsKey("charset")) { diff --git a/src/main/resources/sql/maxwell_schema_bootstrap.sql b/src/main/resources/sql/maxwell_schema_bootstrap.sql index a39fb9e7e..dfdb86ee2 100644 --- a/src/main/resources/sql/maxwell_schema_bootstrap.sql +++ b/src/main/resources/sql/maxwell_schema_bootstrap.sql @@ -2,6 +2,7 @@ CREATE TABLE IF NOT EXISTS `bootstrap` ( id int unsigned auto_increment NOT NULL primary key, database_name varchar(255) NOT NULL, table_name varchar(255) NOT NULL, + where_clause varchar(255), is_complete tinyint(1) unsigned NOT NULL default 0, inserted_rows bigint(20) unsigned NOT NULL DEFAULT 0, total_rows bigint(20) unsigned NOT NULL DEFAULT 0, @@ -11,4 +12,3 @@ CREATE TABLE IF NOT EXISTS `bootstrap` ( binlog_file varchar(255) default NULL, binlog_position int unsigned default 0 ); - diff --git a/src/test/java/com/zendesk/maxwell/BootstrapIntegrationTest.java b/src/test/java/com/zendesk/maxwell/BootstrapIntegrationTest.java index a0002994a..1a44fd758 100644 --- a/src/test/java/com/zendesk/maxwell/BootstrapIntegrationTest.java +++ b/src/test/java/com/zendesk/maxwell/BootstrapIntegrationTest.java @@ -20,6 +20,11 @@ public void testMultipleRowBootstrap() throws Exception { runJSON("json/bootstrap-multiple-row"); } + @Test + public void testMultipleRowBootstrapWithWhereclause() throws Exception { + runJSON("json/bootstrap-multiple-row-with-whereclause"); + } + @Test public void testNoPkTableBootstrap() throws Exception { runJSON("json/bootstrap-no-pk"); diff --git a/src/test/resources/sql/json/bootstrap-multiple-row b/src/test/resources/sql/json/bootstrap-multiple-row index a292b5f6b..5c117cd59 100644 --- a/src/test/resources/sql/json/bootstrap-multiple-row +++ b/src/test/resources/sql/json/bootstrap-multiple-row @@ -5,7 +5,7 @@ insert into maxwell.bootstrap set database_name = 'shard_1', table_name = 'minim -> { database: "shard_1", table: "minimal", type: "insert", data: {id: 1, account_id: 1, text_field: "hello"} } -> { database: "shard_1", table: "minimal", type: "insert", data: {id: 2, account_id: 2, text_field: "bonjour"} } -> { database: "shard_1", table: "minimal", type: "insert", data: {id: 3, account_id: 3, text_field: "goeiedag"} } --> { database: "maxwell", table: "bootstrap", type: "insert", data: {id: 1, database_name: "shard_1", table_name: "minimal", is_complete: 0, inserted_rows: 0, binlog_position:0, total_rows: 0, created_at: null, started_at: null, completed_at: null, binlog_file: null }} +-> { database: "maxwell", table: "bootstrap", type: "insert", data: {id: 1, database_name: "shard_1", table_name: "minimal", is_complete: 0, inserted_rows: 0, binlog_position:0, total_rows: 0, created_at: null, started_at: null, completed_at: null, binlog_file: null, where_clause: null }} -> { database: "shard_1", table: "minimal", type: "bootstrap-start", data: {} } -> { database: "shard_1", table: "minimal", type: "bootstrap-insert", data: {id: 1, account_id: 1, text_field: "hello" } } -> { database: "shard_1", table: "minimal", type: "bootstrap-insert", data: {id: 2, account_id: 2, text_field: "bonjour"} } diff --git a/src/test/resources/sql/json/bootstrap-multiple-row-with-whereclause b/src/test/resources/sql/json/bootstrap-multiple-row-with-whereclause new file mode 100644 index 000000000..f7ed75225 --- /dev/null +++ b/src/test/resources/sql/json/bootstrap-multiple-row-with-whereclause @@ -0,0 +1,12 @@ +truncate table maxwell.bootstrap; +truncate table shard_1.minimal; + +insert into minimal set account_id = 1, text_field='hello'; +insert into minimal set account_id = 2, text_field='bonjour'; +insert into minimal set account_id = 3, text_field='goeiedag'; +insert into maxwell.bootstrap set database_name = 'shard_1', table_name = 'minimal', where_clause = 'id > 1'; +-> { database: "maxwell", table: "bootstrap", type: "insert", data: {id: 1, database_name: "shard_1", table_name: "minimal", is_complete: 0, inserted_rows: 0, binlog_position:0, total_rows: 0, created_at: null, started_at: null, completed_at: null, binlog_file: null, where_clause: "id > 1" }} +-> { database: "shard_1", table: "minimal", type: "bootstrap-start", data: {} } +-> { database: "shard_1", table: "minimal", type: "bootstrap-insert", data: {id: 2, account_id: 2, text_field: "bonjour"} } +-> { database: "shard_1", table: "minimal", type: "bootstrap-insert", data: {id: 3, account_id: 3, text_field: "goeiedag"} } +-> { database: "shard_1", table: "minimal", type: "bootstrap-complete", data: {} } \ No newline at end of file diff --git a/src/test/resources/sql/json/bootstrap-single-row b/src/test/resources/sql/json/bootstrap-single-row index fa02814f6..841633082 100644 --- a/src/test/resources/sql/json/bootstrap-single-row +++ b/src/test/resources/sql/json/bootstrap-single-row @@ -3,7 +3,7 @@ insert into minimal set account_id = 1, text_field='hello' insert into maxwell.bootstrap set database_name = 'shard_1', table_name = 'minimal' -> { database: "shard_1", table: "minimal", type:"insert", data: { id: 1, account_id: 1, text_field: "hello"} } --> { database: "maxwell", table: "bootstrap",type: "insert", data: { database_name: "shard_1", inserted_rows: 0, total_rows: 0, binlog_position: 0, id: 1, is_complete: 0, table_name: "minimal", created_at: null, started_at: null, completed_at: null, binlog_file: null } } +-> { database: "maxwell", table: "bootstrap",type: "insert", data: { database_name: "shard_1", inserted_rows: 0, total_rows: 0, binlog_position: 0, id: 1, is_complete: 0, table_name: "minimal", created_at: null, started_at: null, completed_at: null, binlog_file: null, where_clause: null } } -> { database: "shard_1", table: "minimal", type: "bootstrap-start", data: {} } -> { database: "shard_1", table: "minimal", type: "bootstrap-insert", data: { id: 1, account_id: 1, text_field: "hello"} } -> { database: "shard_1", table: "minimal", type: "bootstrap-complete", data: {}} diff --git a/src/test/resources/sql/json/bootstrap-whitelist b/src/test/resources/sql/json/bootstrap-whitelist index 6c40017f0..0d8c03d8f 100644 --- a/src/test/resources/sql/json/bootstrap-whitelist +++ b/src/test/resources/sql/json/bootstrap-whitelist @@ -3,7 +3,7 @@ insert into shard_1.minimal set account_id = 1, text_field='hello' insert into maxwell.bootstrap set database_name = 'shard_1', table_name = 'minimal' -> { database: "shard_1", table: "minimal", type:"insert", data: { id: 1, account_id: 1, text_field: "hello"} } --> { database: "maxwell", table: "bootstrap",type: "insert", data: { database_name: "shard_1", inserted_rows: 0, total_rows: 0, binlog_position: 0, id: 1, is_complete: 0, table_name: "minimal", created_at: null, started_at: null, completed_at: null, binlog_file: null } } +-> { database: "maxwell", table: "bootstrap",type: "insert", data: { database_name: "shard_1", inserted_rows: 0, total_rows: 0, binlog_position: 0, id: 1, is_complete: 0, table_name: "minimal", created_at: null, started_at: null, completed_at: null, binlog_file: null, where_clause: null } } -> { database: "shard_1", table: "minimal", type: "bootstrap-start", data: {} } -> { database: "shard_1", table: "minimal", type: "bootstrap-insert", data: { id: 1, account_id: 1, text_field: "hello"} } -> { database: "shard_1", table: "minimal", type: "bootstrap-complete", data: {}} \ No newline at end of file