Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid deepcopy when submitting graph #8633

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

use ToPickle

a990563
Select commit
Loading
Failed to load commit list.
Sign in for the full log view
Open

Avoid deepcopy when submitting graph #8633

use ToPickle
a990563
Select commit
Loading
Failed to load commit list.
GitHub Actions / Unit Test Results failed May 3, 2024 in 0s

1 errors, 779 fail, 130 skipped, 1 029 pass in 14h 59m 31s

    18 files   -      9     18 suites   - 9   14h 59m 31s ⏱️ + 4h 48m 8s
 1 939 tests  -  2 112  1 029 ✅  -  2 923  130 💤 +   33    779 ❌ +  777  1 🔥 +1 
16 066 runs   - 35 721  8 013 ✅  - 41 757  788 💤  - 1 227  7 256 ❌ +7 254  9 🔥 +9 

Results for commit a990563. ± Comparison against earlier commit 458eb98.

Annotations

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_basic_merge[inner] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-e631d8eccef279025e42dc005ac2ec27
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:33055', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'inner'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_basic_merge(c, s, a, b, how):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        joined = a.merge(b, left_on="y", right_on="y", how=how)
    
        if dd._dask_expr_enabled():
            # Ensure we're using a hash join
            from dask_expr._merge import HashJoinP2P
    
            assert any(
                isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk()
            )
    
        expected = pd.merge(A, B, how, "y")
>       await list_eq(joined, expected)

distributed/shuffle/tests/test_merge.py:91: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/tests/test_merge.py:36: in list_eq
    a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-e631d8eccef279025e42dc005ac2ec27>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-e631d8eccef279025e42dc005ac2ec27

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_basic_merge[left] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-dd447fb22719f3743c3b573b730bd7f3
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:46629', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'left'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_basic_merge(c, s, a, b, how):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        joined = a.merge(b, left_on="y", right_on="y", how=how)
    
        if dd._dask_expr_enabled():
            # Ensure we're using a hash join
            from dask_expr._merge import HashJoinP2P
    
            assert any(
                isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk()
            )
    
        expected = pd.merge(A, B, how, "y")
>       await list_eq(joined, expected)

distributed/shuffle/tests/test_merge.py:91: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/tests/test_merge.py:36: in list_eq
    a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-dd447fb22719f3743c3b573b730bd7f3>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-dd447fb22719f3743c3b573b730bd7f3

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_basic_merge[right] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-9627eb8818358f60662639d9ff830989
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:35525', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'right'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_basic_merge(c, s, a, b, how):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        joined = a.merge(b, left_on="y", right_on="y", how=how)
    
        if dd._dask_expr_enabled():
            # Ensure we're using a hash join
            from dask_expr._merge import HashJoinP2P
    
            assert any(
                isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk()
            )
    
        expected = pd.merge(A, B, how, "y")
>       await list_eq(joined, expected)

distributed/shuffle/tests/test_merge.py:91: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/tests/test_merge.py:36: in list_eq
    a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-9627eb8818358f60662639d9ff830989>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-9627eb8818358f60662639d9ff830989

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_basic_merge[outer] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-30c90adae378a16295147c29fbcc84b2
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:43845', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'outer'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_basic_merge(c, s, a, b, how):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        joined = a.merge(b, left_on="y", right_on="y", how=how)
    
        if dd._dask_expr_enabled():
            # Ensure we're using a hash join
            from dask_expr._merge import HashJoinP2P
    
            assert any(
                isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk()
            )
    
        expected = pd.merge(A, B, how, "y")
>       await list_eq(joined, expected)

distributed/shuffle/tests/test_merge.py:91: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
distributed/shuffle/tests/test_merge.py:36: in list_eq
    a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-30c90adae378a16295147c29fbcc84b2>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-30c90adae378a16295147c29fbcc84b2

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge_p2p_shuffle_reused_dataframe_with_different_parameters (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-87e18f46c7bf5aab9cc44adadf08aab4
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:37149', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:40069', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:43173', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_merge_p2p_shuffle_reused_dataframe_with_different_parameters(c, s, a, b):
        pdf1 = pd.DataFrame({"a": range(100), "b": range(0, 200, 2)})
        pdf2 = pd.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50})
        ddf1 = dd.from_pandas(pdf1, npartitions=5)
        ddf2 = dd.from_pandas(pdf2, npartitions=10)
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            out = (
                ddf1.merge(ddf2, left_on="a", right_on="x")
                # Vary the number of output partitions for the shuffles of dd2
                .repartition(npartitions=20).merge(ddf2, left_on="b", right_on="x")
            )
        # Generate unique shuffle IDs if the input frame is the same but
        # parameters differ. Reusing shuffles in merges is dangerous because of the
        # required coordination and complexity introduced through dynamic clusters.
        assert sum(id_from_key(k) is not None for k in out.dask) == 4
>       result = await c.compute(out)

distributed/shuffle/tests/test_merge.py:126: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-87e18f46c7bf5aab9cc44adadf08aab4>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-87e18f46c7bf5aab9cc44adadf08aab4

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge_p2p_shuffle_reused_dataframe_with_same_parameters (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-39b72a493975779752b4b99d408b24df
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:34933', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:43809', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:42905', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>

    @gen_cluster(client=True)
    async def test_merge_p2p_shuffle_reused_dataframe_with_same_parameters(c, s, a, b):
        pdf1 = pd.DataFrame({"a": range(100), "b": range(0, 200, 2)})
        pdf2 = pd.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50})
        ddf1 = dd.from_pandas(pdf1, npartitions=5)
        ddf2 = dd.from_pandas(pdf2, npartitions=10)
    
        # This performs two shuffles:
        #   * ddf1 is shuffled on `a`
        #   * ddf2 is shuffled on `x`
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            ddf3 = ddf1.merge(
                ddf2,
                left_on="a",
                right_on="x",
            )
    
        # This performs one shuffle:
        #   * ddf3 is shuffled on `b`
        # We can reuse the shuffle of dd2 on `x` from the previous merge.
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            out = ddf2.merge(
                ddf3,
                left_on="x",
                right_on="b",
            )
        # Generate unique shuffle IDs if the input frame is the same and all its
        # parameters match. Reusing shuffles in merges is dangerous because of the
        # required coordination and complexity introduced through dynamic clusters.
        assert sum(id_from_key(k) is not None for k in out.dask) == 4
>       result = await c.compute(out)

distributed/shuffle/tests/test_merge.py:163: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-39b72a493975779752b4b99d408b24df>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-39b72a493975779752b4b99d408b24df

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge[True-inner] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-43dfb617454cb1aa42c8a3ccdcb16329
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:40515', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'inner', disk = True

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
>           res = await c.compute(joined)

distributed/shuffle/tests/test_merge.py:183: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-43dfb617454cb1aa42c8a3ccdcb16329>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-43dfb617454cb1aa42c8a3ccdcb16329

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge[True-outer] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-af93dbc77b27048ec91353f09450d571
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:43597', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'outer', disk = True

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
>           res = await c.compute(joined)

distributed/shuffle/tests/test_merge.py:183: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-af93dbc77b27048ec91353f09450d571>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-af93dbc77b27048ec91353f09450d571

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge[True-left] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-10a1f67bc4de3d4535f8be22ca0a4a75
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:33601', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'left', disk = True

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
>           res = await c.compute(joined)

distributed/shuffle/tests/test_merge.py:183: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-10a1f67bc4de3d4535f8be22ca0a4a75>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-10a1f67bc4de3d4535f8be22ca0a4a75

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge[True-right] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-6901874377222dc3d690d647fc7da03d
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:40417', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'right', disk = True

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
>           res = await c.compute(joined)

distributed/shuffle/tests/test_merge.py:183: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-6901874377222dc3d690d647fc7da03d>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-6901874377222dc3d690d647fc7da03d

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge[False-inner] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-43dfb617454cb1aa42c8a3ccdcb16329
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:43005', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'inner', disk = False

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
>           res = await c.compute(joined)

distributed/shuffle/tests/test_merge.py:183: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-43dfb617454cb1aa42c8a3ccdcb16329>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-43dfb617454cb1aa42c8a3ccdcb16329

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge[False-outer] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-af93dbc77b27048ec91353f09450d571
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:38005', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'outer', disk = False

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
>           res = await c.compute(joined)

distributed/shuffle/tests/test_merge.py:183: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-af93dbc77b27048ec91353f09450d571>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-af93dbc77b27048ec91353f09450d571

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge[False-left] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-10a1f67bc4de3d4535f8be22ca0a4a75
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:42753', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'left', disk = False

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
>           res = await c.compute(joined)

distributed/shuffle/tests/test_merge.py:183: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-10a1f67bc4de3d4535f8be22ca0a4a75>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-10a1f67bc4de3d4535f8be22ca0a4a75

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge[False-right] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-6901874377222dc3d690d647fc7da03d
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:42717', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
                   x      y
npartitions=2              
0              int64  int64
4                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
                   y      z
