Skip to content

S3 upload throttle: Add --upload.s3.target_mb_per_second parameter #294

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Dec 7, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions conf/mongodb-consistent-backup.example.conf
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ production:
# tar:
# compression: [none|gzip] (default: gzip, none if backup already compressed)
# threads: [1+] (default: 1 per CPU)
# binary: [path] (default: tar)
# zbackup:
# binary: [path] (default: /usr/bin/zbackup)
# cache_mb: [mb] (default: 128)
Expand Down Expand Up @@ -89,11 +90,13 @@ production:
# port: [SSH Port Number] (default: 22)
# delete: [true|false] (default: false)
# s3:
# region: [AWS S3 Region] (default: us-east-1)
# target_mb_per_second: [1+] (default: unlimited)
# region: [AWS S3 Region] (default: us-east-1)
# access_key: [AWS S3 Access Key]
# secret_key: [AWS S3 Secret Key]
# bucket_name: [AWS S3 Bucket Name]
# bucket_prefix: [prefix] (default: /)
# chunk_size_mb: [1+] (default: 50)
# secure: [true|false] (default: true)
# acl: [acl_str] (default: none)
# bucket_prefix: [prefix] (default: /)
# chunk_size_mb: [1+] (default: 50)
# secure: [true|false] (default: true)
# acl: [acl_str] (default: none)
# skip_bucket_validation: [true|false] (default: false)
6 changes: 5 additions & 1 deletion mongodb_consistent_backup/Upload/S3/S3.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
self.secret_key = getattr(self.config.upload.s3, 'secret_key', None)
self.chunk_size_mb = self.config.upload.s3.chunk_size_mb
self.chunk_size = self.chunk_size_mb * 1024 * 1024
self.target_bandwidth = None
if self.config.upload.s3.target_mb_per_second is not None:
self.target_bandwidth = self.config.upload.s3.target_mb_per_second * 1024 * 1024
self.s3_acl = self.config.upload.s3.acl
self.key_prefix = base_dir
self.validate_bucket = not self.config.upload.s3.skip_bucket_validation
Expand All @@ -44,7 +47,8 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
self.remove_uploaded,
self.chunk_size,
self.s3_acl,
validate_bucket=self.validate_bucket
validate_bucket=self.validate_bucket,
target_bandwidth=self.target_bandwidth
)

def get_key_name(self, file_path):
Expand Down
2 changes: 2 additions & 0 deletions mongodb_consistent_backup/Upload/S3/S3UploadPool.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(self, bucket_name, region, access_key, secret_key, threads=4, remov
self.chunk_bytes = chunk_bytes
self.key_acl = key_acl
self.validate_bucket = kwargs.get("validate_bucket")
self.target_bandwidth = kwargs.get("target_bandwidth")
self.upload_file_regex = kwargs.get("upload_file_regex")

self.multipart_min_bytes = 5242880
Expand Down Expand Up @@ -192,6 +193,7 @@ def start(self, file_name, key_name, byte_count, mp_id=None, mp_num=None, mp_par
file_name,
key_name,
byte_count,
self.target_bandwidth,
mp_id,
mp_num,
mp_parts,
Expand Down
43 changes: 39 additions & 4 deletions mongodb_consistent_backup/Upload/S3/S3UploadThread.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from filechunkio import FileChunkIO
from progress.bar import Bar
from time import sleep
from time import time

from S3Session import S3Session

Expand All @@ -28,7 +29,7 @@ def status(self, line):


class S3UploadThread:
def __init__(self, bucket_name, region, access_key, secret_key, file_name, key_name, byte_count, multipart_id=None,
def __init__(self, bucket_name, region, access_key, secret_key, file_name, key_name, byte_count, target_bandwidth, multipart_id=None,
multipart_num=None, multipart_parts=None, multipart_offset=None, retries=5, secure=True, retry_sleep_secs=1):
self.bucket_name = bucket_name
self.region = region
Expand All @@ -37,6 +38,7 @@ def __init__(self, bucket_name, region, access_key, secret_key, file_name, key_n
self.file_name = file_name
self.key_name = key_name
self.byte_count = byte_count
self.target_bandwidth = target_bandwidth
self.multipart_id = multipart_id
self.multipart_num = multipart_num
self.multipart_parts = multipart_parts
Expand All @@ -46,11 +48,14 @@ def __init__(self, bucket_name, region, access_key, secret_key, file_name, key_n
self.retry_sleep_secs = retry_sleep_secs
self.do_stop = False

if self.target_bandwidth is not None:
logging.debug("Target bandwidth: %.2f" % self.target_bandwidth)
progress_key_name = self.short_key_name(self.key_name)
if self.multipart_num and self.multipart_parts:
progress_key_name = "%s %d/%d" % (self.short_key_name(self.key_name), self.multipart_num, self.multipart_parts)
self._progress = S3ProgressBar(progress_key_name, max=float(self.byte_count / 1024.00 / 1024.00))
self._last_bytes = None
self._last_status_ts = None

try:
self.s3_conn = S3Session(self.region, self.access_key, self.secret_key, self.bucket_name, self.secure, self.retries)
Expand Down Expand Up @@ -80,6 +85,28 @@ def status(self, bytes_uploaded, bytes_total):
if update_bytes > 0:
self._progress.next(float(update_bytes / 1024.00 / 1024.00))
self._last_bytes = bytes_uploaded
if self.target_bandwidth is not None:
self.throttle(update_bytes)

def throttle(self, update_bytes):
if self._last_status_ts:
current_ts = float(time())
duration = current_ts - self._last_status_ts
logging.debug("Transferred %d bytes in %.2f seconds" % (update_bytes, duration))
actual_bytes_per_second = float(update_bytes / duration)
target_bytes_per_second = self.target_bandwidth
bps_factor = actual_bytes_per_second / target_bytes_per_second
if bps_factor > 1.0:
logging.debug("Actual speed is %.2f bytes/s vs target speed %.0f bytes/s. "
"This is %.2f times too fast." % (actual_bytes_per_second, target_bytes_per_second,
bps_factor))
throttle_secs_computed = float(duration * bps_factor)
throttle_secs_ceiling = 3
throttle_secs = min(throttle_secs_computed, throttle_secs_ceiling)
logging.debug("Sleeping for %.2fs (but %.2fs at most), trying to approximate target bandwidth." % (
throttle_secs_computed, throttle_secs_ceiling))
sleep(throttle_secs)
self._last_status_ts = float(time())

def run(self):
try:
Expand All @@ -99,9 +126,13 @@ def run(self):
self.multipart_parts,
float(self.byte_count / 1024.00 / 1024.00)
))
callback_count = 10
if self.target_bandwidth is not None:
# request a callback every 2MB to allow for somewhat decent throttling
callback_count = self.byte_count / 1024 / 1024 / 2
with FileChunkIO(self.file_name, 'r', offset=self.multipart_offset, bytes=self.byte_count) as fp:
mp.upload_part_from_file(fp=fp, cb=self.status, num_cb=10, part_num=self.multipart_num)
break
mp.upload_part_from_file(fp=fp, cb=self.status, num_cb=callback_count, part_num=self.multipart_num)
break
else:
key = None
try:
Expand All @@ -110,7 +141,11 @@ def run(self):
float(self.byte_count / 1024.00 / 1024.00)
))
key = Key(bucket=self.bucket, name=self.key_name)
key.set_contents_from_filename(self.file_name, cb=self.status, num_cb=10)
callback_count = 10
if self.target_bandwidth is not None:
# request a callback every 2MB to allow for somewhat decent throttling
callback_count = self.byte_count / 1024 / 1024 / 2
key.set_contents_from_filename(self.file_name, cb=self.status, num_cb=callback_count)
finally:
if key:
key.close()
Expand Down
2 changes: 2 additions & 0 deletions mongodb_consistent_backup/Upload/S3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ def config(parser):
help="S3 Uploader explicit storage key within the S3 bucket")
parser.add_argument("--upload.s3.chunk_size_mb", dest="upload.s3.chunk_size_mb", default=50, type=int,
help="S3 Uploader upload chunk size, in megabytes (default: 50)")
parser.add_argument("--upload.s3.target_mb_per_second", dest="upload.s3.target_mb_per_second", default=None,
type=int, help="S3 Uploader target bandwidth in MB/s per upload thread. (default: unlimited)")
parser.add_argument("--upload.s3.secure", dest="upload.s3.secure", default=True, action="store_false",
help="S3 Uploader connect over SSL (default: true)")
parser.add_argument("--upload.s3.acl", dest="upload.s3.acl", default=None, type=str,
Expand Down