Skip to content

Conversation

@TomAugspurger
Copy link
Member

This changes exception handling in BatchedSend.

The problem: BatchedSend.send is a regular function. What happens if we
add a message to be sent, but that send fails at a later time? In the
current structure, that exception is logged and then lost to the void.

This PR makes things a bit more robust in two ways:

  1. For unexpected errors, we retry a few times. In
    Dask tasks not being shared among workers pangeo-data/pangeo#788 manual retries did
    fix the issue, so we try that here automatically.
  2. For CommClosedErrors or repeated unexpected errors, we close the
    BatchedSend object. By closing the BatchedSend after an exception,
    subsequent BatchedSend.send calls will fail.

We still face the issue with a potential exception happening in the
background that aren't followed by another BatchedSend.send failing.
But I think that's unavoidable given the current design.

I think this better addresses the root problem than #4128. #4128 may still be useful, but I'll revisit it after testing this out.

This changes exception handling in BatchedSend.

The problem: BatchedSend.send is a regular function. What happens if we
add a message to be sent, but that send fails at a later time? In the
current structure, that exception is logged and then lost to the void.

This PR makes things a bit more robust in two ways:

1. For unexpected errors, we retry a few times. In
   pangeo-data/pangeo#788 manual retries did
   fix the issue, so we try that here automatically.
2. For CommClosedErrors or repeated unexpected errors, we close the
   BatchedSend object. By closing the BatchedSend after an exception,
   *subsequent* `BatchedSend.send` calls will fail.

We still face the issue with a potential exception happening in the
background that *aren't* followed by another `BatchedSend.send` failing.
But I think that's unavoidable given the current design.
#
# The reported issues (https://github.com/tornadoweb/tornado/pull/2008)
# claim that the BufferError *should* only happen when the application
# is incorrectly using threads. I haven't been able to construct an
Copy link
Member Author

@TomAugspurger TomAugspurger Sep 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tornadoweb/tornado#2008 (comment) may actually be relevant here. stackimpact was calling sys._current_frames(). I wonder if our profiler might be causing similar issues? Let me try playing with that. That was apparently from a signal handler though.

@TomAugspurger
Copy link
Member Author

Test failure on Windows Python3.6 with tornado 5.x

================================== FAILURES ===================================
___________________________ test_handles_exceptions ___________________________

    @pytest.mark.asyncio
    async def test_handles_exceptions():
        # Ensure that we properly handle exceptions in BatchedSend.
        # https://github.com/pangeo-data/pangeo/issues/788
        # mentioned in https://github.com/dask/distributed/issues/4080, but
        # possibly distinct.
        #
        # The reported issues (https://github.com/tornadoweb/tornado/pull/2008)
        # claim that the BufferError *should* only happen when the application
        # is incorrectly using threads. I haven't been able to construct an
        # actual example, so we mock IOStream.write to raise and ensure that
        # BufferedSend handles things correctly. We don't (yet) test that
        # any *users* of BatchedSend correctly handle BatchedSend dropping
        # messages.
        async with EchoServer() as e:
            comm = await connect(e.address)
            b = BatchedSend(interval=10)
            b.start(comm)
            await asyncio.sleep(0.020)
            orig = comm.stream.write
    
            n = 0
    
            def raise_buffererror(*args, **kwargs):
                nonlocal n
                n += 1
    
                if n == 1:
                    raise BufferError("bad!")
                elif n == 2:
                    orig(*args, **kwargs)
                else:
                    raise CommClosedError
    
            with mock.patch.object(comm.stream, "write", wraps=raise_buffererror):
                b.send("hello")
                b.send("hello")
                b.send("world")
                await asyncio.sleep(0.020)
                result = await comm.read()
                assert result == ("hello", "hello", "world")
    
                b.send("raises when flushed")
                await asyncio.sleep(0.020)  # CommClosedError hit in callback
    
                with pytest.raises(CommClosedError):
>                   b.send("raises when sent")
E                   Failed: DID NOT RAISE <class 'distributed.comm.core.CommClosedError'>

I believe we test python 3.6 and tornado 5 on Linux in https://travis-ci.org/github/dask/distributed/jobs/731248712, so it's likely just a windows & tornado 5 / py36 issue. Going to re-run but I'll likely just skip the test there.

@mrocklin
Copy link
Member

mrocklin commented Sep 29, 2020 via email

@TomAugspurger
Copy link
Member Author

Yes, it took a bunch of tries but I think I have confirmation that it did!

2020-09-29T20:21:59.022379955Z dask_gateway.dask_cli - INFO - Requesting scale to 30 workers from 1
2020-09-29T20:22:08.019871943Z distributed.batched - WARNING - Error in batched write, retrying
2020-09-29T20:24:20.094309559Z dask_gateway.dask_cli - INFO - Requesting scale to 1 workers from 30

The warning only showed up once, so retrying soon after must have been successful, and we didn't have to retire the worker.

That was using https://binder.pangeo.io/v2/gh/TomAugspurger/pangeo-binder-test/bf5ec5e89fcf46737ec9fb2b6715989815fdf591.

I haven't made much progress on a reproducer. In an attempt to identify profile.watch as the culprit I reduced the tick profile interval and ran the workload repeatedly. It took 3-ish times to reproduce the issue with the default settings, and I didn't reproduce it after 10 tries with the reduced frequency. So I'd say that's weak evidence for the profiler being the issue, but it'd be nice to actually verify that.

@mrocklin
Copy link
Member

mrocklin commented Sep 29, 2020 via email

@TomAugspurger
Copy link
Member Author

increase the frequency watch, or introduce a small sleep at an awkward time there if we wanted to trigger this issue.

Tried both of those already and (surprisingly?) I still couldn't reproduce locally. I really that that introducing a sleep somewhere in process (maybe only if "tornado" in ident") would trigger the condition.

I will test if increasing the profile frequency increases the likelihood that it raises on the pangeo cluster.

@TomAugspurger
Copy link
Member Author

TomAugspurger commented Sep 30, 2020

With higher tick frequencies

{
    "DASK_DISTRIBUTED__WORKER__PROFILE__INTERVAL": "1ms",
    "DASK_DISTRIBUTED__WORKER__PROFILE__CYCLE": "10ms"
}

I did see the failure again with dask 2.28.0, but it's still not occurring reliably. I don't know if that makes the profiler more or less likely to be the culprit.

Short-term, I think merging this would be OK, even without a real reproducer. I'm still uncertain on how fruitful additional debugging will be here. I don't know if it matters, but one thing I noticed while looking at the output of profile is that on dask-gateway we're using SSL. If a socket writing over SSL is slower (is that actually the case?) then we might be more likely to hit this (under the assumption that if bytearray += data takes longer, then we have more time to make an additional reference to it).

@TomAugspurger
Copy link
Member Author

@mrocklin coming back to this, are you comfortable merging this despite not having a concrete reproducer? #4135 (comment) has decent evidence that we've fixed the issue for pangeo's use case.

@mrocklin
Copy link
Member

mrocklin commented Oct 6, 2020

Yes, that's fine with me. Are we good to merge now? If so, then please go ahead.

@TomAugspurger TomAugspurger merged commit b9dd003 into dask:master Oct 6, 2020
@TomAugspurger
Copy link
Member Author

Thanks!

@TomAugspurger TomAugspurger deleted the pangeo-788-robust-comm branch October 6, 2020 14:56
lr4d pushed a commit to lr4d/distributed that referenced this pull request Oct 9, 2020
This changes exception handling in BatchedSend to be more robust to failures.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants