Skip to content

Add new 'simple' oplog tailer method #301

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Feb 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ jobs:
script:
- docker load -i $TRAVIS_BUILD_DIR/docker/mongodb_consistent_backup.tar
- $TRAVIS_BUILD_DIR/scripts/travis-ci/run-cluster.sh 3.4
- stage: test-cluster-3.4-simple-oplog
script:
- docker load -i $TRAVIS_BUILD_DIR/docker/mongodb_consistent_backup.tar
- $TRAVIS_BUILD_DIR/scripts/travis-ci/run-cluster.sh 3.4 --oplog.tailer.method simple
- stage: test-replset-3.4
script:
- docker load -i $TRAVIS_BUILD_DIR/docker/mongodb_consistent_backup.tar
Expand Down
1 change: 1 addition & 0 deletions conf/mongodb-consistent-backup.example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ production:
# resolver:
# threads: [1+] (default: 2 per CPU)
# tailer:
# method: [tailer|simple] (default: tailer)
# enabled: true
# status_interval: 30
archive:
Expand Down
23 changes: 21 additions & 2 deletions mongodb_consistent_backup/Common/DB.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from bson.codec_options import CodecOptions
from inspect import currentframe, getframeinfo
from pymongo import DESCENDING, CursorType, MongoClient
from pymongo import ASCENDING, DESCENDING, CursorType, MongoClient
from pymongo.errors import ConfigurationError, ConnectionFailure, OperationFailure, ServerSelectionTimeoutError
from ssl import CERT_REQUIRED, CERT_NONE
from time import sleep
Expand Down Expand Up @@ -194,9 +194,19 @@ def get_oplog_rs(self):
return db.oplog.rs.with_options(codec_options=CodecOptions(unicode_decode_error_handler="ignore"))

def get_oplog_tail_ts(self):
logging.debug("Gathering oldest 'ts' in %s oplog" % self.uri)
logging.debug("Gathering youngest 'ts' in %s oplog" % self.uri)
return self.get_oplog_rs().find_one(sort=[('$natural', DESCENDING)])['ts']

def get_oplog_head_ts(self):
logging.debug("Gathering oldest 'ts' in %s oplog" % self.uri)
return self.get_oplog_rs().find_one(sort=[('$natural', ASCENDING)])['ts']

def is_ts_covered_by_oplog(self, ts_to_check):
oldest_ts = self.get_oplog_head_ts()
covered = oldest_ts <= ts_to_check
logging.debug("Timestamp %s %s covered in %s oplog" % (ts_to_check, "is" if covered else "is NOT", self.uri))
return covered

def get_oplog_cursor_since(self, caller, ts=None):
frame = getframeinfo(currentframe().f_back)
comment = "%s:%s;%s:%i" % (caller.__name__, frame.function, frame.filename, frame.lineno)
Expand All @@ -207,6 +217,15 @@ def get_oplog_cursor_since(self, caller, ts=None):
# http://api.mongodb.com/python/current/examples/tailable.html
return self.get_oplog_rs().find(query, cursor_type=CursorType.TAILABLE_AWAIT, oplog_replay=True).comment(comment)

def get_simple_oplog_cursor_from_to(self, caller, ts_from, ts_to=None):
frame = getframeinfo(currentframe().f_back)
comment = "%s:%s;%s:%i" % (caller.__name__, frame.function, frame.filename, frame.lineno)
if not ts_to:
ts_to = self.get_oplog_tail_ts()
query = {'ts': {'$gte': ts_from, '$lte': ts_to}}
logging.debug("Querying all oplog changes between %s and %s on %s with query: %s" % (ts_from, ts_to, self.uri, query))
return self.get_oplog_rs().find(query, cursor_type=CursorType.NON_TAILABLE, oplog_replay=True).comment(comment)

def close(self):
if self._conn:
logging.debug("Closing connection to: %s" % self.uri)
Expand Down
73 changes: 50 additions & 23 deletions mongodb_consistent_backup/Main.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from Errors import NotifyError, OperationError
from Logger import Logger
from Notify import Notify
from Oplog import Tailer, Resolver
from Oplog import Tailer, Resolver, SimpleOplogGetter
from Replication import Replset, ReplsetSharded
from Rotate import Rotate
from Sharding import Sharding
Expand All @@ -29,6 +29,7 @@ def __init__(self, prog_name="mongodb-consistent-backup"):
self.replset_sharded = None
self.notify = None
self.oplogtailer = None
self.oploggetter = None
self.resolver = None
self.upload = None
self.lock = None
Expand Down Expand Up @@ -162,7 +163,7 @@ def cleanup_and_exit(self, code, frame):
logging.info("Starting cleanup procedure! Stopping running threads")

