Description
Description
Specifying S3ResultHandler for a Flow running on Dask worker(s) with nthreads > 1 fails with: KeyError: 'credential_provider'
, likely due to a race condition in using the global boto3 session (boto3.client) between threads.
Expected Behavior
In a multithreaded environment, boto3 recommends creating a session per thread rather than sharing the default boto3 session, i.e. boto3.client. See boto3 documentation at: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html?highlight=multithreading#multithreading-multiprocessing
This thread in Prefect's Community Slack describes using this session-per-thread approach to successfully fix a similar issue when using boto3 in Prefect tasks: https://prefect-community.slack.com/archives/CM28LL405/p1581434710167100
Reproduction
A Flow with tasks that can run in parallel (e.g. mapped tasks or different Flow branches) and where the Flow-level result_handler is set to S3ResultHandler should reproduce this behavior.
Full stack trace:
February 29th 2020 at 8:09:43am | prefect.CloudTaskRunner
ERROR
Failed to set task state with error: KeyError('credential_provider')
Traceback (most recent call last):
File "/opt/conda/lib/python3.7/site-packages/prefect/engine/cloud/task_runner.py", line 117, in call_runner_target_handlers
cloud_state = prepare_state_for_cloud(new_state)
File "/opt/conda/lib/python3.7/site-packages/prefect/engine/cloud/utilities.py", line 21, in prepare_state_for_cloud
res.store_safe_value()
File "/opt/conda/lib/python3.7/site-packages/prefect/engine/result.py", line 93, in store_safe_value
value = self.result_handler.write(self.value)
File "/opt/conda/lib/python3.7/site-packages/prefect/engine/result_handlers/s3_result_handler.py", line 103, in write
self.client.upload_fileobj(stream, Bucket=self.bucket, Key=uri)
File "/opt/conda/lib/python3.7/site-packages/prefect/engine/result_handlers/s3_result_handler.py", line 67, in client
self.initialize_client()
File "/opt/conda/lib/python3.7/site-packages/prefect/engine/result_handlers/s3_result_handler.py", line 60, in initialize_client
aws_secret_access_key=aws_secret_access_key,
File "/opt/conda/lib/python3.7/site-packages/boto3/__init__.py", line 91, in client
return _get_default_session().client(*args, **kwargs)
File "/opt/conda/lib/python3.7/site-packages/boto3/session.py", line 263, in client
aws_session_token=aws_session_token, config=config)
File "/opt/conda/lib/python3.7/site-packages/botocore/session.py", line 823, in create_client
credentials = self.get_credentials()
File "/opt/conda/lib/python3.7/site-packages/botocore/session.py", line 428, in get_credentials
'credential_provider').load_credentials()
File "/opt/conda/lib/python3.7/site-packages/botocore/session.py", line 923, in get_component
del self._deferred[name]
KeyError: 'credential_provider'
Environment
We create a long-running Dask cluster where our Dask workers are started with --nprocs 1 --nthreads 3.
(Thanks to @JLouSRM for identifying this issue and capturing log evidence!)
Activity