Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix]Add retry for streaming download #18164

Merged
merged 5 commits into from
Apr 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 92 additions & 69 deletions sdk/storage/azure-storage-blob/azure/storage/blob/_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@

import sys
import threading
import time

import warnings
from io import BytesIO

from typing import Iterator
from azure.core.exceptions import HttpResponseError

import requests
from azure.core.exceptions import HttpResponseError, ServiceResponseError

from azure.core.tracing.common import with_current_context
from ._shared.encryption import decrypt_blob
from ._shared.request_handlers import validate_and_format_range_headers
Expand Down Expand Up @@ -44,10 +48,9 @@ def process_range_and_offset(start_range, end_range, length, encryption):
def process_content(data, start_offset, end_offset, encryption):
if data is None:
raise ValueError("Response cannot be None.")
try:
content = b"".join(list(data))
except Exception as error:
raise HttpResponseError(message="Download stream interrupted.", response=data.response, error=error)

content = b"".join(list(data))

if content and encryption.get("key") is not None or encryption.get("resolver") is not None:
try:
return decrypt_blob(
Expand Down Expand Up @@ -189,19 +192,29 @@ def _download_chunk(self, chunk_start, chunk_end):
check_content_md5=self.validate_content
)

try:
_, response = self.client.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self.validate_content,
data_stream_total=self.total_size,
download_stream_current=self.progress_total,
**self.request_options
)
except HttpResponseError as error:
process_storage_error(error)
retry_active = True
retry_total = 3
while retry_active:
try:
_, response = self.client.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self.validate_content,
data_stream_total=self.total_size,
download_stream_current=self.progress_total,
**self.request_options
)
except HttpResponseError as error:
process_storage_error(error)

chunk_data = process_content(response, offset[0], offset[1], self.encryption_options)
try:
chunk_data = process_content(response, offset[0], offset[1], self.encryption_options)
retry_active = False
except (requests.exceptions.ChunkedEncodingError, requests.exceptions.ConnectionError) as error:
retry_total -= 1
if retry_total <= 0:
raise ServiceResponseError(error, error=error)
time.sleep(1)

# This makes sure that if_match is set so that we can validate
# that subsequent downloads are to an unmodified blob
Expand Down Expand Up @@ -354,16 +367,6 @@ def __init__(
# TODO: Set to the stored MD5 when the service returns this
self.properties.content_md5 = None

if self.size == 0:
self._current_content = b""
else:
self._current_content = process_content(
self._response,
self._initial_offset[0],
self._initial_offset[1],
self._encryption_options
)

def __len__(self):
return self.size

Expand All @@ -376,51 +379,71 @@ def _initial_request(self):
check_content_md5=self._validate_content
)

try:
location_mode, response = self._clients.blob.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self._validate_content,
data_stream_total=None,
download_stream_current=0,
**self._request_options
)
retry_active = True
retry_total = 3
while retry_active:
try:
location_mode, response = self._clients.blob.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self._validate_content,
data_stream_total=None,
download_stream_current=0,
**self._request_options
)

# Check the location we read from to ensure we use the same one
# for subsequent requests.
self._location_mode = location_mode
# Check the location we read from to ensure we use the same one
# for subsequent requests.
self._location_mode = location_mode

# Parse the total file size and adjust the download size if ranges
# were specified
self._file_size = parse_length_from_content_range(response.properties.content_range)
if self._end_range is not None:
# Use the end range index unless it is over the end of the file
self.size = min(self._file_size, self._end_range - self._start_range + 1)
elif self._start_range is not None:
self.size = self._file_size - self._start_range
else:
self.size = self._file_size

# Parse the total file size and adjust the download size if ranges
# were specified
self._file_size = parse_length_from_content_range(response.properties.content_range)
if self._end_range is not None:
# Use the end range index unless it is over the end of the file
self.size = min(self._file_size, self._end_range - self._start_range + 1)
elif self._start_range is not None:
self.size = self._file_size - self._start_range
else:
self.size = self._file_size

except HttpResponseError as error:
if self._start_range is None and error.response.status_code == 416:
# Get range will fail on an empty file. If the user did not
# request a range, do a regular get request in order to get
# any properties.
try:
_, response = self._clients.blob.download(
validate_content=self._validate_content,
data_stream_total=0,
download_stream_current=0,
**self._request_options
)
except HttpResponseError as error:
except HttpResponseError as error:
if self._start_range is None and error.response.status_code == 416:
# Get range will fail on an empty file. If the user did not
# request a range, do a regular get request in order to get
# any properties.
try:
_, response = self._clients.blob.download(
validate_content=self._validate_content,
data_stream_total=0,
download_stream_current=0,
**self._request_options
)
except HttpResponseError as error:
process_storage_error(error)

# Set the download size to empty
self.size = 0
self._file_size = 0
else:
process_storage_error(error)

# Set the download size to empty
self.size = 0
self._file_size = 0
else:
process_storage_error(error)
try:
if self.size == 0:
self._current_content = b""
else:
self._current_content = process_content(
response,
self._initial_offset[0],
self._initial_offset[1],
self._encryption_options
)
retry_active = False
except (requests.exceptions.ChunkedEncodingError, requests.exceptions.ConnectionError) as error:
retry_total -= 1
if retry_total <= 0:
raise ServiceResponseError(error, error=error)
time.sleep(1)

# get page ranges to optimize downloading sparse page blob
if response.properties.blob_type == 'PageBlob':
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@
from io import BytesIO
from itertools import islice
import warnings

from typing import AsyncIterator
from azure.core.exceptions import HttpResponseError

from aiohttp import ClientPayloadError
from azure.core.exceptions import HttpResponseError, ServiceResponseError
from .._shared.encryption import decrypt_blob
from .._shared.request_handlers import validate_and_format_range_headers
from .._shared.response_handlers import process_storage_error, parse_length_from_content_range
Expand All @@ -22,10 +23,7 @@
async def process_content(data, start_offset, end_offset, encryption):
if data is None:
raise ValueError("Response cannot be None.")
try:
content = data.response.body()
except Exception as error:
raise HttpResponseError(message="Download stream interrupted.", response=data.response, error=error)
content = data.response.body()
if encryption.get('key') is not None or encryption.get('resolver') is not None:
try:
return decrypt_blob(
Expand Down Expand Up @@ -91,20 +89,31 @@ async def _download_chunk(self, chunk_start, chunk_end):
download_range[1],
check_content_md5=self.validate_content
)
try:
_, response = await self.client.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self.validate_content,
data_stream_total=self.total_size,
download_stream_current=self.progress_total,
**self.request_options
)
except HttpResponseError as error:
process_storage_error(error)
retry_active = True
retry_total = 3
while retry_active:
try:
_, response = await self.client.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self.validate_content,
data_stream_total=self.total_size,
download_stream_current=self.progress_total,
**self.request_options
)
retry_active = False

except HttpResponseError as error:
process_storage_error(error)
except ClientPayloadError as error:
retry_total -= 1
if retry_total <= 0:
raise ServiceResponseError(error, error=error)
await asyncio.sleep(1)

chunk_data = await process_content(response, offset[0], offset[1], self.encryption_options)


# This makes sure that if_match is set so that we can validate
# that subsequent downloads are to an unmodified blob
if self.request_options.get('modified_access_conditions'):
Expand Down Expand Up @@ -277,49 +286,60 @@ async def _initial_request(self):
end_range_required=False,
check_content_md5=self._validate_content)

try:
location_mode, response = await self._clients.blob.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self._validate_content,
data_stream_total=None,
download_stream_current=0,
**self._request_options)

# Check the location we read from to ensure we use the same one
# for subsequent requests.
self._location_mode = location_mode

# Parse the total file size and adjust the download size if ranges
# were specified
self._file_size = parse_length_from_content_range(response.properties.content_range)
if self._end_range is not None:
# Use the length unless it is over the end of the file
self.size = min(self._file_size, self._end_range - self._start_range + 1)
elif self._start_range is not None:
self.size = self._file_size - self._start_range
else:
self.size = self._file_size
retry_active = True
retry_total = 3
while retry_active:
try:
location_mode, response = await self._clients.blob.download(
range=range_header,
range_get_content_md5=range_validation,
validate_content=self._validate_content,
data_stream_total=None,
download_stream_current=0,
**self._request_options)

# Check the location we read from to ensure we use the same one
# for subsequent requests.
self._location_mode = location_mode

# Parse the total file size and adjust the download size if ranges
# were specified
self._file_size = parse_length_from_content_range(response.properties.content_range)
if self._end_range is not None:
# Use the length unless it is over the end of the file
self.size = min(self._file_size, self._end_range - self._start_range + 1)
elif self._start_range is not None:
self.size = self._file_size - self._start_range
else:
self.size = self._file_size
retry_active = False

except HttpResponseError as error:
if self._start_range is None and error.response.status_code == 416:
# Get range will fail on an empty file. If the user did not
# request a range, do a regular get request in order to get
# any properties.
try:
_, response = await self._clients.blob.download(
validate_content=self._validate_content,
data_stream_total=0,
download_stream_current=0,
**self._request_options)
except HttpResponseError as error:
except HttpResponseError as error:
if self._start_range is None and error.response.status_code == 416:
# Get range will fail on an empty file. If the user did not
# request a range, do a regular get request in order to get
# any properties.
try:
_, response = await self._clients.blob.download(
validate_content=self._validate_content,
data_stream_total=0,
download_stream_current=0,
**self._request_options)
retry_active = False
except HttpResponseError as error:
process_storage_error(error)

# Set the download size to empty
self.size = 0
self._file_size = 0
else:
process_storage_error(error)

# Set the download size to empty
self.size = 0
self._file_size = 0
else:
process_storage_error(error)
except ClientPayloadError as error:
retry_total -= 1
if retry_total <= 0:
raise ServiceResponseError(error, error=error)
await asyncio.sleep(1)

# get page ranges to optimize downloading sparse page blob
if response.properties.blob_type == 'PageBlob':
Expand Down