Skip to content

Commit c5f3008

Browse files
dschnellertimvaillancourt
authored andcommitted
S3 upload throttle: Add --upload.s3.target_mb_per_second parameter (#294)
* Ignore Intellij Project File * Make build.sh compatible with macOS There already was a switch in place for the Python executable, but both the readlink and cp commands use flags not present on the default macOS binaries. This commit adds a check upfront and aborts with a message about you needing the coreutils package from homebrew to get the GNU variants of both commands. * Add --archive.tar.binary parameter Allows specifying a custom location of the "tar" command to use. Also, the flags sent to "tar" are sent individually (`tar -cf` becomes `tar -c -f`). This allows easily customizing how the archiving is performed without having to add lots of new options. For example, you could encrypt backup data via a simple shell script and specify it for --archive.tar.binary: ``` #!/bin/bash gpg_pubkey_id=XXXXXXX new_args="" while [ "${#}" -gt 0 ]; do case "$1" in -f) shift; original_output_file="${1}" shift new_args="${new_args} --to-stdout" ;; *) new_args="${new_args} ${1}" shift ;; esac done tar ${new_args} | gpg --always-trust --encrypt --recipient ${gpg_pubkey_id} -z 0 --output ${original_output_file} ``` This has several advantages: * Backups are never written to disk unencrypted * Encryption can be done in one go, instead of causing the potentially heavy additional I/O a separate encryption step would incur. * It's transparent for the upload stages, so you can still benefit from the integrated S3 (or other) uploads. * Option to fix "S3ResponseError: 403 Forbidden" The S3 uploader fails if bucket permissions are restricted to only allow accessing certain prefixes in a bucket. The default behavior for boto's "get_bucket()" is to "validate" it by accessing the bucket's root, needlessly breaking the uploader even though all necessary permissions might be present. This patch adds a new command line switch --upload.s3.skip_bucket_validation to disable this behavior. * Related: Fix flake8: Make regex a raw string * Related: Fix flake8: Make regex a raw string * Related: Fix flake8: Make regex a raw string * Fix indentation * Add --upload.s3.target_mb_per_second parameter Boto2 unfortunately does not provide a bandwidth limiter for S3 uploads. Instead, it will upload a completed backup as quickly as possible, potentially consuming all available network bandwidth and therefore impacting other applications. This patch adds a very basic throttling mechanism for S3 uploads by optionally hooking into the upload progress and determining the current bandwidth. If it exceeds the designated maximum, the upload thread will pause for a suitable amount of time (capped at 3 seconds) before resuming. While this is far from ideal, it is an easy to understand and (from my experience) good enough method to protect other network users from starvation. Notice: The calculation happens per thread! * Fix handling of unspecified bandwidth limit * Add S3 upload bandwidth limit to example config * Related: Add tar.binary to example config file * Related: add skip_bucket_validation to example conf
1 parent 3e1967d commit c5f3008

File tree

5 files changed

+56
-10
lines changed

5 files changed

+56
-10
lines changed

conf/mongodb-consistent-backup.example.conf

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ production:
5151
# tar:
5252
# compression: [none|gzip] (default: gzip, none if backup already compressed)
5353
# threads: [1+] (default: 1 per CPU)
54+
# binary: [path] (default: tar)
5455
# zbackup:
5556
# binary: [path] (default: /usr/bin/zbackup)
5657
# cache_mb: [mb] (default: 128)
@@ -89,11 +90,13 @@ production:
8990
# port: [SSH Port Number] (default: 22)
9091
# delete: [true|false] (default: false)
9192
# s3:
92-
# region: [AWS S3 Region] (default: us-east-1)
93+
# target_mb_per_second: [1+] (default: unlimited)
94+
# region: [AWS S3 Region] (default: us-east-1)
9395
# access_key: [AWS S3 Access Key]
9496
# secret_key: [AWS S3 Secret Key]
9597
# bucket_name: [AWS S3 Bucket Name]
96-
# bucket_prefix: [prefix] (default: /)
97-
# chunk_size_mb: [1+] (default: 50)
98-
# secure: [true|false] (default: true)
99-
# acl: [acl_str] (default: none)
98+
# bucket_prefix: [prefix] (default: /)
99+
# chunk_size_mb: [1+] (default: 50)
100+
# secure: [true|false] (default: true)
101+
# acl: [acl_str] (default: none)
102+
# skip_bucket_validation: [true|false] (default: false)

mongodb_consistent_backup/Upload/S3/S3.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
2121
self.secret_key = getattr(self.config.upload.s3, 'secret_key', None)
2222
self.chunk_size_mb = self.config.upload.s3.chunk_size_mb
2323
self.chunk_size = self.chunk_size_mb * 1024 * 1024
24+
self.target_bandwidth = None
25+
if self.config.upload.s3.target_mb_per_second is not None:
26+
self.target_bandwidth = self.config.upload.s3.target_mb_per_second * 1024 * 1024
2427
self.s3_acl = self.config.upload.s3.acl
2528
self.key_prefix = base_dir
2629
self.validate_bucket = not self.config.upload.s3.skip_bucket_validation
@@ -44,7 +47,8 @@ def __init__(self, manager, config, timer, base_dir, backup_dir, **kwargs):
4447
self.remove_uploaded,
4548
self.chunk_size,
4649
self.s3_acl,
47-
validate_bucket=self.validate_bucket
50+
validate_bucket=self.validate_bucket,
51+
target_bandwidth=self.target_bandwidth
4852
)
4953

5054
def get_key_name(self, file_path):

mongodb_consistent_backup/Upload/S3/S3UploadPool.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ def __init__(self, bucket_name, region, access_key, secret_key, threads=4, remov
3838
self.chunk_bytes = chunk_bytes
3939
self.key_acl = key_acl
4040
self.validate_bucket = kwargs.get("validate_bucket")
41+
self.target_bandwidth = kwargs.get("target_bandwidth")
4142
self.upload_file_regex = kwargs.get("upload_file_regex")
4243

4344
self.multipart_min_bytes = 5242880
@@ -192,6 +193,7 @@ def start(self, file_name, key_name, byte_count, mp_id=None, mp_num=None, mp_par
192193
file_name,
193194
key_name,
194195
byte_count,
196+
self.target_bandwidth,
195197
mp_id,
196198
mp_num,
197199
mp_parts,

mongodb_consistent_backup/Upload/S3/S3UploadThread.py

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from filechunkio import FileChunkIO
88
from progress.bar import Bar
99
from time import sleep
10+
from time import time
1011

1112
from S3Session import S3Session
1213

@@ -28,7 +29,7 @@ def status(self, line):
2829

2930

3031
class S3UploadThread:
31-
def __init__(self, bucket_name, region, access_key, secret_key, file_name, key_name, byte_count, multipart_id=None,
32+
def __init__(self, bucket_name, region, access_key, secret_key, file_name, key_name, byte_count, target_bandwidth, multipart_id=None,
3233
multipart_num=None, multipart_parts=None, multipart_offset=None, retries=5, secure=True, retry_sleep_secs=1):
3334
self.bucket_name = bucket_name
3435
self.region = region
@@ -37,6 +38,7 @@ def __init__(self, bucket_name, region, access_key, secret_key, file_name, key_n
3738
self.file_name = file_name
3839
self.key_name = key_name
3940
self.byte_count = byte_count
41+
self.target_bandwidth = target_bandwidth
4042
self.multipart_id = multipart_id
4143
self.multipart_num = multipart_num
4244
self.multipart_parts = multipart_parts
@@ -46,11 +48,14 @@ def __init__(self, bucket_name, region, access_key, secret_key, file_name, key_n
4648
self.retry_sleep_secs = retry_sleep_secs
4749
self.do_stop = False
4850

51+
if self.target_bandwidth is not None:
52+
logging.debug("Target bandwidth: %.2f" % self.target_bandwidth)
4953
progress_key_name = self.short_key_name(self.key_name)
5054
if self.multipart_num and self.multipart_parts:
5155
progress_key_name = "%s %d/%d" % (self.short_key_name(self.key_name), self.multipart_num, self.multipart_parts)
5256
self._progress = S3ProgressBar(progress_key_name, max=float(self.byte_count / 1024.00 / 1024.00))
5357
self._last_bytes = None
58+
self._last_status_ts = None
5459

5560
try:
5661
self.s3_conn = S3Session(self.region, self.access_key, self.secret_key, self.bucket_name, self.secure, self.retries)
@@ -80,6 +85,28 @@ def status(self, bytes_uploaded, bytes_total):
8085
if update_bytes > 0:
8186
self._progress.next(float(update_bytes / 1024.00 / 1024.00))
8287
self._last_bytes = bytes_uploaded
88+
if self.target_bandwidth is not None:
89+
self.throttle(update_bytes)
90+
91+
def throttle(self, update_bytes):
92+
if self._last_status_ts:
93+
current_ts = float(time())
94+
duration = current_ts - self._last_status_ts
95+
logging.debug("Transferred %d bytes in %.2f seconds" % (update_bytes, duration))
96+
actual_bytes_per_second = float(update_bytes / duration)
97+
target_bytes_per_second = self.target_bandwidth
98+
bps_factor = actual_bytes_per_second / target_bytes_per_second
99+
if bps_factor > 1.0:
100+
logging.debug("Actual speed is %.2f bytes/s vs target speed %.0f bytes/s. "
101+
"This is %.2f times too fast." % (actual_bytes_per_second, target_bytes_per_second,
102+
bps_factor))
103+
throttle_secs_computed = float(duration * bps_factor)
104+
throttle_secs_ceiling = 3
105+
throttle_secs = min(throttle_secs_computed, throttle_secs_ceiling)
106+
logging.debug("Sleeping for %.2fs (but %.2fs at most), trying to approximate target bandwidth." % (
107+
throttle_secs_computed, throttle_secs_ceiling))
108+
sleep(throttle_secs)
109+
self._last_status_ts = float(time())
83110

84111
def run(self):
85112
try:
@@ -99,9 +126,13 @@ def run(self):
99126
self.multipart_parts,
100127
float(self.byte_count / 1024.00 / 1024.00)
101128
))
129+
callback_count = 10
130+
if self.target_bandwidth is not None:
131+
# request a callback every 2MB to allow for somewhat decent throttling
132+
callback_count = self.byte_count / 1024 / 1024 / 2
102133
with FileChunkIO(self.file_name, 'r', offset=self.multipart_offset, bytes=self.byte_count) as fp:
103-
mp.upload_part_from_file(fp=fp, cb=self.status, num_cb=10, part_num=self.multipart_num)
104-
break
134+
mp.upload_part_from_file(fp=fp, cb=self.status, num_cb=callback_count, part_num=self.multipart_num)
135+
break
105136
else:
106137
key = None
107138
try:
@@ -110,7 +141,11 @@ def run(self):
110141
float(self.byte_count / 1024.00 / 1024.00)
111142
))
112143
key = Key(bucket=self.bucket, name=self.key_name)
113-
key.set_contents_from_filename(self.file_name, cb=self.status, num_cb=10)
144+
callback_count = 10
145+
if self.target_bandwidth is not None:
146+
# request a callback every 2MB to allow for somewhat decent throttling
147+
callback_count = self.byte_count / 1024 / 1024 / 2
148+
key.set_contents_from_filename(self.file_name, cb=self.status, num_cb=callback_count)
114149
finally:
115150
if key:
116151
key.close()

mongodb_consistent_backup/Upload/S3/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ def config(parser):
2020
help="S3 Uploader explicit storage key within the S3 bucket")
2121
parser.add_argument("--upload.s3.chunk_size_mb", dest="upload.s3.chunk_size_mb", default=50, type=int,
2222
help="S3 Uploader upload chunk size, in megabytes (default: 50)")
23+
parser.add_argument("--upload.s3.target_mb_per_second", dest="upload.s3.target_mb_per_second", default=None,
24+
type=int, help="S3 Uploader target bandwidth in MB/s per upload thread. (default: unlimited)")
2325
parser.add_argument("--upload.s3.secure", dest="upload.s3.secure", default=True, action="store_false",
2426
help="S3 Uploader connect over SSL (default: true)")
2527
parser.add_argument("--upload.s3.acl", dest="upload.s3.acl", default=None, type=str,

0 commit comments

Comments
 (0)