npartitions=2              
0              int64  int64
2                ...    ...
5                ...    ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'right', disk = False

    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    @pytest.mark.parametrize("disk", [True, False])
    @gen_cluster(client=True)
    async def test_merge(c, s, a, b, how, disk):
        A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
        a = dd.repartition(A, [0, 4, 5])
    
        B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
        b = dd.repartition(B, [0, 2, 5])
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            with dask.config.set({"distributed.p2p.disk": disk}):
                joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
>           res = await c.compute(joined)

distributed/shuffle/tests/test_merge.py:183: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-6901874377222dc3d690d647fc7da03d>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-6901874377222dc3d690d647fc7da03d

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge_by_multiple_columns[inner] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-2920fb1e84930799e8a936f9714a6850
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:45825', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:35315', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:35547', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'inner'

    @pytest.mark.slow
    @gen_cluster(client=True, timeout=120)
    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    async def test_merge_by_multiple_columns(c, s, a, b, how):
        # warnings here from pandas
        pdf1l = pd.DataFrame(
            {
                "a": list("abcdefghij"),
                "b": list("abcdefghij"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf1r = pd.DataFrame(
            {
                "d": list("abcdefghij"),
                "e": list("abcdefghij"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("abcdefghij"),
        )
    
        pdf2l = pd.DataFrame(
            {
                "a": list("abcdeabcde"),
                "b": list("abcabcabca"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf2r = pd.DataFrame(
            {
                "d": list("edcbaedcba"),
                "e": list("aaabbbcccd"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("fghijklmno"),
        )
    
        pdf3l = pd.DataFrame(
            {
                "a": list("aaaaaaaaaa"),
                "b": list("aaaaaaaaaa"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf3r = pd.DataFrame(
            {
                "d": list("aaabbbccaa"),
                "e": list("abbbbbbbbb"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("ABCDEFGHIJ"),
        )
    
        for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
            for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
                ddl = dd.from_pandas(pdl, lpart)
                ddr = dd.from_pandas(pdr, rpart)
    
                with dask.config.set({"dataframe.shuffle.method": "p2p"}):
                    expected = pdl.join(pdr, how=how)
                    assert_eq(
>                       await c.compute(ddl.join(ddr, how=how)),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not (PANDAS_GE_200 and expected.index.empty),
                    )

distributed/shuffle/tests/test_merge.py:292: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-2920fb1e84930799e8a936f9714a6850>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-2920fb1e84930799e8a936f9714a6850

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge_by_multiple_columns[outer] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-a048d71f41fa2758c0995d78b57592da
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:45587', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:45663', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:38155', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'outer'

    @pytest.mark.slow
    @gen_cluster(client=True, timeout=120)
    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    async def test_merge_by_multiple_columns(c, s, a, b, how):
        # warnings here from pandas
        pdf1l = pd.DataFrame(
            {
                "a": list("abcdefghij"),
                "b": list("abcdefghij"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf1r = pd.DataFrame(
            {
                "d": list("abcdefghij"),
                "e": list("abcdefghij"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("abcdefghij"),
        )
    
        pdf2l = pd.DataFrame(
            {
                "a": list("abcdeabcde"),
                "b": list("abcabcabca"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf2r = pd.DataFrame(
            {
                "d": list("edcbaedcba"),
                "e": list("aaabbbcccd"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("fghijklmno"),
        )
    
        pdf3l = pd.DataFrame(
            {
                "a": list("aaaaaaaaaa"),
                "b": list("aaaaaaaaaa"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf3r = pd.DataFrame(
            {
                "d": list("aaabbbccaa"),
                "e": list("abbbbbbbbb"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("ABCDEFGHIJ"),
        )
    
        for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
            for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
                ddl = dd.from_pandas(pdl, lpart)
                ddr = dd.from_pandas(pdr, rpart)
    
                with dask.config.set({"dataframe.shuffle.method": "p2p"}):
                    expected = pdl.join(pdr, how=how)
                    assert_eq(
>                       await c.compute(ddl.join(ddr, how=how)),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not (PANDAS_GE_200 and expected.index.empty),
                    )

distributed/shuffle/tests/test_merge.py:292: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-a048d71f41fa2758c0995d78b57592da>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-a048d71f41fa2758c0995d78b57592da

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge_by_multiple_columns[left] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-67e9d3bfb23f954a3d341a4de5282f79
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:38023', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:44615', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:35981', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'left'

    @pytest.mark.slow
    @gen_cluster(client=True, timeout=120)
    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    async def test_merge_by_multiple_columns(c, s, a, b, how):
        # warnings here from pandas
        pdf1l = pd.DataFrame(
            {
                "a": list("abcdefghij"),
                "b": list("abcdefghij"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf1r = pd.DataFrame(
            {
                "d": list("abcdefghij"),
                "e": list("abcdefghij"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("abcdefghij"),
        )
    
        pdf2l = pd.DataFrame(
            {
                "a": list("abcdeabcde"),
                "b": list("abcabcabca"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf2r = pd.DataFrame(
            {
                "d": list("edcbaedcba"),
                "e": list("aaabbbcccd"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("fghijklmno"),
        )
    
        pdf3l = pd.DataFrame(
            {
                "a": list("aaaaaaaaaa"),
                "b": list("aaaaaaaaaa"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf3r = pd.DataFrame(
            {
                "d": list("aaabbbccaa"),
                "e": list("abbbbbbbbb"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("ABCDEFGHIJ"),
        )
    
        for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
            for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
                ddl = dd.from_pandas(pdl, lpart)
                ddr = dd.from_pandas(pdr, rpart)
    
                with dask.config.set({"dataframe.shuffle.method": "p2p"}):
                    expected = pdl.join(pdr, how=how)
                    assert_eq(
>                       await c.compute(ddl.join(ddr, how=how)),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not (PANDAS_GE_200 and expected.index.empty),
                    )

distributed/shuffle/tests/test_merge.py:292: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-67e9d3bfb23f954a3d341a4de5282f79>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-67e9d3bfb23f954a3d341a4de5282f79

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge_by_multiple_columns[right] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-3bf5fa457df0231bc7228874bedf69e6
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:44039', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:36643', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:39061', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'right'

    @pytest.mark.slow
    @gen_cluster(client=True, timeout=120)
    @pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
    async def test_merge_by_multiple_columns(c, s, a, b, how):
        # warnings here from pandas
        pdf1l = pd.DataFrame(
            {
                "a": list("abcdefghij"),
                "b": list("abcdefghij"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf1r = pd.DataFrame(
            {
                "d": list("abcdefghij"),
                "e": list("abcdefghij"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("abcdefghij"),
        )
    
        pdf2l = pd.DataFrame(
            {
                "a": list("abcdeabcde"),
                "b": list("abcabcabca"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf2r = pd.DataFrame(
            {
                "d": list("edcbaedcba"),
                "e": list("aaabbbcccd"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("fghijklmno"),
        )
    
        pdf3l = pd.DataFrame(
            {
                "a": list("aaaaaaaaaa"),
                "b": list("aaaaaaaaaa"),
                "c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
            },
            index=list("abcdefghij"),
        )
        pdf3r = pd.DataFrame(
            {
                "d": list("aaabbbccaa"),
                "e": list("abbbbbbbbb"),
                "f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
            },
            index=list("ABCDEFGHIJ"),
        )
    
        for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
            for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
                ddl = dd.from_pandas(pdl, lpart)
                ddr = dd.from_pandas(pdr, rpart)
    
                with dask.config.set({"dataframe.shuffle.method": "p2p"}):
                    expected = pdl.join(pdr, how=how)
                    assert_eq(
>                       await c.compute(ddl.join(ddr, how=how)),
                        expected,
                        # FIXME: There's an discrepancy with an empty index for
                        # pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
                        # Temporarily avoid index check until the discrepancy is fixed.
                        check_index=not (PANDAS_GE_200 and expected.index.empty),
                    )

distributed/shuffle/tests/test_merge.py:292: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-3bf5fa457df0231bc7228874bedf69e6>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-3bf5fa457df0231bc7228874bedf69e6

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_index_merge_p2p[inner] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-8bee82abf05338bfacf545f2eacbe0bc
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:40853', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:46655', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:35891', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'inner'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_index_merge_p2p(c, s, a, b, how):
        pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
        pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
    
        left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
        right = dd.from_pandas(pdf_right, npartitions=6)
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            assert_eq(
>               await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
                pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
            )

distributed/shuffle/tests/test_merge.py:388: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-8bee82abf05338bfacf545f2eacbe0bc>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-8bee82abf05338bfacf545f2eacbe0bc

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_index_merge_p2p[left] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-75316c9754a970eda681ec3d7b465f99
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:34807', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:40081', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:41465', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'left'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_index_merge_p2p(c, s, a, b, how):
        pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
        pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
    
        left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
        right = dd.from_pandas(pdf_right, npartitions=6)
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            assert_eq(
>               await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
                pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
            )

distributed/shuffle/tests/test_merge.py:388: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-75316c9754a970eda681ec3d7b465f99>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-75316c9754a970eda681ec3d7b465f99

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_index_merge_p2p[right] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-20004be3884007b7f7661bebdc70ffa7
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:38833', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:41531', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:35113', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'right'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_index_merge_p2p(c, s, a, b, how):
        pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
        pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
    
        left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
        right = dd.from_pandas(pdf_right, npartitions=6)
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            assert_eq(
>               await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
                pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
            )

distributed/shuffle/tests/test_merge.py:388: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-20004be3884007b7f7661bebdc70ffa7>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-20004be3884007b7f7661bebdc70ffa7

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_index_merge_p2p[outer] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-c39a92473466263a55511f2615156ee2
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:33269', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:32799', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:46789', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'outer'

    @pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
    @gen_cluster(client=True)
    async def test_index_merge_p2p(c, s, a, b, how):
        pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
        pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
    
        left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
        right = dd.from_pandas(pdf_right, npartitions=6)
    
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
            assert_eq(
>               await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
                pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
            )

distributed/shuffle/tests/test_merge.py:388: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-c39a92473466263a55511f2615156ee2>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-c39a92473466263a55511f2615156ee2

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge_with_npartitions[4] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-cb5dcf43490a0323d9216c8645e554f6
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:35153', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:34587', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:38797', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
npartitions = 4

    @pytest.mark.parametrize("npartitions", [4, 5, 10, 20])
    @gen_cluster(client=True)
    async def test_merge_with_npartitions(c, s, a, b, npartitions):
        pdf = pd.DataFrame({"a": [1, 2, 3, 4] * 10, "b": 1})
    
        left = dd.from_pandas(pdf, npartitions=10)
        right = dd.from_pandas(pdf, npartitions=5)
    
        expected = pdf.merge(pdf)
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
>           result = await c.compute(left.merge(right, npartitions=npartitions))

distributed/shuffle/tests/test_merge.py:408: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-cb5dcf43490a0323d9216c8645e554f6>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-cb5dcf43490a0323d9216c8645e554f6

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge_with_npartitions[5] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-d6634a86dd53665a78003094935e45c4
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:44341', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:40267', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:40909', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
npartitions = 5

    @pytest.mark.parametrize("npartitions", [4, 5, 10, 20])
    @gen_cluster(client=True)
    async def test_merge_with_npartitions(c, s, a, b, npartitions):
        pdf = pd.DataFrame({"a": [1, 2, 3, 4] * 10, "b": 1})
    
        left = dd.from_pandas(pdf, npartitions=10)
        right = dd.from_pandas(pdf, npartitions=5)
    
        expected = pdf.merge(pdf)
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
>           result = await c.compute(left.merge(right, npartitions=npartitions))

distributed/shuffle/tests/test_merge.py:408: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-d6634a86dd53665a78003094935e45c4>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-d6634a86dd53665a78003094935e45c4

distributed/client.py:342: CancelledError

Check warning on line 0 in distributed.shuffle.tests.test_merge

See this annotation in the file changed.

@github-actions github-actions / Unit Test Results

All 13 runs failed: test_merge_with_npartitions[10] (distributed.shuffle.tests.test_merge)

artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-9b5d2873c7675e6fbccafd7cf2117ac1
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:44311', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:41895', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:37915', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
npartitions = 10

    @pytest.mark.parametrize("npartitions", [4, 5, 10, 20])
    @gen_cluster(client=True)
    async def test_merge_with_npartitions(c, s, a, b, npartitions):
        pdf = pd.DataFrame({"a": [1, 2, 3, 4] * 10, "b": 1})
    
        left = dd.from_pandas(pdf, npartitions=10)
        right = dd.from_pandas(pdf, npartitions=5)
    
        expected = pdf.merge(pdf)
        with dask.config.set({"dataframe.shuffle.method": "p2p"}):
>           result = await c.compute(left.merge(right, npartitions=npartitions))

distributed/shuffle/tests/test_merge.py:408: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Future: cancelled, key: finalize-9b5d2873c7675e6fbccafd7cf2117ac1>
raiseit = True

    async def _result(self, raiseit=True):
        await self._state.wait()
        if self.status == "error":
            exc = clean_exception(self._state.exception, self._state.traceback)
            if raiseit:
                typ, exc, tb = exc
                raise exc.with_traceback(tb)
            else:
                return exc
        elif self.status == "cancelled":
            exception = CancelledError(self.key)
            if raiseit:
>               raise exception
E               concurrent.futures._base.CancelledError: finalize-9b5d2873c7675e6fbccafd7cf2117ac1

distributed/client.py:342: CancelledError