# TODO Move submodules into self that populates as used?
submodules = ['replset', 'sharding', 'backup', 'oplogtailer', 'archive', 'upload']
submodules = ['replset', 'sharding', 'backup', 'oplogtailer', 'oploggetter', 'archive', 'upload']
for submodule_name in submodules:
try:
submodule = getattr(self, submodule_name)
Expand Down Expand Up @@ -325,19 +326,36 @@ def run(self):
except Exception, e:
self.exception("Problem stopping the balancer! Error: %s" % e, e)

# init the oplogtailers
try:
self.oplogtailer = Tailer(
self.manager,
self.config,
self.timer,
self.backup_root_subdirectory,
self.backup_directory,
self.replsets,
self.backup_stop
)
except Exception, e:
self.exception("Problem initializing oplog tailer! Error: %s" % e, e)
tailer_module = None
if self.config.oplog.tailer.method == "simple":
# init the oploggetters
try:
self.oploggetter = SimpleOplogGetter(
self.manager,
self.config,
self.timer,
self.backup_root_subdirectory,
self.backup_directory,
self.replsets,
self.backup_stop
)
tailer_module = self.oploggetter
except Exception, e:
self.exception("Problem initializing oplog getter! Error: %s" % e, e)
else:
try:
self.oplogtailer = Tailer(
self.manager,
self.config,
self.timer,
self.backup_root_subdirectory,
self.backup_directory,
self.replsets,
self.backup_stop
)
tailer_module = self.oplogtailer
except Exception, e:
self.exception("Problem initializing oplog tailer! Error: %s" % e, e)

# init the backup
try:
Expand All @@ -354,15 +372,16 @@ def run(self):
if self.backup.is_compressed():
logging.info("Backup method supports compression, disabling compression in archive step and enabling oplog compression")
self.archive.compression('none')
self.oplogtailer.compression(self.backup.compression())
tailer_module.compression(self.backup.compression())
except Exception, e:
self.exception("Problem initializing backup! Error: %s" % e, e)

# start the oplog tailers, before the backups start
try:
self.oplogtailer.run()
except Exception, e:
self.exception("Failed to start oplog tailing threads! Error: %s" % e, e)
if self.oplogtailer:
# start the oplog tailers, before the backups start
try:
self.oplogtailer.run()
except Exception, e:
self.exception("Failed to start oplog tailing threads! Error: %s" % e, e)

# run the backup(s)
try:
Expand All @@ -376,6 +395,14 @@ def run(self):
self.oplog_summary = self.oplogtailer.stop()
self.state.set('tailer_oplog', self.oplog_summary)
self.oplogtailer.close()
elif self.oploggetter:
try:
self.oploggetter.backup_summary = self.backup_summary
self.oplog_summary = self.oploggetter.run()
self.state.set('tailer_oplog', self.oplog_summary)
self.oploggetter.close()
except Exception, e:
self.exception("Failed to start oplog tailing threads! Error: %s" % e, e)

# set balancer back to original value
try:
Expand All @@ -401,7 +428,7 @@ def run(self):
self.db.close()

# resolve/merge tailed oplog into mongodump oplog.bson to a consistent point for all shards
if self.backup.task.lower() == "mongodump" and self.oplogtailer.enabled():
if self.backup.task.lower() == "mongodump" and tailer_module.enabled():
self.resolver = Resolver(
self.manager,
self.config,
Expand All @@ -411,7 +438,7 @@ def run(self):
self.oplog_summary,
self.backup_summary
)
self.resolver.compression(self.oplogtailer.compression())
self.resolver.compression(tailer_module.compression())
resolver_summary = self.resolver.run()
for shard in resolver_summary:
shard_dir = os.path.join(self.backup_directory, shard)
Expand Down
58 changes: 58 additions & 0 deletions mongodb_consistent_backup/Oplog/Common/OplogTask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import logging
import os
from time import sleep

from mongodb_consistent_backup.Pipeline import Task


class OplogTask(Task):
def __init__(self, manager, config, timer, base_dir, backup_dir, replsets, backup_stop):
super(OplogTask, self).__init__(self.__class__.__name__, manager, config, timer, base_dir, backup_dir)

self.backup_name = self.config.name
self.user = self.config.username
self.password = self.config.password
self.authdb = self.config.authdb
self.status_secs = self.config.oplog.tailer.status_interval
self.replsets = replsets
self.backup_stop = backup_stop
self._enabled = self.config.oplog.tailer.enabled

self.compression_supported = ['none', 'gzip']
self.shards = {}
self._summary = {}

def enabled(self):
if isinstance(self._enabled, bool):
return self._enabled
elif isinstance(self._enabled, str) and self._enabled.strip().lower() != 'false':
return True
return False

def summary(self):
return self._summary

def get_summaries(self):
for shard in self.shards:
state = self.shards[shard].get('state')
self._summary[shard] = state.get().copy()

def prepare_oplog_files(self, shard_name):
oplog_dir = os.path.join(self.backup_dir, shard_name)
if not os.path.isdir(oplog_dir):
os.mkdir(oplog_dir)
oplog_file = os.path.join(oplog_dir, "oplog-tailed.bson")
return oplog_file

def close(self):
if not self.enabled():
return
for shard in self.shards:
try:
self.shards[shard]['stop'].set()
thread = self.shards[shard]['thread']
thread.terminate()
while thread.is_alive():
sleep(0.5)
except Exception, e:
logging.error("Cannot stop oplog task thread: %s" % e)
1 change: 1 addition & 0 deletions mongodb_consistent_backup/Oplog/Common/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from OplogTask import OplogTask # NOQA
32 changes: 25 additions & 7 deletions mongodb_consistent_backup/Oplog/Resolver/Resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ def get_consistent_end_ts(self):
end_ts = last_ts
if last_ts < bkp_end_ts:
end_ts = bkp_end_ts
if end_ts is None:
# Happens when there were _no_ oplog changes since the backup
# end, i. e. when all tailed-oplogs are empty
end_ts = bkp_end_ts
return Timestamp(end_ts.time + 1, 0)

def done(self, done_uri):
Expand All @@ -104,22 +108,30 @@ def wait(self, max_wait_secs=6 * 3600, poll_secs=2):
raise OperationError("Waited more than %i seconds for Oplog resolver! I will assume there is a problem and exit")

def run(self):
uri = None
try:
logging.info("Resolving oplogs (options: threads=%s, compression=%s)" % (self.threads(), self.compression()))
self.timer.start(self.timer_name)
self.running = True

consistent_end_ts = self.get_consistent_end_ts()
logging.info("Consistent end timestamp for all shards is %s" % consistent_end_ts)
for shard in self.backup_oplogs:
backup_oplog = self.backup_oplogs[shard]
self.resolver_state[shard] = OplogState(self.manager, None, backup_oplog['file'])
uri = MongoUri(backup_oplog['uri']).get()
if shard in self.tailed_oplogs:
tailed_oplog = self.tailed_oplogs[shard]
if backup_oplog['last_ts'] is None and tailed_oplog['last_ts'] is None:
backup_last_ts = backup_oplog['last_ts']
if 'last_ts' in tailed_oplog and tailed_oplog['last_ts'] is not None:
tailed_last_ts = tailed_oplog['last_ts']
else:
tailed_last_ts = backup_last_ts
if backup_last_ts is None and tailed_last_ts is None:
logging.info("No oplog changes to resolve for %s" % uri)
elif backup_oplog['last_ts'] > tailed_oplog['last_ts']:
elif backup_last_ts > tailed_last_ts:
logging.fatal(
"Backup oplog is newer than the tailed oplog! This situation is unsupported. Please retry backup")
"Backup oplog is newer (%s) than the tailed oplog (%s)! This situation is unsupported. Please retry backup" % (
backup_last_ts, tailed_last_ts))
raise OperationError("Backup oplog is newer than the tailed oplog!")
else:
thread_name = uri.str()
Expand All @@ -130,17 +142,23 @@ def run(self):
uri,
tailed_oplog.copy(),
backup_oplog.copy(),
self.get_consistent_end_ts(),
consistent_end_ts,
self.compression()
).run, callback=self.done)
self._pooled.append(thread_name)
else:
logging.info("No tailed oplog for host %s" % uri)
self.wait()
if len(self._pooled) > 0:
self.wait()
# Shut down the thread pool to avoid spurious exceptions
self._pool.join()
self.completed = True
logging.info("Oplog resolving completed in %.2f seconds" % self.timer.duration(self.timer_name))
except Exception, e:
logging.error("Resolver failed for %s: %s" % (uri, e))
if uri is not None:
logging.error("Resolver failed for %s: %s" % (uri, e))
else:
logging.error("Resolver failed: %s" % e)
raise e
finally:
self.timer.stop(self.timer_name)
Expand Down
1 change: 1 addition & 0 deletions mongodb_consistent_backup/Oplog/Resolver/ResolverThread.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def __init__(self, config, state, uri, tailed_oplog, mongodump_oplog, max_end_ts
self.flush_secs = self.config['oplog']['flush']['max_secs']

self.oplogs = {}
self.last_ts = None
self.changes = 0
self.stopped = False

Expand Down
Loading