Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Script does not finish with dask distributed #257

Open
drrmmng opened this issue Jun 11, 2019 · 9 comments
Open

Script does not finish with dask distributed #257

drrmmng opened this issue Jun 11, 2019 · 9 comments

Comments

@drrmmng
Copy link

drrmmng commented Jun 11, 2019

I'm using a pipeline that reads text from file via Apache Tika, performs some pre-processing and writes it into a MongoDB.
The following is a truncated version of my script.

if __name__ == "__main__":
    mongo_client = MongoClient("mongodb://localhost:27017/")
    dask_client = dask.distributed.Client()
    file_stream_source = Stream()

    file_stream = (
        file_stream_source.scatter()
        .map(add_filesize)
        .map(add_text)
        .map(add_text_lengths)
        .buffer(16)
        .gather()
    )

    file_stream.sink(write_file)

    # file_stream_source emit loop

Everything works well, but the last few documents are missing. It seems like the dask process is killed before the task has finished. The resulting warnings/errors below support this.
Is this behavior expected and I'm using the interface wrong or is this a bug?

Update:
This does not happen when used in a jupyter notebook. Could this be related to the event loop?

distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-2, started daemon)>
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-1, started daemon)>
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-3, started daemon)>
distributed.process - WARNING - reaping stray process <ForkServerProcess(ForkServerProcess-4, started daemon)>
distributed.nanny - WARNING - Worker process 15143 was killed by signal 15
distributed.nanny - WARNING - Worker process 15141 was killed by signal 15
Traceback (most recent call last):
  File "/home/dario/anaconda3/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
    send_bytes(obj)
  File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
Traceback (most recent call last):
  File "/home/dario/anaconda3/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
    send_bytes(obj)
  File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/home/dario/anaconda3/lib/python3.7/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
distributed.nanny - WARNING - Worker process 15139 was killed by signal 15
distributed.nanny - WARNING - Worker process 15145 was killed by signal 15

relevant package versions

streamz                   0.5.1                      py_0    conda-forge
dask                      1.2.2                      py_0  
dask-core                 1.2.2                      py_0  
tornado                   6.0.2            py37h7b6447c_0 
@martindurant
Copy link
Member

Could it be that your main process is ending immediately after you send the last file for processing, so that the cluster gets shut down at that point and remaining processing tasks in flight are lost? In that case, you need your script to wait until all processing is done, perhaps by awaiting on the futures coming back from the pipeline, or sleeping, or polling the cluster to see if it's still working.

@drrmmng
Copy link
Author

drrmmng commented Jun 12, 2019

That is probably the issue I'm facing.

In that case, you need your script to wait until all processing is done, perhaps by awaiting on the futures coming back from the pipeline

How would I do this?
I hope that doesn't sound demanding, but I guess that is what I expected the default behavior to be.

@kivo360
Copy link

kivo360 commented Oct 6, 2019

Start emitting to the source inside of a daemon thread. On the main have a long-running while loop hold the program up till conclusion.
Example:

source = Source()
source.sink(print)

def start_emitting(s):
    for i in range(1000):
       s.emit(i)

if __name__ == "__main__":
    activated_thread = Thread(daemon=True, target=start_emitting, args=(source,))
    activated_thread.start()
    while True:
        sleep(5)

@martindurant
Copy link
Member

@kivo360 : your situation does not seem to involve distributed at all, perhaps a different issue? Please also provide the way in which you are stalling the main thread.

In any case, could it be that simply the print is being blocked? If you replace

source.sink(print)
->
l = source.sink_to_list()

I believe the list will be populated with values.

@kivo360
Copy link

kivo360 commented Oct 9, 2019

Nah, it is involved with distributed. You need to leave the program time to finish. Often times I default to giving the program infinite time by keeping the main process open for extended periods of time.

@martindurant
Copy link
Member

What I mean is, your code doesn't invoke distributed.
But I now understand that you were providing a solution, not a new issue :)

You should be able to achieve something similar with event loops, but your way may be simpler when none of the source nodes need an event loop anyway (but distributed always has one!). There may perhaps be a way say "run until done" on a source (i.e., stop when all of the events have been processed), which in the case with no timing of backpressure would be immediately.

@CJ-Wright
Copy link
Member

Does emit report the dask futures? Can those be awaited?

@martindurant
Copy link
Member

In the original case of working with distributed, yes, you can wait on futures from emit to be processed, but not in the simpler case.

@CJ-Wright
Copy link
Member

In the simpler case can the thread be joined? Does that thread respect backpressure?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants