Open
Description
Describe the bug
The dask_cudf merge functions returns too few rows when both the dtype of the column being merged on is mismatched (eg: int64
on the left and int32
on the right) and when npartitions>1
Steps/Code to reproduce bug
Here's a reproducer showing that when the dtype is mismatched the number of rows returned is dependent on the number of partitions in the dataframes being merged:
import cupy as cp
import cudf
import dask_cudf
dfa = cudf.DataFrame({"a":cp.random.randint(0,100,100000), "b":cp.random.normal(size=100000)})
dfb = cudf.DataFrame({"a":cp.random.randint(0,100,100000), "c":cp.random.normal(size=100000)})
dfa["a"] = dfa["a"].astype("int32")
dfb["a"] = dfb["a"].astype("int64")
ddfa = dask_cudf.from_cudf(dfa, npartitions=4)
ddfb = dask_cudf.from_cudf(dfb, npartitions=4)
print("npartitions:")
print("left: {}".format(ddfa.npartitions))
print("right: {}".format(ddfb.npartitions))
print("Number of rows in merge result:")
print(len(ddfa.merge(ddfb, how="inner", on="a")))
print("*"*30)
ddfa = ddfa.repartition(npartitions=3)
ddfb = ddfb.repartition(npartitions=3)
print("npartitions:")
print("left: {}".format(ddfa.npartitions))
print("right: {}".format(ddfb.npartitions))
print("Number of rows in merge result:")
print(len(ddfa.merge(ddfb, how="inner", on="a")))
print("*"*30)
ddfa = ddfa.repartition(npartitions=2)
ddfb = ddfb.repartition(npartitions=2)
print("npartitions:")
print("left: {}".format(ddfa.npartitions))
print("right: {}".format(ddfb.npartitions))
print("Number of rows in merge result:")
print(len(ddfa.merge(ddfb, how="inner", on="a")))
print("*"*30)
ddfa = ddfa.repartition(npartitions=1)
ddfb = ddfb.repartition(npartitions=1)
print("npartitions:")
print("left: {}".format(ddfa.npartitions))
print("right: {}".format(ddfb.npartitions))
print("Number of rows in merge result:")
print(len(ddfa.merge(ddfb, how="inner", on="a")))
print("*"*30)
This returns:
npartitions:
left: 4
right: 4
Number of rows in merge result:
16083716
******************************
npartitions:
left: 3
right: 3
Number of rows in merge result:
35342145
******************************
npartitions:
left: 2
right: 2
Number of rows in merge result:
46959990
******************************
npartitions:
left: 1
right: 1
Number of rows in merge result:
100006687
******************************
Expected behavior
If we perform the same operation with just cudf we can see the expected result:
import cupy as cp
import cudf
dfa = cudf.DataFrame({"a":cp.random.randint(0,100,100000), "b":cp.random.normal(size=100000)})
dfb = cudf.DataFrame({"a":cp.random.randint(0,100,100000), "c":cp.random.normal(size=100000)})
dfa["a"] = dfa["a"].astype("int32")
dfb["a"] = dfb["a"].astype("int64")
print(len(ddfa.merge(ddfb, how="inner", on="a")))
which returns:
100006687
which is the same as the dask_cudf version when npartitions=1
Environment overview (please complete the following information)
- Environment location: Bare-metal
- Method of cuDF install: conda
Environment details
cudf 22.08.00a220629 cuda_11_py38_gff63c0a745_173 rapidsai-nightly
dask-cudf 22.08.00a220629 cuda_11_py38_gff63c0a745_173 rapidsai-nightly
libcudf 22.08.00a220629 cuda11_gff63c0a745_173 rapidsai-nightly
dask 2022.6.1 pyhd8ed1ab_0 conda-forge
dask-core 2022.6.1 pyhd8ed1ab_0 conda-forge
dask-cuda 22.08.00a220630 py38_21 rapidsai-nightly
distributed 2022.6.1 pyhd8ed1ab_0 conda-forge
Note
The same thing occurs when how={"left", "right", "outer"}