-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Protocol for distributed data #7
Comments
Thanks @fschlimb, this is quite interesting. I think there's significant interest in having a standardized distributed extension of After reading the protocol description, I think I understand the data shape/partitioning, but I'm less sure about the "locality" information. All it says is: What the data objects are is also a little tricky: For dataframes there is some overlap with the chunking concept in I'd be curious to see what maintainers of libraries with distributed data structures think - @jakirkham, @devin-petersohn, @shwina any thoughts? |
Thanks @rgommers for your feedback.
Yes, that's not only a good questions but also a hard issue to solve. Note that there is a paragraph below which gives more details. We started by not considering cross-library (e.g. cross ray and dask for example) exchange but we are now seeing that this will most likely be needed. Some of our prototypes work in such an heterogenous environment. To make this officially supported we indeed require this to be more concrete. Interchange between dask and ray could most likely be addressed by requiring ip-addresses and disallowing dask's more flexible identifiers. However, interchange with MPI is more difficult since MPI requires more than an ip-address. For going ray/dask->MPI the ip-address should be sufficient. Going the other direction (MPI->ray/dask) is trickier. Any idea?
Yes, this can use some re-wording and specification (also in the detailed paragraph). Currently, the intend of this protocol is to support both, arrays as well as dataframes. I'd support the idea of requiring objects that either support the interface of the array object in the array API standard or the equivalent in the dataframe API standard.
Good point. Note, the protocol here can also describe objects that are partitioned on a single node. The name of the protocol here deliberately does not include the string 'distributed'. Maybe there is a way to bring this together?
Yes! Just in case anyone wants to read a large part of discussion which led to the proposal: read here |
I've been thinking about this and this feels like a huge can of worms. You'd ultimately want to handle every level in the memory hierarchy in this situation, so for CPUs you'd want something like:
For accelerators like GPUs you'd want to do similar but also handle the relevant stream / queue for the memory as well:
The device and stream / queue specification seem to overlap quite a bit with work done for the array interchange protocol with dlpack. Given the level of information down to a specific device stream / queue or NUMA node, you could presumably match that to the correlated MPI / SHMEM rank.
I think the challenge with this is that objects like dataframes are backed by multiple allocations where the dataframe could be partitioned by columns, rows, or both, and could live in different memory spaces. I think it would make more sense to have this protocol deal with memory and then have a distributed dataframe protocol built on top of it. |
Yes, one goal is definitely to minimize such overlap as much as possible.
@kkraus14 Not sure I fully understand. Is this related to the above observation that |
I disagree. If I run something with a single process per socket or NUMA node or GPU then I need to know which chunk belongs to which process and I need more information that just an IP address in order to accomplish that. I.E. imagine I have a box with 4 sockets and I run something with 4 MPI ranks (each one pinned to a socket and its local memory) and within each rank I spin up a dask worker. All 4 dask workers and MPI ranks would have the same IP address, but I'd still want to align the workers and ranks.
I think the problem with this is that what protocols are / aren't supported via |
Yes, that's why the spec defines that for MPI(-likes) the location must be the rank. That's an inherent characteristic of MPI and that's why that information must be in the protocol. And yes, going MPI to dask is trickier than the other way around. In your example the MPI process and the dask worker process will be distinct. In your example, why is the MPI rank not enough?
Sounds interesting, Could you give an example of what you have in mind? I am not sure I see how you could be that generic. |
Not necessarily, they could be in the same process. Lets remove MPI and replace it with something like PySpark or Presto instead. There's no MPI in the mix so there's no rank and it's likely that there's multiple workers running on a single node. There's more and more distributed systems nowadays that are not MPI/SHMEM based and cannot rely on using a rank to indicate a worker. If everything was MPI / SHMEM based than in theory we could use the rank, but unfortunately that isn't the case and for making a general protocol we should capture the general case.
Let's take a stab at it. Let's say I have two distributed libraries which I want to connect and neither depend on MPI / SHMEM:
Given a cluster with 3 nodes, each with 2 sockets that have 2 NUMA nodes and 4 GPUs (1 GPU per NUMA node). The deployment would look like:
Now say that libdf has produced a distributed dataframe that we wish to hand to libml. The distributed dataframe can look like the following:
Now say we want use the
Now you could imagine using Column objects instead of DataFrame objects as the Now if you had something like
This way you get all of the information down to the memory buffers and enables the largest amount of flexibility for distributed frameworks to interchange between one another. It also allows for always getting down to the underlying memory of the objects that are distributed. In practice, I think this could be supported given the current definition of On the other hand, it may not be reasonable to require being able to get down to memory for all objects and could act as a barrier to entry to adopting a protocol like this. |
For better clarity some background: Ideally we want this be as general as possible. It turned out to be not that simple to achieve generality so we approached it by starting with concrete examples like MPI, ray, dask. It's exciting to getting new input and to tackle the next step towards greater generality.
Agree. The generalized concept could be the process id. I had been thinking about replacing
Yes and no. The handle in the
Would the above new
Interesting. This could also be viewed as 'specializations' of Notice: We did not intend this spec to be viewed as a vehicle to send buffers. It's a structure only to describe data partitions and their locality. It's meant to avoid (and not to foster) messaging.
😉 There is definitely a tradeoff to make. I am not sure I see the need to be more generic than what dataframes and arrays require. What's nice about one common definition is that it allows us to have one spec which can be used in both worlds and even be used cross df/array (as long as |
I don't think a process ID is granular enough. I'm also generally super weary of optional pieces to a protocol, because people then rely on those optional pieces instead of doing things the more general way that allows wider usage. In the case of providing an MPI Rank, I believe that needing it is an indication that the non-optional location pieces are underdefined. I.E. you could imagine two separate MPI based applications that are deployed separately and their ranks don't actually line up. You'd then need to fall back to location information and you don't have enough info to align them properly. Additionally, say you have a distributed system who uses a single process per node, and within that process manages objects across multiple GPUs and CPUs. Now say you want to consume some object from a different distributed systems that has a process per GPU, there's no easy way to align things properly. This is somewhat handled in the array interchange protocol via
What is defined as
I guess this means that crossing process boundaries is out of scope? Is there a problem in practice if we're not crossing process boundaries? In my view having the
I'm trying to talk myself out of boiling the ocean here 😄, but given frameworks like Dask, Ray, etc. let you have distributed collections of arbitrary Python objects, you could imagine wanting to exchange them. That being said, I think it would be reasonable to require those objects be serialized into something that could be sent over the wire anyway, which effectively makes them array-like. |
@kkraus14 As for As for providing a process ID, why is that not a granular enough. Can you elaborate? |
It requires domain knowledge about the producing distributed framework from the consumer. There's no easy way for me to know how the producing distributed framework is deployed which could inform where the object is located relative to GPU or NUMA node or any other locality-relevant information. I.E. if I have Dask as a producer and Ray as a consumer, I can't guarantee that they are deployed in the same way and target the same resources and as a consumer I in theory only have information about my framework. |
Sorry for the late reply, I was out.
@kkraus14 I suspect there is still some disconnect. As mentioned earlier, the initial proposal does not intend to facilitate or even provide means for transferring data, it only provides a description of the current state, e.g. the partitioning and location of the data. If needed, it's up to the consumer to move data. Said that, maybe what you're pointing at is that the consumer might not run "within" one of the given processes and so have no access to the data. Is this what your concern is about? If it is, then I think we have 2 cases:
There is definitely a need to properly orchestrate and launch an application using various frameworks. I'd argue that that's a different issue which should not be solved (and probably cannot) with such a protocol. |
Good point. I guess the rank-part was meant to be a short-cut, similar to 'local' specifically for SPMD.
I think even ' (node, ip)' provides sufficient information for your example. There are other challenges which make it super hard, not matter what. Most prominently, it's not possible in MPI to let the consumer get data from an independent MPI app without active action of the producer. If
That's interesting. We were not considering totally independent systems. Maybe we should, but my immediate reaction is that in such cases zero-copy is not possible anyway and some kind of messaging is required. I think the problem is not a lack of information in the protocol but rather a difficult setup to deal with ("no easy way" as you said). I still believe that this protocol should mostly describe the state and not provide messaging capabilities. I was think about a utility package which consumes and produces object implementing this protocol. It could provide re-partitioning, data-transfer and other things for various frameworks. It could also deal with supporting such independent cases.
I see. In your scenario the consuming system assigns GPUs to processes/workers and you would like to schedule on the corresponding process. The current proposal requires 2 calls/tasks to get this done (one to get the device id from
Definitely, the objects/containers need to be serializable (e.g. pickle'able). And yes, we should not attempt to boil the ocean 😉. |
Ray allows to pass a list of
Yes, that is true.
As for data re-partitioning, I think such a functionality should by provided by a framework supplying a data container, and data-transfer and other things might be provided by such a utility package.
I have been thinking on |
Good point. We can clarify that intent of
Re-partitioning is a core feature. Requiring consumers to understand the producer's package contradicts the major goal of the protocol. A utility package is free to implement things in any way it wants - including using modin or heat or whatnot.
Sounds like we agree here! |
Yes, that would be fine.
Such a utility package likely sounds more reasonable. |
cc: @magnatelee @lightsighter for vis (I mentioned this discussion yesterday) |
@leofang There are two major issues with this protocol:
The distributed Legate Store protocol that we use to exchange data between Legate libraries has answers for all of those questions. Unless the protocol you are proposing here adapts to provide for such scenarios, I don't foresee cuNumeric ever supporting it. We'll happily ingest data from other libraries that want to pass us data in this format, but we're not going to condone sub-optimal data exchange practices that compromise performance on accelerated hardware. As an aside: the idea of having a unified global view of an entire partition in the form of tiles is already naive when it comes to building distributed systems. As soon as you have to have meta-data from every node existing on every other node then you are just inviting scalability bottlenecks. |
@lightsighter Thanks for your feedback, very much appreciated! We are open not only to changes/improvements to this initial proposal but also to other approaches. Let's see if/how we can come up with something that can also work with legate. A quick search for "distributed Legate Store protocol" didn't yield anything consumable. @lightsighter, could you point me to a description of it? For clarification: this protocol is meant to help exchanging distributed data between entirely independent projects: at given, point in time data needs to be passed from package A to package B. This is not about how each package/system manages data internally. It is meant to describe the current partitioned state of the data so that the consumer can access it with as little copying as possible. No matter if the producer or consumer operate directly on physical tiles or not: the data must somehow be physically partitioned and distributed. That's what the protocol tries to describe. Similarly, this is not about prescribing whether a system has a global view or not. It is just a way to describe the current global view so others/consumers who do not understand the internals of the producer can efficiently access and use the data/memory. Yes, if there was Ray only, or Dask only, or only legion/legate, or if MPI was the only choice users have, then this protocol would not be needed. You're absolutely right that asynchronous exchange is helpful. Could you explain how the |
https://github.com/nv-legate/legate.core#what-is-the-legate-core
The fact that you even assume that there is a "partitioned state of the data" is design flaw in my opinion. In a good distributed system data should be both replicated and sharded to meet the needs of the application over time. Assuming that there is just one physical partition of the data is massively constraining to describing how data is distributed and how coherence is maintained.
I think my point is that you should never have a single global view of the data anywhere in the system. That leads to scalability and resilience issues. Control and data logic should be both distributed and replicated.
Read about how Legion tracks data dependences between tasks: https://legion.stanford.edu/pdfs/oopsla2013.pdf Section 5 provides a formal operational semantics for computing dependences and maintaining coherence of data between tasks in a distributed environment. You'll find it has very little similarity to anything you've referenced, although if you've done work in distributed databases it might look somewhat familiar. |
The current proposal allows replicated partitions, it does not support partially overlapping partitions, though. Is that what you're concerned about?
In the abstract I certainly do agree. However, this here is not meant to define the ideal approach for distributed compute systems. We are trying to bridge between existing systems. Over time we might see convergence and once legate has taken over the world 😉 this protocol will no longer be needed. Until then, we have to accept reality and focus on data.
We cannot assume that every system we want to support is using tasks. That's simply not the case. It'd be really great if we could find a protocol that defines a bridge also between legate and others. This requires that we accept that every system is different and that exchanging data between them can be less optimal than within the same system. I suggest grounding this discussion on this mindset and avoiding a debate over the best distributed system. So far I have heard nothing that would prevent legate from importing or exporting the protocol. Any proposal for an improved or different kind of protocol will be highly appreciated. |
No, I'm concerned that the API, as it stands, introduces inherent inefficiencies that ultimately prevent any implementations from getting to the speed-of-light execution on accelerated hardware. That's a fundamental flaw and therefore a non-starter for those of us that work at certain companies.
In my experience, asking different runtime systems to share resources is a recipe for poor code execution and disappointed users. The goal of the API shouldn't be to facilitate interactions between different runtimes. It should be to provide a common interface that different runtime systems can implement such that all libraries built on top of the interface be executed in accelerated way on different hardware platforms. Such an API should be highly productive for library developers, even if that adds complexity into the implementations of the runtimes that support it. Furthermore, such an API should contain no explicit mapping decisions about how code runs on the hardware so that runtimes can provide highly tuned and accelerated ways for that code to execute on different hardware platforms. A higher-level API will also enable more semantic information to be communicated from library developers down to the runtime systems, such that the runtimes can perform more sophisticated transformations and scheduling decisions when targeting heterogeneous hardware.
I never said that and I think its disingenuous of you to suggest that that is my motivation. To reiterate my point above, I believe such an API should be higher level such that it's easier for library developers to build libraries that can be run on any runtime, and second multiple such runtimes can provide an implementation so users can pick the best one for different platforms. Starting at such a low-level of abstraction as the current proposal does though immediately prevents any such architecture and severely restricts the design space of potential runtimes that can be built, ultimately leading to code that will execute well below the speed-of-light on future hardware.
When did I ever say anything about tasks being central to Legion's semantics? Tasks are just distinguished computations that the system uses to define the granularity of scheduling as they are in Dask and Ray. The important part (which is laid out in detail in the paper) is how dependence analysis is performed between the computations performed by different libraries. There's no discretization of the data in Legate/Legion like there is in the current API proposal. As we point out in the Legate Store documentation, tasks from different libraries can name completely arbitrary subsets of data and it is the runtime system's responsibility to compute dependences between tasks and maintain coherence of data. Allowing library developers to name arbitrary subsets of data for their computations gives them maximum flexibility to describe precisely the data they need to run any computation and be more productive as they don't have to worry about how data was partitioned by an earlier library that produced the input. Furthermore, there are no false dependences, as dependences only need to to be introduced when there are true data dependences.
Again, I think this is a waste of time. If you really cared about making it easy for library developers and users, you would work on designing an API that allows them to plug-and-play any runtime system underneath all the libraries, and not require user/library developers to figure out how to make multiple runtimes play nicely together. The Legate Store API has this property, there's nothing Legate specific about it. It should even be able to be implemented using the Dask or Ray runtimes. I'll even commit to the Legate project abandoning the Legate Store API if you propose a higher-level API with a data model that's at least as high-level as the one in the Legate Store API (feel free to go even higher-level if you want, we won't complain).
Go back and look at the Legate Store API. There's no way to get access to raw pointers in Legate unless you are actually inside of the execution of a leaf task in the tree of tasks. So your API as it currently stands is fundamentally a non-starter. We exchange views of data between libraries in Legate which are logical descriptions of arrays/dataframes/sets/etc. These views support sparsity and aliasing, literally anything a library developer could ever want to do. I'd encourage you to raise the level of abstraction of your API so that users can plug-and-play any runtime under it and library developers don't have to worry about interoperability between different runtimes. Starting at the level of tiles, pointers, and memory layouts makes way too many assumptions about the implementation of the runtime for Legate to ever be able to support it. As aside, I'll point out that standardizing on a higher-level data model API will also have the benefit of rendering most of the arguments in this thread so far moot, as it will enable runtimes to implement things like data layout, data movement, and synchronization however they want. There's no need for the runtimes to agree, because users and library developers will just be using a single runtime at a time and will never need to understand the details of what the runtimes are doing internally. |
I did not want to suggest any motivation. I am sorry that my comment conveyed such an impression. No need for insult, though. Let's pursue the direction you're proposing. I have a few clarifying questions:
|
Yes, they have it a little bit easier because they can make clients side-load things like synchronization primitives in type-unsafe ways because they assume shared-memory-only execution.
That is a step in the right direction. Those two conditions are necessary but not sufficient for fully decoupling the specification of computation from how it is ultimately executed.
We require that in the LegateStore API because we believe that is a necessary feature for performance debugging by library and application developers. Unnecessary dependences (especially non-deterministic ones) create mystical performance effects that can be impossible to reason about and make it nigh impossible to for users/library developers to tune their code. That said, I would be willing to accept a community standard that did not include this restriction. It's a performance guarantee, but one that is not strictly necessary for correctness.
I strongly disagree with this. Each MPI implementation has many scheduling heuristics. Some are more latency sensitive while others optimize more throughput. Here's an example of some of the scheduling trade-offs that OpenMPI makes in their implementation. https://www.open-mpi.org/papers/euro-pvmmpi-2006-hpc-protocols/euro-pvmmpi-2006-hpc-protocols.pdf Each MPI implementation has all sorts of scheduling heuristics in their runtime for trading off message latency vs throughput of messages and tag matching.
Python programs should always have sequential semantics. Anything employing explicit external communication between processes should be completely beyond the scope of the standard and the responsibility of the user to manage. The Python language standard has nothing to say on this front and therefore neither should we. Nowhere should the number of processes show up in the standard. Just because we're designing the responsibilities of an interface that should support distributed execution doesn't mean that distributed execution should explicitly show up in the interface.
What was the justification for this? I don't see how anything else could be a good idea. Consider a historical example: the development of the MPI standard. Many many years ago (late 1980s, early 1990s) there were dozens of different libraries that provided for communication between processes on supercomputers. Some of them were low-level and required users to manage endpoints, completion queues, and control-flow credits explicitly, while others were high-level and not that different than what ultimately became the MPI standard. The MPI community faced the same decision we now face: do you provide for interoperability between these different runtimes at the least-common denominator of abstraction, or do you standardize on a high-level interface that abstracts over low-level details and allows for a common interface on which to build composable software. They wisely opted for the later choice. This had the following two positive effects on the HPC ecosystem:
All this happened because the HPC community recognized that runtime systems almost never interoperate well and attempting to do so offloads tons of functional responsibility onto library developers and users. A high-level standard leads to a healthier ecosystem for everyone in the long-run. |
I disagree with your analogy, but I'd expect that your high-level thoughts mostly meet consent. At least I am mostly with you. However, we are getting lost in the abstract with only little progress towards a practical proposal. @lightsighter It would be very helpful if you could (here) sketch out an API/protocol that does what you have in mind. It'd be more targeted if we had another proposal providing an independent and different perspective. |
This is exactly what the Legate Store and Operation APIs are. They are our version of what you are proposing. We are not tied to naming convention or syntax in any way, so please ignore that as we are completely flexible on those fronts. The crucial components are the data and execution models. These are both designed to be completely general and independent from any details of the Legate implementation and we believe are high-level enough to abstract over any of the possible runtimes involved in these discussions such that they could each provide their own implementation.
What parts do you disagree with? |
This is a very interesting discussion, thanks all!
@lightsighter I'm curious about how well this maps to Dask, or if anyone has tried that yet. Given that RAPIDS relies heavily on Dask and y'all are @ NVIDIA, have you talked to the RAPIDS team about this already? |
@quasiben would be the best person to answer that. |
Dask assumes a partitioned data set which, as @lightsighter has noted, is not an assumption for Legate. We've briefly discussed a concurrent futures API for Dask which maybe legate could also leverage but the assumption of the two Something missing from this proposal is how spilled data may be handled. @kkraus14 noted several examples of the complexity when distributed libraries leverage hardware optimizations with NUMA pinning and such -- things may be more complicated still when we also have to fetch data from disk as this is also subject to hardware/process/network aware fetching |
The question of how well the interface maps to any given runtime should be at most a secondary constraint. The primary goal of the API should be to provide an interface for users and library developers to build composable software in the easiest way possible. That is the ethos of Python: high productivity. I believe we should be doing everything in our power to make users' and library developers' lives as simple as possible, even in distributed execution environments. After that we can ask the question: can mappings to different runtimes exist and provide efficient execution. Our primary constituency should always be the users first though and not ourselves. That being said, I can see a multitude of different mappings for the Legate Store API down on to Dask that make various tradeoffs in terms of precision of dependence analysis and data coherence.
It's not that it's not an assumption for the Legate interface. The Legate interface has no opinion on how you implement it. You can build an implementation of the Legate Store API that leverages partitioned data sets. There's absolutely no reason that you can't do that. Just because we don't do that in our implementation doesn't mean you can't do it in yours. This is a perfect example of how the Legate interface is considerably less opinionated than the current proposed API which prescribes implementation details like this and offloads the management of those details onto users and library developers.
Why does that need to show up in the interface? Why isn't that the responsibility of each implementation? Users and library developers are poorly positioned to be able to deal with such things as spilling because they have incomplete knowledge of what other parts of the program are doing. Only the runtime has the visibility needed to be able to answer such questions. Furthermore, if we try to expose such details through the API, then we all have to agree an interface and semantics for spilling that meshes with each runtime's execution model, the intersection of which I'm almost certain is the null set. |
Incorporated suggestions/feedback from data-apis/consortium-feedback#7
With my latest commit I tried to incorporate suggestions and feedback from @kkraus14 , @YarShev and @lightsighter. The most important changes are the following:
As an alternative solution a functional approach could be considered. It could allow requesting a 'slice' of the global data from any location. The downside of it is that it can potentially imply unnecessary data movement in situations where several partitionings are acceptable for the consumer. For example: element-wise operations work equally well on any data distribution as long as its balanced, it does not matter if the partitions are chunks of rows, columns or blocks. A way out would allow querying the "current state" (a simplified What do others think? With respect to the more fundamental approach suggested by @lightsighter: It seems to be a harder and longer-term endeavor and probably more difficult to find consensus. @lightsighter, if you have not done so, maybe you want to initiate a separate discussion? In the meanwhile, I believe we have to accept that different systems exist and a protocol which allows low-overhead data interchange at defined points can serve us well (even if not ideally). |
Does this imply that if
It seems to me this should lay on a consumer of the protocol because as you said it can potentially imply unnecessary data movement in situations where several partitionings are acceptable for the consumer. |
'data' is supposed to be a an array (or DF) or a future/handle thereof. Such types always (in Python) have a host structure even if the raw data lives on the device. I do not see why a future would not allow device data. The main reason for having the device-id here (and not rely solely on the data-structure with its dlpack) is to allow (remote) scheduling without even touching the data item. To me, handles/futures and data-location are independent aspects, all combinations are possible. |
That makes sense! I just wanted to make sure the protocol allows to have the device-id for a reference/future. Aside: What do you mean by "a host structure"? |
There is currently no way that the entire Python structure (like a numpy array or a modin/pandas DataFrame) can live entirely on the device (other than host/cpu devices of course). The device pointer (or some other identifier) points to the raw data on the device but a proxy pointer exists on the host; many other things live on the host, like the Python object itself. |
@fschlimb thanks for iterating on this! One comment:
If the object is forced to conform to Also, in addition to the device descriptor of CPU vs GPU vs something else, I think we need a |
Good point. Any idea how this could look like? I would like to avoid having yet another way of referring to various devices. I also liked your earlier comment that maybe we need a specialization of this protocol for DFs (which could add later but already prepare for it).
Absolutely, yes, that is missing in the description. I'll add it. Such an id is actually already used in one of the examples. |
Agreed that we'll likely ultimately want a specialization of this protocol for DataFrames. I think what I'm seeing is that the protocol is already specialized today for things that are somewhat array-like. I.E. say I wanted to use this protocol for something like graph objects which don't have a traditional n-dimensional shape, or even just general opaque objects, it's not straightforward to do so today without essentially ignoring some of the attributes. Along these lines, maybe a more generalized protocol looks something like:
And move these to a more specialized array-like distributed protocol. What this would leave is something that looks like:
This seems general enough that the objects for the |
@kkraus14, what would |
Why do you think that? The point of my proposal is so that we don't have to find consensus between library developers, the vast majority of whom are not represented in this conversation despite the fact that they will be the primary users of such an API. You're proposing an API that will burden them with very challenging responsibilities when they develop their libraries. I'm arguing that we should do everything we can to give them the easiest possible thing to use such that we don't unnecessarily burden them because there are way more of them then there are of us.
This isn't about us. It's about the people who are going to end up building libraries on top of this interface. We need to stop thinking about how to make our lives easier and do the right thing for the large and silent majority absent from this conversation. To not do so would risk losing what is great about the Python ecosystem: trivial interoperability between libraries without unnecessary library developer overhead. If library developers have to think about things like where existing data is currently located and how it is partitioned in order to make things work, we're going to end up with a bunch of broken code that doesn't live up to the Python ethos. Library developers are not us; they're not going to jump through the hoops to make things work that we might be willing to do, and users will never use libraries that don't trivially interoperate with reasonable performance.
I'm happy to start another issue if people think the conversation would be useful. |
I am not sure I understand who you think "we" are how "we" differ from library developers. In most cases library developers use of-the-shelf containers, such as a data-API conforming array or DataFrame implementation. The only burden on lib developers which comes from this protocol - if any - is to check if the input is a As a developer of a distributed container, supporting this protocol is also not very challenging. In particular if it works with a smart runtime like legion the work needed is almost trivial. Even less advanced frameworks will have basic features to import a pre-distributed data structure and eventually even re-distributed as/if needed.
Please don't hesitate to invite related projects/people to this conversation.
See above. The "burden" is on the container developer, not the library developer. The library developer choses the execution layer (legion, dask, mpi, ray, ...) and the containers (array, DF,...). The protocol here does not change that. It only allows zero-copy data interchange between conforming objects/packages. It's up to these lower-level frameworks to make sure they interoperate or not. Without, using a "foreign" container on a library will currently either not work, work incorrectly or be inefficient. The protocol will help to build a one automatic path for all the "inefficient" routes (and potentially make them better) and provide an easy way to enable the other two cases.
I do. |
In order for an ndarray/dataframe system to interact with a variety of frameworks in a distributed environment (such as clusters of workstations) a stable description of the distribution characteristics is needed.
The __partitioned__ protocol accomplishes this by defining a structure which provides the necessary meta-information. Implementations show that it allows data exchange on between different distributed frameworks without unnecessary data transmission or even copies.
The structure defines how the data is partitioned, where it is located and provides a function to access the local data. It does not define or provide any means of communication, messaging and/or resource management. It merely describes the current distribution state.
In a way, this is similar to the meta-data which
dlpack
provides for exchanging data within a single node (including the local GPU) - but it does it for data which lives on more than on process/node. It complements mechanism for intra-node exchange, such asdlpack
,__array_interface__
and alike.The current lack of such a structure typically leads to one of the following scenarios when connecting different frameworks:
This PR lets xgboost_ray automatically work with any new container that supports
__partitioned__
(the extra code for modin DF and Ray/MLDataSet are no longer needed once they support it, too)The implementation here avoids that by entirely hiding the distribution features but still allowing zero-copy data exchange between
__partitioned__
(exemplified by modin (PR) and HeAT (PR)) and scikit-learn-intelex/daal4py.A structure like
__partitioned__
enables distributed data exchange between various frameworks which can be seamless to users while avoiding unnecessary data transfer.The __partitioned__ protocol is an initial proposal, any feedback from this consortium will be highly appreciated. We would like to build a neutral, open governance that continues to oversee the evolution of the spec. For example, if some version of it receives positive response from the community we would be more than happy to donate the __partitioned__ protocol to the data-api consortium or host it in a clean github org.
The text was updated successfully, but these errors were encountered: