Skip to content

Commit

Permalink
FIX-modin-project#6558: Normalize the number of column partitions aft…
Browse files Browse the repository at this point in the history
…er '.read_parquet()'

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
  • Loading branch information
dchigarev committed Sep 14, 2023
1 parent 9886c01 commit 9462bcc
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 69 deletions.
16 changes: 11 additions & 5 deletions modin/core/io/column_stores/column_store_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,14 +143,16 @@ def build_index(cls, partition_ids):
return index, row_lengths

@classmethod
def build_columns(cls, columns):
def build_columns(cls, columns, num_splits=None):
"""
Split columns into chunks that should be read by workers.
Parameters
----------
columns : list
List of columns that should be read from file.
num_splits : int, optional
Desired number of column partitions.
Returns
-------
Expand All @@ -163,11 +165,15 @@ def build_columns(cls, columns):
columns_length = len(columns)
if columns_length == 0:
return [], []
num_partitions = NPartitions.get()
num_splits = (
NPartitions.get()
if num_splits is None
else min(NPartitions.get(), num_splits)
)
column_splits = (
columns_length // num_partitions
if columns_length % num_partitions == 0
else columns_length // num_partitions + 1
columns_length // num_splits
if columns_length % num_splits == 0
else columns_length // num_splits + 1
)
col_partitions = [
columns[i : i + column_splits]
Expand Down
167 changes: 103 additions & 64 deletions modin/core/io/column_stores/parquet_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,23 @@
import fsspec
from fsspec.core import url_to_fs
from fsspec.spec import AbstractBufferedFile
from modin.error_message import ErrorMessage
import numpy as np
from pandas.io.common import stringify_path
import pandas
import pandas._libs.lib as lib
from packaging import version
from typing import TYPE_CHECKING

from modin.core.storage_formats.pandas.utils import compute_chunksize
from modin.config import NPartitions


from modin.core.io.column_stores.column_store_dispatcher import ColumnStoreDispatcher
from modin.utils import _inherit_docstrings

if TYPE_CHECKING:
from modin.core.storage_formats.pandas.parsers import ParquetFileToRead


class ColumnStoreDataset:
"""
Expand Down Expand Up @@ -351,19 +355,102 @@ def get_dataset(cls, path, engine, storage_options):
raise ValueError("engine must be one of 'pyarrow', 'fastparquet'")

@classmethod
def call_deploy(cls, dataset, col_partitions, storage_options, **kwargs):
def _determine_partitioning(
cls, dataset: ColumnStoreDataset
) -> "list[list[ParquetFileToRead]]":
"""
Determine which partition will read certain files/row groups of the dataset.
Parameters
----------
dataset : ColumnStoreDataset
Returns
-------
list[list[ParquetFileToRead]]
Each element in the returned list describes a list of files that a partition has to read.
"""
from modin.core.storage_formats.pandas.parsers import ParquetFileToRead

row_groups_per_file = dataset.row_groups_per_file
num_row_groups = sum(row_groups_per_file)

if num_row_groups == 0:
return []

num_splits = min(NPartitions.get(), num_row_groups)
parquet_files = dataset.files
step = num_row_groups // num_splits
# If 'num_splits' does not divide 'num_row_groups' then we can't cover all of
# the row groups using the original 'step'. According to the 'reminder'
# there has to be that number of partitions that should read 'step + 1'
# number of row groups.
reminder = num_row_groups % num_splits
part_sizes = [step] * (num_splits - reminder) + [step + 1] * reminder

partition_files = []
file_idx = 0
row_group_idx = 0
row_groups_left_in_current_file = row_groups_per_file[file_idx]
# this is used for sanity check at the end, verifying that we indeed added all of the row groups
total_row_groups_added = 0
for size in part_sizes:
row_groups_taken = 0
part_files = []
while row_groups_taken != size:
if row_groups_left_in_current_file < 1:
file_idx += 1
row_group_idx = 0
row_groups_left_in_current_file = row_groups_per_file[file_idx]

to_take = min(size - row_groups_taken, row_groups_left_in_current_file)
part_files.append(
ParquetFileToRead(
parquet_files[file_idx],
row_group_start=row_group_idx,
row_group_end=row_group_idx + to_take,
)
)
row_groups_left_in_current_file -= to_take
row_groups_taken += to_take
row_group_idx += to_take

total_row_groups_added += row_groups_taken
partition_files.append(part_files)

sanity_check = (
len(partition_files) == num_splits
and total_row_groups_added == num_row_groups
)
ErrorMessage.catch_bugs_and_request_email(
failure_condition=not sanity_check,
extra_log="row groups added does not match total num of row groups across parquet files",
)
return partition_files

@classmethod
def call_deploy(
cls,
partition_files: "list[list[ParquetFileToRead]]",
col_partitions: "list[list[str]]",
storage_options: dict,
engine: str,
**kwargs,
):
"""
Deploy remote tasks to the workers with passed parameters.
Parameters
----------
dataset : Dataset
Dataset object of Parquet file/files.
col_partitions : list
partition_files : list[list[ParquetFileToRead]]
List of arrays with files that should be read by each partition.
col_partitions : list[list[str]]
List of arrays with columns names that should be read
by each partition.
storage_options : dict
Parameters for specific storage engine.
engine : {"auto", "pyarrow", "fastparquet"}
Parquet library to use for reading.
**kwargs : dict
Parameters of deploying read_* function.
Expand All @@ -372,67 +459,11 @@ def call_deploy(cls, dataset, col_partitions, storage_options, **kwargs):
List
Array with references to the task deploy result for each partition.
"""
from modin.core.storage_formats.pandas.parsers import ParquetFileToRead

# If we don't have any columns to read, we should just return an empty
# set of references.
if len(col_partitions) == 0:
return []

row_groups_per_file = dataset.row_groups_per_file
num_row_groups = sum(row_groups_per_file)
parquet_files = dataset.files

# step determines how many row groups are going to be in a partition
step = compute_chunksize(
num_row_groups,
NPartitions.get(),
min_block_size=1,
)
current_partition_size = 0
file_index = 0
partition_files = [] # 2D array - each element contains list of chunks to read
row_groups_used_in_current_file = 0
total_row_groups_added = 0
# On each iteration, we add a chunk of one file. That will
# take us either to the end of a partition, or to the end
# of a file.
while total_row_groups_added < num_row_groups:
if current_partition_size == 0:
partition_files.append([])
partition_file = partition_files[-1]
file_path = parquet_files[file_index]
row_group_start = row_groups_used_in_current_file
row_groups_left_in_file = (
row_groups_per_file[file_index] - row_groups_used_in_current_file
)
row_groups_left_for_this_partition = step - current_partition_size
if row_groups_left_for_this_partition <= row_groups_left_in_file:
# File has at least what we need to finish partition
# So finish this partition and start a new one.
num_row_groups_to_add = row_groups_left_for_this_partition
current_partition_size = 0
else:
# File doesn't have enough to complete this partition. Add
# it into current partition and go to next file.
num_row_groups_to_add = row_groups_left_in_file
current_partition_size += num_row_groups_to_add
if num_row_groups_to_add == row_groups_left_in_file:
file_index += 1
row_groups_used_in_current_file = 0
else:
row_groups_used_in_current_file += num_row_groups_to_add
partition_file.append(
ParquetFileToRead(
file_path, row_group_start, row_group_start + num_row_groups_to_add
)
)
total_row_groups_added += num_row_groups_to_add

assert (
total_row_groups_added == num_row_groups
), "row groups added does not match total num of row groups across parquet files"

all_partitions = []
for files_to_read in partition_files:
all_partitions.append(
Expand All @@ -442,7 +473,7 @@ def call_deploy(cls, dataset, col_partitions, storage_options, **kwargs):
f_kwargs={
"files_for_parser": files_to_read,
"columns": cols,
"engine": dataset.engine,
"engine": engine,
"storage_options": storage_options,
**kwargs,
},
Expand Down Expand Up @@ -627,9 +658,17 @@ def build_query_compiler(cls, dataset, columns, index_columns, **kwargs):
storage_options = kwargs.pop("storage_options", {}) or {}
filters = kwargs.get("filters", None)

col_partitions, column_widths = cls.build_columns(columns)
partition_files = cls._determine_partitioning(dataset)
col_partitions, column_widths = cls.build_columns(
columns,
num_splits=(
0
if len(partition_files) == 0
else round(NPartitions.get() / len(partition_files))
),
)
partition_ids = cls.call_deploy(
dataset, col_partitions, storage_options, **kwargs
partition_files, col_partitions, storage_options, dataset.engine, **kwargs
)
index, sync_index = cls.build_index(
dataset, partition_ids, index_columns, filters
Expand Down

0 comments on commit 9462bcc

Please sign in to comment.