Skip to content
This repository has been archived by the owner on Aug 19, 2024. It is now read-only.

__partitioned__ protocol for partitioned and distributed data containers #3

Open
fschlimb opened this issue Mar 25, 2021 · 52 comments
Open

Comments

@fschlimb
Copy link

fschlimb commented Mar 25, 2021

The current state of the specification can be found here: https://github.com/IntelPython/DPPY-Spec/blob/draft/partitioned/Partitioned.md

@fschlimb
Copy link
Author

@YarShev @SmirnovEgorRu @rlnx @shssf @diptorupd @oleksandr-pavlyk I'd like to get your feedback/thoughts on this.

@fschlimb
Copy link
Author

@PivovarA This should be of interest to your distributed sklearn work

@fschlimb
Copy link
Author

@adarshyoga

@YarShev
Copy link

YarShev commented Mar 29, 2021

The first thing I would like to notice is that in the examples mapping shape to list is used instead of shape to tuple. The second is that references to data for Dask backend should be dask.distributed.Future, not dask.Delayed. Please, fix these.

The third thing that would be good to have is metadata on:

  1. partitions themselves (in case partitions are DataFrames it would be good to know their index, columns, dtypes)
  2. shape of partitions themselves

That could look as follows:
2d-structure, 2d-partition-grid, 4 partitions,partitions are of type ray.ObjectRef that refer to pandas DataFrame with shape (5, 5), ray

__partitioned_interface__ = {
    "shape": (2, 2),
    "partitions": {
        "partition_id0": {
            "data": ObjectRef0,
            "position": (0, 0),
            "node_id": '1.1.1.1',
            "metadata": {
                "shape": (5, 5),
                "index": None (or actual index in case it is possible, i.e. "partition_id0" is DataFrame),
                "columns": None (or actual columns in case it is possible, i.e. "partition_id0" is DataFrame),
                "dtypes": None (or actual dtypes in case it is possible, i.e. "partition_id0" is DataFrame),
            },
        "partition_id1": {
            "data": ObjectRef1,
            "position": (0, 1),
            "node_id": '1.1.1.2',
            "metadata": {
                "shape": (5, 5),
                "index": None (or actual index in case it is possible, i.e. "partition_id1" is DataFrame),
                "columns": None (or actual columns in case it is possible, i.e. "partition_id1" is DataFrame),
                "dtypes": None (or actual dtypes in case it is possible, i.e. "partition_id1" is DataFrame),
            },
        "partition_id2": {
            "data": ObjectRef2,
            "position": (1, 0),
            "node_id": '1.1.1.3',
            "metadata": {
                "shape": (5, 5),
                "index": None (or actual index in case it is possible, i.e. "partition_id2" is DataFrame),
                "columns": None (or actual columns in case it is possible, i.e. "partition_id2" is DataFrame),
                "dtypes": None (or actual dtypes in case it is possible, i.e. "partition_id2" is DataFrame),
            },
        "partition_id3": {
            "data": ObjectRef3,
            "position": (1, 1),
            "node_id": '1.1.1.4',
            "metadata": {
                "shape": (5, 5),
                "index": None (or actual index in case it is possible, i.e. "partition_id3" is DataFrame),
                "columns": None (or actual columns in case it is possible, i.e. "partition_id3" is DataFrame),
                "dtypes": None (or actual dtypes in case it is possible, i.e. "partition_id3" is DataFrame),
            },
        },
    },
}
  • For SPMD-MPI-like backends: partitions which are not locally available may be None. This is the recommended behavior unless the underlying backend supports references such as promises to avoid unnecessary data movement.

I am not sure of I understand that case. Where can we retrieve data for those positions from? I don't see they are presented in the interface somehow.

@fschlimb
Copy link
Author

@YarShev Thanks for your comments. I corrected the tuples and using dask.Future.

With respect to the metadata: I think I do understand the motivation for including such information. However, the information given in your examples is not really about the partitioning. I think we should cleanly separate the partitioning interface from anything that is related to the underlying dat/data-structure itself. That's also why I suggested to not provide the global shape of the data, only the shape of the partition-grid.

I do agree we should allow any given implementation (like modin) to add more data to the dictionary than what's specified in the protocol. I am just not convinced what you suggest should be a mandatory part of the protocol because a) it's a different kind of information and b) it is not generally useful for being used across different container types.

About the SPMD question: The last example assumes two ranks. The 'partitions' entries are for rank 0. The comments show what would be present on rank 1. The protocol does not provide means to get partitions which live on other ranks. If one needs to get a partition from a different rank one needs to explicitly use any available communication infrastructure (like MPI). Does this make sense?

@YarShev
Copy link

YarShev commented Apr 14, 2021

@fschlimb , about the metadata I specified, saying about Modin, if we know that information, we can avoid any computation triggering on Modin side when creating a Modin DataFrame. Thus, it allows us to avoid any undesired data movement when running remote calls even though Ray and Dask try performing any calls as closer to data as possible. However, I agree with the points a) and b). Probably, we may want to generalize that information somehow.

