From 495e944f300251a19c1e8321814548a7862fddc1 Mon Sep 17 00:00:00 2001 From: Frank Schlimbach Date: Thu, 27 Jan 2022 12:12:42 +0100 Subject: [PATCH] Update Partitioned.md Incorporated suggestions/feedback from https://github.com/data-apis/consortium-feedback/issues/7 --- Partitioned.md | 55 +++++++++++++++++++++++++++----------------------- 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/Partitioned.md b/Partitioned.md index c7e1f11..8c90573 100644 --- a/Partitioned.md +++ b/Partitioned.md @@ -1,15 +1,20 @@ **Draft** # Motivation -When operating in distributed memory systems a data container (such as tensors or data-frames) may be partitioned into several smaller chunks which might be allocated in distinct (distributed) address spaces. An implementation of operations defined specifically for such a partitioned data container will likely need to use specifics of the structure to provide good performance. Most prominently, making use of the data locality can be vital. For a consumer of such a partitioned container it will be inconvenient (if not impossible) to write specific code for any possible partitioned and distributed data container. On the other hand, like with the array/dataframe API, it would be much better to have a standard way of extracting the meta data about the partitioned and distributed nature of a given container. +When operating in distributed memory systems a data container (such as tensors or data-frames) needs to operate on several subsets of the data, each assigned to distinct (distributed) address spaces. An implementation of operations defined specifically for such a distributed data container will likely need to use specifics of the structure to provide good performance. Most prominently, making use of the data locality can be vital. For a consumer of such a partitioned container it will be inconvenient (if not impossible) to write specific code for any possible partitioned and/or distributed data container. On the other hand, like with the array API, it would be much better to have a standard way of extracting the meta data about the current partitioned and distributed state of a given container. -The goal of the `__partitioned__` protocol is to allow partitioned and distributed data containers to expose information to consumers so that unnecessary copying can be avoided as much as possible. +The goal of the `__partitioned__` protocol is to allow partitioned and distributed data containers to expose information to consumers so that unnecessary copying can be avoided as much as possible, ideally zero-copy. # Scope Currently the focus is dense data structures with rectangular shapes, such as dense nd-arrays and DataFrames. The data structures compose their data-(index-)space into several rectangular partitions which form a multi-dimensional grid. While the protocol is designed to be generic enough to cover any distribution backend, the currently considered backends are Ray, Dask and MPI-like (SPMD assigning ids to each process/rank/PE). +# Non-Goals +This protocol accepts that multiple runtime systems for distributed computation exist which do not easily compose. A standard API for defining data- and control-flow on distributed systems would allow an even better composability. That is a much bigger effort and not a goal of this protocol. + +Currently neither non-rectangular data structures, nor non-rectangular partitions nor irreglar partitions grids are considered. + # Partitioned Protocol A conforming implementation of the partitioned protocol standard must provide and support a data structure object having a `__partitioned__` property which returns a Python dictionary with the following fields: * `shape`: a tuple defining the number of elements for each dimension of the global data-(index-)space. @@ -41,7 +46,7 @@ Each key/position maps to * 'start': The offset of the starting element in the partition from the first element in the global index space of the underlying data-structure, given as a tuple * 'shape': Shape of the partition (same dimensionality as the shape of the partition grid) given as a tuple * 'data': The actual data provided as ObjRef, Future, array or DF or... -* 'location': The location (or home) of the partition, given as a sequence of ip-addresses or ranks or... +* 'location': The location (or home) of the partition within the memory hierachy, provided as a tuple (IP, PID[, device]) A consumer must verify it supports the provided object types and locality information; it must throw an exception if not. @@ -61,10 +66,10 @@ A consumer must verify it supports the provided object types and locality inform * For SPMD-MPI-like backends: partitions which are not locally available may be `None`. This is the recommended behavior unless the underlying backend supports references (such as promises/futures) to avoid unnecessary data movement. ### `location` * A sequence of locations where data can be accessed locally, e.g. without extra communication -* The location information must include all necessary data to uniquely identify the location of the partition/data. The exact information depends on the underlying distribution system: - * Ray: ip-address - * Dask: worker-Id (name, ip, or ip:port) - * SPMD/MPI-like frameworks such as MPI, SHMEM etc.: rank +* The location information includes all necessary data to uniquely identify the location of the partition/data within the memory hierachy. It is represented as a tuple: (IP, PID[, device]) + * IP: a string identifying the address of the node in the network + * PID: an integer identifying the unique process ID of the process as assigned by the OS + * device: optional string identifying the device; if present, assumes underlying object exposes `dlpack` and the value conforms to `dlpack`. Default: 'kDLCPU'. ## Examples ### 1d-data-structure (64 elements), 1d-partition-grid, 4 partitions on 4 nodes, blocked distribution, partitions are of type `Ray.ObjRef`, Ray @@ -77,27 +82,27 @@ __partitioned__ = { 'start': (0,), 'shape': (16,), 'data': ObjRef0, - 'location': ['1.1.1.1’], } + 'location': [('1.1.1.1’, 7755737)], } (1,): { 'start': (16,), 'shape': (16,), 'data': ObjRef1, - 'location': ['1.1.1.2’], } + 'location': [('1.1.1.2’, 7736336)], } (2,): { 'start': (32,), 'shape': (16,), 'data': ObjRef2, - 'location': ['1.1.1.3’], } + 'location': [('1.1.1.3’, 64763578)], } (3,): { 'start': (48,), 'shape': (16,), 'data': ObjRef3, - 'location': ['1.1.1.4’], } + 'location': [('1.1.1.4’, 117264534)], } }, 'get': lambda x: ray.get(x) } ``` -### 2d-structure (64 elements), 2d-partition-grid, 4 partitions on 2 nodes, block-cyclic distribution, partitions are of type `dask.Future`, dask +### 2d-structure (64 elements), 2d-partition-grid, 4 partitions on 2 nodes with 2 workers each, block-cyclic distribution, partitions are of type `dask.Future`, dask ```python __partitioned__ = { 'shape’: (8, 8), @@ -107,27 +112,27 @@ __partitioned__ = { 'start': (4, 4), 'shape': (4, 4), 'data': future0, - 'location': ['Alice’], }, + 'location': [('1.1.1.1', 77463764)], }, (1,0): { 'start': (4, 0), 'shape': (4, 4), 'data': future1, - 'location': ['1.1.1.2:55667’], }, + 'location': [('1.1.1.2', 9174756892)], }, (0,1): { 'start': (0, 4), 'shape': (4, 4), 'data': future2, - 'location': ['Alice’], }, + 'location': [('1.1.1.1', 29384572)], }, (0,0): { 'start': (0,0), 'shape': (4, 4), 'data': future3, - 'location': ['1.1.1.2:55667’], }, + 'location': [('1.1.1.2', 847236952)], }, } 'get': lambda x: distributed.get_client().gather(x) } ``` -### 2d-structure (64 elements), 1d-partition-grid, 4 partitions on 2 ranks, row-block-cyclic distribution, partitions are of type `pandas.DataFrame`, MPI +### 2d-structure (64 elements), 1d-partition-grid, 4 partitions on 2 ranks, row-block-cyclic distribution, partitions are arrays allcocated on devices, MPI ```python __partitioned__ = { 'shape’: (8, 8), @@ -136,23 +141,23 @@ __partitioned__ = { (0,0): { 'start': (0, 0), 'shape': (2, 8), - 'data': df0, # this is for rank 0, for rank 1 it'd be None - 'location': [0], }, + 'data': ary0, # this is for rank 0, for rank 1 it'd be None + 'location': [('1.1.1.1', 29384572, 'kDLOneAPI:0')], }, (1,0): { 'start': (2, 0), 'shape': (2, 8), - 'data': None, # this is for rank 0, for rank 1 it'd be df1 - 'location': [1], }, + 'data': None, # this is for rank 0, for rank 1 it'd be ary1 + 'location': [('1.1.1.2', 575812655, 'kDLOneAPI:0')], }, (2,0): { 'start': (4, 0), 'shape': (2, 8), - 'data': df2, # this is for rank 0, for rank 1 it'd be None - 'location': [0], }, + 'data': ary2, # this is for rank 0, for rank 1 it'd be None + 'location': [('1.1.1.1', 29384572, 'kDLOneAPI:1')], }, (3,0): { 'start': (6, 0), 'shape': (2, 8), - 'data': None, # this is for rank 0, for rank 1 it'd be df3 - 'location': [1], }, + 'data': None, # this is for rank 0, for rank 1 it'd be ary3 + 'location': [('1.1.1.2', 575812655, 'kDLOneAPI:1')], }, }, 'get': lambda x: x, 'locals': [(0,0), (2,0)] # this is for rank 0, for rank 1 it'd be [(1,0), (3,0)]