Skip to content

Commit 48c45ab

Browse files
abrarsheikhbaloo
authored andcommitted
Add support for ignored_tables and ignored_schema (julien-duponchelle#201)
* Add support for ignored_tables and ignored_schema * reverting style changes
1 parent ac32b48 commit 48c45ab

File tree

5 files changed

+88
-11
lines changed

5 files changed

+88
-11
lines changed

pymysqlreplication/binlogstream.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,8 @@ def __init__(self, connection_settings, server_id, ctl_connection_settings=None,
130130
blocking=False, only_events=None, log_file=None, log_pos=None,
131131
filter_non_implemented_events=True,
132132
ignored_events=None, auto_position=None,
133-
only_tables=None, only_schemas=None,
133+
only_tables=None, ignored_tables=None,
134+
only_schemas=None, ignored_schemas=None,
134135
freeze_schema=False, skip_to_timestamp=None,
135136
report_slave=None, slave_uuid=None,
136137
pymysql_wrapper=None,
@@ -148,7 +149,9 @@ def __init__(self, connection_settings, server_id, ctl_connection_settings=None,
148149
log_pos: Set replication start log pos (resume_stream should be true)
149150
auto_position: Use master_auto_position gtid to set position
150151
only_tables: An array with the tables you want to watch
152+
ignored_tables: An array with the tables you want to skip
151153
only_schemas: An array with the schemas you want to watch
154+
ignored_schemas: An array with the schemas you want to skip
152155
freeze_schema: If true do not support ALTER TABLE. It's faster.
153156
skip_to_timestamp: Ignore all events until reaching specified timestamp.
154157
report_slave: Report slave in SHOW SLAVE HOSTS.
@@ -174,7 +177,9 @@ def __init__(self, connection_settings, server_id, ctl_connection_settings=None,
174177
self._ctl_connection_settings.setdefault("charset", "utf8")
175178

176179
self.__only_tables = only_tables
180+
self.__ignored_tables = ignored_tables
177181
self.__only_schemas = only_schemas
182+
self.__ignored_schemas = ignored_schemas
178183
self.__freeze_schema = freeze_schema
179184
self.__allowed_events = self._allowed_event_list(
180185
only_events, ignored_events, filter_non_implemented_events)
@@ -420,7 +425,9 @@ def fetchone(self):
420425
self.__use_checksum,
421426
self.__allowed_events_in_packet,
422427
self.__only_tables,
428+
self.__ignored_tables,
423429
self.__only_schemas,
430+
self.__ignored_schemas,
424431
self.__freeze_schema,
425432
self.__fail_on_table_metadata_unavailable)
426433

pymysqlreplication/event.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@
88

99
class BinLogEvent(object):
1010
def __init__(self, from_packet, event_size, table_map, ctl_connection,
11-
only_tables = None,
12-
only_schemas = None,
13-
freeze_schema = False,
14-
fail_on_table_metadata_unavailable = False):
11+
only_tables=None,
12+
ignored_tables=None,
13+
only_schemas=None,
14+
ignored_schemas=None,
15+
freeze_schema=False,
16+
fail_on_table_metadata_unavailable=False):
1517
self.packet = from_packet
1618
self.table_map = table_map
1719
self.event_type = self.packet.event_type

pymysqlreplication/packet.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,9 @@ class BinLogPacketWrapper(object):
8888
def __init__(self, from_packet, table_map, ctl_connection, use_checksum,
8989
allowed_events,
9090
only_tables,
91+
ignored_tables,
9192
only_schemas,
93+
ignored_schemas,
9294
freeze_schema,
9395
fail_on_table_metadata_unavailable):
9496
# -1 because we ignore the ok byte
@@ -129,10 +131,12 @@ def __init__(self, from_packet, table_map, ctl_connection, use_checksum,
129131
return
130132
self.event = event_class(self, event_size_without_header, table_map,
131133
ctl_connection,
132-
only_tables = only_tables,
133-
only_schemas = only_schemas,
134-
freeze_schema = freeze_schema,
135-
fail_on_table_metadata_unavailable = fail_on_table_metadata_unavailable)
134+
only_tables=only_tables,
135+
ignored_tables=ignored_tables,
136+
only_schemas=only_schemas,
137+
ignored_schemas=ignored_schemas,
138+
freeze_schema=freeze_schema,
139+
fail_on_table_metadata_unavailable=fail_on_table_metadata_unavailable)
136140
if self.event._processed == False:
137141
self.event = None
138142

pymysqlreplication/row_event.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
2222
ctl_connection, **kwargs)
2323
self.__rows = None
2424
self.__only_tables = kwargs["only_tables"]
25+
self.__ignored_tables = kwargs["ignored_tables"]
2526
self.__only_schemas = kwargs["only_schemas"]
27+
self.__ignored_schemas = kwargs["ignored_schemas"]
2628

2729
#Header
2830
self.table_id = self._read_table_id()
@@ -39,9 +41,16 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
3941
if self.__only_tables is not None and self.table not in self.__only_tables:
4042
self._processed = False
4143
return
44+
elif self.__ignored_tables is not None and self.table in self.__ignored_tables:
45+
self._processed = False
46+
return
47+
4248
if self.__only_schemas is not None and self.schema not in self.__only_schemas:
4349
self._processed = False
4450
return
51+
elif self.__ignored_schemas is not None and self.schema in self.__ignored_schemas:
52+
self._processed = False
53+
return
4554

4655

4756
#Event V2
@@ -527,7 +536,9 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
527536
super(TableMapEvent, self).__init__(from_packet, event_size,
528537
table_map, ctl_connection, **kwargs)
529538
self.__only_tables = kwargs["only_tables"]
539+
self.__ignored_tables = kwargs["ignored_tables"]
530540
self.__only_schemas = kwargs["only_schemas"]
541+
self.__ignored_schemas = kwargs["ignored_schemas"]
531542
self.__freeze_schema = kwargs["freeze_schema"]
532543

533544
# Post-Header
@@ -549,9 +560,16 @@ def __init__(self, from_packet, event_size, table_map, ctl_connection, **kwargs)
549560
if self.__only_tables is not None and self.table not in self.__only_tables:
550561
self._processed = False
551562
return
563+
elif self.__ignored_tables is not None and self.table in self.__ignored_tables:
564+
self._processed = False
565+
return
566+
552567
if self.__only_schemas is not None and self.schema not in self.__only_schemas:
553568
self._processed = False
554569
return
570+
elif self.__ignored_schemas is not None and self.schema in self.__ignored_schemas:
571+
self._processed = False
572+
return
555573

556574
self.packet.advance(1)
557575
self.column_count = self.packet.read_length_coded_binary()

pymysqlreplication/tests/test_basic.py

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,14 +148,15 @@ def test_filtering_ignore_events(self):
148148
event = self.stream.fetchone()
149149
self.assertIsInstance(event, RotateEvent)
150150

151-
def test_filtering_table_event(self):
151+
def test_filtering_table_event_with_only_tables(self):
152152
self.stream.close()
153153
self.assertEqual(self.bin_log_format(), "ROW")
154154
self.stream = BinLogStreamReader(
155155
self.database,
156156
server_id=1024,
157157
only_events=[WriteRowsEvent],
158-
only_tables = ["test_2"])
158+
only_tables = ["test_2"]
159+
)
159160

160161
query = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
161162
self.execute(query)
@@ -171,6 +172,51 @@ def test_filtering_table_event(self):
171172
event = self.stream.fetchone()
172173
self.assertEqual(event.table, "test_2")
173174

175+
def test_filtering_table_event_with_ignored_tables(self):
176+
self.stream.close()
177+
self.assertEqual(self.bin_log_format(), "ROW")
178+
self.stream = BinLogStreamReader(
179+
self.database,
180+
server_id=1024,
181+
only_events=[WriteRowsEvent],
182+
ignored_tables = ["test_2"]
183+
)
184+
185+
query = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
186+
self.execute(query)
187+
query = "CREATE TABLE test_3 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
188+
self.execute(query)
189+
190+
self.execute("INSERT INTO test_2 (data) VALUES ('alpha')")
191+
self.execute("INSERT INTO test_3 (data) VALUES ('alpha')")
192+
self.execute("INSERT INTO test_2 (data) VALUES ('beta')")
193+
self.execute("COMMIT")
194+
event = self.stream.fetchone()
195+
self.assertEqual(event.table, "test_3")
196+
197+
def test_filtering_table_event_with_only_tables_and_ignored_tables(self):
198+
self.stream.close()
199+
self.assertEqual(self.bin_log_format(), "ROW")
200+
self.stream = BinLogStreamReader(
201+
self.database,
202+
server_id=1024,
203+
only_events=[WriteRowsEvent],
204+
only_tables = ["test_2"],
205+
ignored_tables = ["test_3"]
206+
)
207+
208+
query = "CREATE TABLE test_2 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
209+
self.execute(query)
210+
query = "CREATE TABLE test_3 (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
211+
self.execute(query)
212+
213+
self.execute("INSERT INTO test_2 (data) VALUES ('alpha')")
214+
self.execute("INSERT INTO test_3 (data) VALUES ('alpha')")
215+
self.execute("INSERT INTO test_2 (data) VALUES ('beta')")
216+
self.execute("COMMIT")
217+
event = self.stream.fetchone()
218+
self.assertEqual(event.table, "test_2")
219+
174220
def test_write_row_event(self):
175221
query = "CREATE TABLE test (id INT NOT NULL AUTO_INCREMENT, data VARCHAR (50) NOT NULL, PRIMARY KEY (id))"
176222
self.execute(query)

0 commit comments

Comments
 (0)