Skip to content

Conversation

@majosm
Copy link
Collaborator

@majosm majosm commented Dec 9, 2022

Bug fixes for #393. Depends on #417.

Original first post (now outdated):

I think the way we were identifying communication batches with parts wasn't quite right, because it still allows for deadlocks. For example, in the examples/distributed.py case, neither the send nor receive depend on any prior local sends/receives, so they end up in the first comm batch. But they can't both be executed in the first part because the sends have to be done before the receives. This PR reinterprets communication batches as existing in between consecutive parts and then assembles the parts around them.

With this change I can almost get the distributed example working. I get an error:

  File "distributed.py", line 56, in main
    final_res = context["out"].get(queue)
                ~~~~~~~^^^^^^^
KeyError: 'out'

which I was able to fix in a hacky way in order to get it to run successfully. I don't know how to fix this the proper way though.

When I try to run Kaushik's reproducer I get a different error:

Traceback (most recent call last):
  File "pytato/pytato/transform/__init__.py", line 151, in rec
    method = getattr(self, expr._mapper_method)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'CodeGenMapper' object has no attribute 'map_distributed_recv'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "mpi4py/__main__.py", line 7, in <module>
    main()
  File "mpi4py/run.py", line 198, in main
    run_command_line(args)
  File "mpi4py/run.py", line 47, in run_command_line
    run_path(sys.argv[0], run_name='__main__')
  File "<frozen runpy>", line 291, in run_path
  File "<frozen runpy>", line 98, in _run_module_code
  File "<frozen runpy>", line 88, in _run_code
  File "hang_reproducer.py", line 67, in <module>
    prg_per_partition = pt.generate_code_for_partition(distributed_partition)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "pytato/pytato/partition.py", line 407, in generate_code_for_partition
    part_id_to_prg[part.pid] = generate_loopy(d)
                               ^^^^^^^^^^^^^^^^^
  File "pytato/pytato/target/loopy/codegen.py", line 983, in generate_loopy
    insn_id = add_store(name, expr, cg_mapper(expr, state), state, cg_mapper)
                                    ^^^^^^^^^^^^^^^^^^^^^^
  File "pytato/pytato/transform/__init__.py", line 170, in __call__
    return self.rec(expr, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "pytato/pytato/transform/__init__.py", line 161, in rec
    return self.handle_unsupported_array(expr, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "pytato/pytato/transform/__init__.py", line 136, in handle_unsupported_array
    raise UnsupportedArrayError("%s cannot handle expressions of type %s"
pytato.transform.UnsupportedArrayError: CodeGenMapper cannot handle expressions of type <class 'pytato.distributed.nodes.DistributedRecv'>

and if I run the tests I get yet a third error:

Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "mpi4py/__main__.py", line 7, in <module>
    main()
  File "mpi4py/run.py", line 198, in main
    run_command_line(args)
  File "mpi4py/run.py", line 47, in run_command_line
    run_path(sys.argv[0], run_name='__main__')
  File "<frozen runpy>", line 291, in run_path
  File "<frozen runpy>", line 98, in _run_module_code
  File "<frozen runpy>", line 88, in _run_code
  File "pytato/test/test_distributed.py", line 437, in <module>
    run_test_with_mpi_inner()
  File "pytato/test/test_distributed.py", line 59, in run_test_with_mpi_inner
    f(cl.create_some_context, *args)
  File "pytato/test/test_distributed.py", line 169, in _do_test_distributed_execution_random_dag
    pt.verify_distributed_partition(comm, distributed_partition)
  File "pytato/pytato/distributed/verify.py", line 253, in verify_distributed_partition
    defining_pid = output_to_defining_pid[input_name]
                   ~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^
KeyError: _DistributedName(rank=0, name='_pt_recv_id_0')

Any idea how to fix these?

@inducer
Copy link
Owner

inducer commented Dec 9, 2022

Just looked at the code some more. Like I said in the meeting, I agree with the analysis of what the problem is, but I'm not particularly fond of all the (fairly implicit) logic that flows from "sends live here, and receives live one floor down". I think this would look cleaner (and be simpler to reason about if we made proper, separate nodes for sends and receives before entering into the topological sort. This could be as simple as just

@dataclass(frozen=True)
class _SendOperationIdentifier(_CommunicationOperationIdentifier):
    pass

@dataclass(frozen=True)
class _RecvOperationIdentifier(_CommunicationOperationIdentifier):
    pass

(Of course make sure that these don't end up comparing equal to one another.)

@inducer
Copy link
Owner

inducer commented Dec 9, 2022

I'd attribute KeyError: 'out' to naming of things within the namespace of distributed execution. I can see one of two solutions. First, since the output names are user-prescribed, make sure that those names are retained internally, i.e. the "actual" outputs of the DAG get "first dibs" on their (user-specified) names. The other option would be to maintain a mapping between internal and external names. The first option is likely simpler, because it saves one layer of indirection.

@inducer
Copy link
Owner

inducer commented Dec 9, 2022

pytato.transform.UnsupportedArrayError: CodeGenMapper cannot handle expressions of type <class 'pytato.distributed.nodes.DistributedRecv'>

means that a receive node "stuck around" in the DAG until code generation, when in fact the _DistributedInputReplacer should've gotten rid of them. Right now, that only seems to happen "by accident" if the receive is also an output:

name = self.roo_ary_to_name.get(expr)

@inducer
Copy link
Owner

inducer commented Dec 9, 2022

KeyError: _DistributedName(rank=0, name='_pt_recv_id_0') points to us emitting a placeholder for a name that's not in fact a part output.

@majosm
Copy link
Collaborator Author

majosm commented Dec 9, 2022

Added some fixes. The distributed example is working now, and Kaushik's reproducer almost works. Specifically, it works if I add a node after the final receive; there's still some issue for parts in which a receive is both an input and an output.

(I haven't looked at the test error yet, or the _Send/RecvOperationIdentifier stuff.)

@inducer inducer force-pushed the dist-mem-part-2000 branch 3 times, most recently from 537486f to 7e452ae Compare January 9, 2023 20:28
@inducer inducer force-pushed the dist-mem-part-2000-fixes branch 2 times, most recently from aefb354 to e0dba7c Compare January 9, 2023 21:59
@majosm majosm force-pushed the dist-mem-part-2000-fixes branch from 61518e4 to a43973b Compare January 9, 2023 23:02
@inducer
Copy link
Owner

inducer commented Jan 10, 2023

Thanks for pushing that fix! (LGTM!) I'll take over these changes and squash them into #393. Once that's done, I'll get this #393 ready for merge.

@majosm
Copy link
Collaborator Author

majosm commented Jan 10, 2023

@inducer FYI, there's (at least) one more lingering issue. When I run Kaushik's hang reproducer, I see the following errors:

Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "mpi4py/__main__.py", line 7, in <module>
    main()
  File "mpi4py/run.py", line 198, in main
    run_command_line(args)
  File "mpi4py/run.py", line 47, in run_command_line
    run_path(sys.argv[0], run_name='__main__')
  File "<frozen runpy>", line 291, in run_path
  File "<frozen runpy>", line 98, in _run_module_code
  File "<frozen runpy>", line 88, in _run_code
  File "hang_reproducer.py", line 44, in <module>
    pt.verify_distributed_partition(comm, distributed_parts)
  File "pytato/distributed/verify.py", line 208, in verify_distributed_partition
    assert recv_name not in name_to_computing_pid
AssertionError

and

Traceback (most recent call last):
  File "<frozen runpy>", line 198, in _run_module_as_main
  File "<frozen runpy>", line 88, in _run_code
  File "mpi4py/__main__.py", line 7, in <module>
    main()
  File "mpi4py/run.py", line 198, in main
    run_command_line(args)
  File "mpi4py/run.py", line 47, in run_command_line
    run_path(sys.argv[0], run_name='__main__')
  File "<frozen runpy>", line 291, in run_path
  File "<frozen runpy>", line 98, in _run_module_code
  File "<frozen runpy>", line 88, in _run_code
  File "hang_reproducer.py", line 45, in <module>
    prg_per_partition = pt.generate_code_for_partition(distributed_parts)
                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "pytato/partition.py", line 407, in generate_code_for_partition
    part_id_to_prg[part.pid] = generate_loopy(d)
                               ^^^^^^^^^^^^^^^^^
  File "pytato/target/loopy/codegen.py", line 975, in generate_loopy
    state.var_name_gen.add_names(outputs)
  File "pytools/__init__.py", line 2319, in add_names
    self.add_name(name, conflicting_ok=conflicting_ok)
  File "pytools/__init__.py", line 2302, in add_name
    raise ValueError(f"name '{name}' conflicts with existing names")
ValueError: name '_pt_recv_id' conflicts with existing names

(I see both of them at once; I guess one rank is emitting one and the other rank is emitting the other?)

Both errors go away if I make the following change:

diff --git a/hang_reproducer_orig.py b/hang_reproducer_almost_orig.py
index 7d653ef..30e1446 100644
--- a/hang_reproducer_orig.py
+++ b/hang_reproducer_almost_orig.py
@@ -34,12 +34,12 @@ elif rank == 1:
     recv = pt.make_distributed_recv(
                 src_rank=recv_rank, comm_tag=43,
                 shape=(10,), dtype=np.float64)
-    out = pt.make_dict_of_named_arrays({"out1": send, "out2": recv})
+    out = pt.make_dict_of_named_arrays({"out1": send, "out2": 2*recv})
 else:
     raise AssertionError()

So I think it's something to do with the receive being both an input and an output.

@inducer
Copy link
Owner

inducer commented Jan 10, 2023

Good to know! I'll add that as a to-do to #393.

@inducer inducer force-pushed the dist-mem-part-2000 branch 3 times, most recently from effd604 to 692e681 Compare January 12, 2023 01:58
@inducer inducer force-pushed the dist-mem-part-2000 branch 2 times, most recently from c8db611 to f8dacd3 Compare January 13, 2023 15:03
@majosm
Copy link
Collaborator Author

majosm commented Jan 13, 2023

@inducer, @matthiasdiener To get the new partitioning working on the prediction case, I had to change the code to avoid making some assumptions about the incoming arrays. Specifically:

  1. The arrays in outputs might not be unique. For example, in the Y2 case all of the species diffusivity arrays are the same. 95a5ea3
  2. The same array might be sent more than once. I didn't look into the exact specifics of where this was happening, but I think it's either a single array being sent to two different ranks or to the same rank with different tags. Before the change I was getting missing sends reported in the post-partition verification. ae48471

@inducer I haven't incorporated your changes for the hang reproducer issue into this branch yet. Interestingly, with the above changes I no longer get the error; not sure if it's actually fixed or if the non-determinism issue is still lurking somewhere.

Edit: Oh, and I also had to disable the check for partition disjointness (I was getting errors here, both with and without the other changes).

@majosm majosm force-pushed the dist-mem-part-2000-fixes branch from b9eb8ad to da52e71 Compare January 18, 2023 16:58
@kaushikcfd
Copy link
Collaborator

Looks like it was actually kaushikcfd@bfe1bc8 that was causing the majority of the performance improvement (kaushikcfd@5723775 seemed to make a small difference at most; maybe a minute or so).

The earlier commit ensures FusionContractorArraycontext does not fallback to SingleWorkGroupPytatoArraycontext for a reasonably wide class of array expressions and hence improves the kernel execution time by a significant factor. The latter commit is a minor optimization to amortize some operations in the transformation pipeline and has no effect on the quality of the generated code.

@majosm
Copy link
Collaborator Author

majosm commented Jan 19, 2023

Looks like it was actually kaushikcfd@bfe1bc8 that was causing the majority of the performance improvement (kaushikcfd@5723775 seemed to make a small difference at most; maybe a minute or so).

The earlier commit ensures FusionContractorArraycontext does not fallback to SingleWorkGroupPytatoArraycontext for a reasonably wide class of array expressions and hence improves the kernel execution time by a significant factor. The latter commit is a minor optimization to amortize some operations in the transformation pipeline and has no effect on the quality of the generated code.

Sorry, I should have specified I was looking at compile time performance improvement. 🙂 It was taking significantly longer without that pymbolic expressions commit.

@kaushikcfd
Copy link
Collaborator

Sorry, I should have specified I was looking at compile time performance improvement. slightly_smiling_face It was taking significantly longer without that pymbolic expressions commit.

Aah yes. SingleWorkGroupArrayContext takes very long to compile as well. One of the limitations of loopy's implementation (fortunately not a limitation of the IR itself).

@majosm majosm force-pushed the dist-mem-part-2000-fixes branch from 607dd13 to acd764a Compare February 6, 2023 18:59
@majosm majosm force-pushed the dist-mem-part-2000-fixes branch from acd764a to 42982b9 Compare February 6, 2023 19:09

# {{{ crude ordered set

class _OrderedSet(collections.abc.MutableSet[Any]):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe:

Suggested change
class _OrderedSet(collections.abc.MutableSet[Any]):
class _OrderedSet(collections.abc.MutableSet[Hashable]):

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ran into some mypy errors when attempting this, and after struggling with it for a while I ultimately ended up with 7c3f47d.

@majosm
Copy link
Collaborator Author

majosm commented Feb 20, 2023

@inducer I think this is ready for review now. Note: the diff currently contains a couple of Kaushik's commits (see above), and the changes from #417 (also ready for review).

@inducer
Copy link
Owner

inducer commented May 4, 2023

Superseded by #434.

@inducer inducer closed this May 4, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants