Skip to content

Flaky test for BatchedSend error handling #4163

@fjetter

Description

@fjetter

We've encountered a flaky test in #4151 for the recently added exception handling of the BatchedSend introduced in #4135

Full build log
2_build (3.7).txt

Test traceback


118
    @pytest.mark.asyncio
119
    @pytest.mark.skipif(
120
        WINDOWS and not PY37 and not TORNADO6, reason="failing on windows, py36, tornado 5."
121
    )
122
    async def test_handles_exceptions():
123
        # Ensure that we properly handle exceptions in BatchedSend.
124
        # https://github.com/pangeo-data/pangeo/issues/788
125
        # mentioned in https://github.com/dask/distributed/issues/4080, but
126
        # possibly distinct.
127
        #
128
        # The reported issues (https://github.com/tornadoweb/tornado/pull/2008)
129
        # claim that the BufferError *should* only happen when the application
130
        # is incorrectly using threads. I haven't been able to construct an
131
        # actual example, so we mock IOStream.write to raise and ensure that
132
        # BufferedSend handles things correctly. We don't (yet) test that
133
        # any *users* of BatchedSend correctly handle BatchedSend dropping
134
        # messages.
135
        async with EchoServer() as e:
136
            comm = await connect(e.address)
137
            b = BatchedSend(interval=10)
138
            b.start(comm)
139
            await asyncio.sleep(0.020)
140
            orig = comm.stream.write
141
    
142
            n = 0
143
    
144
            def raise_buffererror(*args, **kwargs):
145
                nonlocal n
146
                n += 1
147
    
148
                if n == 1:
149
                    raise BufferError("bad!")
150
                elif n == 2:
151
                    orig(*args, **kwargs)
152
                else:
153
                    raise CommClosedError
154
    
155
            with mock.patch.object(comm.stream, "write", wraps=raise_buffererror):
156
                b.send("hello")
157
                b.send("hello")
158
                b.send("world")
159
                await asyncio.sleep(0.020)
160
                result = await comm.read()
161
                assert result == ("hello", "hello", "world")
162
    
163
                b.send("raises when flushed")
164
                await asyncio.sleep(0.020)  # CommClosedError hit in callback
165
    
166
                with pytest.raises(CommClosedError):
167
>                   b.send("raises when sent")
168
E                   Failed: DID NOT RAISE <class 'distributed.comm.core.CommClosedError'>

cc @TomAugspurger

Metadata

Metadata

Assignees

No one assigned

    Labels

    flaky testIntermittent failures on CI.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions