Skip to content

[BUG]: DFP module pipelines creating dask cluster for every batch #1146

Closed
@efajardo-nv

Description

Version

23.11

Which installation method(s) does this occur on?

Docker

Describe the bug.

Running DFP module pipelines with DEBUG log level shows that a new dask cluster is being created for every batch. This results in very slow processing and pipeline mostly fails before completion.

Minimum reproducible example

  1. Follow instructions here to build DFP container.

  2. Create bash shell in container:

docker compose run morpheus_pipeline bash
  1. Download Azure example data:
python /workspace/examples/digital_fingerprinting/fetch_example_data.py all
  1. Run module implementation of Azure DFP training pipeline using DEBUG log level:
cd /workspace/examples/digital_fingerprinting/production/morpheus
python ./dfp_integrated_training_batch_pipeline.py --use_cpp=true --source duo --start_time "2022-08-01" --duration "60d" --train_users generic --log_level DEBUG --input_file "./control_messages/duo_streaming_training.json"

Relevant log output

Log snippet showing dask cluster being created multiple times:

FileToDF [training_pipe]: 915 messages [02:54,  5.23 messages/s]Creating dask cluster...
Preprocessed [training_pipe]: 1101 messages [02:53,  8.22 messagesElapsed time since last train is too short
FileToDF [training_pipe]: 915 messages [03:10,  5.23 messag/opt/conda/envs/morpheus/lib/python3.10/site-packages/merlin/dtypes/mappings/tf.py:52: UserWarning: Tensorflow dtype mappings did not load successfully due to an error: No module named 'tensorflow'
  warn(f"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}")
/opt/conda/envs/morpheus/lib/python3.10/site-packages/merlin/dtypes/mappings/tf.py:52: UserWarning: Tensorflow dtype mappings did not load successfully due to an error: No module named 'tensorflow'
  warn(f"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}")
FileToDF [training_pipe]: 915 messages [03:15,  5.23 messag2023-08-23 16:09:31,028 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2023-08-23 16:09:31,029 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2023-08-23 16:09:31,048 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2023-08-23 16:09:31,048 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
FileToDF [training_pipe]: 915 messages [03:16,  5.23 messagCreating dask cluster... Done. Dashboard: http://127.0.0.1:40219/status
FileToDF [training_pipe]: 915 messages [03:17,  5.23 messagS3 objects to DF complete. Rows: 103, Cache: miss, Duration: 23103.997707366943 ms, Rate: 4.458102935456811 rows/s
FileToDF [training_pipe]: 1018 messages [03:17,  5.12 messages/s]Creating dask cluster...
Preprocessed [training_pipe]: 1101 messages [03:17,  8.22 messages/Elapsed time since last train is too short

Errors when pipeline fails:

FileToDF [training_pipe]: 1287 messages [06:06,  3.77 me2023-08-22 16:16:21,024 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):essages [06:06,  3.77 messages/s]s]
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/tornado/iostream.py", line 861, in _read_to_buffer
    bytes_read = self.read_from_fd(buf)ages [06:06,  8.32 messages/s]
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/tornado/iostream.py", line 1116, in read_from_fd
    return self.socket.recv_into(buf, len(buf)) messages/s]s/s]
ConnectionResetError: [Errno 104] Connection reset by peer/s]
Preprocessed [inference_pipe]: 0 messages [06:06, ? messages/s]
The above exception was the direct cause of the following exception:
Saved [inference_pipe]: 0 messages [06:06, ? messages/s]
Traceback (most recent call last):
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/distributed/worker.py", line 1244, in heartbeat
    response = await retry_operation(
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/distributed/utils_comm.py", line 434, in retry_operation
    return await retry(
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/distributed/utils_comm.py", line 413, in retry
    return await coro()
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/distributed/core.py", line 1265, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/distributed/core.py", line 1024, in send_recv
    response = await comm.read(deserializers=deserializers)
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/distributed/comm/tcp.py", line 241, in read
    convert_stream_closed_error(self, e)
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error
    raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.heartbeat_worker local=tcp://127.0.0.1:52758 remote=tcp://127.0.0.1:41399>: ConnectionResetError: [Errno 104] Connection reset by peer
2023-08-22 16:16:21,096 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/tornado/iostream.py", line 861, in _read_to_buffer
    bytes_read = self.read_from_fd(buf)
  File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/tornado/iostream.py", line 1116, in read_from_fd
    return self.socket.recv_into(buf, len(buf))
ConnectionResetError: [Errno 104] Connection reset by peer

Full env printout

No response

Other/Misc.

No response

Code of Conduct

  • I agree to follow Morpheus' Code of Conduct
  • I have searched the open bugs and have found no duplicates for this bug report

Metadata

Assignees

Labels

bugSomething isn't workingdfp[Workflow] Related to the Digital Fingerprinting (DFP) workflow

Type

No type

Projects

  • Status

    Done

Relationships

None yet

Development

No branches or pull requests

Issue actions