forked from vitessio/vitess
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker.py
executable file
·636 lines (533 loc) · 23.5 KB
/
worker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
#!/usr/bin/env python
#
# Copyright 2013, Google Inc. All rights reserved.
# Use of this source code is governed by a BSD-style license that can
# be found in the LICENSE file.
"""Tests the robustness and resiliency of vtworkers."""
from collections import namedtuple
import urllib
import urllib2
import logging
import unittest
from vtdb import keyrange_constants
import base_sharding
import environment
import tablet
import utils
KEYSPACE_ID_TYPE = keyrange_constants.KIT_UINT64
class ShardTablets(namedtuple('ShardTablets', 'master replicas rdonlys')):
"""ShardTablets is a container for all the tablet.Tablets of a shard.
`master` should be a single Tablet, while `replicas` and `rdonlys` should be
lists of Tablets of the appropriate types.
"""
@property
def all_tablets(self):
"""Returns a list of all the tablets of the shard.
Does not guarantee any ordering on the returned tablets.
Returns:
List of all tablets of the shard.
"""
return [self.master] + self.replicas + self.rdonlys
@property
def replica(self):
"""Returns the first replica Tablet instance for the shard, or None."""
if self.replicas:
return self.replicas[0]
else:
return None
@property
def rdonly(self):
"""Returns the first replica Tablet instance for the shard, or None."""
if self.rdonlys:
return self.rdonlys[0]
else:
return None
def __str__(self):
return """master %s
replicas:
%s
rdonlys:
%s
""" % (self.master,
'\n'.join(' %s' % replica for replica in self.replicas),
'\n'.join(' %s' % rdonly for rdonly in self.rdonlys))
# initial shard, covers everything
shard_master = tablet.Tablet()
shard_replica = tablet.Tablet()
shard_rdonly1 = tablet.Tablet()
# split shards
# range '' - 80
shard_0_master = tablet.Tablet()
shard_0_replica = tablet.Tablet()
shard_0_rdonly1 = tablet.Tablet()
# range 80 - ''
shard_1_master = tablet.Tablet()
shard_1_replica = tablet.Tablet()
shard_1_rdonly1 = tablet.Tablet()
all_shard_tablets = ShardTablets(shard_master, [shard_replica], [shard_rdonly1])
shard_0_tablets = ShardTablets(
shard_0_master, [shard_0_replica], [shard_0_rdonly1])
shard_1_tablets = ShardTablets(
shard_1_master, [shard_1_replica], [shard_1_rdonly1])
def init_keyspace():
"""Creates a `test_keyspace` keyspace with a sharding key."""
utils.run_vtctl(
['CreateKeyspace', '-sharding_column_name', 'keyspace_id',
'-sharding_column_type', KEYSPACE_ID_TYPE, 'test_keyspace'])
def setUpModule():
try:
environment.topo_server().setup()
setup_procs = [
shard_master.init_mysql(),
shard_replica.init_mysql(),
shard_rdonly1.init_mysql(),
shard_0_master.init_mysql(),
shard_0_replica.init_mysql(),
shard_0_rdonly1.init_mysql(),
shard_1_master.init_mysql(),
shard_1_replica.init_mysql(),
shard_1_rdonly1.init_mysql(),
]
utils.wait_procs(setup_procs)
init_keyspace()
logging.debug('environment set up with the following shards and tablets:')
logging.debug('=========================================================')
logging.debug('TABLETS: test_keyspace/0:\n%s', all_shard_tablets)
logging.debug('TABLETS: test_keyspace/-80:\n%s', shard_0_tablets)
logging.debug('TABLETS: test_keyspace/80-:\n%s', shard_1_tablets)
except:
tearDownModule()
raise
def tearDownModule():
utils.required_teardown()
if utils.options.skip_teardown:
return
teardown_procs = [
shard_master.teardown_mysql(),
shard_replica.teardown_mysql(),
shard_rdonly1.teardown_mysql(),
shard_0_master.teardown_mysql(),
shard_0_replica.teardown_mysql(),
shard_0_rdonly1.teardown_mysql(),
shard_1_master.teardown_mysql(),
shard_1_replica.teardown_mysql(),
shard_1_rdonly1.teardown_mysql(),
]
utils.wait_procs(teardown_procs, raise_on_error=False)
environment.topo_server().teardown()
utils.kill_sub_processes()
utils.remove_tmp_files()
shard_master.remove_tree()
shard_replica.remove_tree()
shard_rdonly1.remove_tree()
shard_0_master.remove_tree()
shard_0_replica.remove_tree()
shard_0_rdonly1.remove_tree()
shard_1_master.remove_tree()
shard_1_replica.remove_tree()
shard_1_rdonly1.remove_tree()
class TestBaseSplitClone(unittest.TestCase, base_sharding.BaseShardingTest):
"""Abstract test base class for testing the SplitClone worker."""
def __init__(self, *args, **kwargs):
super(TestBaseSplitClone, self).__init__(*args, **kwargs)
self.num_insert_rows = utils.options.num_insert_rows
def run_shard_tablets(
self, shard_name, shard_tablets, create_table=True):
"""Handles all the necessary work for initially running a shard's tablets.
This encompasses the following steps:
1. (optional) Create db
2. Starting vttablets and let themselves init them
3. Waiting for the appropriate vttablet state
4. Force reparent to the master tablet
5. RebuildKeyspaceGraph
7. (optional) Running initial schema setup
Args:
shard_name: the name of the shard to start tablets in
shard_tablets: an instance of ShardTablets for the given shard
create_table: boolean, True iff we should create a table on the tablets
"""
# Start tablets.
#
# NOTE: The future master has to be started with type 'replica'.
shard_tablets.master.start_vttablet(
wait_for_state=None, init_tablet_type='replica',
init_keyspace='test_keyspace', init_shard=shard_name)
for t in shard_tablets.replicas:
t.start_vttablet(
wait_for_state=None, init_tablet_type='replica',
init_keyspace='test_keyspace', init_shard=shard_name)
for t in shard_tablets.rdonlys:
t.start_vttablet(
wait_for_state=None, init_tablet_type='rdonly',
init_keyspace='test_keyspace', init_shard=shard_name)
# Block until tablets are up and we can enable replication.
# All tables should be NOT_SERVING until we run InitShardMaster.
for t in shard_tablets.all_tablets:
t.wait_for_vttablet_state('NOT_SERVING')
# Reparent to choose an initial master and enable replication.
utils.run_vtctl(
['InitShardMaster', '-force', 'test_keyspace/%s' % shard_name,
shard_tablets.master.tablet_alias], auto_log=True)
utils.run_vtctl(['RebuildKeyspaceGraph', 'test_keyspace'], auto_log=True)
# Enforce a health check instead of waiting for the next periodic one.
# (saves up to 1 second execution time on average)
for t in shard_tablets.replicas:
utils.run_vtctl(['RunHealthCheck', t.tablet_alias])
for t in shard_tablets.rdonlys:
utils.run_vtctl(['RunHealthCheck', t.tablet_alias])
# Wait for tablet state to change after starting all tablets. This allows
# us to start all tablets at once, instead of sequentially waiting.
# NOTE: Replication has to be enabled first or the health check will
# set a a replica or rdonly tablet back to NOT_SERVING.
for t in shard_tablets.all_tablets:
t.wait_for_vttablet_state('SERVING')
create_table_sql = (
'create table worker_test('
'id bigint unsigned,'
'msg varchar(64),'
'keyspace_id bigint(20) unsigned not null,'
'primary key (id),'
'index by_msg (msg)'
') Engine=InnoDB'
)
if create_table:
utils.run_vtctl(['ApplySchema',
'-sql=' + create_table_sql,
'test_keyspace'],
auto_log=True)
def copy_schema_to_destination_shards(self):
for keyspace_shard in ('test_keyspace/-80', 'test_keyspace/80-'):
utils.run_vtctl(['CopySchemaShard',
'--exclude_tables', 'unrelated',
shard_rdonly1.tablet_alias,
keyspace_shard],
auto_log=True)
def _insert_values(self, vttablet, id_offset, msg, keyspace_id, num_values):
"""Inserts values into MySQL along with the required routing comments.
Args:
vttablet: the Tablet instance to modify.
id_offset: offset for the value of `id` column.
msg: the value of `msg` column.
keyspace_id: the value of `keyspace_id` column.
num_values: number of rows to be inserted.
"""
# For maximum performance, multiple values are inserted in one statement.
# However, when the statements are too long, queries will timeout and
# vttablet will kill them. Therefore, we chunk it into multiple statements.
def chunks(full_list, n):
"""Yield successive n-sized chunks from full_list."""
for i in xrange(0, len(full_list), n):
yield full_list[i:i+n]
max_chunk_size = 100*1000
k = utils.uint64_to_hex(keyspace_id)
for chunk in chunks(range(1, num_values+1), max_chunk_size):
logging.debug('Inserting values for range [%d, %d].', chunk[0], chunk[-1])
values_str = ''
for i in chunk:
if i != chunk[0]:
values_str += ','
values_str += "(%d, '%s', 0x%x)" % (id_offset + i, msg, keyspace_id)
vttablet.mquery(
'vt_test_keyspace', [
'begin',
'insert into worker_test(id, msg, keyspace_id) values%s '
'/* vtgate:: keyspace_id:%s */' % (values_str, k),
'commit'],
write=True)
def insert_values(self, vttablet, num_values, num_shards, offset=0,
keyspace_id_range=2**64):
"""Inserts simple values, one for each potential shard.
Each row is given a message that contains the shard number, so we can easily
verify that the source and destination shards have the same data.
Args:
vttablet: the Tablet instance to modify.
num_values: The number of values to insert.
num_shards: the number of shards that we expect to have.
offset: amount that we should offset the `id`s by. This is useful for
inserting values multiple times.
keyspace_id_range: the number of distinct values that the keyspace id
can have.
"""
shard_width = keyspace_id_range / num_shards
shard_offsets = [i * shard_width for i in xrange(num_shards)]
for shard_num in xrange(num_shards):
self._insert_values(
vttablet,
shard_offsets[shard_num] + offset,
'msg-shard-%d' % shard_num,
shard_offsets[shard_num],
num_values)
def assert_shard_data_equal(
self, shard_num, source_tablet, destination_tablet):
"""Asserts source and destination tablets have identical shard data.
Args:
shard_num: The shard number of the shard that we want to verify.
source_tablet: Tablet instance of the source shard.
destination_tablet: Tablet instance of the destination shard.
"""
select_query = (
'select * from worker_test where msg="msg-shard-%s" order by id asc' %
shard_num)
# Make sure all the right rows made it from the source to the destination
source_rows = source_tablet.mquery('vt_test_keyspace', select_query)
destination_rows = destination_tablet.mquery(
'vt_test_keyspace', select_query)
self.assertEqual(source_rows, destination_rows)
# Make sure that there are no extra rows on the destination
count_query = 'select count(*) from worker_test'
destination_count = destination_tablet.mquery(
'vt_test_keyspace', count_query)[0][0]
self.assertEqual(destination_count, len(destination_rows))
def run_split_diff(self, keyspace_shard, source_tablets, destination_tablets):
"""Runs a vtworker SplitDiff on the given keyspace/shard.
Sets all former rdonly slaves back to rdonly.
Args:
keyspace_shard: keyspace/shard to run SplitDiff on (string)
source_tablets: ShardTablets instance for the source shard
destination_tablets: ShardTablets instance for the destination shard
"""
_ = source_tablets, destination_tablets
logging.debug('Running vtworker SplitDiff for %s', keyspace_shard)
_, _ = utils.run_vtworker(
['-cell', 'test_nj', 'SplitDiff',
'--min_healthy_rdonly_tablets', '1',
keyspace_shard], auto_log=True)
def setUp(self):
"""Creates shards, starts the tablets, and inserts some data."""
try:
self.run_shard_tablets('0', all_shard_tablets)
# create the split shards
self.run_shard_tablets(
'-80', shard_0_tablets, create_table=False)
self.run_shard_tablets(
'80-', shard_1_tablets, create_table=False)
logging.debug('Start inserting initial data: %s rows',
self.num_insert_rows)
self.insert_values(shard_master, self.num_insert_rows, 2)
logging.debug(
'Done inserting initial data, waiting for replication to catch up')
utils.wait_for_replication_pos(shard_master, shard_rdonly1)
logging.debug('Replication on source rdonly tablet is caught up')
except:
self.tearDown()
raise
def tearDown(self):
"""Does the minimum to reset topology and tablets to their initial states.
When benchmarked, this seemed to take around 30% of the time of
(setupModule + tearDownModule).
FIXME(aaijazi): doing this in parallel greatly reduces the time it takes.
See the kill_tablets method in tablet.py.
"""
for shard_tablet in [all_shard_tablets, shard_0_tablets, shard_1_tablets]:
for t in shard_tablet.all_tablets:
t.reset_replication()
t.set_semi_sync_enabled(master=False)
t.clean_dbs()
t.kill_vttablet()
# we allow failures here as some tablets will be gone sometimes
# (the master tablets after an emergency reparent)
utils.run_vtctl(['DeleteTablet', '-allow_master', t.tablet_alias],
auto_log=True, raise_on_error=False)
utils.run_vtctl(['RebuildKeyspaceGraph', 'test_keyspace'], auto_log=True)
for shard in ['0', '-80', '80-']:
utils.run_vtctl(
['DeleteShard', 'test_keyspace/%s' % shard], auto_log=True)
class TestBaseSplitCloneResiliency(TestBaseSplitClone):
"""Tests that the SplitClone worker is resilient to particular failures."""
def setUp(self):
try:
super(TestBaseSplitCloneResiliency, self).setUp()
self.copy_schema_to_destination_shards()
except:
self.tearDown()
raise
def verify_successful_worker_copy_with_reparent(self, mysql_down=False):
"""Verifies that vtworker can successfully copy data for a SplitClone.
Order of operations:
1. Run a background vtworker
2. Wait until the worker successfully resolves the destination masters.
3. Reparent the destination tablets
4. Wait until the vtworker copy is finished
5. Verify that the worker was forced to reresolve topology and retry writes
due to the reparent.
6. Verify that the data was copied successfully to both new shards
Args:
mysql_down: boolean. If True, we take down the MySQL instances on the
destination masters at first, then bring them back and reparent away.
Raises:
AssertionError if things didn't go as expected.
"""
if mysql_down:
logging.debug('Shutting down mysqld on destination masters.')
utils.wait_procs(
[shard_0_master.shutdown_mysql(),
shard_1_master.shutdown_mysql()])
worker_proc, worker_port, worker_rpc_port = utils.run_vtworker_bg(
['--cell', 'test_nj'],
auto_log=True)
# --max_tps is only specified to enable the throttler and ensure that the
# code is executed. But the intent here is not to throttle the test, hence
# the rate limit is set very high.
workerclient_proc = utils.run_vtworker_client_bg(
['SplitClone',
'--source_reader_count', '1',
'--destination_pack_count', '1',
'--destination_writer_count', '1',
'--min_healthy_rdonly_tablets', '1',
'--max_tps', '9999',
'test_keyspace/0'],
worker_rpc_port)
if mysql_down:
# If MySQL is down, we wait until vtworker retried at least once to make
# sure it reached the point where a write failed due to MySQL being down.
# There should be two retries at least, one for each destination shard.
utils.poll_for_vars(
'vtworker', worker_port,
'WorkerRetryCount >= 2',
condition_fn=lambda v: v.get('WorkerRetryCount') >= 2)
logging.debug('Worker has retried at least twice, starting reparent now')
# vtworker is blocked at this point. This is a good time to test that its
# throttler server is reacting to RPCs.
self.check_binlog_throttler('localhost:%d' % worker_rpc_port,
['test_keyspace/-80', 'test_keyspace/80-'],
9999)
# Bring back masters. Since we test with semi-sync now, we need at least
# one replica for the new master. This test is already quite expensive,
# so we bring back the old master as a replica rather than having a third
# replica up the whole time.
logging.debug('Restarting mysqld on destination masters')
utils.wait_procs(
[shard_0_master.start_mysql(),
shard_1_master.start_mysql()])
# Reparent away from the old masters.
utils.run_vtctl(
['PlannedReparentShard', 'test_keyspace/-80',
shard_0_replica.tablet_alias], auto_log=True)
utils.run_vtctl(
['PlannedReparentShard', 'test_keyspace/80-',
shard_1_replica.tablet_alias], auto_log=True)
else:
# NOTE: There is a race condition around this:
# It's possible that the SplitClone vtworker command finishes before the
# PlannedReparentShard vtctl command, which we start below, succeeds.
# Then the test would fail because vtworker did not have to retry.
#
# To workaround this, the test takes a parameter to increase the number of
# rows that the worker has to copy (with the idea being to slow the worker
# down).
# You should choose a value for num_insert_rows, such that this test
# passes for your environment (trial-and-error...)
# Make sure that vtworker got past the point where it picked a master
# for each destination shard ("finding targets" state).
utils.poll_for_vars(
'vtworker', worker_port,
'WorkerState == copying the data',
condition_fn=lambda v: v.get('WorkerState') == 'copying the data')
logging.debug('Worker is in copy state, starting reparent now')
utils.run_vtctl(
['PlannedReparentShard', 'test_keyspace/-80',
shard_0_replica.tablet_alias], auto_log=True)
utils.run_vtctl(
['PlannedReparentShard', 'test_keyspace/80-',
shard_1_replica.tablet_alias], auto_log=True)
utils.wait_procs([workerclient_proc])
# Verify that we were forced to re-resolve and retry.
worker_vars = utils.get_vars(worker_port)
# There should be two retries at least, one for each destination shard.
self.assertGreater(worker_vars['WorkerRetryCount'], 1)
self.assertNotEqual(worker_vars['WorkerRetryCount'], {},
"expected vtworker to retry, but it didn't")
utils.kill_sub_process(worker_proc, soft=True)
# Make sure that everything is caught up to the same replication point
self.run_split_diff('test_keyspace/-80', all_shard_tablets, shard_0_tablets)
self.run_split_diff('test_keyspace/80-', all_shard_tablets, shard_1_tablets)
self.assert_shard_data_equal(0, shard_master, shard_0_tablets.replica)
self.assert_shard_data_equal(1, shard_master, shard_1_tablets.replica)
class TestReparentDuringWorkerCopy(TestBaseSplitCloneResiliency):
def __init__(self, *args, **kwargs):
super(TestReparentDuringWorkerCopy, self).__init__(*args, **kwargs)
self.num_insert_rows = utils.options.num_insert_rows_before_reparent_test
def test_reparent_during_worker_copy(self):
"""Simulates a destination reparent during a worker SplitClone copy.
The SplitClone command should be able to gracefully handle the reparent and
end up with the correct data on the destination.
Note: this test has a small possibility of flaking, due to the timing issues
involved. It's possible for the worker to finish the copy step before the
reparent succeeds, in which case there are assertions that will fail. This
seems better than having the test silently pass.
"""
self.verify_successful_worker_copy_with_reparent()
class TestMysqlDownDuringWorkerCopy(TestBaseSplitCloneResiliency):
def test_mysql_down_during_worker_copy(self):
"""This test simulates MySQL being down on the destination masters."""
self.verify_successful_worker_copy_with_reparent(mysql_down=True)
class TestVtworkerWebinterface(unittest.TestCase):
def setUp(self):
# Run vtworker without any optional arguments to start in interactive mode.
self.worker_proc, self.worker_port, _ = utils.run_vtworker_bg([])
def tearDown(self):
utils.kill_sub_process(self.worker_proc)
def test_webinterface(self):
worker_base_url = 'http://localhost:%d' % int(self.worker_port)
# Wait for /status to become available.
timeout = 10
while True:
done = False
try:
urllib2.urlopen(worker_base_url + '/status').read()
done = True
except urllib2.URLError:
pass
if done:
break
timeout = utils.wait_step(
'worker /status webpage must be available', timeout)
# Run the command twice to make sure it's idempotent.
for _ in range(2):
# Run Ping command.
try:
urllib2.urlopen(
worker_base_url + '/Debugging/Ping',
data=urllib.urlencode({'message': 'pong'})).read()
raise Exception('Should have thrown an HTTPError for the redirect.')
except urllib2.HTTPError as e:
self.assertEqual(e.code, 307)
# Verify that the command logged something and its available at /status.
status = urllib2.urlopen(worker_base_url + '/status').read()
self.assertIn(
"Ping command was called with message: 'pong'", status,
'Command did not log output to /status: %s' % status)
# Reset the job.
urllib2.urlopen(worker_base_url + '/reset').read()
status_after_reset = urllib2.urlopen(worker_base_url + '/status').read()
self.assertIn(
'This worker is idle.', status_after_reset,
'/status does not indicate that the reset was successful')
class TestMinHealthyRdonlyTablets(TestBaseSplitCloneResiliency):
def split_clone_fails_not_enough_health_rdonly_tablets(self):
"""Verify vtworker errors if there aren't enough healthy RDONLY tablets."""
_, stderr = utils.run_vtworker(
['-cell', 'test_nj',
'--wait_for_healthy_rdonly_tablets_timeout', '1s',
'SplitClone',
'--min_healthy_rdonly_tablets', '2',
'test_keyspace/0'],
auto_log=True,
expect_fail=True)
self.assertIn('findTargets() failed: FindWorkerTablet() failed for'
' test_nj/test_keyspace/0: not enough healthy RDONLY'
' tablets to choose from in (test_nj,test_keyspace/0),'
' have 1 healthy ones, need at least 2', stderr)
def add_test_options(parser):
parser.add_option(
'--num_insert_rows', type='int', default=100,
help='The number of rows, per shard, that we should insert before '
'resharding for this test.')
parser.add_option(
'--num_insert_rows_before_reparent_test', type='int', default=3000,
help='The number of rows, per shard, that we should insert before '
'running TestReparentDuringWorkerCopy (supersedes --num_insert_rows in '
'that test). There must be enough rows such that SplitClone takes '
'several seconds to run while we run a planned reparent.')
if __name__ == '__main__':
utils.main(test_options=add_test_options)