Skip to content

[BUG] Dask_cudf merge function returns too few rows #11189

Open
@ChrisJar

Description

@ChrisJar

Describe the bug
The dask_cudf merge functions returns too few rows when both the dtype of the column being merged on is mismatched (eg: int64 on the left and int32 on the right) and when npartitions>1

Steps/Code to reproduce bug
Here's a reproducer showing that when the dtype is mismatched the number of rows returned is dependent on the number of partitions in the dataframes being merged:

import cupy as cp
import cudf
import dask_cudf

dfa = cudf.DataFrame({"a":cp.random.randint(0,100,100000), "b":cp.random.normal(size=100000)})
dfb = cudf.DataFrame({"a":cp.random.randint(0,100,100000), "c":cp.random.normal(size=100000)})

dfa["a"] = dfa["a"].astype("int32")
dfb["a"] = dfb["a"].astype("int64")

ddfa = dask_cudf.from_cudf(dfa, npartitions=4)
ddfb = dask_cudf.from_cudf(dfb, npartitions=4)
print("npartitions:")
print("left: {}".format(ddfa.npartitions))
print("right: {}".format(ddfb.npartitions))

print("Number of rows in merge result:")
print(len(ddfa.merge(ddfb, how="inner", on="a")))
print("*"*30)

ddfa = ddfa.repartition(npartitions=3)
ddfb = ddfb.repartition(npartitions=3)
print("npartitions:")
print("left: {}".format(ddfa.npartitions))
print("right: {}".format(ddfb.npartitions))

print("Number of rows in merge result:")
print(len(ddfa.merge(ddfb, how="inner", on="a")))
print("*"*30)

ddfa = ddfa.repartition(npartitions=2)
ddfb = ddfb.repartition(npartitions=2)
print("npartitions:")
print("left: {}".format(ddfa.npartitions))
print("right: {}".format(ddfb.npartitions))

print("Number of rows in merge result:")
print(len(ddfa.merge(ddfb, how="inner", on="a")))
print("*"*30)

ddfa = ddfa.repartition(npartitions=1)
ddfb = ddfb.repartition(npartitions=1)
print("npartitions:")
print("left: {}".format(ddfa.npartitions))
print("right: {}".format(ddfb.npartitions))

print("Number of rows in merge result:")
print(len(ddfa.merge(ddfb, how="inner", on="a")))
print("*"*30)

This returns:

npartitions:
left: 4
right: 4
Number of rows in merge result:
16083716
******************************
npartitions:
left: 3
right: 3
Number of rows in merge result:
35342145
******************************
npartitions:
left: 2
right: 2
Number of rows in merge result:
46959990
******************************
npartitions:
left: 1
right: 1
Number of rows in merge result:
100006687
******************************

Expected behavior
If we perform the same operation with just cudf we can see the expected result:

import cupy as cp
import cudf

dfa = cudf.DataFrame({"a":cp.random.randint(0,100,100000), "b":cp.random.normal(size=100000)})
dfb = cudf.DataFrame({"a":cp.random.randint(0,100,100000), "c":cp.random.normal(size=100000)})

dfa["a"] = dfa["a"].astype("int32")
dfb["a"] = dfb["a"].astype("int64")

print(len(ddfa.merge(ddfb, how="inner", on="a")))

which returns:

100006687

which is the same as the dask_cudf version when npartitions=1

Environment overview (please complete the following information)

  • Environment location: Bare-metal
  • Method of cuDF install: conda

Environment details

cudf                      22.08.00a220629 cuda_11_py38_gff63c0a745_173    rapidsai-nightly
dask-cudf                 22.08.00a220629 cuda_11_py38_gff63c0a745_173    rapidsai-nightly
libcudf                   22.08.00a220629 cuda11_gff63c0a745_173    rapidsai-nightly
dask                      2022.6.1           pyhd8ed1ab_0    conda-forge
dask-core                 2022.6.1           pyhd8ed1ab_0    conda-forge
dask-cuda                 22.08.00a220630         py38_21    rapidsai-nightly
distributed               2022.6.1           pyhd8ed1ab_0    conda-forge

Note
The same thing occurs when how={"left", "right", "outer"}

Metadata

Metadata

Assignees

Labels

bugSomething isn't workingdaskDask issue

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions