1 errors, 779 fail, 130 skipped, 1 029 pass in 14h 59m 31s
Annotations
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 13 runs failed: test_basic_merge[inner] (distributed.shuffle.tests.test_merge)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-e631d8eccef279025e42dc005ac2ec27
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:33055', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'inner'
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_basic_merge(c, s, a, b, how):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
joined = a.merge(b, left_on="y", right_on="y", how=how)
if dd._dask_expr_enabled():
# Ensure we're using a hash join
from dask_expr._merge import HashJoinP2P
assert any(
isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk()
)
expected = pd.merge(A, B, how, "y")
> await list_eq(joined, expected)
distributed/shuffle/tests/test_merge.py:91:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/shuffle/tests/test_merge.py:36: in list_eq
a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: finalize-e631d8eccef279025e42dc005ac2ec27>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: finalize-e631d8eccef279025e42dc005ac2ec27
distributed/client.py:342: CancelledError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 13 runs failed: test_basic_merge[left] (distributed.shuffle.tests.test_merge)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-dd447fb22719f3743c3b573b730bd7f3
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:46629', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'left'
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_basic_merge(c, s, a, b, how):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
joined = a.merge(b, left_on="y", right_on="y", how=how)
if dd._dask_expr_enabled():
# Ensure we're using a hash join
from dask_expr._merge import HashJoinP2P
assert any(
isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk()
)
expected = pd.merge(A, B, how, "y")
> await list_eq(joined, expected)
distributed/shuffle/tests/test_merge.py:91:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/shuffle/tests/test_merge.py:36: in list_eq
a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: finalize-dd447fb22719f3743c3b573b730bd7f3>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: finalize-dd447fb22719f3743c3b573b730bd7f3
distributed/client.py:342: CancelledError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 13 runs failed: test_basic_merge[right] (distributed.shuffle.tests.test_merge)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-9627eb8818358f60662639d9ff830989
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:35525', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'right'
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_basic_merge(c, s, a, b, how):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
joined = a.merge(b, left_on="y", right_on="y", how=how)
if dd._dask_expr_enabled():
# Ensure we're using a hash join
from dask_expr._merge import HashJoinP2P
assert any(
isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk()
)
expected = pd.merge(A, B, how, "y")
> await list_eq(joined, expected)
distributed/shuffle/tests/test_merge.py:91:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/shuffle/tests/test_merge.py:36: in list_eq
a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: finalize-9627eb8818358f60662639d9ff830989>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: finalize-9627eb8818358f60662639d9ff830989
distributed/client.py:342: CancelledError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 13 runs failed: test_basic_merge[outer] (distributed.shuffle.tests.test_merge)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-30c90adae378a16295147c29fbcc84b2
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:43845', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'outer'
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_basic_merge(c, s, a, b, how):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
joined = a.merge(b, left_on="y", right_on="y", how=how)
if dd._dask_expr_enabled():
# Ensure we're using a hash join
from dask_expr._merge import HashJoinP2P
assert any(
isinstance(expr, HashJoinP2P) for expr in joined.optimize()._expr.walk()
)
expected = pd.merge(A, B, how, "y")
> await list_eq(joined, expected)
distributed/shuffle/tests/test_merge.py:91:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
distributed/shuffle/tests/test_merge.py:36: in list_eq
a = await c.compute(a) if isinstance(a, dd.DataFrame) else a
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: finalize-30c90adae378a16295147c29fbcc84b2>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: finalize-30c90adae378a16295147c29fbcc84b2
distributed/client.py:342: CancelledError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 13 runs failed: test_merge_p2p_shuffle_reused_dataframe_with_different_parameters (distributed.shuffle.tests.test_merge)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-87e18f46c7bf5aab9cc44adadf08aab4
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:37149', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:40069', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:43173', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_merge_p2p_shuffle_reused_dataframe_with_different_parameters(c, s, a, b):
pdf1 = pd.DataFrame({"a": range(100), "b": range(0, 200, 2)})
pdf2 = pd.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50})
ddf1 = dd.from_pandas(pdf1, npartitions=5)
ddf2 = dd.from_pandas(pdf2, npartitions=10)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
out = (
ddf1.merge(ddf2, left_on="a", right_on="x")
# Vary the number of output partitions for the shuffles of dd2
.repartition(npartitions=20).merge(ddf2, left_on="b", right_on="x")
)
# Generate unique shuffle IDs if the input frame is the same but
# parameters differ. Reusing shuffles in merges is dangerous because of the
# required coordination and complexity introduced through dynamic clusters.
assert sum(id_from_key(k) is not None for k in out.dask) == 4
> result = await c.compute(out)
distributed/shuffle/tests/test_merge.py:126:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: finalize-87e18f46c7bf5aab9cc44adadf08aab4>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: finalize-87e18f46c7bf5aab9cc44adadf08aab4
distributed/client.py:342: CancelledError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 13 runs failed: test_merge_p2p_shuffle_reused_dataframe_with_same_parameters (distributed.shuffle.tests.test_merge)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-39b72a493975779752b4b99d408b24df
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:34933', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:43809', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:42905', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
@gen_cluster(client=True)
async def test_merge_p2p_shuffle_reused_dataframe_with_same_parameters(c, s, a, b):
pdf1 = pd.DataFrame({"a": range(100), "b": range(0, 200, 2)})
pdf2 = pd.DataFrame({"x": range(200), "y": [1, 2, 3, 4] * 50})
ddf1 = dd.from_pandas(pdf1, npartitions=5)
ddf2 = dd.from_pandas(pdf2, npartitions=10)
# This performs two shuffles:
# * ddf1 is shuffled on `a`
# * ddf2 is shuffled on `x`
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
ddf3 = ddf1.merge(
ddf2,
left_on="a",
right_on="x",
)
# This performs one shuffle:
# * ddf3 is shuffled on `b`
# We can reuse the shuffle of dd2 on `x` from the previous merge.
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
out = ddf2.merge(
ddf3,
left_on="x",
right_on="b",
)
# Generate unique shuffle IDs if the input frame is the same and all its
# parameters match. Reusing shuffles in merges is dangerous because of the
# required coordination and complexity introduced through dynamic clusters.
assert sum(id_from_key(k) is not None for k in out.dask) == 4
> result = await c.compute(out)
distributed/shuffle/tests/test_merge.py:163:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: finalize-39b72a493975779752b4b99d408b24df>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: finalize-39b72a493975779752b4b99d408b24df
distributed/client.py:342: CancelledError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 13 runs failed: test_merge[True-inner] (distributed.shuffle.tests.test_merge)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-43dfb617454cb1aa42c8a3ccdcb16329
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:40515', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'inner', disk = True
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_merge(c, s, a, b, how, disk):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
> res = await c.compute(joined)
distributed/shuffle/tests/test_merge.py:183:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: finalize-43dfb617454cb1aa42c8a3ccdcb16329>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: finalize-43dfb617454cb1aa42c8a3ccdcb16329
distributed/client.py:342: CancelledError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 13 runs failed: test_merge[True-outer] (distributed.shuffle.tests.test_merge)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-af93dbc77b27048ec91353f09450d571
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:43597', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'outer', disk = True
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_merge(c, s, a, b, how, disk):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
> res = await c.compute(joined)
distributed/shuffle/tests/test_merge.py:183:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: finalize-af93dbc77b27048ec91353f09450d571>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: finalize-af93dbc77b27048ec91353f09450d571
distributed/client.py:342: CancelledError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 13 runs failed: test_merge[True-left] (distributed.shuffle.tests.test_merge)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-10a1f67bc4de3d4535f8be22ca0a4a75
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:33601', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'left', disk = True
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_merge(c, s, a, b, how, disk):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
> res = await c.compute(joined)
distributed/shuffle/tests/test_merge.py:183:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: finalize-10a1f67bc4de3d4535f8be22ca0a4a75>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: finalize-10a1f67bc4de3d4535f8be22ca0a4a75
distributed/client.py:342: CancelledError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 13 runs failed: test_merge[True-right] (distributed.shuffle.tests.test_merge)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-6901874377222dc3d690d647fc7da03d
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:40417', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'right', disk = True
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_merge(c, s, a, b, how, disk):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
> res = await c.compute(joined)
distributed/shuffle/tests/test_merge.py:183:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: finalize-6901874377222dc3d690d647fc7da03d>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: finalize-6901874377222dc3d690d647fc7da03d
distributed/client.py:342: CancelledError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 13 runs failed: test_merge[False-inner] (distributed.shuffle.tests.test_merge)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-43dfb617454cb1aa42c8a3ccdcb16329
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:43005', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'inner', disk = False
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_merge(c, s, a, b, how, disk):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
> res = await c.compute(joined)
distributed/shuffle/tests/test_merge.py:183:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: finalize-43dfb617454cb1aa42c8a3ccdcb16329>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: finalize-43dfb617454cb1aa42c8a3ccdcb16329
distributed/client.py:342: CancelledError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 13 runs failed: test_merge[False-outer] (distributed.shuffle.tests.test_merge)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-af93dbc77b27048ec91353f09450d571
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:38005', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'outer', disk = False
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_merge(c, s, a, b, how, disk):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
> res = await c.compute(joined)
distributed/shuffle/tests/test_merge.py:183:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: finalize-af93dbc77b27048ec91353f09450d571>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: finalize-af93dbc77b27048ec91353f09450d571
distributed/client.py:342: CancelledError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 13 runs failed: test_merge[False-left] (distributed.shuffle.tests.test_merge)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-10a1f67bc4de3d4535f8be22ca0a4a75
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:42753', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'left', disk = False
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_merge(c, s, a, b, how, disk):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
> res = await c.compute(joined)
distributed/shuffle/tests/test_merge.py:183:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: finalize-10a1f67bc4de3d4535f8be22ca0a4a75>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: finalize-10a1f67bc4de3d4535f8be22ca0a4a75
distributed/client.py:342: CancelledError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 13 runs failed: test_merge[False-right] (distributed.shuffle.tests.test_merge)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-6901874377222dc3d690d647fc7da03d
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:42717', workers: 0, cores: 0, tasks: 0>
a = Dask DataFrame Structure:
x y
npartitions=2
0 int64 int64
4 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
b = Dask DataFrame Structure:
y z
npartitions=2
0 int64 int64
2 ... ...
5 ... ...
Dask Name: repartition-dataframe, 1 graph layer
how = 'right', disk = False
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
@pytest.mark.parametrize("disk", [True, False])
@gen_cluster(client=True)
async def test_merge(c, s, a, b, how, disk):
A = pd.DataFrame({"x": [1, 2, 3, 4, 5, 6], "y": [1, 1, 2, 2, 3, 4]})
a = dd.repartition(A, [0, 4, 5])
B = pd.DataFrame({"y": [1, 3, 4, 4, 5, 6], "z": [6, 5, 4, 3, 2, 1]})
b = dd.repartition(B, [0, 2, 5])
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
with dask.config.set({"distributed.p2p.disk": disk}):
joined = dd.merge(a, b, left_index=True, right_index=True, how=how)
> res = await c.compute(joined)
distributed/shuffle/tests/test_merge.py:183:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: finalize-6901874377222dc3d690d647fc7da03d>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: finalize-6901874377222dc3d690d647fc7da03d
distributed/client.py:342: CancelledError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 13 runs failed: test_merge_by_multiple_columns[inner] (distributed.shuffle.tests.test_merge)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-2920fb1e84930799e8a936f9714a6850
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:45825', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:35315', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:35547', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'inner'
@pytest.mark.slow
@gen_cluster(client=True, timeout=120)
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
async def test_merge_by_multiple_columns(c, s, a, b, how):
# warnings here from pandas
pdf1l = pd.DataFrame(
{
"a": list("abcdefghij"),
"b": list("abcdefghij"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf1r = pd.DataFrame(
{
"d": list("abcdefghij"),
"e": list("abcdefghij"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("abcdefghij"),
)
pdf2l = pd.DataFrame(
{
"a": list("abcdeabcde"),
"b": list("abcabcabca"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf2r = pd.DataFrame(
{
"d": list("edcbaedcba"),
"e": list("aaabbbcccd"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("fghijklmno"),
)
pdf3l = pd.DataFrame(
{
"a": list("aaaaaaaaaa"),
"b": list("aaaaaaaaaa"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf3r = pd.DataFrame(
{
"d": list("aaabbbccaa"),
"e": list("abbbbbbbbb"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("ABCDEFGHIJ"),
)
for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
ddl = dd.from_pandas(pdl, lpart)
ddr = dd.from_pandas(pdr, rpart)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
expected = pdl.join(pdr, how=how)
assert_eq(
> await c.compute(ddl.join(ddr, how=how)),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not (PANDAS_GE_200 and expected.index.empty),
)
distributed/shuffle/tests/test_merge.py:292:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: finalize-2920fb1e84930799e8a936f9714a6850>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: finalize-2920fb1e84930799e8a936f9714a6850
distributed/client.py:342: CancelledError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 13 runs failed: test_merge_by_multiple_columns[outer] (distributed.shuffle.tests.test_merge)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-a048d71f41fa2758c0995d78b57592da
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:45587', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:45663', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:38155', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'outer'
@pytest.mark.slow
@gen_cluster(client=True, timeout=120)
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
async def test_merge_by_multiple_columns(c, s, a, b, how):
# warnings here from pandas
pdf1l = pd.DataFrame(
{
"a": list("abcdefghij"),
"b": list("abcdefghij"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf1r = pd.DataFrame(
{
"d": list("abcdefghij"),
"e": list("abcdefghij"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("abcdefghij"),
)
pdf2l = pd.DataFrame(
{
"a": list("abcdeabcde"),
"b": list("abcabcabca"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf2r = pd.DataFrame(
{
"d": list("edcbaedcba"),
"e": list("aaabbbcccd"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("fghijklmno"),
)
pdf3l = pd.DataFrame(
{
"a": list("aaaaaaaaaa"),
"b": list("aaaaaaaaaa"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf3r = pd.DataFrame(
{
"d": list("aaabbbccaa"),
"e": list("abbbbbbbbb"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("ABCDEFGHIJ"),
)
for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
ddl = dd.from_pandas(pdl, lpart)
ddr = dd.from_pandas(pdr, rpart)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
expected = pdl.join(pdr, how=how)
assert_eq(
> await c.compute(ddl.join(ddr, how=how)),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not (PANDAS_GE_200 and expected.index.empty),
)
distributed/shuffle/tests/test_merge.py:292:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: finalize-a048d71f41fa2758c0995d78b57592da>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: finalize-a048d71f41fa2758c0995d78b57592da
distributed/client.py:342: CancelledError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 13 runs failed: test_merge_by_multiple_columns[left] (distributed.shuffle.tests.test_merge)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-67e9d3bfb23f954a3d341a4de5282f79
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:38023', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:44615', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:35981', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'left'
@pytest.mark.slow
@gen_cluster(client=True, timeout=120)
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
async def test_merge_by_multiple_columns(c, s, a, b, how):
# warnings here from pandas
pdf1l = pd.DataFrame(
{
"a": list("abcdefghij"),
"b": list("abcdefghij"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf1r = pd.DataFrame(
{
"d": list("abcdefghij"),
"e": list("abcdefghij"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("abcdefghij"),
)
pdf2l = pd.DataFrame(
{
"a": list("abcdeabcde"),
"b": list("abcabcabca"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf2r = pd.DataFrame(
{
"d": list("edcbaedcba"),
"e": list("aaabbbcccd"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("fghijklmno"),
)
pdf3l = pd.DataFrame(
{
"a": list("aaaaaaaaaa"),
"b": list("aaaaaaaaaa"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf3r = pd.DataFrame(
{
"d": list("aaabbbccaa"),
"e": list("abbbbbbbbb"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("ABCDEFGHIJ"),
)
for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
ddl = dd.from_pandas(pdl, lpart)
ddr = dd.from_pandas(pdr, rpart)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
expected = pdl.join(pdr, how=how)
assert_eq(
> await c.compute(ddl.join(ddr, how=how)),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not (PANDAS_GE_200 and expected.index.empty),
)
distributed/shuffle/tests/test_merge.py:292:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: finalize-67e9d3bfb23f954a3d341a4de5282f79>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: finalize-67e9d3bfb23f954a3d341a4de5282f79
distributed/client.py:342: CancelledError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 13 runs failed: test_merge_by_multiple_columns[right] (distributed.shuffle.tests.test_merge)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-3bf5fa457df0231bc7228874bedf69e6
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:44039', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:36643', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:39061', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'right'
@pytest.mark.slow
@gen_cluster(client=True, timeout=120)
@pytest.mark.parametrize("how", ["inner", "outer", "left", "right"])
async def test_merge_by_multiple_columns(c, s, a, b, how):
# warnings here from pandas
pdf1l = pd.DataFrame(
{
"a": list("abcdefghij"),
"b": list("abcdefghij"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf1r = pd.DataFrame(
{
"d": list("abcdefghij"),
"e": list("abcdefghij"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("abcdefghij"),
)
pdf2l = pd.DataFrame(
{
"a": list("abcdeabcde"),
"b": list("abcabcabca"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf2r = pd.DataFrame(
{
"d": list("edcbaedcba"),
"e": list("aaabbbcccd"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("fghijklmno"),
)
pdf3l = pd.DataFrame(
{
"a": list("aaaaaaaaaa"),
"b": list("aaaaaaaaaa"),
"c": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
},
index=list("abcdefghij"),
)
pdf3r = pd.DataFrame(
{
"d": list("aaabbbccaa"),
"e": list("abbbbbbbbb"),
"f": [10, 9, 8, 7, 6, 5, 4, 3, 2, 1],
},
index=list("ABCDEFGHIJ"),
)
for pdl, pdr in [(pdf1l, pdf1r), (pdf2l, pdf2r), (pdf3l, pdf3r)]:
for lpart, rpart in [(2, 2), (3, 2), (2, 3)]:
ddl = dd.from_pandas(pdl, lpart)
ddr = dd.from_pandas(pdr, rpart)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
expected = pdl.join(pdr, how=how)
assert_eq(
> await c.compute(ddl.join(ddr, how=how)),
expected,
# FIXME: There's an discrepancy with an empty index for
# pandas=2.0 (xref https://github.com/dask/dask/issues/9957).
# Temporarily avoid index check until the discrepancy is fixed.
check_index=not (PANDAS_GE_200 and expected.index.empty),
)
distributed/shuffle/tests/test_merge.py:292:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: finalize-3bf5fa457df0231bc7228874bedf69e6>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: finalize-3bf5fa457df0231bc7228874bedf69e6
distributed/client.py:342: CancelledError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 13 runs failed: test_index_merge_p2p[inner] (distributed.shuffle.tests.test_merge)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-8bee82abf05338bfacf545f2eacbe0bc
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:40853', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:46655', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:35891', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'inner'
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_index_merge_p2p(c, s, a, b, how):
pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
right = dd.from_pandas(pdf_right, npartitions=6)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
assert_eq(
> await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
)
distributed/shuffle/tests/test_merge.py:388:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: finalize-8bee82abf05338bfacf545f2eacbe0bc>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: finalize-8bee82abf05338bfacf545f2eacbe0bc
distributed/client.py:342: CancelledError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 13 runs failed: test_index_merge_p2p[left] (distributed.shuffle.tests.test_merge)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-75316c9754a970eda681ec3d7b465f99
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:34807', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:40081', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:41465', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'left'
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_index_merge_p2p(c, s, a, b, how):
pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
right = dd.from_pandas(pdf_right, npartitions=6)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
assert_eq(
> await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
)
distributed/shuffle/tests/test_merge.py:388:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: finalize-75316c9754a970eda681ec3d7b465f99>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: finalize-75316c9754a970eda681ec3d7b465f99
distributed/client.py:342: CancelledError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 13 runs failed: test_index_merge_p2p[right] (distributed.shuffle.tests.test_merge)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-20004be3884007b7f7661bebdc70ffa7
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:38833', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:41531', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:35113', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'right'
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_index_merge_p2p(c, s, a, b, how):
pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
right = dd.from_pandas(pdf_right, npartitions=6)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
assert_eq(
> await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
)
distributed/shuffle/tests/test_merge.py:388:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: finalize-20004be3884007b7f7661bebdc70ffa7>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: finalize-20004be3884007b7f7661bebdc70ffa7
distributed/client.py:342: CancelledError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 13 runs failed: test_index_merge_p2p[outer] (distributed.shuffle.tests.test_merge)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-c39a92473466263a55511f2615156ee2
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:33269', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:32799', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:46789', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
how = 'outer'
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
@gen_cluster(client=True)
async def test_index_merge_p2p(c, s, a, b, how):
pdf_left = pd.DataFrame({"a": [4, 2, 3] * 10, "b": 1}).set_index("a")
pdf_right = pd.DataFrame({"a": [4, 2, 3] * 10, "c": 1})
left = dd.from_pandas(pdf_left, npartitions=5, sort=False)
right = dd.from_pandas(pdf_right, npartitions=6)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
assert_eq(
> await c.compute(left.merge(right, how=how, left_index=True, right_on="a")),
pdf_left.merge(pdf_right, how=how, left_index=True, right_on="a"),
)
distributed/shuffle/tests/test_merge.py:388:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: finalize-c39a92473466263a55511f2615156ee2>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: finalize-c39a92473466263a55511f2615156ee2
distributed/client.py:342: CancelledError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 13 runs failed: test_merge_with_npartitions[4] (distributed.shuffle.tests.test_merge)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-cb5dcf43490a0323d9216c8645e554f6
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:35153', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:34587', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:38797', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
npartitions = 4
@pytest.mark.parametrize("npartitions", [4, 5, 10, 20])
@gen_cluster(client=True)
async def test_merge_with_npartitions(c, s, a, b, npartitions):
pdf = pd.DataFrame({"a": [1, 2, 3, 4] * 10, "b": 1})
left = dd.from_pandas(pdf, npartitions=10)
right = dd.from_pandas(pdf, npartitions=5)
expected = pdf.merge(pdf)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
> result = await c.compute(left.merge(right, npartitions=npartitions))
distributed/shuffle/tests/test_merge.py:408:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: finalize-cb5dcf43490a0323d9216c8645e554f6>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: finalize-cb5dcf43490a0323d9216c8645e554f6
distributed/client.py:342: CancelledError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 13 runs failed: test_merge_with_npartitions[5] (distributed.shuffle.tests.test_merge)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-d6634a86dd53665a78003094935e45c4
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:44341', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:40267', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:40909', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
npartitions = 5
@pytest.mark.parametrize("npartitions", [4, 5, 10, 20])
@gen_cluster(client=True)
async def test_merge_with_npartitions(c, s, a, b, npartitions):
pdf = pd.DataFrame({"a": [1, 2, 3, 4] * 10, "b": 1})
left = dd.from_pandas(pdf, npartitions=10)
right = dd.from_pandas(pdf, npartitions=5)
expected = pdf.merge(pdf)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
> result = await c.compute(left.merge(right, npartitions=npartitions))
distributed/shuffle/tests/test_merge.py:408:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: finalize-d6634a86dd53665a78003094935e45c4>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: finalize-d6634a86dd53665a78003094935e45c4
distributed/client.py:342: CancelledError
Check warning on line 0 in distributed.shuffle.tests.test_merge
github-actions / Unit Test Results
All 13 runs failed: test_merge_with_npartitions[10] (distributed.shuffle.tests.test_merge)
artifacts/ubuntu-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_expr-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-3.9-no_queue-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-default-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-numpy-ci1/pytest.xml [took 0s]
artifacts/ubuntu-latest-mindeps-pandas-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.10-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.11-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.12-default-ci1/pytest.xml [took 0s]
artifacts/windows-latest-3.9-default-ci1/pytest.xml [took 0s]
Raw output
concurrent.futures._base.CancelledError: finalize-9b5d2873c7675e6fbccafd7cf2117ac1
c = <Client: No scheduler connected>
s = <Scheduler 'tcp://127.0.0.1:44311', workers: 0, cores: 0, tasks: 0>
a = <Worker 'tcp://127.0.0.1:41895', name: 0, status: closed, stored: 0, running: 0/1, ready: 0, comm: 0, waiting: 0>
b = <Worker 'tcp://127.0.0.1:37915', name: 1, status: closed, stored: 0, running: 0/2, ready: 0, comm: 0, waiting: 0>
npartitions = 10
@pytest.mark.parametrize("npartitions", [4, 5, 10, 20])
@gen_cluster(client=True)
async def test_merge_with_npartitions(c, s, a, b, npartitions):
pdf = pd.DataFrame({"a": [1, 2, 3, 4] * 10, "b": 1})
left = dd.from_pandas(pdf, npartitions=10)
right = dd.from_pandas(pdf, npartitions=5)
expected = pdf.merge(pdf)
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
> result = await c.compute(left.merge(right, npartitions=npartitions))
distributed/shuffle/tests/test_merge.py:408:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <Future: cancelled, key: finalize-9b5d2873c7675e6fbccafd7cf2117ac1>
raiseit = True
async def _result(self, raiseit=True):
await self._state.wait()
if self.status == "error":
exc = clean_exception(self._state.exception, self._state.traceback)
if raiseit:
typ, exc, tb = exc
raise exc.with_traceback(tb)
else:
return exc
elif self.status == "cancelled":
exception = CancelledError(self.key)
if raiseit:
> raise exception
E concurrent.futures._base.CancelledError: finalize-9b5d2873c7675e6fbccafd7cf2117ac1
distributed/client.py:342: CancelledError