Skip to content

Script does not finish with dask distributed #257

Open
@drrmmng

Description

@drrmmng

I'm using a pipeline that reads text from file via Apache Tika, performs some pre-processing and writes it into a MongoDB.
The following is a truncated version of my script.

if __name__ == "__main__":
    mongo_client = MongoClient("mongodb://localhost:27017/")
    dask_client = dask.distributed.Client()
    file_stream_source = Stream()

    file_stream = (
        file_stream_source.scatter()
        .map(add_filesize)
        .map(add_text)
        .map(add_text_lengths)
        .buffer(16)
        .gather()
    )

    file_stream.sink(write_file)

    # file_stream_source emit loop

Everything works well, but the last few documents are missing. It seems like the dask process is killed before the task has finished. The resulting warnings/errors below support this.
Is this behavior expected and I'm using the interface wrong or is this a bug?

Update:
This does not happen when used in a jupyter notebook. Could this be related to the event loop?

distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-2, started daemon)>
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-1, started daemon)>
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-3, started daemon)>
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-4, started daemon)>
distributed.nanny - WARNING - Worker process 15143 was killed by signal 15
distributed.nanny - WARNING - Worker process 15141 was killed by signal 15
Traceback (most recent call last):
  File "/home/dario/anaconda3/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
    send_bytes(obj)
  File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
Traceback (most recent call last):
  File "/home/dario/anaconda3/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
    send_bytes(obj)
  File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
distributed.nanny - WARNING - Worker process 15139 was killed by signal 15
distributed.nanny - WARNING - Worker process 15145 was killed by signal 15

relevant package versions

streamz                   0.5.1                      py_0    conda-forge
dask                      1.2.2                      py_0  
dask-core                 1.2.2                      py_0  
tornado                   6.0.2            py37h7b6447c_0 

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions