diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 417fb503d37e0..77b25ea8c89c4 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -101,20 +101,56 @@ @PublicAPI class Dataset(Generic[T]): - """Implements a distributed Arrow dataset. - - Datasets are implemented as a list of ``ObjectRef[Block]``. The block - also determines the unit of parallelism. The default block type is the - ``pyarrow.Table``. Arrow-incompatible objects are held in ``list`` blocks. + """A Dataset is a distributed data collection for data loading and processing. + + Datasets are implemented as a list of ``ObjectRef[Block]``, where each block + holds an ordered collection of items, representing a shard of the overall + data collection. The block can be either a ``pyarrow.Table``, or Python list. + The block also determines the unit of parallelism. + + Datasets can be created in multiple ways: from synthetic data via ``range_*()`` + APIs, from existing memory data via ``from_*()`` APIs, or from external storage + systems such as local disk, S3, HDFS etc. via the ``read_*()`` APIs. The + (potentially processed) Dataset can be saved back to external storage systems via + the ``write_*()`` APIs. + + Examples: + >>> import ray + >>> # Create dataset from synthetic data. + >>> ds = ray.data.range(1000) # doctest: +SKIP + >>> # Create dataset from in-memory data. + >>> ds = ray.data.from_items( # doctest: +SKIP + ... [{"col1": i, "col2": i * 2} for i in range(1000)]) + >>> # Create dataset from external storage system. + >>> ds = ray.data.read_parquet("s3://bucket/path") # doctest: +SKIP + >>> # Save dataset back to external storage system. + >>> ds.write_csv("s3//bucket/output") # doctest: +SKIP + + Datasets supports parallel processing at scale: transformations such as + :py:meth:`.map_batches()`, aggregations such as + :py:meth:`.min()`/:py:meth:`.max()`/:py:meth:`.mean()`, grouping via + :py:meth:`.groupby()`, shuffling operations such as :py:meth:`.sort()`, + :py:meth:`.random_shuffle()`, and :py:meth:`.repartition()`. + + Examples: + >>> import ray + >>> ds = ray.data.range(1000) # doctest: +SKIP + >>> # Transform in parallel with map_batches(). + >>> ds.map_batches(lambda batch: [v * 2 for v in batch]) # doctest: +SKIP + >>> # Compute max. + >>> ds.max() # doctest: +SKIP + >>> # Group the data. + >>> ds.groupby(lambda x: x % 3).count() # doctest: +SKIP + >>> # Shuffle this dataset randomly. + >>> ds.random_shuffle() # doctest: +SKIP + >>> # Sort it back in order. + >>> ds.sort() # doctest: +SKIP Since Datasets are just lists of Ray object refs, they can be passed - between Ray tasks and actors just like any other object. Datasets support + between Ray tasks and actors without incurring a copy. Datasets support conversion to/from several more featureful dataframe libraries (e.g., Spark, Dask, Modin, MARS), and are also compatible with distributed TensorFlow / PyTorch. - - Dataset supports parallel transformations such as .map(), .map_batches(), - and simple repartition, but currently not aggregations and joins. """ def __init__( @@ -355,8 +391,6 @@ def add_column( A function generating the new column values given the batch in pandas format must be specified. - This is a convenience wrapper over ``.map_batches()``. - Examples: >>> import ray >>> ds = ray.data.range_table(100) # doctest: +SKIP @@ -698,8 +732,8 @@ def split( number of records. This may drop records if they cannot be divided equally among the splits. locality_hints: A list of Ray actor handles of size ``n``. The - system will try to co-locate the blocks of the ith dataset - with the ith actor to maximize data locality. + system will try to co-locate the blocks of the i-th dataset + with the i-th actor to maximize data locality. Returns: A list of ``n`` disjoint dataset splits. diff --git a/python/ray/data/dataset_pipeline.py b/python/ray/data/dataset_pipeline.py index 79737305acde9..20c29c1546835 100644 --- a/python/ray/data/dataset_pipeline.py +++ b/python/ray/data/dataset_pipeline.py @@ -57,7 +57,6 @@ class DatasetPipeline(Generic[T]): """Implements a pipeline of Datasets. - Unlike Datasets, which execute all transformations synchronously, DatasetPipelines implement pipelined execution. This allows for the overlapped execution of data input (e.g., reading files), computation (e.g. feature preprocessing), and output (e.g., distributed ML training).