"metadata": {
    "shape": (5, 5),
    "dimensions": {
        "index/height/or just 0": values of dim0,
        "columns/width/or just 1": values of dim1,
    },
},

I am not sure of dtypes but, probably, we can get rid of it because dtypes doesn't affect triggering computations on Modin side when creating a Modin DataFrame. But if we then call dtypes (like df.dtypes), this will run remote calls to compute actual dtypes.

About the SPMD, I see, thanks! Do we want to use futures/promises for SPMD-like frameworks like it is done in MPI4PY, for instance?

@DrTodd13
Copy link

If you have starting global index and size/shape of each local partition then you already have the ability to express non-regular grids. I don't think anything should be imposed through the API or convention that enforces regularity.

@fschlimb
Copy link
Author

@pspillai

Actually, is there any requirement that the index range of a particular part has any correlation with its id tuple?

The only correlation is that the tuples have the same length. The dimensionalities of the partition grid and the partitions themselves must be the same.

Finally, should location also support additional possibilities, like "on GPU", "on disk", "in NVM", etc.?

I see that this could be convenient. However, with currently available data-structures which support GPUs etc. this information is available already through the underlying data-structure. These data-structures must be able to handle this no matter what and without an understanding of the underlying structure the data will not be consumable anyway. Could you give an example where duplicating the information in here would significantly help?

@pspillai
Copy link

I did not have a specific case in mind for the device location. However, depending on where on the node (main memory, on GPU memory, on disk, etc.) the data resides will affect the latency to fetch data, and thus may influence how one accesses it (e.g. granularity of access). If the underlying data structures all provide this in a uniform way, then it does not need to be replicated here. However, if this information can only be accessed only at the remote node, then we may still want to replicate here so any node that needs to access remote data can optimize its access pattern.

@coquelin77
Copy link

After reading over the conversation, as well as a discussion with @Markus-Goetz, we have a few thoughts. we both like this plan from a general perspective as we think that it is intuitive. We have a few questions and comments which I collected. I'm not going to quote everyone because it would take too long.

Firstly, we think that the name __partitioned_interface__ is a bit too long and cumbersome. Some alternatives could be __dist_plan__ or __parts__ or __portions__ or __chunks__ or __chunk_map__ etc.

Secondly, there seems to be some dissonance with who would be using the __partitioned_interface__. We believe that it should be something intended for developers and not so much for users. We think that once a user moves to this level, they can read the docstring for it and learn how things are used there. This does have certain implications which I will get into later.

ambiguity of the term shape

in the development of Heat, we found that although the local and global shapes can be determined, both are very useful during development. We think that calling it simply shape makes it easy to misunderstand which shape is meant in the code. We think that a top level global shape attribute (i.e. shape) is useful for both user and developer, but the local shape is very very useful for developers. Having the local for all the tiles means that communication can be planned and pre-determined.

In the example below, there is the shape attribute of the __partition_interface__ which corresponds to the global shape and each tile has its own shape attribute as well. This allows for each process to know how much data it will receive a tile that is not local.

offset-tuple v. shape-tuple

the starting index of the data-block is very useful in functions like __getitem__ and __setitem__, as well as in the backend of other functions. also, it does not greatly cluter the dictionary

distribution and balancing

There was a mention of irregular distribution schemes. We would heavily warn against this. We think everything would be fine so long as the data partitions run through the entire dataset, i.e. the divisions occur on global indexes.

The assumption that the data is regularly distributed amongst the available resources makes development much easier and faster. It also reduces the amount of data which needs to be sent over the comm network.

grid keys

Is there a set order for which was the grid is defined? C-order (clockwise) or F-order (counter clockwise)?

materialize

In our opinion, it is best to make each of the tiles a class which defines has all of the attributes which we desire. This would make __getitem__ something which we can set for many different packages. This is not show in the example as it would become unweildy.

The current dict format has a downside that it the calls to it can get large and it can make the code less clear.

dtype/ptype, labels

In the interest of creating comm buffers each tile/chunk should have its own dtype info in the 'partitions' dictionary. To receive data, a process should know how much data it will recieve. for that we need shape and dtype.

Devices/dtype

in out view, we should hope that the dtype and device should be set to be a global array attribute. however each tile whould have an attribute with the device and dtype info. For pd.DataFrames this would also let us to have the local column names at hand easily

Pandas / arrays with differing datatypes or column names

if data is partitioned along columns, then all the partitions should know which column titles it has / metadata

locals

maybe the locals parameter should return an iterator over the local tile/chunk keys and data for simplity.


I compiled these into an example below. I took the "2d-structure (64 elements), 1d-partition-grid, 4 partitions on 2 ranks, row-block-cyclic distribution, ..." example as a guide but the others would look similar

Example

__dtype__ = dppy.dtype  # possibly a mirror of dpnp or np or ...      or    dict -> pd.DataFrame column_names/dtypes
__device__ = dppy.device  # if all on same device
__partitioned_interface__ = {
  'shape': (64, 64),
  'partition_order': 'C'  # clockwise (C-type) or 'F' counter-clockwise (F-type)
  'partition_tiling': (4, 1),  # name t
  'partitions': {
      (0,0):  {
        'start': (0, 0),
        'shape': (16, 64),
        'data': df0,   # reference to local data
        'data.__getitem__': ray.get(df0) if ray else df0 ... ,
        'location': 0,  # location of data 
        'dtype': None,  # None -> global dtype     or    dict -> pd.DataFrame column_names/dtypes
        'device': None,  # None -> global device
      },
      (1,0):  {
        'start': (16, 0),
        'shape': (16, 64),
        'data': None,   # reference to local data
        'data.__getitem__': ray.get(df0) if ray else df0 ... ,
        'location': 1,  # location of data 
        'dtype': None,  # None -> global dtype     or    dict -> pd.DataFrame column_names/dtypes
      },
      (2,0):  {
        'start': (32, 0),
        'shape': (16, 64),
        'data': df2,   # reference to local data
        'data.__getitem__': ray.get(df0) if ray else df0 ... ,
        'location': 0,  # location of data 
        'dtype': None,  # None -> global dtype     or    dict -> pd.DataFrame column_names/dtypes
      },
      (3,0):  {
        'start': (48, 0),
        'shape': (16, 64),
        'data': None,  # reference to local data
        'data.__getitem__': ray.get(df0) if ray else df0 ... ,
        'location': 1,  # location of data
        'dtype': None,  # None -> global dtype     or    dict -> pd.DataFrame column_names/dtypes
      },
  },
  'locals': iter([ [(0, 0), (0, 0).data], [(2, 0), (2, 0).data] ])  # this is for rank 0, for rank 1 it'd be [(1,0), (3,0)],
}

@fschlimb
Copy link
Author

Thanks so much for your comments, @coquelin77 @Markus-Goetz
I mostly agree. Here are some additional and/or clarifying thoughts.

What about __partitioned__ instead of __partitioned_interface__?

Yes, I think there is a broad agreement that this protocol is intended for library/package developers, not for "end-users".

I think we need to be careful with overloading this interface. The goal is to provide meta-information related to the partitioning/distribution of the data, not to describe the global data fully within the interface. For example, I can see the argument that the global shape is directly related. However, the dtype is not directly related and can be easily extracted from the underlying data.

I strongly suggest limiting this interface to a single property and not require more than __partitioned__ in the global structure.

The iterator for locals looks a bit odd to me, mostly because it suggests redundant information which is super-easy to access with a simple list.

I am not sure I understand why C/Fortran grid order matters at this level. This is about Python and the dimensions of the grid correspond/map to the dimensions of the underlying structure (through start/shape of each partition). Can you explain why we need the partition order or how it could be useful?

I am not sure we can address devices here in a meaningful way. There is a lot of fragmentation in this space and we probably better handle this as an independent issue. I suggest we start with referring developers to using __usm_array_interface__, __dlpack__, and/or __cuda_array_interface__ as provided by the underlying data (@pspillai).

@coquelin77
Copy link

What about __partitioned__ instead of __partitioned_interface__?

i think __partitions__ would also work. but this is a very minor difference. whichever works best for the most people is fine with me.

I think we need to be careful with overloading this interface. The goal is to provide meta-information related to the partitioning/distribution of the data, not to describe the global data fully within the interface. For example, I can see the argument that the global shape is directly related. However, the dtype is not directly related and can be easily extracted from the underlying data.

I agree that we should be careful to avoid overloading this interface. Our idea was to use __partitions__ to create correctly sized buffers to receive data from other processes as well as for the setters and getters. And for that, we need to know the shape and dtype of the data. So if the dtype of all tiles is determined when __partitioned__ is created then it doesnt need to be sent before every single communication function.

The iterator for locals looks a bit odd to me, mostly because it suggests redundant information which is super-easy to access with a simple list.

The idea was to handle the most common case that when you get locals, you probably also want to get the data from the processes.

I am not sure I understand why C/Fortran grid order matters at this level. This is about Python and the dimensions of the grid correspond/map to the dimensions of the underlying structure (through start/shape of each partition). Can you explain why we need the partition order or how it could be useful?

this references how the tiling is determined in a general sense. It can be determined based of the starting indexes and the tile keys but i think it would be useful to know it from the start.

C:         F:
1 | 2       1 | 3
-----       -----
3 | 4       2 | 4

I am not sure we can address devices here in a meaningful way. There is a lot of fragmentation in this space and we probably better handle this as an independent issue. I suggest we start with referring developers to using __usm_array_interface__, __dlpack__, and/or __cuda_array_interface__ as provided by the underlying data (@pspillai).

yes this is another issue. for one, if there are tiles which move between devices it will become difficult to communicate to the other processes.
However, we should decide now (or soon): we can either assume that the data exists on the corresponding devices now (i.e. all on GPU) and make the compatibility change later, or we can leave this open and allow the data to be on any device and transfer the data between devices at the top of a function within a check.

@DrTodd13
Copy link

distribution and balancing

There was a mention of irregular distribution schemes. We would heavily warn against this. We think everything would be fine so long as the data partitions run through the entire dataset, i.e. the divisions occur on global indexes.

The assumption that the data is regularly distributed amongst the available resources makes development much easier and faster. It also reduces the amount of data which needs to be sent over the comm network.

Are you referring to some comments that I made? I don't think the term "irregular" was used in this thread and that term could be defined in multiple ways. Let's say I have a 2x3 array and want to partition it 3 ways. Here are some various ways to do that. How would you label them?

  1. Node 1 gets [0,0] and [1,0]
    Node 2 gets [0,1] and [1,1]
    Node 3 gets [0,2] and [1,2]

  2. Node 1 gets [0,0] and [1,0]
    Node 2 gets [0,1] and [0,2]
    Node 3 gets [1,1] and [1,2]

  3. Node 1 gets [0,0] and [0,2]
    Node 2 gets [0,1] and [1,0]
    Node 3 gets [1,1] and [1,2]

In case #1, everything is partitioned across only one dimension. I think everyone would call that a "regular" distribution. As the number of dimensions or the size of dimensions grows, this principle can be extended such that all partitions have the same shape except maybe those at the maximum edges of the array, which may be smaller.

In case #2, the first partition is across one dimension but then subsequent partitions are across the other dimension. To generalize this to more dimensions or larger dimensions, all the partitions are hyperrectangles but theoretically each hyperrectangle could have a different shape. I think you would get different answers here as to whether this is "regular" or not. An individual hyperrectangles is a pretty "regular" shape but are a set of hyperrectangles with different shapes "irregular?" What do you think?

In case #3, we're saying that one partition on one node get disjoint sets of elements (that don't share an edge). I think everyone would call this irregular. There is a 4th option similar to #3 where we split the array into 6 partitions but still assign each partition to one of 3 nodes. In this case, each partition could be the same shape but more than 1 partition assigned to each node. I haven't heard anything in this thread that would prohibit that. I would still call that "regular", would you? We were talking about that just yesterday when talking to the nums-package folks about how a block-cyclic distribution (with multiple disjoint blocks assigned to the same node) like this is useful for load balancing in things like LU-decomposition.

My position is that we should support #2 with the ability to assign multiple disjoint partitions to the same node. #2 can still represent what you could call a "more regular" distribution like #1 but not vice versa. My fear is that #1 is overly restrictive. A compromise position could be a flag to indicate whether all hyperrectangles are the same size except at the edges.

@fschlimb
Copy link
Author

I agree that we should be careful to avoid overloading this interface. Our idea was to use __partitions__ to create correctly sized buffers to receive data from other processes as well as for the setters and getters. And for that, we need to know the shape and dtype of the data. So if the dtype of all tiles is determined when __partitioned__ is created then it doesnt need to be sent before every single communication function.

I see. The proposal here suggests you add more information if you want. So if your implementation provides the dtype, that's absolutely fine. I am not sure if this should be a required field, though. What do others think?

The iterator for locals looks a bit odd to me, mostly because it suggests redundant information which is super-easy to access with a simple list.

The idea was to handle the most common case that when you get locals, you probably also want to get the data from the processes.

I prefer to not add redundancies just because we think we know what the most common case is. I am not even sure I would agree to your assessment since I think in most cases you will require at least some of the other partition information as well.

I am not sure I understand why C/Fortran grid order matters at this level. This is about Python and the dimensions of the grid correspond/map to the dimensions of the underlying structure (through start/shape of each partition). Can you explain why we need the partition order or how it could be useful?

this references how the tiling is determined in a general sense. It can be determined based of the starting indexes and the tile keys but i think it would be useful to know it from the start.

C:         F:
1 | 2       1 | 3
-----       -----
3 | 4       2 | 4

I am confused. What do the numbers denote in this diagram? Following the proposal, you'd need to to have a 2-tuple for each partition, not a scalar.
Maybe the disconnect here is that I purposely proposed a dictionary with grid-locations and not partition-ids. There is no implicit or explicit order, only locations. There is no need to assume any order and no need to think about an order.

I am not sure we can address devices here in a meaningful way. There is a lot of fragmentation in this space and we probably better handle this as an independent issue. I suggest we start with referring developers to using __usm_array_interface__, __dlpack__, and/or __cuda_array_interface__ as provided by the underlying data (@pspillai).

yes this is another issue. for one, if there are tiles which move between devices it will become difficult to communicate to the other processes.
However, we should decide now (or soon): we can either assume that the data exists on the corresponding devices now (i.e. all on GPU) and make the compatibility change later, or we can leave this open and allow the data to be on any device and transfer the data between devices at the top of a function within a check.

I would make no assumptions other than that the underlying data-structure needs to provide the necessary information to access/use the data. Whether it's on GPU's or CPU's only or if it's a mixed CPU/GPU distribution and how to access it depends on the implementation. Without adding any new requirements we can push this to the consumer. There is no new requirement because even if the consumed data-structure is non-distributed the consumer will need to know how to handle the device business.

@coquelin77
Copy link

coquelin77 commented May 11, 2021

@DrTodd13 i agree with everything that you said. My apologies for not more clearly defining irregular, I was already becoming concerned about the length of my comment. i think that option #3 might cause issues down the road in the communications structures. when I was picturing irregular grids i was thinking of things like this:

x x x x x x | x x x x
x x x x x x | x x x x
---------------------
x x x | x x x x x x x
x x x | x x x x x x x

when i was talking about the tile/chunk divisions crossing all tiles, i imagined something like this:

x x x x x x | x x x x
x x x x x x | x x x x
---------------------
x x x x x x | x x x x
x x x x x x | x x x x

If i remember correctly, this should still allow for many load balancing techniques, but it has been some time since i looked over everything

EDIT: the second distribution scheme puts no limits on which blocks are on which proesses

@coquelin77
Copy link

@fschlimb
for the dtypes: this could very well be grouped into the implementation details

for the locals attribute, that is a good point. i find that i keep going back towards the idea of a class which would be the value in the partition dictionary. the locals could return the key and the class for the corresponding tile. if its and iterator can also be an implementation detail.

as for the tiling in C vs F layout, i was thinking about these tiles numbering from 1 to 4 in partition-ids instead of as their coordinate-ids. I think this happened when i was thinking about the partitioning schemes to allow and my wires were crossed

for the devices, it makes a lot of sense to leave the control up to the user/implementation. if a choice doesnt need to be made, then it might be best to leave the devices for the user to keep track of

@DrTodd13
Copy link

@DrTodd13 i agree with everything that you said. My apologies for not more clearly defining irregular, I was already becoming concerned about the length of my comment. i think that option #3 might cause issues down the road in the communications structures. when I was picturing irregular grids i was thinking of things like this:

x x x x x x | x x x x
x x x x x x | x x x x
---------------------
x x x | x x x x x x x
x x x | x x x x x x x

The example above is #2 and you could say it is less regular than the example below, which is #1. It still isn't clear to me if you want to allow or disallow the above example.

when i was talking about the tile/chunk divisions crossing all tiles, i imagined something like this:

x x x x x x | x x x x
x x x x x x | x x x x
---------------------
x x x x x x | x x x x
x x x x x x | x x x x

If i remember correctly, this should still allow for many load balancing techniques, but it has been some time since i looked over everything

If we just consider the number of elements per partition, the more flexible arbitrary hyperrectangles can more evenly split the array in some cases. (Of course, #3 is what would be required to get a truly maximally balanced split but we don't want to go there and in practice I don't think that people would use the #4 approach to get that level of balancing as it is probably not worth it.)

I guess the question is that some packages might natively want to split across only one certain dimension and then they decide they want to accept external arrays with partitioned that could come from a package with a different philosophy. So, what are we saying to packages that produce or consume this interface? I really hesitate to start dictating a one-size-fits-all partitioning scheme and at the same time I don't think a package should be forced to accept all possible partitioning schemes. As a package writer, it would be nice be able to quickly tell if a given partitioning scheme is one that I'm prepared to support (versus having to inspect the partitioning in detail). If we ere on the flexible side and let people support what they want to support then I think we may get convergence through time and the marketplace of ideas but again I hesitate to force convergence at this point.

@fschlimb
Copy link
Author

@DrTodd13 I fully agree your last comments. We should allow flexibility on both ends - we neither want to limit how an implementation (producer) partitions the data and we also cannot demand that a consumer must digest all possible partitionings.

At the same time it is worth mentioning explicitly that simple partitionings will be easier to consume and most likely by more packages.

I guess that brings back the idea (@YarShev) of providing a mechanism to request a certain partitioning and/or a library which can do so. I tend to prefer a re-partitioning library because a) the proposal here allows zero-copy on the consumer side and b) there is no point in asking every producer to implement re-partitioning strategies and c) it can easily grow over time.

@coquelin77
Copy link

I think that the point about flexibility is very valid. Allowing hyperrectangles is reasonable for some application use cases. However, the examples which @DrTodd13 gave previously (specifically case 2) shows that the concept of coordinate-ids breaks down at this point. Alternatively, the tiles can be forced into a more regular grid, at which point, two or more tiles can sit on a single process. Notably, matching these tiles for networked communication, especially in SPMD-like processing will be challenging. However, this challenge can be pushed to the framework which uses the interface.

This leads to the next thought: a semi-regular grid, similar to what I have drawn previously. In this scheme, the divisions between tiles would occur on global indices, however all tiles need not be the same size. Furthermore, the rows and columns can all be different from one another, they only must be globally defined, similar to a spreadsheet in Excel (for a 2D array). This is the first case for which coordinate-ids of the tiles can be used in all cases. As previously stated by @fschlimb and myself, the tiles on a process do not need to share an axis. Beyond this step, the constraints are too restrictive for what we seek.

In my opinion, the last option is the best bet. It can allow for a form of hyperrectangles when data sits on a single process, coordinate-id referencing, and it can easily grow over time. I do not propose that we out-law free-form distribution schemes, however I believe that it should be a more specific option and not the default.

as an example:

to modify the previous cases shown by @DrTodd13 where instead of a 2x3 array it will be a 2x3 array of tiles:
The values at the top center indicate the process which they are on

1.                     2.                       3.
x 0 | x 1 x | x 2 x    x 0 x | x 1 x | x 1 x    x 0 x | x 1 x | x 0 x
x x | x x x | x x x    x x x | x x x | x x x    x x x | x x x | x x x
x x | x x x | x x x    ---------------------    ---------------------
-------------------    x 0 x | x 2 x | x 2 x    x 1 x | x 2 x | x 2 x
x 0 | x 1 x | x 2 x    x x x | x x x | x x x    x x x | x x x | x x x

All three of these examples would be allowed.

I agree with @fschlimb that encapsulating partitioning into a library makes sense. Especially, when other packages should or could make use of it.

What I find unclear is the future usage of this spec. There are two clear avenues: a) a first (perhaps future-feature-incomplete) software or b) a general-purpose standard to be release to the world. If a) is the case, I would go with a simple partitioning model that is easy to grasp, quick to implement and useful, i.e. covering most application use cases. This would mean that hyperrectangles on non-aligned grids would be disallowed but hyperrectangles using multiple tiles on a semi-regular grid would be allowed. If b) is the case, we would need to rethink tile coordinates and carefully evaluate whether a first implementation needs to provide this feature.

@fschlimb
Copy link
Author

Hi @coquelin77 I am sorry, but I am having trouble following your argument. You seem to suggest that you would like to express things, which the current proposal disallows. If that's a correct understanding, could you please provide a concrete example of what you are missing and maybe even a suggestion on how to address the gap?

With respect to where this should go, we'd like this to be become some kind of a standard. Of course a standard is useful only when it's being used. So we need

  1. get input from stakeholders in the abstract to understand differences early on
  2. Implement prototypes in at least two (complementary) packages to gain practical insights

In my mind good candidates for early prototypes of __partitioned__ would be daal4py, modin and HeAT. If all three teams (@YarShev @coquelin77 @PivovarA) are on board with the concept/idea I would volunteer to spend time on this.

@pspillai
Copy link

If we allow only the partitions like the examples @coquelin77 shows, or example 1 @DrTodd13 showed, then there is a lot of redundant information in the partition data structure. It would be far simpler to indicate the global range, list the split points for each axis, and have a simple convention of mapping tile id to the grid of tiles formed by the axis split points. Then, we would need to map tile ids to locations.

@YarShev
Copy link

YarShev commented May 14, 2021

@fschlimb

As for the name of the protocol, I like __partitions__, __shards__, __chunks__, __dist_plan__.

I agree that we should be careful to avoid overloading this interface. Our idea was to use __partitions__ to create correctly sized buffers to receive data from other processes as well as for the setters and getters. And for that, we need to know the shape and dtype of the data. So if the dtype of all tiles is determined when __partitioned__ is created then it doesnt need to be sent before every single communication function.

I see. The proposal here suggests you add more information if you want. So if your implementation provides the dtype, that's absolutely fine. I am not sure if this should be a required field, though. What do others think?

dtype should probably be as an optional field. If a producer provides it, a consumer won't need to calculate them by scheduling remote operations. Also, I agree this might be helpful for better scheduling work.
In addition to dtype as optional fields should be labels of partitions themselves in order to a consumer is able to construct global labels of the underlying data.

As for the location field, we should provide a list of IPs/hostnames/ranks (not a string) because multiple workers can own data.

I am not fully understand what start field is required for. Can you explain or point me to the comment above if it exists (probably, I missed something)?

I agree we should try implementing the protocol at least in Modin and DAAL4PY soon. A good candidate for seeing the protocol usage would be the PR related to distributed KMeans in DAAL4PY (link).

@fschlimb
Copy link
Author

@YarShev The start can be seen as a convenience information. It can be derived from shapes only but it is such a fundamental information that several people think it should be there directly (and not ask consumers to compute it each time).

Could you explain what you mean by

multiple workers can own data.

?

@YarShev
Copy link

YarShev commented May 26, 2021

@fschlimb , thanks!

Could you explain what you mean by

multiple workers can own data.

?

Let's look at these examples:

Dask

from distributed.client import Client
c = Client()
c.scheduler_info()["workers"].keys()
dict_keys(['tcp://1.1.1.1:54547', 'tcp://1.1.1.2:54550', 'tcp://1.1.1.1:54553', 'tcp://1.1.1.2:54556'])
f = c.scatter(1, broadcast=True)
c.who_has(f)
{'int-58e78e1b34eb49a68c65b54815d1b158': ('tcp://1.1.1.1:54553',
  'tcp://1.1.1.2:54550',
  'tcp://1.1.1.2:54556',
  'tcp://1.1.1.1:54547')}

Ray

import ray
ray.init()
ref = ray.put(1)
# When scheduling a remote call Ray can submit the task to a different node and copy `ref` to it if necessary.
# Then, the following call may show two nodes that own data.
ray.objects(ref.hex())
{'ObjectRef': 'ffffffffffffffffffffffff0100000001000000',
 'Locations': ['5fdec4674c145fa35efc5df817c2fbb7c9eb0730', '5fdec4604c145fa15efc2df817c2fbb7c9eb1732']}
# Each value of the `Locations` field matches respective node IP address.

That's why we should provide a list of IPs/hostnames/ranks (not a string) for the location field.

@fschlimb
Copy link
Author

fschlimb commented Jul 9, 2021

I created a new branch and put the above spec into a new file which will allow tracking changes to the spec: https://github.com/IntelPython/DPPY-Spec/blob/draft/partitioned/Partitioned.md

Some more experiments made apparent that the above discussed callable to resolve a future/handle is needed. I added it the the spec (https://github.com/IntelPython/DPPY-Spec/blob/draft/partitioned/Partitioned.md).

Also, I think we should probably get rid of the partition-grid idea altogether. As discussed with @pspillai, @DrTodd13, @coquelin77 and @Markus-Goetz it doesn't seem to add anything and would rather limit applicability. Additionally, it seems more practically useful to map locations to partition-lists - and not positions to partitions.

Thoughts? Objections?

@YarShev
Copy link

YarShev commented Jul 14, 2021

Also, I think we should probably get rid of the partition-grid idea altogether. As discussed with @pspillai, @DrTodd13, @coquelin77 and @Markus-Goetz it doesn't seem to add anything and would rather limit applicability.

Are you saying about getting rid of 'shape' field that is next to 'partitions'?

Additionally, it seems more practically useful to map locations to partition-lists - and not positions to partitions.

If we want to unify implementations across different execution backends (Ray, Dask, MPI), we should provide uniform objects for both futures and locations. Consumers themselves can map map list of partitions to specific location.

Some more experiments made apparent that the above discussed callable to resolve a future/handle is needed.

That callable should be kind of generic to be able to materialize list of futures but not one.

@fschlimb
Copy link
Author

Also, I think we should probably get rid of the partition-grid idea altogether. As discussed with @pspillai, @DrTodd13, @coquelin77 and @Markus-Goetz it doesn't seem to add anything and would rather limit applicability.

Are you saying about getting rid of 'shape' field that is next to 'partitions'?

Maybe, yes, maybe no.
As discussed earlier, the current grid-shape implies that every cut of any dimension fully spans over all other dimensions. In other words, no partition will ever have more than one neighbor in every dimension. @DrTodd13 seems to propose being more flexible which would mean we interpret the grid-shape more flexible or we need a different (or no) scheme to describe the grid-space.

I tend to agree to @coquelin77 and @Markus-Goetz that a regular grid-shape is most likely all we need. If we can agree on that, we should keep the grid-shape.

Changing the dict to mapping locations to list of partitions I think is a good idea in either case.

Additionally, it seems more practically useful to map locations to partition-lists - and not positions to partitions.

If we want to unify implementations across different execution backends (Ray, Dask, MPI), we should provide uniform objects for both futures and locations. Consumers themselves can map map list of partitions to specific location.

Not sure I understand your last comment, Could you elaborate?
Also, could you give an example how such a uniform location/future object could look like and how this would be beneficial in a potential use case? Note we cannot assume that we have a uniform runtime that everybody uses, e.g. we explicitly target the real world where different packages use different backends.

In any case, I like uniform objects, but for this we should not define new data types/structures. I think we need to describe distribution etc. in a uniform way, but not invent new types or alike. I think of this as a protocol, not a library/package.

Some more experiments made apparent that the above discussed callable to resolve a future/handle is needed.

That callable should be kind of generic to be able to materialize list of futures but not one.

Are you suggesting this to allow parallel execution?

@YarShev
Copy link

YarShev commented Jul 15, 2021

I tend to agree to @coquelin77 and @Markus-Goetz that a regular grid-shape is most likely all we need. If we can agree on that, we should keep the grid-shape.

I am okay with this.

Changing the dict to mapping locations to list of partitions I think is a good idea in either case.

Additionally, it seems more practically useful to map locations to partition-lists - and not positions to partitions.

If we want to unify implementations across different execution backends (Ray, Dask, MPI), we should provide uniform objects for both futures and locations. Consumers themselves can map map list of partitions to specific location.

Not sure I understand your last comment, Could you elaborate?
Also, could you give an example how such a uniform location/future object could look like and how this would be beneficial in a potential use case? Note we cannot assume that we have a uniform runtime that everybody uses, e.g. we explicitly target the real world where different packages use different backends.

Don't we lose an original positioning of the partitions in the grid in that case? That might be useful for consumers if they are more concerned about the positioning rather than mapping locations to list of partitions.
As for uniform objects for futures and locations, consumers may and may not want to check types of these futures and locations and call respective APIs on them. Consumers may call future.result() (in the case of Dask env) and as well as may call ray.get(future) (in the case of Ray env). Something similar relates to locations. However, if we will require consumers throw an exception in case they do not support information by the protocol (explicit checks), that would be ok.

Some more experiments made apparent that the above discussed callable to resolve a future/handle is needed.

That callable should be kind of generic to be able to materialize list of futures but not one.

Are you suggesting this to allow parallel execution?

Something similar to ray.get(list_of_futures) and dask_client.gather(list_of_futures).

@fschlimb
Copy link
Author

Don't we lose an original positioning of the partitions in the grid in that case? That might be useful for consumers if they are more concerned about the positioning rather than mapping locations to list of partitions.

I agree this must be possible and I don't think we lose the information. We provide start and shape for each partition which implicitly provides the position in the grid. The question is what we think is the more common case. My experience shows that getting partitions per rank/location is the common case.

As for uniform objects for futures and locations, consumers may and may not want to check types of these futures and locations and call respective APIs on them. Consumers may call future.result() (in the case of Dask env) and as well as may call ray.get(future) (in the case of Ray env). Something similar relates to locations. However, if we will require consumers throw an exception in case they do not support information by the protocol (explicit checks), that would be ok.

Agree. That's what I also had in mind. If the consumer cannot deal with the handle nor with the result of get(handle) it should throw an exception (or use whatever other error handling they use).

Are you suggesting this to allow parallel execution?

Something similar to ray.get(list_of_futures) and dask_client.gather(list_of_futures).

I am trying to understand why you suggest this. A list comprehension on the user side is easy to write: [p['get'](x) for x in list_of_futures].

@YarShev
Copy link

YarShev commented Jul 19, 2021

Don't we lose an original positioning of the partitions in the grid in that case? That might be useful for consumers if they are more concerned about the positioning rather than mapping locations to list of partitions.

I agree this must be possible and I don't think we lose the information. We provide start and shape for each partition which implicitly provides the position in the grid. The question is what we think is the more common case. My experience shows that getting partitions per rank/location is the common case.

Mapping partitions per rank/location seems to me as well as the common case but I am not sure how the protocol could look like in that case. Can you give an example of that? Since the protocol is going to provide exactly the same underlying partition structure (without any repartition) it seems to me that for now information provided by the protocol looks more natural because consumers can see the partitioning at once.

Are you suggesting this to allow parallel execution?

Something similar to ray.get(list_of_futures) and dask_client.gather(list_of_futures).

I am trying to understand why you suggest this. A list comprehension on the user side is easy to write: [p['get'](x) for x in list_of_futures].

Passing a future instead of a list of futures to .get(...) may dramatically differ.

Ray

import ray
ray.init()

@ray.remote
def foo():
    from time import sleep
    sleep(10)
    return 1

%%time
ray.get([foo.remote() for _ in range(10)])
Wall time: 20.3 s
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

%%time
[ray.get(foo.remote()) for _ in range(10)]
Wall time: 1min 40s
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

Dask

from distributed.client import Client
c = Client()
def foo():
     from time import sleep
     sleep(10)
     return 1

%%time
c.gather([c.submit(foo) for _ in range(10)])
Wall time: 10 s
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

%%time
[c.gather(c.submit(foo)) for _ in range(10)]
Wall time: 30.1 s
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

So we should provide more general API to be able to pass multiple futures.

@fschlimb fschlimb changed the title __partitioned_interface__ protocol for distributed data containers __partitioned__ protocol for partitioned and distributed data containers Aug 23, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests