Skip to content

Modifying DynamoDB example code to handle throttling exceptions #7490

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 46 additions & 21 deletions python/example_code/dynamodb/batching/dynamo_batching.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,26 +82,45 @@ def do_batch_get(batch_keys):
sleepy_time = 1 # Start with 1 second of sleep, then exponentially increase.
retrieved = {key: [] for key in batch_keys}
while tries < max_tries:
response = dynamodb.batch_get_item(RequestItems=batch_keys)
# Collect any retrieved items and retry unprocessed keys.
for key in response.get("Responses", []):
retrieved[key] += response["Responses"][key]
unprocessed = response["UnprocessedKeys"]
if len(unprocessed) > 0:
batch_keys = unprocessed
unprocessed_count = sum(
[len(batch_key["Keys"]) for batch_key in batch_keys.values()]
)
logger.info(
"%s unprocessed keys returned. Sleep, then retry.", unprocessed_count
)
tries += 1
if tries < max_tries:
logger.info("Sleeping for %s seconds.", sleepy_time)
time.sleep(sleepy_time)
sleepy_time = min(sleepy_time * 2, 32)
else:
break
try:
response = dynamodb.batch_get_item(RequestItems=batch_keys)
# Collect any retrieved items and retry unprocessed keys.
for key in response.get("Responses", []):
retrieved[key] += response["Responses"][key]
unprocessed = response["UnprocessedKeys"]
if len(unprocessed) > 0:
batch_keys = unprocessed
unprocessed_count = sum(
[len(batch_key["Keys"]) for batch_key in batch_keys.values()]
)
logger.info(
"%s unprocessed keys returned. Sleep, then retry.", unprocessed_count
)
tries += 1
if tries < max_tries:
logger.info("Sleeping for %s seconds.", sleepy_time)
time.sleep(sleepy_time)
sleepy_time = min(sleepy_time * 2, 32)
else:
break
except ClientError as error:
if error.response["Error"]["Code"] in ["ProvisionedThroughputExceeded", "ThrottlingException","GsiProvisionedThroughputExceeded", "RequestLimitExceeded"] ughputExceeded"]:
# Check for new throttlingReasons field
if "throttlingReason" in error.response:
for reason in error.response["throttlingReason"]:
logger.warning(
"Throttling detected - Reason: %s, Resource: %s",reason.get("reason"),reason.get("resource")
)
else:
# Fallback to previous message
logger.warning("Throttling detected: %s", error.response["Error"]["Message"])
tries += 1
if tries < max_tries:
logger.info("Sleeping for %s seconds.", sleepy_time)
time.sleep(sleepy_time)
sleepy_time = min(sleepy_time * 2, 32)
else:
raise

return retrieved

Expand Down Expand Up @@ -129,7 +148,9 @@ def fill_table(table, table_data):
for item in table_data:
writer.put_item(Item=item)
logger.info("Loaded data into table %s.", table.name)
except ClientError:
except ClientError as error:
if "throttlingReason" in error.response:
logger.error("Batch write throttled with reasons: %s", error.response["throttlingReason"])
logger.exception("Couldn't load data into table %s.", table.name)
raise

Expand Down Expand Up @@ -215,6 +236,10 @@ def archive_movies(movie_table, movie_data):
"Got expected exception when trying to put duplicate records into the "
"archive table."
)
elif error.response["Error"]["Code"] in ["ProvisionedThroughputExceededException", "ThrottlingException"]:
if "throttlingReasons" in error.response:
logger.error("Archive operation throttled: %s", error.response["throttlingReasons"])
raise
else:
logger.exception(
"Got unexpected exception when trying to put duplicate records into "
Expand Down
Loading