Skip to content

Commit

Permalink
[Data] Move _fetch_metadata_parallel to file_meta_provider (ray-p…
Browse files Browse the repository at this point in the history
…roject#40295)

_fetch_metadata_parallel() is only used in file_meta_provider.py. To keep the function close to where it's actually used, this PR moves it from file_based_datasource.py to file_meta_provider.py.

---------

Signed-off-by: Balaji Veeramani <balaji@anyscale.com>
  • Loading branch information
bveeramani authored Oct 19, 2023
1 parent 318fd57 commit f9de855
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 36 deletions.
32 changes: 0 additions & 32 deletions python/ray/data/datasource/file_based_datasource.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import itertools
import os
import pathlib
import sys
Expand All @@ -14,7 +13,6 @@
Literal,
Optional,
Tuple,
TypeVar,
Union,
)

Expand All @@ -24,8 +22,6 @@
from ray.air._internal.remote_storage import _is_local_windows_path
from ray.data._internal.dataset_logger import DatasetLogger
from ray.data._internal.execution.interfaces import TaskContext
from ray.data._internal.progress_bar import ProgressBar
from ray.data._internal.remote_fn import cached_remote_fn
from ray.data._internal.util import (
_check_pyarrow_version,
_resolve_custom_scheme,
Expand Down Expand Up @@ -844,34 +840,6 @@ def _resolve_kwargs(
return kwargs


Uri = TypeVar("Uri")
Meta = TypeVar("Meta")


def _fetch_metadata_parallel(
uris: List[Uri],
fetch_func: Callable[[List[Uri]], List[Meta]],
desired_uris_per_task: int,
**ray_remote_args,
) -> Iterator[Meta]:
"""Fetch file metadata in parallel using Ray tasks."""
remote_fetch_func = cached_remote_fn(fetch_func, num_cpus=0.5)
if ray_remote_args:
remote_fetch_func = remote_fetch_func.options(**ray_remote_args)
# Choose a parallelism that results in a # of metadata fetches per task that
# dominates the Ray task overhead while ensuring good parallelism.
# Always launch at least 2 parallel fetch tasks.
parallelism = max(len(uris) // desired_uris_per_task, 2)
metadata_fetch_bar = ProgressBar("Metadata Fetch Progress", total=parallelism)
fetch_tasks = []
for uri_chunk in np.array_split(uris, parallelism):
if len(uri_chunk) == 0:
continue
fetch_tasks.append(remote_fetch_func.remote(uri_chunk))
results = metadata_fetch_bar.fetch_until_complete(fetch_tasks)
yield from itertools.chain.from_iterable(results)


def _open_file_with_retry(
file_path: str,
open_file: Callable[[], "pyarrow.NativeFile"],
Expand Down
48 changes: 44 additions & 4 deletions python/ray/data/datasource/file_meta_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,22 @@
import os
import pathlib
import re
from typing import TYPE_CHECKING, Any, Iterator, List, Optional, Tuple, Union

from typing import (
TYPE_CHECKING,
Any,
Callable,
Iterator,
List,
Optional,
Tuple,
TypeVar,
Union,
)

import numpy as np

from ray.data._internal.progress_bar import ProgressBar
from ray.data._internal.remote_fn import cached_remote_fn
from ray.data.block import BlockMetadata
from ray.data.datasource.partitioning import Partitioning
from ray.util.annotations import DeveloperAPI
Expand Down Expand Up @@ -315,7 +329,6 @@ def prefetch_file_metadata(
fragments: List["pyarrow.dataset.ParquetFileFragment"],
**ray_remote_args,
) -> Optional[List["pyarrow.parquet.FileMetaData"]]:
from ray.data.datasource.file_based_datasource import _fetch_metadata_parallel
from ray.data.datasource.parquet_datasource import (
FRAGMENTS_PER_META_FETCH,
PARALLELIZE_META_FETCH_THRESHOLD,
Expand Down Expand Up @@ -477,7 +490,6 @@ def _get_file_infos_parallel(
) -> Iterator[Tuple[str, int]]:
from ray.data.datasource.file_based_datasource import (
PATHS_PER_FILE_SIZE_FETCH_TASK,
_fetch_metadata_parallel,
_unwrap_s3_serialization_workaround,
_wrap_s3_serialization_workaround,
)
Expand Down Expand Up @@ -506,6 +518,34 @@ def _file_infos_fetcher(paths: List[str]) -> List[Tuple[str, int]]:
)


Uri = TypeVar("Uri")
Meta = TypeVar("Meta")


def _fetch_metadata_parallel(
uris: List[Uri],
fetch_func: Callable[[List[Uri]], List[Meta]],
desired_uris_per_task: int,
**ray_remote_args,
) -> Iterator[Meta]:
"""Fetch file metadata in parallel using Ray tasks."""
remote_fetch_func = cached_remote_fn(fetch_func, num_cpus=0.5)
if ray_remote_args:
remote_fetch_func = remote_fetch_func.options(**ray_remote_args)
# Choose a parallelism that results in a # of metadata fetches per task that
# dominates the Ray task overhead while ensuring good parallelism.
# Always launch at least 2 parallel fetch tasks.
parallelism = max(len(uris) // desired_uris_per_task, 2)
metadata_fetch_bar = ProgressBar("Metadata Fetch Progress", total=parallelism)
fetch_tasks = []
for uri_chunk in np.array_split(uris, parallelism):
if len(uri_chunk) == 0:
continue
fetch_tasks.append(remote_fetch_func.remote(uri_chunk))
results = metadata_fetch_bar.fetch_until_complete(fetch_tasks)
yield from itertools.chain.from_iterable(results)


def _get_file_infos(
path: str, filesystem: "pyarrow.fs.FileSystem", ignore_missing_path: bool = False
) -> List[Tuple[str, int]]:
Expand Down

0 comments on commit f9de855

Please sign in to comment.