Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions airflow/providers/amazon/aws/log/s3_task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def s3_read(self, remote_log_location: str, return_error: bool = False) -> str:
return msg
return ''

def s3_write(self, log: str, remote_log_location: str, append: bool = True):
def s3_write(self, log: str, remote_log_location: str, append: bool = True, max_retry: int = 1):
"""
Writes the log to the remote_log_location. Fails silently if no hook
was created.
Expand All @@ -174,6 +174,7 @@ def s3_write(self, log: str, remote_log_location: str, append: bool = True):
:param remote_log_location: the log's location in remote storage
:param append: if False, any existing log file is overwritten. If True,
the new log is appended to any existing logs.
:param max_retry: Maximum number of times to retry on upload failure
"""
try:
if append and self.s3_log_exists(remote_log_location):
Expand All @@ -182,12 +183,20 @@ def s3_write(self, log: str, remote_log_location: str, append: bool = True):
except Exception:
self.log.exception('Could not verify previous log to append')

try:
self.hook.load_string(
log,
key=remote_log_location,
replace=True,
encrypt=conf.getboolean('logging', 'ENCRYPT_S3_LOGS'),
)
except Exception:
self.log.exception('Could not write logs to %s', remote_log_location)
# Default to a single retry attempt because s3 upload failures are
# rare but occasionally occur. Multiple retry attempts are unlikely
# to help as they usually indicate non-empheral errors.
for try_num in range(1 + max_retry):
try:
self.hook.load_string(
log,
key=remote_log_location,
replace=True,
encrypt=conf.getboolean('logging', 'ENCRYPT_S3_LOGS'),
)
break
except Exception:
if try_num < max_retry:
self.log.warning('Failed attempt to write logs to %s, will retry', remote_log_location)
else:
self.log.exception('Could not write logs to %s', remote_log_location)