Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Another fix related to fix encoding issues. (file.name / stream.name are... #508

Merged
merged 1 commit into from
Mar 18, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions S3/MultiPart.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import sys
from stat import ST_SIZE
from logging import debug, info, warning, error
from Utils import getTextFromXml, getTreeFromXml, formatSize, deunicodise, calculateChecksum, parseNodes, encode_to_s3
from Utils import getTextFromXml, getTreeFromXml, formatSize, unicodise, deunicodise, calculateChecksum, parseNodes, encode_to_s3

class MultiPartUpload(object):

Expand Down Expand Up @@ -85,34 +85,35 @@ def upload_all_parts(self):
raise RuntimeError("Attempting to use a multipart upload that has not been initiated.")

self.chunk_size = self.s3.config.multipart_chunk_size_mb * 1024 * 1024
filename = unicodise(self.file.name)

if self.file.name != "<stdin>":
size_left = file_size = os.stat(deunicodise(self.file.name))[ST_SIZE]
if filename != "<stdin>":
size_left = file_size = os.stat(deunicodise(filename))[ST_SIZE]
nr_parts = file_size / self.chunk_size + (file_size % self.chunk_size and 1)
debug("MultiPart: Uploading %s in %d parts" % (self.file.name, nr_parts))
debug("MultiPart: Uploading %s in %d parts" % (filename, nr_parts))
else:
debug("MultiPart: Uploading from %s" % (self.file.name))
debug("MultiPart: Uploading from %s" % filename)

remote_statuses = dict()
if self.s3.config.put_continue:
remote_statuses = self.get_parts_information(self.uri, self.upload_id)

seq = 1
if self.file.name != "<stdin>":
if filename != "<stdin>":
while size_left > 0:
offset = self.chunk_size * (seq - 1)
current_chunk_size = min(file_size - offset, self.chunk_size)
size_left -= current_chunk_size
labels = {
'source' : self.file.name,
'source' : filename,
'destination' : self.uri.uri(),
'extra' : "[part %d of %d, %s]" % (seq, nr_parts, "%d%sB" % formatSize(current_chunk_size, human_readable = True))
}
try:
self.upload_part(seq, offset, current_chunk_size, labels, remote_status = remote_statuses.get(seq))
except:
error(u"\nUpload of '%s' part %d failed. Use\n %s abortmp %s %s\nto abort the upload, or\n %s --upload-id %s put ...\nto continue the upload."
% (self.file.name, seq, sys.argv[0], self.uri, self.upload_id, sys.argv[0], self.upload_id))
% (filename, seq, sys.argv[0], self.uri, self.upload_id, sys.argv[0], self.upload_id))
raise
seq += 1
else:
Expand All @@ -121,7 +122,7 @@ def upload_all_parts(self):
offset = 0 # send from start of the buffer
current_chunk_size = len(buffer)
labels = {
'source' : self.file.name,
'source' : filename,
'destination' : self.uri.uri(),
'extra' : "[part %d, %s]" % (seq, "%d%sB" % formatSize(current_chunk_size, human_readable = True))
}
Expand All @@ -131,7 +132,7 @@ def upload_all_parts(self):
self.upload_part(seq, offset, current_chunk_size, labels, buffer, remote_status = remote_statuses.get(seq))
except:
error(u"\nUpload of '%s' part %d failed. Use\n %s abortmp %s %s\nto abort, or\n %s --upload-id %s put ...\nto continue the upload."
% (self.file.name, seq, sys.argv[0], self.uri, self.upload_id, sys.argv[0], self.upload_id))
% (filename, seq, sys.argv[0], self.uri, self.upload_id, sys.argv[0], self.upload_id))
raise
seq += 1

Expand Down
24 changes: 13 additions & 11 deletions S3/S3.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ def object_get(self, uri, stream, start_position = 0, extra_label = ""):
if uri.type != "s3":
raise ValueError("Expected URI type 's3', got '%s'" % uri.type)
request = self.create_request("OBJECT_GET", uri = uri)
labels = { 'source' : uri.uri(), 'destination' : stream.name, 'extra' : extra_label }
labels = { 'source' : uri.uri(), 'destination' : unicodise(stream.name), 'extra' : extra_label }
response = self.recv_file(request, stream, labels, start_position)
return response

Expand Down Expand Up @@ -1029,16 +1029,17 @@ def send_file(self, request, file, labels, buffer = '', throttle = 0, retries =
S3Request.region_map[request.resource['bucket']] = region

size_left = size_total = long(headers["content-length"])
filename = unicodise(file.name)
if self.config.progress_meter:
progress = self.config.progress_class(labels, size_total)
else:
info("Sending file '%s', please wait..." % file.name)
info("Sending file '%s', please wait..." % filename)
timestamp_start = time.time()

if buffer:
sha256_hash = checksum_sha256_buffer(buffer, offset, size_total)
else:
sha256_hash = checksum_sha256_file(file.name, offset, size_total)
sha256_hash = checksum_sha256_file(filename, offset, size_total)
request.body = sha256_hash
method_string, resource, headers = request.get_triplet()
try:
Expand Down Expand Up @@ -1066,7 +1067,7 @@ def send_file(self, request, file, labels, buffer = '', throttle = 0, retries =

try:
while (size_left > 0):
#debug("SendFile: Reading up to %d bytes from '%s' - remaining bytes: %s" % (self.config.send_chunk, file.name, size_left))
#debug("SendFile: Reading up to %d bytes from '%s' - remaining bytes: %s" % (self.config.send_chunk, filename, size_left))
l = min(self.config.send_chunk, size_left)
if buffer == '':
data = file.read(l)
Expand Down Expand Up @@ -1115,7 +1116,7 @@ def send_file(self, request, file, labels, buffer = '', throttle = 0, retries =
# Connection error -> same throttle value
return self.send_file(request, file, labels, buffer, throttle, retries - 1, offset, chunk_size)
else:
debug("Giving up on '%s' %s" % (file.name, e))
debug("Giving up on '%s' %s" % (filename, e))
raise S3UploadError("Upload failed for: %s" % resource['uri'])

timestamp_end = time.time()
Expand Down Expand Up @@ -1165,7 +1166,7 @@ def send_file(self, request, file, labels, buffer = '', throttle = 0, retries =
time.sleep(self._fail_wait(retries))
return self.send_file(request, file, labels, buffer, throttle, retries - 1, offset, chunk_size)
else:
warning("Too many failures. Giving up on '%s'" % (file.name))
warning("Too many failures. Giving up on '%s'" % (filename))
raise S3UploadError

## Non-recoverable error
Expand All @@ -1175,10 +1176,10 @@ def send_file(self, request, file, labels, buffer = '', throttle = 0, retries =
if response["headers"]["etag"].strip('"\'') != md5_hash.hexdigest():
warning("MD5 Sums don't match!")
if retries:
warning("Retrying upload of %s" % (file.name))
warning("Retrying upload of %s" % (filename))
return self.send_file(request, file, labels, buffer, throttle, retries - 1, offset, chunk_size)
else:
warning("Too many failures. Giving up on '%s'" % (file.name))
warning("Too many failures. Giving up on '%s'" % (filename))
raise S3UploadError

return response
Expand All @@ -1196,10 +1197,11 @@ def send_file_multipart(self, file, headers, uri, size):

def recv_file(self, request, stream, labels, start_position = 0, retries = _max_retries):
method_string, resource, headers = request.get_triplet()
filename = unicodise(stream.name)
if self.config.progress_meter:
progress = self.config.progress_class(labels, 0)
else:
info("Receiving file '%s', please wait..." % stream.name)
info("Receiving file '%s', please wait..." % filename)
timestamp_start = time.time()
try:
conn = ConnMan.get(self.get_hostname(resource['bucket']))
Expand Down Expand Up @@ -1334,10 +1336,10 @@ def recv_file(self, request, stream, labels, start_position = 0, retries = _max_
else:
# Otherwise try to compute MD5 of the output file
try:
response["md5"] = hash_file_md5(stream.name)
response["md5"] = hash_file_md5(filename)
except IOError, e:
if e.errno != errno.ENOENT:
warning("Unable to open file: %s: %s" % (stream.name, e))
warning("Unable to open file: %s: %s" % (filename, e))
warning("Unable to verify MD5. Assume it matches.")
response["md5"] = response["headers"]["etag"]

Expand Down