Closed
Description
Version
23.11
Which installation method(s) does this occur on?
Docker
Describe the bug.
Running DFP module pipelines with DEBUG
log level shows that a new dask cluster is being created for every batch. This results in very slow processing and pipeline mostly fails before completion.
Minimum reproducible example
-
Follow instructions here to build DFP container.
-
Create
bash
shell in container:
docker compose run morpheus_pipeline bash
- Download Azure example data:
python /workspace/examples/digital_fingerprinting/fetch_example_data.py all
- Run module implementation of Azure DFP training pipeline using
DEBUG
log level:
cd /workspace/examples/digital_fingerprinting/production/morpheus
python ./dfp_integrated_training_batch_pipeline.py --use_cpp=true --source duo --start_time "2022-08-01" --duration "60d" --train_users generic --log_level DEBUG --input_file "./control_messages/duo_streaming_training.json"
Relevant log output
Log snippet showing dask cluster being created multiple times:
FileToDF [training_pipe]: 915 messages [02:54, 5.23 messages/s]Creating dask cluster...
Preprocessed [training_pipe]: 1101 messages [02:53, 8.22 messagesElapsed time since last train is too short
FileToDF [training_pipe]: 915 messages [03:10, 5.23 messag/opt/conda/envs/morpheus/lib/python3.10/site-packages/merlin/dtypes/mappings/tf.py:52: UserWarning: Tensorflow dtype mappings did not load successfully due to an error: No module named 'tensorflow'
warn(f"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}")
/opt/conda/envs/morpheus/lib/python3.10/site-packages/merlin/dtypes/mappings/tf.py:52: UserWarning: Tensorflow dtype mappings did not load successfully due to an error: No module named 'tensorflow'
warn(f"Tensorflow dtype mappings did not load successfully due to an error: {exc.msg}")
FileToDF [training_pipe]: 915 messages [03:15, 5.23 messag2023-08-23 16:09:31,028 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2023-08-23 16:09:31,029 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
2023-08-23 16:09:31,048 - distributed.preloading - INFO - Creating preload: dask_cuda.initialize
2023-08-23 16:09:31,048 - distributed.preloading - INFO - Import preload module: dask_cuda.initialize
FileToDF [training_pipe]: 915 messages [03:16, 5.23 messagCreating dask cluster... Done. Dashboard: http://127.0.0.1:40219/status
FileToDF [training_pipe]: 915 messages [03:17, 5.23 messagS3 objects to DF complete. Rows: 103, Cache: miss, Duration: 23103.997707366943 ms, Rate: 4.458102935456811 rows/s
FileToDF [training_pipe]: 1018 messages [03:17, 5.12 messages/s]Creating dask cluster...
Preprocessed [training_pipe]: 1101 messages [03:17, 8.22 messages/Elapsed time since last train is too short
Errors when pipeline fails:
FileToDF [training_pipe]: 1287 messages [06:06, 3.77 me2023-08-22 16:16:21,024 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):essages [06:06, 3.77 messages/s]s]
File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/tornado/iostream.py", line 861, in _read_to_buffer
bytes_read = self.read_from_fd(buf)ages [06:06, 8.32 messages/s]
File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/tornado/iostream.py", line 1116, in read_from_fd
return self.socket.recv_into(buf, len(buf)) messages/s]s/s]
ConnectionResetError: [Errno 104] Connection reset by peer/s]
Preprocessed [inference_pipe]: 0 messages [06:06, ? messages/s]
The above exception was the direct cause of the following exception:
Saved [inference_pipe]: 0 messages [06:06, ? messages/s]
Traceback (most recent call last):
File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/distributed/worker.py", line 1244, in heartbeat
response = await retry_operation(
File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/distributed/utils_comm.py", line 434, in retry_operation
return await retry(
File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/distributed/utils_comm.py", line 413, in retry
return await coro()
File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/distributed/core.py", line 1265, in send_recv_from_rpc
return await send_recv(comm=comm, op=key, **kwargs)
File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/distributed/core.py", line 1024, in send_recv
response = await comm.read(deserializers=deserializers)
File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/distributed/comm/tcp.py", line 241, in read
convert_stream_closed_error(self, e)
File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/distributed/comm/tcp.py", line 142, in convert_stream_closed_error
raise CommClosedError(f"in {obj}: {exc.__class__.__name__}: {exc}") from exc
distributed.comm.core.CommClosedError: in <TCP (closed) ConnectionPool.heartbeat_worker local=tcp://127.0.0.1:52758 remote=tcp://127.0.0.1:41399>: ConnectionResetError: [Errno 104] Connection reset by peer
2023-08-22 16:16:21,096 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.
Traceback (most recent call last):
File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/tornado/iostream.py", line 861, in _read_to_buffer
bytes_read = self.read_from_fd(buf)
File "/opt/conda/envs/morpheus/lib/python3.10/site-packages/tornado/iostream.py", line 1116, in read_from_fd
return self.socket.recv_into(buf, len(buf))
ConnectionResetError: [Errno 104] Connection reset by peer
Full env printout
No response
Other/Misc.
No response
Code of Conduct
- I agree to follow Morpheus' Code of Conduct
- I have searched the open bugs and have found no duplicates for this bug report
Metadata
Assignees
Type
Projects
Status
Done