Open
Description
I'm using a pipeline that reads text from file via Apache Tika, performs some pre-processing and writes it into a MongoDB.
The following is a truncated version of my script.
if __name__ == "__main__":
mongo_client = MongoClient("mongodb://localhost:27017/")
dask_client = dask.distributed.Client()
file_stream_source = Stream()
file_stream = (
file_stream_source.scatter()
.map(add_filesize)
.map(add_text)
.map(add_text_lengths)
.buffer(16)
.gather()
)
file_stream.sink(write_file)
# file_stream_source emit loop
Everything works well, but the last few documents are missing. It seems like the dask process is killed before the task has finished. The resulting warnings/errors below support this.
Is this behavior expected and I'm using the interface wrong or is this a bug?
Update:
This does not happen when used in a jupyter notebook. Could this be related to the event loop?
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-2, started daemon)>
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-1, started daemon)>
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-3, started daemon)>
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-4, started daemon)>
distributed.nanny - WARNING - Worker process 15143 was killed by signal 15
distributed.nanny - WARNING - Worker process 15141 was killed by signal 15
Traceback (most recent call last):
File "/home/dario/anaconda3/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
send_bytes(obj)
File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
self._send_bytes(m[offset:offset + size])
File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
self._send(header + buf)
File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
Traceback (most recent call last):
File "/home/dario/anaconda3/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
send_bytes(obj)
File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
self._send_bytes(m[offset:offset + size])
File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
self._send(header + buf)
File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
distributed.nanny - WARNING - Worker process 15139 was killed by signal 15
distributed.nanny - WARNING - Worker process 15145 was killed by signal 15
relevant package versions
streamz 0.5.1 py_0 conda-forge
dask 1.2.2 py_0
dask-core 1.2.2 py_0
tornado 6.0.2 py37h7b6447c_0
Metadata
Metadata
Assignees
Labels
No labels