Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make combine_similar less greedy for merge #334

Merged
merged 17 commits into from
Oct 16, 2023
Merged

Conversation

phofl
Copy link
Collaborator

@phofl phofl commented Oct 13, 2023

This makes combine_similar less greedy, we are currently removing projections on both sides and see if we are equal, we should do one after another to avoid pushing projections up unnecessarily. Shuffling is expensive, so this should help quite a bit.

This sits on top of #333, the previous structure made this change even harder that it was anyway

@@ -206,3 +207,26 @@ def test_merge_combine_similar(npartitions_left, npartitions_right):
expected["new"] = expected.b + expected.c
expected = expected.groupby(["a", "e", "x"]).new.sum()
assert_eq(query, expected)


def test_merge_combine_similar_intermediate_projections():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reproducer for the behavior being targeted by this PR? This test seems to pass on main for me.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the bug was blocked by the additional layer in #333 but shows up now

@phofl
Copy link
Collaborator Author

phofl commented Oct 16, 2023

Previous tree with the reproducer:

Assign: key='new'
  Projection: columns=['b', 'x', 'y']
    Projection: columns=['b', 'x', 'd', 'y']
      HashJoinP2P: left_on='b' right_on='d' left_index=False right_index=False shuffle_left_on='b' shuffle_right_on='d'
        HashJoinP2P: left_on=['a'] right_on=['a'] left_index=False right_index=False shuffle_left_on=['a'] shuffle_right_on=['a']
          FromPandas: frame='<pandas>' npartitions=2 columns=['a', 'b']
          FromPandas: frame='<pandas>' npartitions=3
        FromPandas: frame='<pandas>' npartitions=3 columns=['d', 'y']
  Add:
    Projection: columns='b'
      Projection: columns=['b', 'a', 'd']
        HashJoinP2P: left_on='b' right_on='d' left_index=False right_index=False shuffle_left_on='b' shuffle_right_on='d'
          HashJoinP2P: left_on=['a'] right_on=['a'] left_index=False right_index=False shuffle_left_on=['a'] shuffle_right_on=['a']
            FromPandas: frame='<pandas>' npartitions=2 columns=['a', 'b']
            FromPandas: frame='<pandas>' npartitions=3
          FromPandas: frame='<pandas>' npartitions=3 columns=['d', 'y']
    Projection: columns='x'
      Projection: columns=['b', 'x', 'd']
        HashJoinP2P: left_on='b' right_on='d' left_index=False right_index=False shuffle_left_on='b' shuffle_right_on='d'
          HashJoinP2P: left_on=['a'] right_on=['a'] left_index=False right_index=False shuffle_left_on=['a'] shuffle_right_on=['a']
            FromPandas: frame='<pandas>' npartitions=2 columns=['a', 'b']
            FromPandas: frame='<pandas>' npartitions=3
          FromPandas: frame='<pandas>' npartitions=3 columns=['d', 'y']

We have the 2 Join layers right next to each other without dropping unnecessary columns

The new tree:

Assign: key='new'
  Projection: columns=['b', 'x', 'y']
    HashJoinP2P: left_on='b' right_on='d' left_index=False right_index=False shuffle_left_on='b' shuffle_right_on='d'
      Projection: columns=['b', 'x']
        HashJoinP2P: left_on=['a'] right_on=['a'] left_index=False right_index=False shuffle_left_on=['a'] shuffle_right_on=['a']
          FromPandas: frame='<pandas>' npartitions=2 columns=['a', 'b']
          FromPandas: frame='<pandas>' npartitions=3
      FromPandas: frame='<pandas>' npartitions=3 columns=['d', 'y']
  Add:
    Projection: columns='b'
      HashJoinP2P: left_on='b' right_on='d' left_index=False right_index=False shuffle_left_on='b' shuffle_right_on='d'
        Projection: columns=['b']
          HashJoinP2P: left_on=['a'] right_on=['a'] left_index=False right_index=False shuffle_left_on=['a'] shuffle_right_on=['a']
            FromPandas: frame='<pandas>' npartitions=2 columns=['a', 'b']
            Projection: columns=['a']
              FromPandas: frame='<pandas>' npartitions=3
        Projection: columns=['d']
          FromPandas: frame='<pandas>' npartitions=3 columns=['d', 'y']
    Projection: columns='x'
      HashJoinP2P: left_on='b' right_on='d' left_index=False right_index=False shuffle_left_on='b' shuffle_right_on='d'
        Projection: columns=['b', 'x']
          HashJoinP2P: left_on=['a'] right_on=['a'] left_index=False right_index=False shuffle_left_on=['a'] shuffle_right_on=['a']
            FromPandas: frame='<pandas>' npartitions=2 columns=['a', 'b']
            FromPandas: frame='<pandas>' npartitions=3
        Projection: columns=['d']
          FromPandas: frame='<pandas>' npartitions=3 columns=['d', 'y']

We have an intermediate Projection that drops unnecessary columns right away

Copy link
Member

@rjzamora rjzamora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic is pretty hard to follow (mostly because it was already complicated), but I don't have any better ideas, and I don't think it is worth the effort to spend much time attempting a simplification until we have addressed #336

@@ -23,6 +23,7 @@
from dask_expr._util import _convert_to_list

_HASH_COLUMN_NAME = "__hash_partition"
_PARTITION_COLUMNS = "_partitions"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Rename to _PARTITION_COLUMN for clarity.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good point, thx

dask_expr/_expr.py Show resolved Hide resolved
@phofl phofl merged commit 8024327 into dask:main Oct 16, 2023
4 checks passed
@phofl phofl deleted the combine_similar branch October 16, 2023 18:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants