Skip to content

Flow S3ResultHandler Fails for Dask Worker with nthreads > 1 #2108

Closed
@joeschmid

Description

Description

Specifying S3ResultHandler for a Flow running on Dask worker(s) with nthreads > 1 fails with: KeyError: 'credential_provider', likely due to a race condition in using the global boto3 session (boto3.client) between threads.

Expected Behavior

In a multithreaded environment, boto3 recommends creating a session per thread rather than sharing the default boto3 session, i.e. boto3.client. See boto3 documentation at: https://boto3.amazonaws.com/v1/documentation/api/latest/guide/resources.html?highlight=multithreading#multithreading-multiprocessing

This thread in Prefect's Community Slack describes using this session-per-thread approach to successfully fix a similar issue when using boto3 in Prefect tasks: https://prefect-community.slack.com/archives/CM28LL405/p1581434710167100

Reproduction

A Flow with tasks that can run in parallel (e.g. mapped tasks or different Flow branches) and where the Flow-level result_handler is set to S3ResultHandler should reproduce this behavior.

Full stack trace:

February 29th 2020 at 8:09:43am | prefect.CloudTaskRunner
ERROR 
Failed to set task state with error: KeyError('credential_provider')
Traceback (most recent call last):
  File "/opt/conda/lib/python3.7/site-packages/prefect/engine/cloud/task_runner.py", line 117, in call_runner_target_handlers
    cloud_state = prepare_state_for_cloud(new_state)
  File "/opt/conda/lib/python3.7/site-packages/prefect/engine/cloud/utilities.py", line 21, in prepare_state_for_cloud
    res.store_safe_value()
  File "/opt/conda/lib/python3.7/site-packages/prefect/engine/result.py", line 93, in store_safe_value
    value = self.result_handler.write(self.value)
  File "/opt/conda/lib/python3.7/site-packages/prefect/engine/result_handlers/s3_result_handler.py", line 103, in write
    self.client.upload_fileobj(stream, Bucket=self.bucket, Key=uri)
  File "/opt/conda/lib/python3.7/site-packages/prefect/engine/result_handlers/s3_result_handler.py", line 67, in client
    self.initialize_client()
  File "/opt/conda/lib/python3.7/site-packages/prefect/engine/result_handlers/s3_result_handler.py", line 60, in initialize_client
    aws_secret_access_key=aws_secret_access_key,
  File "/opt/conda/lib/python3.7/site-packages/boto3/__init__.py", line 91, in client
    return _get_default_session().client(*args, **kwargs)
  File "/opt/conda/lib/python3.7/site-packages/boto3/session.py", line 263, in client
    aws_session_token=aws_session_token, config=config)
  File "/opt/conda/lib/python3.7/site-packages/botocore/session.py", line 823, in create_client
    credentials = self.get_credentials()
  File "/opt/conda/lib/python3.7/site-packages/botocore/session.py", line 428, in get_credentials
    'credential_provider').load_credentials()
  File "/opt/conda/lib/python3.7/site-packages/botocore/session.py", line 923, in get_component
    del self._deferred[name]
KeyError: 'credential_provider'

Environment

We create a long-running Dask cluster where our Dask workers are started with --nprocs 1 --nthreads 3.

(Thanks to @JLouSRM for identifying this issue and capturing log evidence!)

Activity

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions