Skip to content

Commit e630e3a

Browse files
dschnellertimvaillancourt
authored andcommitted
Add new 'simple' oplog tailer method (#301)
* Add new 'simple' oplog tailer method Adds a new "simple" method for collecting oplogs needed to construct a consistent backup. It implements an algorithm similar to what mongodump --oplog already does, albeit for multiple shards. It does _not_ begin tailing the oplogs for all shards at the beginning of the backup. Instead, it runs mongodump for all shards and waits until they have all finished. Then it collects the delta between when each shard's dump ended and the time when the last one finished. The following stages, especially the Resolver, which brings all shard's oplogs forward to a consistent point in time, are unchanged. Rationale for this addition: With the existing "tailer" approach, our backups very often failed with the error message "Tailer host changed". This appears to be a common problem with oplog tailing in general, judging from what you can find on the internet. It appears that for some reason the oplog tailing cursors get aborted by mongod with an error stating `"operation exceeded time limit", code: 50`. With this new simpler oplog fetching method, that apparently does not happen. The most important difference/drawback compared to the current tailer is that the simple approach fails if the oplog of one of the shards is so busy that by the time the deltas are to be collected it has rolled over, so that operations are no longer available. This, however, will only be the case on very busy systems where one might argue the oplog size should be increased anyway. In general the simple method should be a little less resource intensive, because there is not additional I/O while mongodumps are runnig. This change is backwards compatbile for callers. To use the new method, a new configuration parameter needs to be specified: `--oplog.tailer.method simple`. The default value for this option is `tailer` which can also be explictly set to select the classic implementation. Implementation Notes: * Common functionality between the original Tailer and the new simple implementation was extracted into a new common base class "OplogTask". * In a few places some variables were extracted or renamed to (hopefully) make the code a little more readable, despite the additions. * In the Resolver class the thread pool's join() method is called to fix spurious (harmless) error messages like the following when finishing: ``` Process PoolWorker-8: Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run self._target(*self._args, **self._kwargs) File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker task = get() File "/usr/lib/python2.7/multiprocessing/queues.py", line 380, in get rrelease() ``` * Re-activate commented out exception handler Was commented out for debugging and accidentally forgotten. * Test new simple oplog method in Travis * Fix case when there are no oplog changes If there were no oplog changes to be resolved, an exception would be thrown; in addition, the logging was broken, because it tried to access the `uri` variable before it was initialized. This commit fixes both issues.
1 parent 7e755b9 commit e630e3a

File tree

13 files changed

+486
-78
lines changed

13 files changed

+486
-78
lines changed

.travis.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ jobs:
2626
script:
2727
- docker load -i $TRAVIS_BUILD_DIR/docker/mongodb_consistent_backup.tar
2828
- $TRAVIS_BUILD_DIR/scripts/travis-ci/run-cluster.sh 3.4
29+
- stage: test-cluster-3.4-simple-oplog
30+
script:
31+
- docker load -i $TRAVIS_BUILD_DIR/docker/mongodb_consistent_backup.tar
32+
- $TRAVIS_BUILD_DIR/scripts/travis-ci/run-cluster.sh 3.4 --oplog.tailer.method simple
2933
- stage: test-replset-3.4
3034
script:
3135
- docker load -i $TRAVIS_BUILD_DIR/docker/mongodb_consistent_backup.tar

conf/mongodb-consistent-backup.example.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ production:
4444
# resolver:
4545
# threads: [1+] (default: 2 per CPU)
4646
# tailer:
47+
# method: [tailer|simple] (default: tailer)
4748
# enabled: true
4849
# status_interval: 30
4950
archive:

mongodb_consistent_backup/Common/DB.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from bson.codec_options import CodecOptions
44
from inspect import currentframe, getframeinfo
5-
from pymongo import DESCENDING, CursorType, MongoClient
5+
from pymongo import ASCENDING, DESCENDING, CursorType, MongoClient
66
from pymongo.errors import ConfigurationError, ConnectionFailure, OperationFailure, ServerSelectionTimeoutError
77
from ssl import CERT_REQUIRED, CERT_NONE
88
from time import sleep
@@ -194,9 +194,19 @@ def get_oplog_rs(self):
194194
return db.oplog.rs.with_options(codec_options=CodecOptions(unicode_decode_error_handler="ignore"))
195195

196196
def get_oplog_tail_ts(self):
197-
logging.debug("Gathering oldest 'ts' in %s oplog" % self.uri)
197+
logging.debug("Gathering youngest 'ts' in %s oplog" % self.uri)
198198
return self.get_oplog_rs().find_one(sort=[('$natural', DESCENDING)])['ts']
199199

200+
def get_oplog_head_ts(self):
201+
logging.debug("Gathering oldest 'ts' in %s oplog" % self.uri)
202+
return self.get_oplog_rs().find_one(sort=[('$natural', ASCENDING)])['ts']
203+
204+
def is_ts_covered_by_oplog(self, ts_to_check):
205+
oldest_ts = self.get_oplog_head_ts()
206+
covered = oldest_ts <= ts_to_check
207+
logging.debug("Timestamp %s %s covered in %s oplog" % (ts_to_check, "is" if covered else "is NOT", self.uri))
208+
return covered
209+
200210
def get_oplog_cursor_since(self, caller, ts=None):
201211
frame = getframeinfo(currentframe().f_back)
202212
comment = "%s:%s;%s:%i" % (caller.__name__, frame.function, frame.filename, frame.lineno)
@@ -207,6 +217,15 @@ def get_oplog_cursor_since(self, caller, ts=None):
207217
# http://api.mongodb.com/python/current/examples/tailable.html
208218
return self.get_oplog_rs().find(query, cursor_type=CursorType.TAILABLE_AWAIT, oplog_replay=True).comment(comment)
209219

220+
def get_simple_oplog_cursor_from_to(self, caller, ts_from, ts_to=None):
221+
frame = getframeinfo(currentframe().f_back)
222+
comment = "%s:%s;%s:%i" % (caller.__name__, frame.function, frame.filename, frame.lineno)
223+
if not ts_to:
224+
ts_to = self.get_oplog_tail_ts()
225+
query = {'ts': {'$gte': ts_from, '$lte': ts_to}}
226+
logging.debug("Querying all oplog changes between %s and %s on %s with query: %s" % (ts_from, ts_to, self.uri, query))
227+
return self.get_oplog_rs().find(query, cursor_type=CursorType.NON_TAILABLE, oplog_replay=True).comment(comment)
228+
210229
def close(self):
211230
if self._conn:
212231
logging.debug("Closing connection to: %s" % self.uri)

mongodb_consistent_backup/Main.py

Lines changed: 50 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from Errors import NotifyError, OperationError
1212
from Logger import Logger
1313
from Notify import Notify
14-
from Oplog import Tailer, Resolver
14+
from Oplog import Tailer, Resolver, SimpleOplogGetter
1515
from Replication import Replset, ReplsetSharded
1616
from Rotate import Rotate
1717
from Sharding import Sharding
@@ -29,6 +29,7 @@ def __init__(self, prog_name="mongodb-consistent-backup"):
2929
self.replset_sharded = None
3030
self.notify = None
3131
self.oplogtailer = None
32+
self.oploggetter = None
3233
self.resolver = None
3334
self.upload = None
3435
self.lock = None
@@ -162,7 +163,7 @@ def cleanup_and_exit(self, code, frame):
162163
logging.info("Starting cleanup procedure! Stopping running threads")
163164

164165
# TODO Move submodules into self that populates as used?
165-
submodules = ['replset', 'sharding', 'backup', 'oplogtailer', 'archive', 'upload']
166+
submodules = ['replset', 'sharding', 'backup', 'oplogtailer', 'oploggetter', 'archive', 'upload']
166167
for submodule_name in submodules:
167168
try:
168169
submodule = getattr(self, submodule_name)
@@ -325,19 +326,36 @@ def run(self):
325326
except Exception, e:
326327
self.exception("Problem stopping the balancer! Error: %s" % e, e)
327328

328-
# init the oplogtailers
329-
try:
330-
self.oplogtailer = Tailer(
331-
self.manager,
332-
self.config,
333-
self.timer,
334-
self.backup_root_subdirectory,
335-
self.backup_directory,
336-
self.replsets,
337-
self.backup_stop
338-
)
339-
except Exception, e:
340-
self.exception("Problem initializing oplog tailer! Error: %s" % e, e)
329+
tailer_module = None
330+
if self.config.oplog.tailer.method == "simple":
331+
# init the oploggetters
332+
try:
333+
self.oploggetter = SimpleOplogGetter(
334+
self.manager,
335+
self.config,
336+
self.timer,
337+
self.backup_root_subdirectory,
338+
self.backup_directory,
339+
self.replsets,
340+
self.backup_stop
341+
)
342+
tailer_module = self.oploggetter
343+
except Exception, e:
344+
self.exception("Problem initializing oplog getter! Error: %s" % e, e)
345+
else:
346+
try:
347+
self.oplogtailer = Tailer(
348+
self.manager,
349+
self.config,
350+
self.timer,
351+
self.backup_root_subdirectory,
352+
self.backup_directory,
353+
self.replsets,
354+
self.backup_stop
355+
)
356+
tailer_module = self.oplogtailer
357+
except Exception, e:
358+
self.exception("Problem initializing oplog tailer! Error: %s" % e, e)
341359

342360
# init the backup
343361
try:
@@ -354,15 +372,16 @@ def run(self):
354372
if self.backup.is_compressed():
355373
logging.info("Backup method supports compression, disabling compression in archive step and enabling oplog compression")
356374
self.archive.compression('none')
357-
self.oplogtailer.compression(self.backup.compression())
375+
tailer_module.compression(self.backup.compression())
358376
except Exception, e:
359377
self.exception("Problem initializing backup! Error: %s" % e, e)
360378

361-
# start the oplog tailers, before the backups start
362-
try:
363-
self.oplogtailer.run()
364-
except Exception, e:
365-
self.exception("Failed to start oplog tailing threads! Error: %s" % e, e)
379+
if self.oplogtailer:
380+
# start the oplog tailers, before the backups start
381+
try:
382+
self.oplogtailer.run()
383+
except Exception, e:
384+
self.exception("Failed to start oplog tailing threads! Error: %s" % e, e)
366385

367386
# run the backup(s)
368387
try:
@@ -376,6 +395,14 @@ def run(self):
376395
self.oplog_summary = self.oplogtailer.stop()
377396
self.state.set('tailer_oplog', self.oplog_summary)
378397
self.oplogtailer.close()
398+
elif self.oploggetter:
399+
try:
400+
self.oploggetter.backup_summary = self.backup_summary
401+
self.oplog_summary = self.oploggetter.run()
402+
self.state.set('tailer_oplog', self.oplog_summary)
403+
self.oploggetter.close()
404+
except Exception, e:
405+
self.exception("Failed to start oplog tailing threads! Error: %s" % e, e)
379406

380407
# set balancer back to original value
381408
try:
@@ -401,7 +428,7 @@ def run(self):
401428
self.db.close()
402429

403430
# resolve/merge tailed oplog into mongodump oplog.bson to a consistent point for all shards
404-
if self.backup.task.lower() == "mongodump" and self.oplogtailer.enabled():
431+
if self.backup.task.lower() == "mongodump" and tailer_module.enabled():
405432
self.resolver = Resolver(
406433
self.manager,
407434
self.config,
@@ -411,7 +438,7 @@ def run(self):
411438
self.oplog_summary,
412439
self.backup_summary
413440
)
414-
self.resolver.compression(self.oplogtailer.compression())
441+
self.resolver.compression(tailer_module.compression())
415442
resolver_summary = self.resolver.run()
416443
for shard in resolver_summary:
417444
shard_dir = os.path.join(self.backup_directory, shard)
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
import logging
2+
import os
3+
from time import sleep
4+
5+
from mongodb_consistent_backup.Pipeline import Task
6+
7+
8+
class OplogTask(Task):
9+
def __init__(self, manager, config, timer, base_dir, backup_dir, replsets, backup_stop):
10+
super(OplogTask, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir)
11+
12+
self.backup_name = self.config.name
13+
self.user = self.config.username
14+
self.password = self.config.password
15+
self.authdb = self.config.authdb
16+
self.status_secs = self.config.oplog.tailer.status_interval
17+
self.replsets = replsets
18+
self.backup_stop = backup_stop
19+
self._enabled = self.config.oplog.tailer.enabled
20+
21+
self.compression_supported = ['none', 'gzip']
22+
self.shards = {}
23+
self._summary = {}
24+
25+
def enabled(self):
26+
if isinstance(self._enabled, bool):
27+
return self._enabled
28+
elif isinstance(self._enabled, str) and self._enabled.strip().lower() != 'false':
29+
return True
30+
return False
31+
32+
def summary(self):
33+
return self._summary
34+
35+
def get_summaries(self):
36+
for shard in self.shards:
37+
state = self.shards[shard].get('state')
38+
self._summary[shard] = state.get().copy()
39+
40+
def prepare_oplog_files(self, shard_name):
41+
oplog_dir = os.path.join(self.backup_dir, shard_name)
42+
if not os.path.isdir(oplog_dir):
43+
os.mkdir(oplog_dir)
44+
oplog_file = os.path.join(oplog_dir, "oplog-tailed.bson")
45+
return oplog_file
46+
47+
def close(self):
48+
if not self.enabled():
49+
return
50+
for shard in self.shards:
51+
try:
52+
self.shards[shard]['stop'].set()
53+
thread = self.shards[shard]['thread']
54+
thread.terminate()
55+
while thread.is_alive():
56+
sleep(0.5)
57+
except Exception, e:
58+
logging.error("Cannot stop oplog task thread: %s" % e)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from OplogTask import OplogTask # NOQA

mongodb_consistent_backup/Oplog/Resolver/Resolver.py

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ def get_consistent_end_ts(self):
7878
end_ts = last_ts
7979
if last_ts < bkp_end_ts:
8080
end_ts = bkp_end_ts
81+
if end_ts is None:
82+
# Happens when there were _no_ oplog changes since the backup
83+
# end, i. e. when all tailed-oplogs are empty
84+
end_ts = bkp_end_ts
8185
return Timestamp(end_ts.time + 1, 0)
8286

8387
def done(self, done_uri):
@@ -104,22 +108,30 @@ def wait(self, max_wait_secs=6 * 3600, poll_secs=2):
104108
raise OperationError("Waited more than %i seconds for Oplog resolver! I will assume there is a problem and exit")
105109

106110
def run(self):
111+
uri = None
107112
try:
108113
logging.info("Resolving oplogs (options: threads=%s, compression=%s)" % (self.threads(), self.compression()))
109114
self.timer.start(self.timer_name)
110115
self.running = True
111-
116+
consistent_end_ts = self.get_consistent_end_ts()
117+
logging.info("Consistent end timestamp for all shards is %s" % consistent_end_ts)
112118
for shard in self.backup_oplogs:
113119
backup_oplog = self.backup_oplogs[shard]
114120
self.resolver_state[shard] = OplogState(self.manager, None, backup_oplog['file'])
115121
uri = MongoUri(backup_oplog['uri']).get()
116122
if shard in self.tailed_oplogs:
117123
tailed_oplog = self.tailed_oplogs[shard]
118-
if backup_oplog['last_ts'] is None and tailed_oplog['last_ts'] is None:
124+
backup_last_ts = backup_oplog['last_ts']
125+
if 'last_ts' in tailed_oplog and tailed_oplog['last_ts'] is not None:
126+
tailed_last_ts = tailed_oplog['last_ts']
127+
else:
128+
tailed_last_ts = backup_last_ts
129+
if backup_last_ts is None and tailed_last_ts is None:
119130
logging.info("No oplog changes to resolve for %s" % uri)
120-
elif backup_oplog['last_ts'] > tailed_oplog['last_ts']:
131+
elif backup_last_ts > tailed_last_ts:
121132
logging.fatal(
122-
"Backup oplog is newer than the tailed oplog! This situation is unsupported. Please retry backup")
133+
"Backup oplog is newer (%s) than the tailed oplog (%s)! This situation is unsupported. Please retry backup" % (
134+
backup_last_ts, tailed_last_ts))
123135
raise OperationError("Backup oplog is newer than the tailed oplog!")
124136
else:
125137
thread_name = uri.str()
@@ -130,17 +142,23 @@ def run(self):
130142
uri,
131143
tailed_oplog.copy(),
132144
backup_oplog.copy(),
133-
self.get_consistent_end_ts(),
145+
consistent_end_ts,
134146
self.compression()
135147
).run, callback=self.done)
136148
self._pooled.append(thread_name)
137149
else:
138150
logging.info("No tailed oplog for host %s" % uri)
139-
self.wait()
151+
if len(self._pooled) > 0:
152+
self.wait()
153+
# Shut down the thread pool to avoid spurious exceptions
154+
self._pool.join()
140155
self.completed = True
141156
logging.info("Oplog resolving completed in %.2f seconds" % self.timer.duration(self.timer_name))
142157
except Exception, e:
143-
logging.error("Resolver failed for %s: %s" % (uri, e))
158+
if uri is not None:
159+
logging.error("Resolver failed for %s: %s" % (uri, e))
160+
else:
161+
logging.error("Resolver failed: %s" % e)
144162
raise e
145163
finally:
146164
self.timer.stop(self.timer_name)

mongodb_consistent_backup/Oplog/Resolver/ResolverThread.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ def __init__(self, config, state, uri, tailed_oplog, mongodump_oplog, max_end_ts
2626
self.flush_secs = self.config['oplog']['flush']['max_secs']
2727

2828
self.oplogs = {}
29+
self.last_ts = None
2930
self.changes = 0
3031
self.stopped = False
3132

0 commit comments

Comments
 (0)