Skip to content
This repository has been archived by the owner on Aug 19, 2024. It is now read-only.

Commit

Permalink
Update Partitioned.md
Browse files Browse the repository at this point in the history
Incorporated suggestions/feedback from data-apis/consortium-feedback#7
  • Loading branch information
fschlimb authored Jan 27, 2022
1 parent e126088 commit 495e944
Showing 1 changed file with 30 additions and 25 deletions.
55 changes: 30 additions & 25 deletions Partitioned.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
**Draft**

# Motivation
When operating in distributed memory systems a data container (such as tensors or data-frames) may be partitioned into several smaller chunks which might be allocated in distinct (distributed) address spaces. An implementation of operations defined specifically for such a partitioned data container will likely need to use specifics of the structure to provide good performance. Most prominently, making use of the data locality can be vital. For a consumer of such a partitioned container it will be inconvenient (if not impossible) to write specific code for any possible partitioned and distributed data container. On the other hand, like with the array/dataframe API, it would be much better to have a standard way of extracting the meta data about the partitioned and distributed nature of a given container.
When operating in distributed memory systems a data container (such as tensors or data-frames) needs to operate on several subsets of the data, each assigned to distinct (distributed) address spaces. An implementation of operations defined specifically for such a distributed data container will likely need to use specifics of the structure to provide good performance. Most prominently, making use of the data locality can be vital. For a consumer of such a partitioned container it will be inconvenient (if not impossible) to write specific code for any possible partitioned and/or distributed data container. On the other hand, like with the array API, it would be much better to have a standard way of extracting the meta data about the current partitioned and distributed state of a given container.

The goal of the `__partitioned__` protocol is to allow partitioned and distributed data containers to expose information to consumers so that unnecessary copying can be avoided as much as possible.
The goal of the `__partitioned__` protocol is to allow partitioned and distributed data containers to expose information to consumers so that unnecessary copying can be avoided as much as possible, ideally zero-copy.

# Scope
Currently the focus is dense data structures with rectangular shapes, such as dense nd-arrays and DataFrames. The data structures compose their data-(index-)space into several rectangular partitions which form a multi-dimensional grid.

While the protocol is designed to be generic enough to cover any distribution backend, the currently considered backends are Ray, Dask and MPI-like (SPMD assigning ids to each process/rank/PE).

# Non-Goals
This protocol accepts that multiple runtime systems for distributed computation exist which do not easily compose. A standard API for defining data- and control-flow on distributed systems would allow an even better composability. That is a much bigger effort and not a goal of this protocol.

Currently neither non-rectangular data structures, nor non-rectangular partitions nor irreglar partitions grids are considered.

# Partitioned Protocol
A conforming implementation of the partitioned protocol standard must provide and support a data structure object having a `__partitioned__` property which returns a Python dictionary with the following fields:
* `shape`: a tuple defining the number of elements for each dimension of the global data-(index-)space.
Expand Down Expand Up @@ -41,7 +46,7 @@ Each key/position maps to
* 'start': The offset of the starting element in the partition from the first element in the global index space of the underlying data-structure, given as a tuple
* 'shape': Shape of the partition (same dimensionality as the shape of the partition grid) given as a tuple
* 'data': The actual data provided as ObjRef, Future, array or DF or...
* 'location': The location (or home) of the partition, given as a sequence of ip-addresses or ranks or...
* 'location': The location (or home) of the partition within the memory hierachy, provided as a tuple (IP, PID[, device])

A consumer must verify it supports the provided object types and locality information; it must throw an exception if not.

Expand All @@ -61,10 +66,10 @@ A consumer must verify it supports the provided object types and locality inform
* For SPMD-MPI-like backends: partitions which are not locally available may be `None`. This is the recommended behavior unless the underlying backend supports references (such as promises/futures) to avoid unnecessary data movement.
### `location`
* A sequence of locations where data can be accessed locally, e.g. without extra communication
* The location information must include all necessary data to uniquely identify the location of the partition/data. The exact information depends on the underlying distribution system:
* Ray: ip-address
* Dask: worker-Id (name, ip, or ip:port)
* SPMD/MPI-like frameworks such as MPI, SHMEM etc.: rank
* The location information includes all necessary data to uniquely identify the location of the partition/data within the memory hierachy. It is represented as a tuple: (IP, PID[, device])
* IP: a string identifying the address of the node in the network
* PID: an integer identifying the unique process ID of the process as assigned by the OS
* device: optional string identifying the device; if present, assumes underlying object exposes `dlpack` and the value conforms to `dlpack`. Default: 'kDLCPU'.

## Examples
### 1d-data-structure (64 elements), 1d-partition-grid, 4 partitions on 4 nodes, blocked distribution, partitions are of type `Ray.ObjRef`, Ray
Expand All @@ -77,27 +82,27 @@ __partitioned__ = {
'start': (0,),
'shape': (16,),
'data': ObjRef0,
'location': ['1.1.1.1’], }
'location': [('1.1.1.1’, 7755737)], }
(1,): {
'start': (16,),
'shape': (16,),
'data': ObjRef1,
'location': ['1.1.1.2’], }
'location': [('1.1.1.2’, 7736336)], }
(2,): {
'start': (32,),
'shape': (16,),
'data': ObjRef2,
'location': ['1.1.1.3’], }
'location': [('1.1.1.3’, 64763578)], }
(3,): {
'start': (48,),
'shape': (16,),
'data': ObjRef3,
'location': ['1.1.1.4’], }
'location': [('1.1.1.4’, 117264534)], }
},
'get': lambda x: ray.get(x)
}
```
### 2d-structure (64 elements), 2d-partition-grid, 4 partitions on 2 nodes, block-cyclic distribution, partitions are of type `dask.Future`, dask
### 2d-structure (64 elements), 2d-partition-grid, 4 partitions on 2 nodes with 2 workers each, block-cyclic distribution, partitions are of type `dask.Future`, dask
```python
__partitioned__ = {
'shape’: (8, 8),
Expand All @@ -107,27 +112,27 @@ __partitioned__ = {
'start': (4, 4),
'shape': (4, 4),
'data': future0,
'location': ['Alice’], },
'location': [('1.1.1.1', 77463764)], },
(1,0): {
'start': (4, 0),
'shape': (4, 4),
'data': future1,
'location': ['1.1.1.2:55667’], },
'location': [('1.1.1.2', 9174756892)], },
(0,1): {
'start': (0, 4),
'shape': (4, 4),
'data': future2,
'location': ['Alice’], },
'location': [('1.1.1.1', 29384572)], },
(0,0): {
'start': (0,0),
'shape': (4, 4),
'data': future3,
'location': ['1.1.1.2:55667’], },
'location': [('1.1.1.2', 847236952)], },
}
'get': lambda x: distributed.get_client().gather(x)
}
```
### 2d-structure (64 elements), 1d-partition-grid, 4 partitions on 2 ranks, row-block-cyclic distribution, partitions are of type `pandas.DataFrame`, MPI
### 2d-structure (64 elements), 1d-partition-grid, 4 partitions on 2 ranks, row-block-cyclic distribution, partitions are arrays allcocated on devices, MPI
```python
__partitioned__ = {
'shape’: (8, 8),
Expand All @@ -136,23 +141,23 @@ __partitioned__ = {
(0,0): {
'start': (0, 0),
'shape': (2, 8),
'data': df0, # this is for rank 0, for rank 1 it'd be None
'location': [0], },
'data': ary0, # this is for rank 0, for rank 1 it'd be None
'location': [('1.1.1.1', 29384572, 'kDLOneAPI:0')], },
(1,0): {
'start': (2, 0),
'shape': (2, 8),
'data': None, # this is for rank 0, for rank 1 it'd be df1
'location': [1], },
'data': None, # this is for rank 0, for rank 1 it'd be ary1
'location': [('1.1.1.2', 575812655, 'kDLOneAPI:0')], },
(2,0): {
'start': (4, 0),
'shape': (2, 8),
'data': df2, # this is for rank 0, for rank 1 it'd be None
'location': [0], },
'data': ary2, # this is for rank 0, for rank 1 it'd be None
'location': [('1.1.1.1', 29384572, 'kDLOneAPI:1')], },
(3,0): {
'start': (6, 0),
'shape': (2, 8),
'data': None, # this is for rank 0, for rank 1 it'd be df3
'location': [1], },
'data': None, # this is for rank 0, for rank 1 it'd be ary3
'location': [('1.1.1.2', 575812655, 'kDLOneAPI:1')], },
},
'get': lambda x: x,
'locals': [(0,0), (2,0)] # this is for rank 0, for rank 1 it'd be [(1,0), (3,0)]
Expand Down

0 comments on commit 495e944

Please sign in to comment.