-
Notifications
You must be signed in to change notification settings - Fork 1
__partitioned__ protocol for partitioned and distributed data containers #3
Comments
@YarShev @SmirnovEgorRu @rlnx @shssf @diptorupd @oleksandr-pavlyk I'd like to get your feedback/thoughts on this. |
@PivovarA This should be of interest to your distributed sklearn work |
The first thing I would like to notice is that in the examples mapping The third thing that would be good to have is metadata on:
That could look as follows: __partitioned_interface__ = {
"shape": (2, 2),
"partitions": {
"partition_id0": {
"data": ObjectRef0,
"position": (0, 0),
"node_id": '1.1.1.1',
"metadata": {
"shape": (5, 5),
"index": None (or actual index in case it is possible, i.e. "partition_id0" is DataFrame),
"columns": None (or actual columns in case it is possible, i.e. "partition_id0" is DataFrame),
"dtypes": None (or actual dtypes in case it is possible, i.e. "partition_id0" is DataFrame),
},
"partition_id1": {
"data": ObjectRef1,
"position": (0, 1),
"node_id": '1.1.1.2',
"metadata": {
"shape": (5, 5),
"index": None (or actual index in case it is possible, i.e. "partition_id1" is DataFrame),
"columns": None (or actual columns in case it is possible, i.e. "partition_id1" is DataFrame),
"dtypes": None (or actual dtypes in case it is possible, i.e. "partition_id1" is DataFrame),
},
"partition_id2": {
"data": ObjectRef2,
"position": (1, 0),
"node_id": '1.1.1.3',
"metadata": {
"shape": (5, 5),
"index": None (or actual index in case it is possible, i.e. "partition_id2" is DataFrame),
"columns": None (or actual columns in case it is possible, i.e. "partition_id2" is DataFrame),
"dtypes": None (or actual dtypes in case it is possible, i.e. "partition_id2" is DataFrame),
},
"partition_id3": {
"data": ObjectRef3,
"position": (1, 1),
"node_id": '1.1.1.4',
"metadata": {
"shape": (5, 5),
"index": None (or actual index in case it is possible, i.e. "partition_id3" is DataFrame),
"columns": None (or actual columns in case it is possible, i.e. "partition_id3" is DataFrame),
"dtypes": None (or actual dtypes in case it is possible, i.e. "partition_id3" is DataFrame),
},
},
},
}
I am not sure of I understand that case. Where can we retrieve data for those positions from? I don't see they are presented in the interface somehow. |
@YarShev Thanks for your comments. I corrected the tuples and using With respect to the metadata: I think I do understand the motivation for including such information. However, the information given in your examples is not really about the partitioning. I think we should cleanly separate the partitioning interface from anything that is related to the underlying dat/data-structure itself. That's also why I suggested to not provide the global shape of the data, only the shape of the partition-grid. I do agree we should allow any given implementation (like modin) to add more data to the dictionary than what's specified in the protocol. I am just not convinced what you suggest should be a mandatory part of the protocol because a) it's a different kind of information and b) it is not generally useful for being used across different container types. About the SPMD question: The last example assumes two ranks. The 'partitions' entries are for rank 0. The comments show what would be present on rank 1. The protocol does not provide means to get partitions which live on other ranks. If one needs to get a partition from a different rank one needs to explicitly use any available communication infrastructure (like MPI). Does this make sense? |
@fschlimb , about the metadata I specified, saying about Modin, if we know that information, we can avoid any computation triggering on Modin side when creating a Modin DataFrame. Thus, it allows us to avoid any undesired data movement when running remote calls even though Ray and Dask try performing any calls as closer to data as possible. However, I agree with the points a) and b). Probably, we may want to generalize that information somehow.
I am not sure of About the SPMD, I see, thanks! Do we want to use futures/promises for SPMD-like frameworks like it is done in MPI4PY, for instance? |
If you have starting global index and size/shape of each local partition then you already have the ability to express non-regular grids. I don't think anything should be imposed through the API or convention that enforces regularity. |
The only correlation is that the tuples have the same length. The dimensionalities of the partition grid and the partitions themselves must be the same.
I see that this could be convenient. However, with currently available data-structures which support GPUs etc. this information is available already through the underlying data-structure. These data-structures must be able to handle this no matter what and without an understanding of the underlying structure the data will not be consumable anyway. Could you give an example where duplicating the information in here would significantly help? |
I did not have a specific case in mind for the device location. However, depending on where on the node (main memory, on GPU memory, on disk, etc.) the data resides will affect the latency to fetch data, and thus may influence how one accesses it (e.g. granularity of access). If the underlying data structures all provide this in a uniform way, then it does not need to be replicated here. However, if this information can only be accessed only at the remote node, then we may still want to replicate here so any node that needs to access remote data can optimize its access pattern. |
After reading over the conversation, as well as a discussion with @Markus-Goetz, we have a few thoughts. we both like this plan from a general perspective as we think that it is intuitive. We have a few questions and comments which I collected. I'm not going to quote everyone because it would take too long. Firstly, we think that the name Secondly, there seems to be some dissonance with who would be using the ambiguity of the term shapein the development of Heat, we found that although the local and global shapes can be determined, both are very useful during development. We think that calling it simply shape makes it easy to misunderstand which shape is meant in the code. We think that a top level global shape attribute (i.e. shape) is useful for both user and developer, but the local shape is very very useful for developers. Having the local for all the tiles means that communication can be planned and pre-determined. In the example below, there is the offset-tuple v. shape-tuplethe starting index of the data-block is very useful in functions like distribution and balancingThere was a mention of irregular distribution schemes. We would heavily warn against this. We think everything would be fine so long as the data partitions run through the entire dataset, i.e. the divisions occur on global indexes. The assumption that the data is regularly distributed amongst the available resources makes development much easier and faster. It also reduces the amount of data which needs to be sent over the comm network. grid keysIs there a set order for which was the grid is defined? C-order (clockwise) or F-order (counter clockwise)? materializeIn our opinion, it is best to make each of the tiles a class which defines has all of the attributes which we desire. This would make The current dtype/ptype, labelsIn the interest of creating comm buffers each tile/chunk should have its own dtype info in the 'partitions' dictionary. To receive data, a process should know how much data it will recieve. for that we need shape and dtype. Devices/dtypein out view, we should hope that the dtype and device should be set to be a global array attribute. however each tile whould have an attribute with the device and dtype info. For pd.DataFrames this would also let us to have the local column names at hand easily Pandas / arrays with differing datatypes or column namesif data is partitioned along columns, then all the partitions should know which column titles it has / metadata
|
Thanks so much for your comments, @coquelin77 @Markus-Goetz What about Yes, I think there is a broad agreement that this protocol is intended for library/package developers, not for "end-users". I think we need to be careful with overloading this interface. The goal is to provide meta-information related to the partitioning/distribution of the data, not to describe the global data fully within the interface. For example, I can see the argument that the global shape is directly related. However, the dtype is not directly related and can be easily extracted from the underlying data. I strongly suggest limiting this interface to a single property and not require more than The iterator for I am not sure I understand why C/Fortran grid order matters at this level. This is about Python and the dimensions of the grid correspond/map to the dimensions of the underlying structure (through start/shape of each partition). Can you explain why we need the partition order or how it could be useful? I am not sure we can address devices here in a meaningful way. There is a lot of fragmentation in this space and we probably better handle this as an independent issue. I suggest we start with referring developers to using |
i think
I agree that we should be careful to avoid overloading this interface. Our idea was to use
The idea was to handle the most common case that when you get locals, you probably also want to get the data from the processes.
this references how the tiling is determined in a general sense. It can be determined based of the starting indexes and the tile keys but i think it would be useful to know it from the start.
yes this is another issue. for one, if there are tiles which move between devices it will become difficult to communicate to the other processes. |
Are you referring to some comments that I made? I don't think the term "irregular" was used in this thread and that term could be defined in multiple ways. Let's say I have a 2x3 array and want to partition it 3 ways. Here are some various ways to do that. How would you label them?
In case #1, everything is partitioned across only one dimension. I think everyone would call that a "regular" distribution. As the number of dimensions or the size of dimensions grows, this principle can be extended such that all partitions have the same shape except maybe those at the maximum edges of the array, which may be smaller. In case #2, the first partition is across one dimension but then subsequent partitions are across the other dimension. To generalize this to more dimensions or larger dimensions, all the partitions are hyperrectangles but theoretically each hyperrectangle could have a different shape. I think you would get different answers here as to whether this is "regular" or not. An individual hyperrectangles is a pretty "regular" shape but are a set of hyperrectangles with different shapes "irregular?" What do you think? In case #3, we're saying that one partition on one node get disjoint sets of elements (that don't share an edge). I think everyone would call this irregular. There is a 4th option similar to #3 where we split the array into 6 partitions but still assign each partition to one of 3 nodes. In this case, each partition could be the same shape but more than 1 partition assigned to each node. I haven't heard anything in this thread that would prohibit that. I would still call that "regular", would you? We were talking about that just yesterday when talking to the nums-package folks about how a block-cyclic distribution (with multiple disjoint blocks assigned to the same node) like this is useful for load balancing in things like LU-decomposition. My position is that we should support #2 with the ability to assign multiple disjoint partitions to the same node. #2 can still represent what you could call a "more regular" distribution like #1 but not vice versa. My fear is that #1 is overly restrictive. A compromise position could be a flag to indicate whether all hyperrectangles are the same size except at the edges. |
I see. The proposal here suggests you add more information if you want. So if your implementation provides the dtype, that's absolutely fine. I am not sure if this should be a required field, though. What do others think?
I prefer to not add redundancies just because we think we know what the most common case is. I am not even sure I would agree to your assessment since I think in most cases you will require at least some of the other partition information as well.
I am confused. What do the numbers denote in this diagram? Following the proposal, you'd need to to have a 2-tuple for each partition, not a scalar.
I would make no assumptions other than that the underlying data-structure needs to provide the necessary information to access/use the data. Whether it's on GPU's or CPU's only or if it's a mixed CPU/GPU distribution and how to access it depends on the implementation. Without adding any new requirements we can push this to the consumer. There is no new requirement because even if the consumed data-structure is non-distributed the consumer will need to know how to handle the device business. |
@DrTodd13 i agree with everything that you said. My apologies for not more clearly defining irregular, I was already becoming concerned about the length of my comment. i think that option
when i was talking about the tile/chunk divisions crossing all tiles, i imagined something like this:
If i remember correctly, this should still allow for many load balancing techniques, but it has been some time since i looked over everything EDIT: the second distribution scheme puts no limits on which blocks are on which proesses |
@fschlimb for the as for the tiling in C vs F layout, i was thinking about these tiles numbering from 1 to 4 in partition-ids instead of as their coordinate-ids. I think this happened when i was thinking about the partitioning schemes to allow and my wires were crossed for the devices, it makes a lot of sense to leave the control up to the user/implementation. if a choice doesnt need to be made, then it might be best to leave the devices for the user to keep track of |
The example above is #2 and you could say it is less regular than the example below, which is #1. It still isn't clear to me if you want to allow or disallow the above example.
If we just consider the number of elements per partition, the more flexible arbitrary hyperrectangles can more evenly split the array in some cases. (Of course, #3 is what would be required to get a truly maximally balanced split but we don't want to go there and in practice I don't think that people would use the #4 approach to get that level of balancing as it is probably not worth it.) I guess the question is that some packages might natively want to split across only one certain dimension and then they decide they want to accept external arrays with partitioned that could come from a package with a different philosophy. So, what are we saying to packages that produce or consume this interface? I really hesitate to start dictating a one-size-fits-all partitioning scheme and at the same time I don't think a package should be forced to accept all possible partitioning schemes. As a package writer, it would be nice be able to quickly tell if a given partitioning scheme is one that I'm prepared to support (versus having to inspect the partitioning in detail). If we ere on the flexible side and let people support what they want to support then I think we may get convergence through time and the marketplace of ideas but again I hesitate to force convergence at this point. |
@DrTodd13 I fully agree your last comments. We should allow flexibility on both ends - we neither want to limit how an implementation (producer) partitions the data and we also cannot demand that a consumer must digest all possible partitionings. At the same time it is worth mentioning explicitly that simple partitionings will be easier to consume and most likely by more packages. I guess that brings back the idea (@YarShev) of providing a mechanism to request a certain partitioning and/or a library which can do so. I tend to prefer a re-partitioning library because a) the proposal here allows zero-copy on the consumer side and b) there is no point in asking every producer to implement re-partitioning strategies and c) it can easily grow over time. |
I think that the point about flexibility is very valid. Allowing hyperrectangles is reasonable for some application use cases. However, the examples which @DrTodd13 gave previously (specifically case 2) shows that the concept of coordinate-ids breaks down at this point. Alternatively, the tiles can be forced into a more regular grid, at which point, two or more tiles can sit on a single process. Notably, matching these tiles for networked communication, especially in SPMD-like processing will be challenging. However, this challenge can be pushed to the framework which uses the interface. This leads to the next thought: a semi-regular grid, similar to what I have drawn previously. In this scheme, the divisions between tiles would occur on global indices, however all tiles need not be the same size. Furthermore, the rows and columns can all be different from one another, they only must be globally defined, similar to a spreadsheet in Excel (for a 2D array). This is the first case for which coordinate-ids of the tiles can be used in all cases. As previously stated by @fschlimb and myself, the tiles on a process do not need to share an axis. Beyond this step, the constraints are too restrictive for what we seek. In my opinion, the last option is the best bet. It can allow for a form of hyperrectangles when data sits on a single process, coordinate-id referencing, and it can easily grow over time. I do not propose that we out-law free-form distribution schemes, however I believe that it should be a more specific option and not the default. as an example: to modify the previous cases shown by @DrTodd13 where instead of a 2x3 array it will be a 2x3 array of tiles:
All three of these examples would be allowed. I agree with @fschlimb that encapsulating partitioning into a library makes sense. Especially, when other packages should or could make use of it. What I find unclear is the future usage of this spec. There are two clear avenues: a) a first (perhaps future-feature-incomplete) software or b) a general-purpose standard to be release to the world. If a) is the case, I would go with a simple partitioning model that is easy to grasp, quick to implement and useful, i.e. covering most application use cases. This would mean that hyperrectangles on non-aligned grids would be disallowed but hyperrectangles using multiple tiles on a semi-regular grid would be allowed. If b) is the case, we would need to rethink tile coordinates and carefully evaluate whether a first implementation needs to provide this feature. |
Hi @coquelin77 I am sorry, but I am having trouble following your argument. You seem to suggest that you would like to express things, which the current proposal disallows. If that's a correct understanding, could you please provide a concrete example of what you are missing and maybe even a suggestion on how to address the gap? With respect to where this should go, we'd like this to be become some kind of a standard. Of course a standard is useful only when it's being used. So we need
In my mind good candidates for early prototypes of |
If we allow only the partitions like the examples @coquelin77 shows, or example 1 @DrTodd13 showed, then there is a lot of redundant information in the partition data structure. It would be far simpler to indicate the global range, list the split points for each axis, and have a simple convention of mapping tile id to the grid of tiles formed by the axis split points. Then, we would need to map tile ids to locations. |
As for the name of the protocol, I like
As for the I am not fully understand what I agree we should try implementing the protocol at least in Modin and DAAL4PY soon. A good candidate for seeing the protocol usage would be the PR related to distributed KMeans in DAAL4PY (link). |
@YarShev The Could you explain what you mean by
? |
@fschlimb , thanks!
Let's look at these examples: Dask from distributed.client import Client
c = Client()
c.scheduler_info()["workers"].keys()
dict_keys(['tcp://1.1.1.1:54547', 'tcp://1.1.1.2:54550', 'tcp://1.1.1.1:54553', 'tcp://1.1.1.2:54556'])
f = c.scatter(1, broadcast=True)
c.who_has(f)
{'int-58e78e1b34eb49a68c65b54815d1b158': ('tcp://1.1.1.1:54553',
'tcp://1.1.1.2:54550',
'tcp://1.1.1.2:54556',
'tcp://1.1.1.1:54547')} Ray import ray
ray.init()
ref = ray.put(1)
# When scheduling a remote call Ray can submit the task to a different node and copy `ref` to it if necessary.
# Then, the following call may show two nodes that own data.
ray.objects(ref.hex())
{'ObjectRef': 'ffffffffffffffffffffffff0100000001000000',
'Locations': ['5fdec4674c145fa35efc5df817c2fbb7c9eb0730', '5fdec4604c145fa15efc2df817c2fbb7c9eb1732']}
# Each value of the `Locations` field matches respective node IP address. That's why we should provide a list of IPs/hostnames/ranks (not a string) for the |
I created a new branch and put the above spec into a new file which will allow tracking changes to the spec: https://github.com/IntelPython/DPPY-Spec/blob/draft/partitioned/Partitioned.md Some more experiments made apparent that the above discussed callable to resolve a future/handle is needed. I added it the the spec (https://github.com/IntelPython/DPPY-Spec/blob/draft/partitioned/Partitioned.md). Also, I think we should probably get rid of the partition-grid idea altogether. As discussed with @pspillai, @DrTodd13, @coquelin77 and @Markus-Goetz it doesn't seem to add anything and would rather limit applicability. Additionally, it seems more practically useful to map locations to partition-lists - and not positions to partitions. Thoughts? Objections? |
Are you saying about getting rid of 'shape' field that is next to 'partitions'?
If we want to unify implementations across different execution backends (Ray, Dask, MPI), we should provide uniform objects for both futures and locations. Consumers themselves can map map list of partitions to specific location.
That callable should be kind of generic to be able to materialize list of futures but not one. |
Maybe, yes, maybe no. I tend to agree to @coquelin77 and @Markus-Goetz that a regular grid-shape is most likely all we need. If we can agree on that, we should keep the grid-shape. Changing the dict to mapping locations to list of partitions I think is a good idea in either case.
Not sure I understand your last comment, Could you elaborate? In any case, I like uniform objects, but for this we should not define new data types/structures. I think we need to describe distribution etc. in a uniform way, but not invent new types or alike. I think of this as a protocol, not a library/package.
Are you suggesting this to allow parallel execution? |
I am okay with this.
Don't we lose an original positioning of the partitions in the grid in that case? That might be useful for consumers if they are more concerned about the positioning rather than mapping locations to list of partitions.
Something similar to ray.get(list_of_futures) and dask_client.gather(list_of_futures). |
I agree this must be possible and I don't think we lose the information. We provide
Agree. That's what I also had in mind. If the consumer cannot deal with the handle nor with the result of
I am trying to understand why you suggest this. A list comprehension on the user side is easy to write: |
Mapping partitions per rank/location seems to me as well as the common case but I am not sure how the protocol could look like in that case. Can you give an example of that? Since the protocol is going to provide exactly the same underlying partition structure (without any repartition) it seems to me that for now information provided by the protocol looks more natural because consumers can see the partitioning at once.
Passing a future instead of a list of futures to .get(...) may dramatically differ. Ray import ray
ray.init()
@ray.remote
def foo():
from time import sleep
sleep(10)
return 1
%%time
ray.get([foo.remote() for _ in range(10)])
Wall time: 20.3 s
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
%%time
[ray.get(foo.remote()) for _ in range(10)]
Wall time: 1min 40s
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1] Dask from distributed.client import Client
c = Client()
def foo():
from time import sleep
sleep(10)
return 1
%%time
c.gather([c.submit(foo) for _ in range(10)])
Wall time: 10 s
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
%%time
[c.gather(c.submit(foo)) for _ in range(10)]
Wall time: 30.1 s
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1] So we should provide more general API to be able to pass multiple futures. |
The current state of the specification can be found here: https://github.com/IntelPython/DPPY-Spec/blob/draft/partitioned/Partitioned.md
The text was updated successfully, but these errors were encountered: