Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protocol for distributed data #7

Open
fschlimb opened this issue Oct 6, 2021 · 41 comments
Open

Protocol for distributed data #7

fschlimb opened this issue Oct 6, 2021 · 41 comments

Comments

@fschlimb
Copy link

fschlimb commented Oct 6, 2021

In order for an ndarray/dataframe system to interact with a variety of frameworks in a distributed environment (such as clusters of workstations) a stable description of the distribution characteristics is needed.

The __partitioned__ protocol accomplishes this by defining a structure which provides the necessary meta-information. Implementations show that it allows data exchange on between different distributed frameworks without unnecessary data transmission or even copies.

The structure defines how the data is partitioned, where it is located and provides a function to access the local data. It does not define or provide any means of communication, messaging and/or resource management. It merely describes the current distribution state.

In a way, this is similar to the meta-data which dlpack provides for exchanging data within a single node (including the local GPU) - but it does it for data which lives on more than on process/node. It complements mechanism for intra-node exchange, such as dlpack, __array_interface__ and alike.

The current lack of such a structure typically leads to one of the following scenarios when connecting different frameworks:

  • a data consumer implements a dedicated import functionality for every distributed data container it sees important enough. As an example, xgboost_ray implements a variety of data_sources.
    This PR lets xgboost_ray automatically work with any new container that supports __partitioned__ (the extra code for modin DF and Ray/MLDataSet are no longer needed once they support it, too)
  • the user needs to explicitly deal with the distributed nature of the data. This either leads to unnecessary data transfer and/or developers need to understand internals of the data-container. In the latter case they get exposed to explicit parallelism/distribution while often the original intend of the producer was to hide exactly that.
    The implementation here avoids that by entirely hiding the distribution features but still allowing zero-copy data exchange between __partitioned__ (exemplified by modin (PR) and HeAT (PR)) and scikit-learn-intelex/daal4py.
  • frameworks like dask and ray wrap established APIs (such as pytorch, xgboost etc) and ask developers to switch to the new framework/API and to adopt their programing model.

A structure like __partitioned__ enables distributed data exchange between various frameworks which can be seamless to users while avoiding unnecessary data transfer.

The __partitioned__ protocol is an initial proposal, any feedback from this consortium will be highly appreciated. We would like to build a neutral, open governance that continues to oversee the evolution of the spec. For example, if some version of it receives positive response from the community we would be more than happy to donate the __partitioned__ protocol to the data-api consortium or host it in a clean github org.

@rgommers
Copy link
Member

rgommers commented Oct 7, 2021

Thanks @fschlimb, this is quite interesting. I think there's significant interest in having a standardized distributed extension of __dlpack__ et al.

After reading the protocol description, I think I understand the data shape/partitioning, but I'm less sure about the "locality" information. All it says is:
"location': The location (or home) of the partition, given as a sequence of ip-addresses or ranks or..."
Shouldn't this be better specified for multiple libraries to be able to talk to each other?

What the data objects are is also a little tricky:
"'data': The actual data provided as ObjRef, Future, array or DF or..."
I would imagine that this should instead say (if we adopt this protocol here) that it should be any object that implements the interface of the array object in the array API standard.

For dataframes there is some overlap with the chunking concept in __dataframe__ (see https://data-apis.org/dataframe-protocol/latest/index.html), although the scope of that is much more limited.

I'd be curious to see what maintainers of libraries with distributed data structures think - @jakirkham, @devin-petersohn, @shwina any thoughts?

@fschlimb
Copy link
Author

fschlimb commented Oct 7, 2021

Thanks @rgommers for your feedback.

After reading the protocol description, I think I understand the data shape/partitioning, but I'm less sure about the "locality" information. All it says is: "location': The location (or home) of the partition, given as a sequence of ip-addresses or ranks or..." Shouldn't this be better specified for multiple libraries to be able to talk to each other?

Yes, that's not only a good questions but also a hard issue to solve. Note that there is a paragraph below which gives more details. We started by not considering cross-library (e.g. cross ray and dask for example) exchange but we are now seeing that this will most likely be needed. Some of our prototypes work in such an heterogenous environment. To make this officially supported we indeed require this to be more concrete.

Interchange between dask and ray could most likely be addressed by requiring ip-addresses and disallowing dask's more flexible identifiers. However, interchange with MPI is more difficult since MPI requires more than an ip-address. For going ray/dask->MPI the ip-address should be sufficient. Going the other direction (MPI->ray/dask) is trickier. Any idea?

What the data objects are is also a little tricky: "'data': The actual data provided as ObjRef, Future, array or DF or..." I would imagine that this should instead say (if we adopt this protocol here) that it should be any object that implements the interface of the array object in the array API standard.

Yes, this can use some re-wording and specification (also in the detailed paragraph). Currently, the intend of this protocol is to support both, arrays as well as dataframes. I'd support the idea of requiring objects that either support the interface of the array object in the array API standard or the equivalent in the dataframe API standard.

For dataframes there is some overlap with the chunking concept in __dataframe__ (see https://data-apis.org/dataframe-protocol/latest/index.html), although the scope of that is much more limited.

Good point. Note, the protocol here can also describe objects that are partitioned on a single node. The name of the protocol here deliberately does not include the string 'distributed'. Maybe there is a way to bring this together?

I'd be curious to see what maintainers of libraries with distributed data structures think - @jakirkham, @devin-petersohn, @shwina any thoughts?

Yes! Just in case anyone wants to read a large part of discussion which led to the proposal: read here

@kkraus14
Copy link

kkraus14 commented Oct 7, 2021

Interchange between dask and ray could most likely be addressed by requiring ip-addresses and disallowing dask's more flexible identifiers. However, interchange with MPI is more difficult since MPI requires more than an ip-address. For going ray/dask->MPI the ip-address should be sufficient. Going the other direction (MPI->ray/dask) is trickier. Any idea?

I've been thinking about this and this feels like a huge can of worms. You'd ultimately want to handle every level in the memory hierarchy in this situation, so for CPUs you'd want something like:

  • host (IP address)
    • socket
      • numa node (may need something more granular than this)

For accelerators like GPUs you'd want to do similar but also handle the relevant stream / queue for the memory as well:

  • host (IP address)
    • device (may need something more granular than this)
      • stream / queue

The device and stream / queue specification seem to overlap quite a bit with work done for the array interchange protocol with dlpack.

Given the level of information down to a specific device stream / queue or NUMA node, you could presumably match that to the correlated MPI / SHMEM rank.

Yes, this can use some re-wording and specification (also in the detailed paragraph). Currently, the intend of this protocol is to support both, arrays as well as dataframes. I'd support the idea of requiring objects that either support the interface of the array object in the array API standard or the equivalent in the dataframe API standard.

I think the challenge with this is that objects like dataframes are backed by multiple allocations where the dataframe could be partitioned by columns, rows, or both, and could live in different memory spaces. I think it would make more sense to have this protocol deal with memory and then have a distributed dataframe protocol built on top of it.

@fschlimb
Copy link
Author

fschlimb commented Oct 7, 2021

Interchange between dask and ray could most likely be addressed by requiring ip-addresses and disallowing dask's more flexible identifiers. However, interchange with MPI is more difficult since MPI requires more than an ip-address. For going ray/dask->MPI the ip-address should be sufficient. Going the other direction (MPI->ray/dask) is trickier. Any idea?

I've been thinking about this and this feels like a huge can of worms. You'd ultimately want to handle every level in the memory hierarchy in this situation, so for CPUs you'd want something like:

  • host (IP address)

    • socket

      • numa node (may need something more granular than this)

For accelerators like GPUs you'd want to do similar but also handle the relevant stream / queue for the memory as well:

  • host (IP address)

    • device (may need something more granular than this)

      • stream / queue

The device and stream / queue specification seem to overlap quite a bit with work done for the array interchange protocol with dlpack.

Yes, one goal is definitely to minimize such overlap as much as possible. __partitioned__ tries to only define the "address" or "location" in which the local object is accessible. 'get' will return an object which can be handled in the usual intra-node manner. For example, it will conform to the array API and so have __dlpack__ and __dlpack_device__. With that we have everything need to properly deal with devices, streams and such (otherwise these protocols would be broken). The protocol suggested here is meant to describe the cross-node level of the memory hierarchy only.

Given the level of information down to a specific device stream / queue or NUMA node, you could presumably match that to the correlated MPI / SHMEM rank.

Yes, this can use some re-wording and specification (also in the detailed paragraph). Currently, the intend of this protocol is to support both, arrays as well as dataframes. I'd support the idea of requiring objects that either support the interface of the array object in the array API standard or the equivalent in the dataframe API standard.

I think the challenge with this is that objects like dataframes are backed by multiple allocations where the dataframe could be partitioned by columns, rows, or both, and could live in different memory spaces. I think it would make more sense to have this protocol deal with memory and then have a distributed dataframe protocol built on top of it.

@kkraus14 Not sure I fully understand. Is this related to the above observation that __dataframe__'s chunks represent a related concepts to the partitions here? Producers of __partitioned__ are free to define partitions in alignment with their internal allocation strategy (which is in fact one of the main drivers behind this: if that is not possible we immediately lose the ability to use the data with zero-copy). How would your suggestion handle distributed arrays?

@kkraus14
Copy link

kkraus14 commented Oct 7, 2021

Yes, one goal is definitely to minimize such overlap as much as possible. __partitioned__ tries to only define the "address" or "location" in which the local object is accessible. 'get' will return an object which can be handled in the usual intra-node manner. For example, it will conform to the array API and so have __dlpack__ and __dlpack_device__. With that we have everything need to properly deal with devices, streams and such (otherwise these protocols would be broken). The protocol suggested here is meant to describe the cross-node level of the memory hierarchy only.

I disagree. If I run something with a single process per socket or NUMA node or GPU then I need to know which chunk belongs to which process and I need more information that just an IP address in order to accomplish that. I.E. imagine I have a box with 4 sockets and I run something with 4 MPI ranks (each one pinned to a socket and its local memory) and within each rank I spin up a dask worker. All 4 dask workers and MPI ranks would have the same IP address, but I'd still want to align the workers and ranks.

@kkraus14 Not sure I fully understand. Is this related to the above observation that __dataframe__'s chunks represent a related concepts to the partitions here? Producers of __partitioned__ are free to define partitions in alignment with their internal allocation strategy (which is in fact one of the main drivers behind this: if that is not possible we immediately lose the ability to use the data with zero-copy). How would your suggestion handle distributed arrays?

I think the problem with this is that what protocols are / aren't supported via __partitioned__? If someone creates a __graph__ protocol or a __some_arbitrary_thing__ protocol, are objects that support those protocols allowed for the data member? I'd argue that by limiting the __partitioned__ protocol to only a memory description, it more effectively allows other protocols like distributed arrays, distributed dataframes, distributed graphs, distributed etc. to be defined on top of it.

@fschlimb
Copy link
Author

fschlimb commented Oct 7, 2021

I disagree. If I run something with a single process per socket or NUMA node or GPU then I need to know which chunk belongs to which process and I need more information that just an IP address in order to accomplish that. I.E. imagine I have a box with 4 sockets and I run something with 4 MPI ranks (each one pinned to a socket and its local memory) and within each rank I spin up a dask worker. All 4 dask workers and MPI ranks would have the same IP address, but I'd still want to align the workers and ranks.

Yes, that's why the spec defines that for MPI(-likes) the location must be the rank. That's an inherent characteristic of MPI and that's why that information must be in the protocol. And yes, going MPI to dask is trickier than the other way around. In your example the MPI process and the dask worker process will be distinct. In your example, why is the MPI rank not enough?

@kkraus14 Not sure I fully understand. Is this related to the above observation that __dataframe__'s chunks represent a related concepts to the partitions here? Producers of __partitioned__ are free to define partitions in alignment with their internal allocation strategy (which is in fact one of the main drivers behind this: if that is not possible we immediately lose the ability to use the data with zero-copy). How would your suggestion handle distributed arrays?

I think the problem with this is that what protocols are / aren't supported via __partitioned__? If someone creates a __graph__ protocol or a __some_arbitrary_thing__ protocol, are objects that support those protocols allowed for the data member? I'd argue that by limiting the __partitioned__ protocol to only a memory description, it more effectively allows other protocols like distributed arrays, distributed dataframes, distributed graphs, distributed etc. to be defined on top of it.

Sounds interesting, Could you give an example of what you have in mind? I am not sure I see how you could be that generic.

@kkraus14
Copy link

kkraus14 commented Oct 7, 2021

Yes, that's why the spec defines that for MPI(-likes) the location must be the rank. That's an inherent characteristic of MPI and that's why that information must be in the protocol. And yes, going MPI to dask is trickier than the other way around. In your example the MPI process and the dask worker process will be distinct. In your example, why is the MPI rank not enough?

Not necessarily, they could be in the same process. Lets remove MPI and replace it with something like PySpark or Presto instead. There's no MPI in the mix so there's no rank and it's likely that there's multiple workers running on a single node. There's more and more distributed systems nowadays that are not MPI/SHMEM based and cannot rely on using a rank to indicate a worker.

If everything was MPI / SHMEM based than in theory we could use the rank, but unfortunately that isn't the case and for making a general protocol we should capture the general case.

Sounds interesting, Could you give an example of what you have in mind? I am not sure I see how you could be that generic.

Let's take a stab at it. Let's say I have two distributed libraries which I want to connect and neither depend on MPI / SHMEM:

  • libdf
    • Distributed dataframe library, runs a process per GPU that leverages the NUMA local host resources as well
  • libml
    • Distributed ML library, runs a process per CPU NUMA node and a process per GPU

Given a cluster with 3 nodes, each with 2 sockets that have 2 NUMA nodes and 4 GPUs (1 GPU per NUMA node). The deployment would look like:

  • libdf
    • 4 Processes per Node, each process leverages 1 GPU and 1 NUMA node
  • libml
    • 8 Processes per Node
      • 4 Processes for leveraging the GPUs
      • 4 Processes for leveraging the CPU NUMA nodes

Now say that libdf has produced a distributed dataframe that we wish to hand to libml. The distributed dataframe can look like the following:

  • 2 columns, A and B, both string type without nulls (backed by two buffers per column, can't use array protocol)
    • column A is stored in GPU memory
    • column B is stored in host memory
  • The dataframes are partitioned row-wise per process
    • Given we have 3 nodes x 4 processes per node, this means we have 12 processes and 12 local dataframes to represent the distributed dataframe
      • Each local dataframe has 2 columns, each of which is backed by 2 buffers, so 4 buffers per process (2 on GPU, 2 on CPU) that need to be communicated
    • Lets assume each partition has 100 rows
  • libml wants column A in its GPU processes and column B in its CPU NUMA processes

Now say we want use the __partitioned__ protocol to pass the data between the two libraries and a reference to each local dataframe is used as the data member. It would look something like:

__partitioned__ = {
  'shape': (1200,2),
  'partition_tiling': (12,),
  'partitions': {
      (0,):  {
        'start': (0,),
        'shape': (100,),
        'data': ObjRef0,
        'location': ['1.1.1.1'], 
      },
      (1,): {
        'start': (100,),
        'shape': (100,),
        'data': ObjRef1,
        'location': ['1.1.1.1'],
      },
      (2,): {
        'start': (200,),
        'shape': (100,),
        'data': ObjRef2,
        'location': ['1.1.1.1'],
      },
      (3,): {
        'start': (300,),
        'shape': (100,),
        'data': ObjRef3,
        'location': ['1.1.1.1'],
      },
      (4,):  {
        'start': (400,),
        'shape': (100,),
        'data': ObjRef4,
        'location': ['1.1.1.2'], 
      },
      (5,): {
        'start': (500,),
        'shape': (100,),
        'data': ObjRef5,
        'location': ['1.1.1.2'],
      },
      (6,): {
        'start': (600,),
        'shape': (100,),
        'data': ObjRef6,
        'location': ['1.1.1.2'],
      },
      (7,): {
        'start': (700,),
        'shape': (100,),
        'data': ObjRef7,
        'location': ['1.1.1.2'],
      },
      (8,):  {
        'start': (800,),
        'shape': (100,),
        'data': ObjRef8,
        'location': ['1.1.1.3'], 
      },
      (9,): {
        'start': (900,),
        'shape': (100,),
        'data': ObjRef9,
        'location': ['1.1.1.3'],
      },
      (10,): {
        'start': (1000,),
        'shape': (100,),
        'data': ObjRef10,
        'location': ['1.1.1.3'],
      },
      (11,): {
        'start': (1100,),
        'shape': (100,),
        'data': ObjRef11,
        'location': ['1.1.1.3'],
      }
  },
  'get': lambda x: libdf.get(x)
}
  • There's no way for libml to properly schedule things on the CPU vs GPU process here because it would need to first unpack the data objects.
  • There's no way for libml to know which process within a node to use to keep the data local to the correct GPU / NUMA node.

Now you could imagine using Column objects instead of DataFrame objects as the data members in the __partitioned__ protocol in this situation, but you could run into the same situation if the buffers within a column are in different memory spaces as well (would probably went them in the same process at this point though).

Now if you had something like __partitioned_dataframe__ and __partitioned_column__ protocols, and __partitioned__ was required to only send around buffers, then you could imagine a class structure that looked something like:

  • Partitioned DataFrame

This way you get all of the information down to the memory buffers and enables the largest amount of flexibility for distributed frameworks to interchange between one another. It also allows for always getting down to the underlying memory of the objects that are distributed. In practice, I think this could be supported given the current definition of __partitioned__ and possibly we just need this additional specialization for DataFrames because they're complex and messy 😄.

On the other hand, it may not be reasonable to require being able to get down to memory for all objects and could act as a barrier to entry to adopting a protocol like this.

@fschlimb
Copy link
Author

fschlimb commented Oct 8, 2021

For better clarity some background: Ideally we want this be as general as possible. It turned out to be not that simple to achieve generality so we approached it by starting with concrete examples like MPI, ray, dask. It's exciting to getting new input and to tackle the next step towards greater generality.

Not necessarily, they could be in the same process. Lets remove MPI and replace it with something like PySpark or Presto instead. There's no MPI in the mix so there's no rank and it's likely that there's multiple workers running on a single node. There's more and more distributed systems nowadays that are not MPI/SHMEM based and cannot rely on using a rank to indicate a worker.

If everything was MPI / SHMEM based than in theory we could use the rank, but unfortunately that isn't the case and for making a general protocol we should capture the general case.

Agree. The generalized concept could be the process id. I had been thinking about replacing <ip-address or rank> in 'location' with a tuple of (<ip-address>, <process-id>). An additional, optional elements could be added to provide a framework-specific identifier, like the MPI rank. To be general, this could be a string, like ('192.0.0.1', 477563, 'MPI:0').

Sounds interesting, Could you give an example of what you have in mind? I am not sure I see how you could be that generic.

Let's take a stab at it. Let's say I have two distributed libraries which I want to connect and neither depend on MPI / SHMEM:

[...]

  • There's no way for libml to properly schedule things on the CPU vs GPU process here because it would need to first unpack the data objects.

Yes and no. The handle in the 'data' field is expected to be retrieved by 'get' and only within the 'location' where the data object resides. Ideally 'get' will return the exact same object that the producer created, e.g. a dataframe or array. In any case, getting the object is a local operation which will not copy the actual data. The object will give you the information about intra-node locality (e.g. CPU vs GPU) through __dlpack__ et al.

  • There's no way for libml to know which process within a node to use to keep the data local to the correct GPU / NUMA node.

Would the above new 'location' spec sufficiently address your concern?

Now you could imagine using Column objects instead of DataFrame objects as the data members in the __partitioned__ protocol in this situation, but you could run into the same situation if the buffers within a column are in different memory spaces as well (would probably went them in the same process at this point though).

Now if you had something like __partitioned_dataframe__ and __partitioned_column__ protocols, and __partitioned__ was required to only send around buffers, then you could imagine a class structure that looked something like:
[...]

Interesting. This could also be viewed as 'specializations' of __partitioned__. @YarShev made similar comments about dataframes; they would benefit from additional, data-frame specific information.

Notice: We did not intend this spec to be viewed as a vehicle to send buffers. It's a structure only to describe data partitions and their locality. It's meant to avoid (and not to foster) messaging.

This way you get all of the information down to the memory buffers and enables the largest amount of flexibility for distributed frameworks to interchange between one another. It also allows for always getting down to the underlying memory of the objects that are distributed. In practice, I think this could be supported given the current definition of __partitioned__ and possibly we just need this additional specialization for DataFrames because they're complex and messy 😄.

On the other hand, it may not be reasonable to require being able to get down to memory for all objects and could act as a barrier to entry to adopting a protocol like this.

😉 There is definitely a tradeoff to make.

I am not sure I see the need to be more generic than what dataframes and arrays require. What's nice about one common definition is that it allows us to have one spec which can be used in both worlds and even be used cross df/array (as long as ndims<=2). But maybe we can even have both (generality and df/array compatibility). Other thoughts?

@kkraus14
Copy link

kkraus14 commented Oct 8, 2021

Agree. The generalized concept could be the process id. I had been thinking about replacing <ip-address or rank> in 'location' with a tuple of (<ip-address>, <process-id>). An additional, optional elements could be added to provide a framework-specific identifier, like the MPI rank. To be general, this could be a string, like ('192.0.0.1', 477563, 'MPI:0').

Would the above new 'location' spec sufficiently address your concern?

I don't think a process ID is granular enough. I'm also generally super weary of optional pieces to a protocol, because people then rely on those optional pieces instead of doing things the more general way that allows wider usage. In the case of providing an MPI Rank, I believe that needing it is an indication that the non-optional location pieces are underdefined. I.E. you could imagine two separate MPI based applications that are deployed separately and their ranks don't actually line up. You'd then need to fall back to location information and you don't have enough info to align them properly.

Additionally, say you have a distributed system who uses a single process per node, and within that process manages objects across multiple GPUs and CPUs. Now say you want to consume some object from a different distributed systems that has a process per GPU, there's no easy way to align things properly.

This is somewhat handled in the array interchange protocol via __dlpack_device__, which returns a device type indicating if it's CPU vs GPU memory, and a device ID indicating which GPU it's on. This doesn't handle NUMA locality for now since dlpack is for sharing data within a process as opposed to handling cross process, but I think that's okay and it could be tackled later if needed.

Yes and no. The handle in the 'data' field is expected to be retrieved by 'get' and only within the 'location' where the data object resides. Ideally 'get' will return the exact same object that the producer created, e.g. a dataframe or array. In any case, getting the object is a local operation which will not copy the actual data. The object will give you the information about intra-node locality (e.g. CPU vs GPU) through __dlpack__ et al.

What is defined as location in this case? If I don't know which GPU the data is on then I don't know which process in the consuming system to schedule the get call on and I may inadvertently call it from a process that doesn't have access to the GPU with the data on it.

Notice: We did not intend this spec to be viewed as a vehicle to send buffers. It's a structure only to describe data partitions and their locality. It's meant to avoid (and not to foster) messaging.

I guess this means that crossing process boundaries is out of scope? Is there a problem in practice if we're not crossing process boundaries? In my view having the get member be something that can be serialized to be run by a different process to get access to the data efficiently is a key piece.

I am not sure I see the need to be more generic than what dataframes and arrays require. What's nice about one common definition is that it allows us to have one spec which can be used in both worlds and even be used cross df/array (as long as ndims<=2). But maybe we can even have both (generality and df/array compatibility). Other thoughts?

I'm trying to talk myself out of boiling the ocean here 😄, but given frameworks like Dask, Ray, etc. let you have distributed collections of arbitrary Python objects, you could imagine wanting to exchange them. That being said, I think it would be reasonable to require those objects be serialized into something that could be sent over the wire anyway, which effectively makes them array-like.

@YarShev
Copy link

YarShev commented Oct 11, 2021

@kkraus14
Thanks for your input, you made a great write-up. But I am not sure we should require such details (where a certain data buffer resides - I mean a device) from the protocol. I think this should be handled with __dlpack__ et al. as @fschlimb said.

As for get field to get materialized data at a different location (not one that is specified in location), you can get the data wherever within your system but it will involve data copy that we want to avoid.

As for providing a process ID, why is that not a granular enough. Can you elaborate?

@kkraus14
Copy link

As for providing a process ID, why is that not a granular enough. Can you elaborate?

It requires domain knowledge about the producing distributed framework from the consumer. There's no easy way for me to know how the producing distributed framework is deployed which could inform where the object is located relative to GPU or NUMA node or any other locality-relevant information. I.E. if I have Dask as a producer and Ray as a consumer, I can't guarantee that they are deployed in the same way and target the same resources and as a consumer I in theory only have information about my framework.

@fschlimb
Copy link
Author

Sorry for the late reply, I was out.

As for providing a process ID, why is that not a granular enough. Can you elaborate?

It requires domain knowledge about the producing distributed framework from the consumer. There's no easy way for me to know how the producing distributed framework is deployed which could inform where the object is located relative to GPU or NUMA node or any other locality-relevant information. I.E. if I have Dask as a producer and Ray as a consumer, I can't guarantee that they are deployed in the same way and target the same resources and as a consumer I in theory only have information about my framework.

@kkraus14 I suspect there is still some disconnect. As mentioned earlier, the initial proposal does not intend to facilitate or even provide means for transferring data, it only provides a description of the current state, e.g. the partitioning and location of the data. If needed, it's up to the consumer to move data. Said that, maybe what you're pointing at is that the consumer might not run "within" one of the given processes and so have no access to the data. Is this what your concern is about?

If it is, then I think we have 2 cases:

  1. SPMD (MPI) produced:: I'd argue to simply not allow calling 'get' on non-local ranks.
  2. Controller-worker produced: 'get' must always work on the root node. Are you suggesting to make this more general and require 'get' to work on all workers (c/w only)? I guess dask and ray already do that...

There is definitely a need to properly orchestrate and launch an application using various frameworks. I'd argue that that's a different issue which should not be solved (and probably cannot) with such a protocol.

@fschlimb
Copy link
Author

I don't think a process ID is granular enough. I'm also generally super weary of optional pieces to a protocol, because people then rely on those optional pieces instead of doing things the more general way that allows wider usage. In the case of providing an MPI Rank, I believe that needing it is an indication that the non-optional location pieces are underdefined.

Good point. I guess the rank-part was meant to be a short-cut, similar to 'local' specifically for SPMD.

I.E. you could imagine two separate MPI based applications that are deployed separately and their ranks don't actually line up. You'd then need to fall back to location information and you don't have enough info to align them properly.

I think even ' (node, ip)' provides sufficient information for your example. There are other challenges which make it super hard, not matter what. Most prominently, it's not possible in MPI to let the consumer get data from an independent MPI app without active action of the producer. If MPI_COMM_WORLD is not shared that's a challenge of its own. I'd argue such a scenario is out of scope (for MPI).

Additionally, say you have a distributed system who uses a single process per node, and within that process manages objects across multiple GPUs and CPUs. Now say you want to consume some object from a different distributed systems that has a process per GPU, there's no easy way to align things properly.

That's interesting. We were not considering totally independent systems. Maybe we should, but my immediate reaction is that in such cases zero-copy is not possible anyway and some kind of messaging is required. I think the problem is not a lack of information in the protocol but rather a difficult setup to deal with ("no easy way" as you said).

I still believe that this protocol should mostly describe the state and not provide messaging capabilities. I was think about a utility package which consumes and produces object implementing this protocol. It could provide re-partitioning, data-transfer and other things for various frameworks. It could also deal with supporting such independent cases.

What is defined as location in this case? If I don't know which GPU the data is on then I don't know which process in the consuming system to schedule the get call on and I may inadvertently call it from a process that doesn't have access to the GPU with the data on it.

I see. In your scenario the consuming system assigns GPUs to processes/workers and you would like to schedule on the corresponding process. The current proposal requires 2 calls/tasks to get this done (one to get the device id from __dlpack__ and a second to do the actual work on the corresponding device). At the cost of adding more redundancy and more complexity at the producer side we could extend 'location' to also include the device (node, ip, device). device would then align with __dlpack__s nomenclature and your example case would work nicely. We had a discussion on that before so I expect others support the idea (@coquelin77); your use-case finally convinced me (and maybe also others) that it would be a good tradeoff.

I'm trying to talk myself out of boiling the ocean here 😄, but given frameworks like Dask, Ray, etc. let you have distributed collections of arbitrary Python objects, you could imagine wanting to exchange them. That being said, I think it would be reasonable to require those objects be serialized into something that could be sent over the wire anyway, which effectively makes them array-like.

Definitely, the objects/containers need to be serializable (e.g. pickle'able). And yes, we should not attempt to boil the ocean 😉.

@YarShev
Copy link

YarShev commented Oct 28, 2021

2. Controller-worker produced: 'get' must always work on the root node. Are you suggesting to make this more general and require 'get' to work on all workers (c/w only)? I guess dask and ray already do that...

Ray allows to pass a list of ObjectRef objects in a remote function where you can call get method to materialize data in a worker. There will be no a data copy if it occurs within a single node since Ray uses Plasma object store per node. On the other hand, Dask itself materializes Future objects of any sort of nesting (a list of Future-s, a list of lists of Future-s, etc.) passed in a remote function and doesn't encourage users themselves to call get method to materialize data in a worker since a data copy might be occurred. With respect to the latter statement, even though Ray allows users themselves to materialize data in a worker, passing ObjectRef and Future objects in a remote function to be materialized by Ray and Dask is more appropriate way to transfer data between workers.

Additionally, say you have a distributed system who uses a single process per node, and within that process manages objects across multiple GPUs and CPUs. Now say you want to consume some object from a different distributed systems that has a process per GPU, there's no easy way to align things properly.

That's interesting. We were not considering totally independent systems. Maybe we should, but my immediate reaction is that in such cases zero-copy is not possible anyway and some kind of messaging is required. I think the problem is not a lack of information in the protocol but rather a difficult setup to deal with ("no easy way" as you said).

Yes, that is true.

I still believe that this protocol should mostly describe the state and not provide messaging capabilities. I was think about a utility package which consumes and produces object implementing this protocol. It could provide re-partitioning, data-transfer and other things for various frameworks. It could also deal with supporting such independent cases.

As for data re-partitioning, I think such a functionality should by provided by a framework supplying a data container, and data-transfer and other things might be provided by such a utility package.

What is defined as location in this case? If I don't know which GPU the data is on then I don't know which process in the consuming system to schedule the get call on and I may inadvertently call it from a process that doesn't have access to the GPU with the data on it.

I see. In your scenario the consuming system assigns GPUs to processes/workers and you would like to schedule on the corresponding process. The current proposal requires 2 calls/tasks to get this done (one to get the device id from __dlpack__ and a second to do the actual work on the corresponding device). At the cost of adding more redundancy and more complexity at the producer side we could extend 'location' to also include the device (node, ip, device). device would then align with __dlpack__s nomenclature and your example case would work nicely. We had a discussion on that before so I expect others support the idea (@coquelin77); your use-case finally convinced me (and maybe also others) that it would be a good tradeoff.

I have been thinking on location field and came up to the following. I agree in order to be comprehensive on data location we should probably add both device, which is aligned with __dlpack__'s nomenclature, and process ID to location field. Then, location would contain something like this ["node_ip": {"process_id", "device_id"}].

@fschlimb
Copy link
Author

Ray allows to pass a list of ObjectRef objects in a remote function where you can call get method to materialize data in a worker. There will be no a data copy if it occurs within a single node since Ray uses Plasma object store per node. On the other hand, Dask itself materializes Future objects of any sort of nesting (a list of Future-s, a list of lists of Future-s, etc.) passed in a remote function and doesn't encourage users themselves to call get method to materialize data in a worker since a data copy might be occurred. With respect to the latter statement, even though Ray allows users themselves to materialize data in a worker, passing ObjectRef and Future objects in a remote function to be materialized by Ray and Dask is more appropriate way to transfer data between workers.

Good point. We can clarify that intent of __partitioned__['get'] is to allow unified access to the materialized data. At the same time the consumer is free to use the handle in __partitioned__['partitions']['data'] directly if it knows how to deal with it (e.g. if it uses the same framework).

As for data re-partitioning, I think such a functionality should by provided by a framework supplying a data container, and data-transfer and other things might be provided by such a utility package.

Re-partitioning is a core feature. Requiring consumers to understand the producer's package contradicts the major goal of the protocol. A utility package is free to implement things in any way it wants - including using modin or heat or whatnot.

I have been thinking on location field and came up to the following. I agree in order to be comprehensive on data location we should probably add both device, which is aligned with dlpack's nomenclature, and process ID to location field. Then, location would contain something like this ["node_ip": {"process_id", "device_id"}].

Sounds like we agree here!

@YarShev
Copy link

YarShev commented Oct 28, 2021

Ray allows to pass a list of ObjectRef objects in a remote function where you can call get method to materialize data in a worker. There will be no a data copy if it occurs within a single node since Ray uses Plasma object store per node. On the other hand, Dask itself materializes Future objects of any sort of nesting (a list of Future-s, a list of lists of Future-s, etc.) passed in a remote function and doesn't encourage users themselves to call get method to materialize data in a worker since a data copy might be occurred. With respect to the latter statement, even though Ray allows users themselves to materialize data in a worker, passing ObjectRef and Future objects in a remote function to be materialized by Ray and Dask is more appropriate way to transfer data between workers.

Good point. We can clarify that intent of __partitioned__['get'] is to allow unified access to the materialized data. At the same time the consumer is free to use the handle in __partitioned__['partitions']['data'] directly if it knows how to deal with it (e.g. if it uses the same framework).

Yes, that would be fine.

As for data re-partitioning, I think such a functionality should by provided by a framework supplying a data container, and data-transfer and other things might be provided by such a utility package.

Re-partitioning is a core feature. Requiring consumers to understand the producer's package contradicts the major goal of the protocol. A utility package is free to implement things in any way it wants - including using modin or heat or whatnot.

Such a utility package likely sounds more reasonable.

@leofang
Copy link

leofang commented Nov 18, 2021

cc: @magnatelee @lightsighter for vis (I mentioned this discussion yesterday)

@lightsighter
Copy link

@leofang There are two major issues with this protocol:

  1. It enshrines the concept of "tiles" into the protocol with the __partition_tiling member. There is neither a concept of tiles nor anything remotely similar to a concept of tiles in cuNumeric's distributed implementation. Instead cuNumeric operates on views of array data. View can be incomplete, aliased, and/or have non-trivial intersections with other views. In general, you should never pick one canonical tiling for an array because it's likely the case that somewhere later in the program a different operation will want a different view onto the array.
  2. There is no support for asynchronous execution across nodes in this protocol. How should a consumer library get access to subsets of the data in an array when there are still outstanding effects (read GPU kernels) running on devices to produce the array data. Blocking asynchronous execution is an immediate non-started for us. Note you can't rely on streams here because they are not portable across processes. Furthermore, how do you handle asynchronous effects on different view subsets of data (see above)?

The distributed Legate Store protocol that we use to exchange data between Legate libraries has answers for all of those questions. Unless the protocol you are proposing here adapts to provide for such scenarios, I don't foresee cuNumeric ever supporting it. We'll happily ingest data from other libraries that want to pass us data in this format, but we're not going to condone sub-optimal data exchange practices that compromise performance on accelerated hardware.

As an aside: the idea of having a unified global view of an entire partition in the form of tiles is already naive when it comes to building distributed systems. As soon as you have to have meta-data from every node existing on every other node then you are just inviting scalability bottlenecks.

@fschlimb
Copy link
Author

@lightsighter Thanks for your feedback, very much appreciated!

We are open not only to changes/improvements to this initial proposal but also to other approaches. Let's see if/how we can come up with something that can also work with legate.

A quick search for "distributed Legate Store protocol" didn't yield anything consumable. @lightsighter, could you point me to a description of it?

For clarification: this protocol is meant to help exchanging distributed data between entirely independent projects: at given, point in time data needs to be passed from package A to package B. This is not about how each package/system manages data internally. It is meant to describe the current partitioned state of the data so that the consumer can access it with as little copying as possible. No matter if the producer or consumer operate directly on physical tiles or not: the data must somehow be physically partitioned and distributed. That's what the protocol tries to describe.

Similarly, this is not about prescribing whether a system has a global view or not. It is just a way to describe the current global view so others/consumers who do not understand the internals of the producer can efficiently access and use the data/memory.

Yes, if there was Ray only, or Dask only, or only legion/legate, or if MPI was the only choice users have, then this protocol would not be needed.

You're absolutely right that asynchronous exchange is helpful. Could you explain how the get callback is not sufficient for your case? We are confident that at least for Ray and Dask this would allow a good degree of asynchronicity.

@lightsighter
Copy link

could you point me to a description of it?

https://github.com/nv-legate/legate.core#what-is-the-legate-core

It is meant to describe the current partitioned state of the data so that the consumer can access it with as little copying as possible.

The fact that you even assume that there is a "partitioned state of the data" is design flaw in my opinion. In a good distributed system data should be both replicated and sharded to meet the needs of the application over time. Assuming that there is just one physical partition of the data is massively constraining to describing how data is distributed and how coherence is maintained.

It is just a way to describe the current global view

I think my point is that you should never have a single global view of the data anywhere in the system. That leads to scalability and resilience issues. Control and data logic should be both distributed and replicated.

You're absolutely right that asynchronous exchange is helpful. Could you explain how the get callback is not sufficient for your case?

Read about how Legion tracks data dependences between tasks: https://legion.stanford.edu/pdfs/oopsla2013.pdf Section 5 provides a formal operational semantics for computing dependences and maintaining coherence of data between tasks in a distributed environment. You'll find it has very little similarity to anything you've referenced, although if you've done work in distributed databases it might look somewhat familiar.

@fschlimb
Copy link
Author

The fact that you even assume that there is a "partitioned state of the data" is design flaw in my opinion. In a good distributed system data should be both replicated and sharded to meet the needs of the application over time. Assuming that there is just one physical partition of the data is massively constraining to describing how data is distributed and how coherence is maintained.

The current proposal allows replicated partitions, it does not support partially overlapping partitions, though. Is that what you're concerned about?

It is just a way to describe the current global view

I think my point is that you should never have a single global view of the data anywhere in the system. That leads to scalability and resilience issues. Control and data logic should be both distributed and replicated.

In the abstract I certainly do agree. However, this here is not meant to define the ideal approach for distributed compute systems. We are trying to bridge between existing systems. Over time we might see convergence and once legate has taken over the world 😉 this protocol will no longer be needed. Until then, we have to accept reality and focus on data.

You're absolutely right that asynchronous exchange is helpful. Could you explain how the get callback is not sufficient for your case?

Read about how Legion tracks data dependences between tasks: https://legion.stanford.edu/pdfs/oopsla2013.pdf Section 5 provides a formal operational semantics for computing dependences and maintaining coherence of data between tasks in a distributed environment. You'll find it has very little similarity to anything you've referenced, although if you've done work in distributed databases it might look somewhat familiar.

We cannot assume that every system we want to support is using tasks. That's simply not the case.

It'd be really great if we could find a protocol that defines a bridge also between legate and others. This requires that we accept that every system is different and that exchanging data between them can be less optimal than within the same system. I suggest grounding this discussion on this mindset and avoiding a debate over the best distributed system.

So far I have heard nothing that would prevent legate from importing or exporting the protocol. Any proposal for an improved or different kind of protocol will be highly appreciated.

@lightsighter
Copy link

Is that what you're concerned about?

No, I'm concerned that the API, as it stands, introduces inherent inefficiencies that ultimately prevent any implementations from getting to the speed-of-light execution on accelerated hardware. That's a fundamental flaw and therefore a non-starter for those of us that work at certain companies.

We are trying to bridge between existing systems.

In my experience, asking different runtime systems to share resources is a recipe for poor code execution and disappointed users. The goal of the API shouldn't be to facilitate interactions between different runtimes. It should be to provide a common interface that different runtime systems can implement such that all libraries built on top of the interface be executed in accelerated way on different hardware platforms. Such an API should be highly productive for library developers, even if that adds complexity into the implementations of the runtimes that support it. Furthermore, such an API should contain no explicit mapping decisions about how code runs on the hardware so that runtimes can provide highly tuned and accelerated ways for that code to execute on different hardware platforms. A higher-level API will also enable more semantic information to be communicated from library developers down to the runtime systems, such that the runtimes can perform more sophisticated transformations and scheduling decisions when targeting heterogeneous hardware.

Over time we might see convergence and once legate has taken over the world wink this protocol will no longer be needed.

I never said that and I think its disingenuous of you to suggest that that is my motivation. To reiterate my point above, I believe such an API should be higher level such that it's easier for library developers to build libraries that can be run on any runtime, and second multiple such runtimes can provide an implementation so users can pick the best one for different platforms. Starting at such a low-level of abstraction as the current proposal does though immediately prevents any such architecture and severely restricts the design space of potential runtimes that can be built, ultimately leading to code that will execute well below the speed-of-light on future hardware.

We cannot assume that every system we want to support is using tasks. That's simply not the case.

When did I ever say anything about tasks being central to Legion's semantics? Tasks are just distinguished computations that the system uses to define the granularity of scheduling as they are in Dask and Ray. The important part (which is laid out in detail in the paper) is how dependence analysis is performed between the computations performed by different libraries. There's no discretization of the data in Legate/Legion like there is in the current API proposal. As we point out in the Legate Store documentation, tasks from different libraries can name completely arbitrary subsets of data and it is the runtime system's responsibility to compute dependences between tasks and maintain coherence of data. Allowing library developers to name arbitrary subsets of data for their computations gives them maximum flexibility to describe precisely the data they need to run any computation and be more productive as they don't have to worry about how data was partitioned by an earlier library that produced the input. Furthermore, there are no false dependences, as dependences only need to to be introduced when there are true data dependences.

It'd be really great if we could find a protocol that defines a bridge also between legate and others.

Again, I think this is a waste of time. If you really cared about making it easy for library developers and users, you would work on designing an API that allows them to plug-and-play any runtime system underneath all the libraries, and not require user/library developers to figure out how to make multiple runtimes play nicely together. The Legate Store API has this property, there's nothing Legate specific about it. It should even be able to be implemented using the Dask or Ray runtimes. I'll even commit to the Legate project abandoning the Legate Store API if you propose a higher-level API with a data model that's at least as high-level as the one in the Legate Store API (feel free to go even higher-level if you want, we won't complain).

So far I have heard nothing that would prevent legate from importing or exporting the protocol.

Go back and look at the Legate Store API. There's no way to get access to raw pointers in Legate unless you are actually inside of the execution of a leaf task in the tree of tasks. So your API as it currently stands is fundamentally a non-starter. We exchange views of data between libraries in Legate which are logical descriptions of arrays/dataframes/sets/etc. These views support sparsity and aliasing, literally anything a library developer could ever want to do. I'd encourage you to raise the level of abstraction of your API so that users can plug-and-play any runtime under it and library developers don't have to worry about interoperability between different runtimes. Starting at the level of tiles, pointers, and memory layouts makes way too many assumptions about the implementation of the runtime for Legate to ever be able to support it.

As aside, I'll point out that standardizing on a higher-level data model API will also have the benefit of rendering most of the arguments in this thread so far moot, as it will enable runtimes to implement things like data layout, data movement, and synchronization however they want. There's no need for the runtimes to agree, because users and library developers will just be using a single runtime at a time and will never need to understand the details of what the runtimes are doing internally.

@fschlimb
Copy link
Author

I never said that and I think its disingenuous of you to suggest that that is my motivation.

I did not want to suggest any motivation. I am sorry that my comment conveyed such an impression. No need for insult, though.

Let's pursue the direction you're proposing. I have a few clarifying questions:

  • I'd argue that dlpack is similarly restricted as the current proposal here. Would you agree?

  • There had been some discussion around a functional API (in contrast to a pure protocol) but for the sake of simplicity of implementations it was put aside. I understand that you argue in favor of a parametrized function call (or larger API) which returns something which

    • logically represents the requested sub-set of the data
    • allows materializing the respective data so that it can be operated on

    Would that go in the direction you're envisioning?

  • In the description of the LegateStore it says: "It is illegal to over-approximate dependences.". I understand that this would be the ideal case, but why is this required? Think of programming systems like MPI, they do not have a runtime scheduler (there is nothing to be scheduled). All they could do is over-approximate.

  • Similarly, we'd need to clarify what "Any use of the LegateStore on any processor/node in the system must abide by the original sequential semantics of the program" means for systems like MPI which have no sequential program semantics.

@lightsighter
Copy link

I'd argue that dlpack is similarly restricted as the current proposal here. Would you agree?

Yes, they have it a little bit easier because they can make clients side-load things like synchronization primitives in type-unsafe ways because they assume shared-memory-only execution.

Would that go in the direction you're envisioning?

That is a step in the right direction. Those two conditions are necessary but not sufficient for fully decoupling the specification of computation from how it is ultimately executed.

I understand that this would be the ideal case, but why is this required?

We require that in the LegateStore API because we believe that is a necessary feature for performance debugging by library and application developers. Unnecessary dependences (especially non-deterministic ones) create mystical performance effects that can be impossible to reason about and make it nigh impossible to for users/library developers to tune their code. That said, I would be willing to accept a community standard that did not include this restriction. It's a performance guarantee, but one that is not strictly necessary for correctness.

Think of programming systems like MPI, they do not have a runtime scheduler (there is nothing to be scheduled)

I strongly disagree with this. Each MPI implementation has many scheduling heuristics. Some are more latency sensitive while others optimize more throughput. Here's an example of some of the scheduling trade-offs that OpenMPI makes in their implementation. https://www.open-mpi.org/papers/euro-pvmmpi-2006-hpc-protocols/euro-pvmmpi-2006-hpc-protocols.pdf Each MPI implementation has all sorts of scheduling heuristics in their runtime for trading off message latency vs throughput of messages and tag matching.

Similarly, we'd need to clarify what "Any use of the LegateStore on any processor/node in the system must abide by the original sequential semantics of the program" means for systems like MPI which have no sequential program semantics.

Python programs should always have sequential semantics. Anything employing explicit external communication between processes should be completely beyond the scope of the standard and the responsibility of the user to manage. The Python language standard has nothing to say on this front and therefore neither should we. Nowhere should the number of processes show up in the standard. Just because we're designing the responsibilities of an interface that should support distributed execution doesn't mean that distributed execution should explicitly show up in the interface.

There had been some discussion around a functional API (in contrast to a pure protocol) but for the sake of simplicity of implementations it was put aside

What was the justification for this? I don't see how anything else could be a good idea. Consider a historical example: the development of the MPI standard. Many many years ago (late 1980s, early 1990s) there were dozens of different libraries that provided for communication between processes on supercomputers. Some of them were low-level and required users to manage endpoints, completion queues, and control-flow credits explicitly, while others were high-level and not that different than what ultimately became the MPI standard. The MPI community faced the same decision we now face: do you provide for interoperability between these different runtimes at the least-common denominator of abstraction, or do you standardize on a high-level interface that abstracts over low-level details and allows for a common interface on which to build composable software. They wisely opted for the later choice. This had the following two positive effects on the HPC ecosystem:

  1. Rather than requiring users to have to understand all the nuanced details of a low-level standard, they provided a high-level way for doing things which made it much easier for users and library developers to productively develop code. This made the software built on top of the standard much easier to write, debug, maintain, and port. Importantly neither users nor library developers ever needed to worry about interoperability of communication runtimes, there was just MPI under everything.
  2. It aligned economic incentives for the ecosystem: it was now in the best interest of all the communication runtime developers to provide the best possible implementation of the common standard. This encouraged competition with quantifiable metrics that users could easily measure. Naturally communication runtimes were abandoned, combined, extended, and ultimately whittled down to the few MPI implementations that exist today that offer excellent performance and scalability to users. The users and library developers building on top of MPI didn't have to do any work to make that happen. It just occurred.

All this happened because the HPC community recognized that runtime systems almost never interoperate well and attempting to do so offloads tons of functional responsibility onto library developers and users. A high-level standard leads to a healthier ecosystem for everyone in the long-run.

@fschlimb
Copy link
Author

I disagree with your analogy, but I'd expect that your high-level thoughts mostly meet consent. At least I am mostly with you. However, we are getting lost in the abstract with only little progress towards a practical proposal.

@lightsighter It would be very helpful if you could (here) sketch out an API/protocol that does what you have in mind. It'd be more targeted if we had another proposal providing an independent and different perspective.

@lightsighter
Copy link

It would be very helpful if you could (here) sketch out an API/protocol that does what you have in mind. It'd be more targeted if we had another proposal providing an independent and different perspective.

This is exactly what the Legate Store and Operation APIs are. They are our version of what you are proposing. We are not tied to naming convention or syntax in any way, so please ignore that as we are completely flexible on those fronts. The crucial components are the data and execution models. These are both designed to be completely general and independent from any details of the Legate implementation and we believe are high-level enough to abstract over any of the possible runtimes involved in these discussions such that they could each provide their own implementation.

I disagree with your analogy

What parts do you disagree with?

@rgommers
Copy link
Member

rgommers commented Dec 6, 2021

This is a very interesting discussion, thanks all!

This is exactly what the Legate Store and Operation APIs are.

@lightsighter I'm curious about how well this maps to Dask, or if anyone has tried that yet. Given that RAPIDS relies heavily on Dask and y'all are @ NVIDIA, have you talked to the RAPIDS team about this already?

@mmccarty
Copy link

mmccarty commented Dec 6, 2021

@quasiben would be the best person to answer that.

@quasiben
Copy link

quasiben commented Dec 6, 2021

Dask assumes a partitioned data set which, as @lightsighter has noted, is not an assumption for Legate. We've briefly discussed a concurrent futures API for Dask which maybe legate could also leverage but the assumption of the two
libraries are quite distinct.

Something missing from this proposal is how spilled data may be handled. @kkraus14 noted several examples of the complexity when distributed libraries leverage hardware optimizations with NUMA pinning and such -- things may be more complicated still when we also have to fetch data from disk as this is also subject to hardware/process/network aware fetching

@lightsighter
Copy link

I'm curious about how well this maps to Dask

The question of how well the interface maps to any given runtime should be at most a secondary constraint. The primary goal of the API should be to provide an interface for users and library developers to build composable software in the easiest way possible. That is the ethos of Python: high productivity. I believe we should be doing everything in our power to make users' and library developers' lives as simple as possible, even in distributed execution environments. After that we can ask the question: can mappings to different runtimes exist and provide efficient execution. Our primary constituency should always be the users first though and not ourselves.

That being said, I can see a multitude of different mappings for the Legate Store API down on to Dask that make various tradeoffs in terms of precision of dependence analysis and data coherence.

Dask assumes a partitioned data set which, as @lightsighter has noted, is not an assumption for Legate.

It's not that it's not an assumption for the Legate interface. The Legate interface has no opinion on how you implement it. You can build an implementation of the Legate Store API that leverages partitioned data sets. There's absolutely no reason that you can't do that. Just because we don't do that in our implementation doesn't mean you can't do it in yours. This is a perfect example of how the Legate interface is considerably less opinionated than the current proposed API which prescribes implementation details like this and offloads the management of those details onto users and library developers.

Something missing from this proposal is how spilled data may be handled...

Why does that need to show up in the interface? Why isn't that the responsibility of each implementation? Users and library developers are poorly positioned to be able to deal with such things as spilling because they have incomplete knowledge of what other parts of the program are doing. Only the runtime has the visibility needed to be able to answer such questions. Furthermore, if we try to expose such details through the API, then we all have to agree an interface and semantics for spilling that meshes with each runtime's execution model, the intersection of which I'm almost certain is the null set.

fschlimb added a commit to IntelPython/DPPY-Spec that referenced this issue Jan 27, 2022
Incorporated suggestions/feedback from data-apis/consortium-feedback#7
@fschlimb
Copy link
Author

fschlimb commented Jan 27, 2022

With my latest commit I tried to incorporate suggestions and feedback from @kkraus14 , @YarShev and @lightsighter. The most important changes are the following:

  • Reworded intro so that non-static and/or on-the-fly partitioning is not excluded and the goals are clearer
  • Made 'location' a tuple (IP, PID[, device]). This complicates creation and consumption a bit but makes it unique across frwks and allows easier scheduling without inspecting the object itself.

As an alternative solution a functional approach could be considered. It could allow requesting a 'slice' of the global data from any location. The downside of it is that it can potentially imply unnecessary data movement in situations where several partitionings are acceptable for the consumer. For example: element-wise operations work equally well on any data distribution as long as its balanced, it does not matter if the partitions are chunks of rows, columns or blocks. A way out would allow querying the "current state" (a simplified __partitioned__ ) and retrieving a specific partitioning (__get_chunk__(slice)). Not sure if/how device-to-device copying should be handled in this case.

What do others think?

With respect to the more fundamental approach suggested by @lightsighter: It seems to be a harder and longer-term endeavor and probably more difficult to find consensus. @lightsighter, if you have not done so, maybe you want to initiate a separate discussion? In the meanwhile, I believe we have to accept that different systems exist and a protocol which allows low-overhead data interchange at defined points can serve us well (even if not ideally).

@YarShev
Copy link

YarShev commented Jan 31, 2022

@fschlimb

device: optional string identifying the device; if present, assumes underlying object exposes dlpack and the value conforms to dlpack. Default: 'kDLCPU'.

Does this imply that if data field was a reference/future, device field would be missed (not present)?

As an alternative solution a functional approach could be considered. It could allow requesting a 'slice' of the global data from any location. The downside of it is that it can potentially imply unnecessary data movement in situations where several partitionings are acceptable for the consumer. For example: element-wise operations work equally well on any data distribution as long as its balanced, it does not matter if the partitions are chunks of rows, columns or blocks. A way out would allow querying the "current state" (a simplified partitioned ) and retrieving a specific partitioning (get_chunk(slice)). Not sure if/how device-to-device copying should be handled in this case.

It seems to me this should lay on a consumer of the protocol because as you said it can potentially imply unnecessary data movement in situations where several partitionings are acceptable for the consumer.

@fschlimb
Copy link
Author

Does this imply that if data field was a reference/future, device field would be missed (not present)?

'data' is supposed to be a an array (or DF) or a future/handle thereof. Such types always (in Python) have a host structure even if the raw data lives on the device. I do not see why a future would not allow device data. The main reason for having the device-id here (and not rely solely on the data-structure with its dlpack) is to allow (remote) scheduling without even touching the data item. To me, handles/futures and data-location are independent aspects, all combinations are possible.

@YarShev
Copy link

YarShev commented Jan 31, 2022

That makes sense! I just wanted to make sure the protocol allows to have the device-id for a reference/future.

Aside: What do you mean by "a host structure"?

@fschlimb
Copy link
Author

fschlimb commented Jan 31, 2022

Aside: What do you mean by "a host structure"?

There is currently no way that the entire Python structure (like a numpy array or a modin/pandas DataFrame) can live entirely on the device (other than host/cpu devices of course). The device pointer (or some other identifier) points to the raw data on the device but a proxy pointer exists on the host; many other things live on the host, like the Python object itself.

@kkraus14
Copy link

@fschlimb thanks for iterating on this! One comment:

device: optional string identifying the device; if present, assumes underlying object exposes dlpack and the value conforms to dlpack. Default: 'kDLCPU'.

If the object is forced to conform to dlpack then it prevents things like DataFrames from using this specifier which may have their own interchange protocol separate from dlpack. Should we allow the use of the device attribute more generally?

Also, in addition to the device descriptor of CPU vs GPU vs something else, I think we need a device_id or something similar to allow specifying which CPU or GPU it is on.

@fschlimb
Copy link
Author

If the object is forced to conform to dlpack then it prevents things like DataFrames from using this specifier which may have their own interchange protocol separate from dlpack. Should we allow the use of the device attribute more generally?

Good point. Any idea how this could look like? I would like to avoid having yet another way of referring to various devices. I also liked your earlier comment that maybe we need a specialization of this protocol for DFs (which could add later but already prepare for it).

Also, in addition to the device descriptor of CPU vs GPU vs something else, I think we need a device_id or something similar to allow specifying which CPU or GPU it is on.

Absolutely, yes, that is missing in the description. I'll add it. Such an id is actually already used in one of the examples.

@kkraus14
Copy link

Good point. Any idea how this could look like? I would like to avoid having yet another way of referring to various devices. I also liked your earlier comment that maybe we need a specialization of this protocol for DFs (which could add later but already prepare for it).

Agreed that we'll likely ultimately want a specialization of this protocol for DataFrames.

I think what I'm seeing is that the protocol is already specialized today for things that are somewhat array-like. I.E. say I wanted to use this protocol for something like graph objects which don't have a traditional n-dimensional shape, or even just general opaque objects, it's not straightforward to do so today without essentially ignoring some of the attributes.

Along these lines, maybe a more generalized protocol looks something like:

  • Remove the shape attribute
  • Remove the partition_tiling attribute
  • Remove the start attribute from partitions
  • Remove the shape attribute from partitions

And move these to a more specialized array-like distributed protocol.

What this would leave is something that looks like:

__partitioned__ = {
  'partitions': {
      (0,0):  {
        'data': ObjRef0,
        'location': [('1.1.1.1’, 7755737, 'CPU', 0)], }
      (0,1): {
        'data': ObjRef1,
        'location': [('1.1.1.2’, 7736336, 'CPU', 0)], }
      (1,0): {
        'data': ObjRef2,
        'location': [('1.1.1.3’, 64763578, 'CPU', 0)], }
      (1,1): {
        'data': ObjRef3,
        'location': [('1.1.1.4’, 117264534, 'CPU', 0)], }
  },
  'get': lambda x: ray.get(x)
}

This seems general enough that the objects for the data keys could be anything and we'd generally know where they are locality-wise. Then for arrays we could build a specialization that's either a superset of this protocol or built on top of this protocol that allows specifying the keys removed above and requiring that the objects for the data keys support a protocol like dlpack or support the Array API.

@YarShev
Copy link

YarShev commented Feb 2, 2022

@kkraus14, what would (0,0), (0,1), (1,0), (1,1) keys mean for a general opaque object in that case?

@lightsighter
Copy link

It seems to be a harder and longer-term endeavor and probably more difficult to find consensus.

Why do you think that? The point of my proposal is so that we don't have to find consensus between library developers, the vast majority of whom are not represented in this conversation despite the fact that they will be the primary users of such an API. You're proposing an API that will burden them with very challenging responsibilities when they develop their libraries. I'm arguing that we should do everything we can to give them the easiest possible thing to use such that we don't unnecessarily burden them because there are way more of them then there are of us.

In the meanwhile, I believe we have to accept that different systems exist and a protocol which allows low-overhead data interchange at defined points can serve us well

This isn't about us. It's about the people who are going to end up building libraries on top of this interface. We need to stop thinking about how to make our lives easier and do the right thing for the large and silent majority absent from this conversation. To not do so would risk losing what is great about the Python ecosystem: trivial interoperability between libraries without unnecessary library developer overhead. If library developers have to think about things like where existing data is currently located and how it is partitioned in order to make things work, we're going to end up with a bunch of broken code that doesn't live up to the Python ethos. Library developers are not us; they're not going to jump through the hoops to make things work that we might be willing to do, and users will never use libraries that don't trivially interoperate with reasonable performance.

if you have not done so, maybe you want to initiate a separate discussion?

I'm happy to start another issue if people think the conversation would be useful.

@fschlimb
Copy link
Author

fschlimb commented Mar 4, 2022

It seems to be a harder and longer-term endeavor and probably more difficult to find consensus.

Why do you think that? The point of my proposal is so that we don't have to find consensus between library developers, the vast majority of whom are not represented in this conversation despite the fact that they will be the primary users of such an API. You're proposing an API that will burden them with very challenging responsibilities when they develop their libraries. I'm arguing that we should do everything we can to give them the easiest possible thing to use such that we don't unnecessarily burden them because there are way more of them then there are of us.

I am not sure I understand who you think "we" are how "we" differ from library developers.

In most cases library developers use of-the-shelf containers, such as a data-API conforming array or DataFrame implementation. The only burden on lib developers which comes from this protocol - if any - is to check if the input is a __partitioned__ and explicitly call from_partitioned on the container they use anyway. A good data-API conforming implementation would not even require this as long as users apply the container only to features bundled with the container.

As a developer of a distributed container, supporting this protocol is also not very challenging. In particular if it works with a smart runtime like legion the work needed is almost trivial. Even less advanced frameworks will have basic features to import a pre-distributed data structure and eventually even re-distributed as/if needed.

In the meanwhile, I believe we have to accept that different systems exist and a protocol which allows low-overhead data interchange at defined points can serve us well

This isn't about us. It's about the people who are going to end up building libraries on top of this interface. We need to stop thinking about how to make our lives easier and do the right thing for the large and silent majority absent from this conversation.

Please don't hesitate to invite related projects/people to this conversation.

To not do so would risk losing what is great about the Python ecosystem: trivial interoperability between libraries without unnecessary library developer overhead. If library developers have to think about things like where existing data is currently located and how it is partitioned in order to make things work, we're going to end up with a bunch of broken code that doesn't live up to the Python ethos. Library developers are not us; they're not going to jump through the hoops to make things work that we might be willing to do, and users will never use libraries that don't trivially interoperate with reasonable performance.

See above. The "burden" is on the container developer, not the library developer. The library developer choses the execution layer (legion, dask, mpi, ray, ...) and the containers (array, DF,...). The protocol here does not change that. It only allows zero-copy data interchange between conforming objects/packages. It's up to these lower-level frameworks to make sure they interoperate or not. Without, using a "foreign" container on a library will currently either not work, work incorrectly or be inefficient. The protocol will help to build a one automatic path for all the "inefficient" routes (and potentially make them better) and provide an easy way to enable the other two cases.

if you have not done so, maybe you want to initiate a separate discussion?

I'm happy to start another issue if people think the conversation would be useful.

I do.

@aregm

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants