Skip to content

Commit

Permalink
FIX-#6558: Normalize the number of partitions after .read_parquet() (
Browse files Browse the repository at this point in the history
…#6559)

Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
Co-authored-by: Anatoly Myachev <anatoliimyachev@mail.com>
  • Loading branch information
dchigarev and anmyachev authored Sep 16, 2023
1 parent 0e9cfdb commit 2880990
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 71 deletions.
24 changes: 17 additions & 7 deletions modin/core/io/column_stores/column_store_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import numpy as np
import pandas

from modin.config import NPartitions
from modin.config import MinPartitionSize, NPartitions
from modin.core.io.file_dispatcher import FileDispatcher
from modin.core.storage_formats.pandas.utils import compute_chunksize

Expand Down Expand Up @@ -143,14 +143,18 @@ def build_index(cls, partition_ids):
return index, row_lengths

@classmethod
def build_columns(cls, columns):
def build_columns(cls, columns, num_row_parts=None):
"""
Split columns into chunks that should be read by workers.
Parameters
----------
columns : list
List of columns that should be read from file.
num_row_parts : int, optional
Number of parts the dataset is split into. This parameter is used
to align the column partitioning with it so we won't end up with an
over partitioned frame.
Returns
-------
Expand All @@ -163,11 +167,17 @@ def build_columns(cls, columns):
columns_length = len(columns)
if columns_length == 0:
return [], []
num_partitions = NPartitions.get()
column_splits = (
columns_length // num_partitions
if columns_length % num_partitions == 0
else columns_length // num_partitions + 1
if num_row_parts is None:
# in column formats we mostly read columns in parallel rather than rows,
# so we try to chunk columns as much as possible
min_block_size = 1
else:
num_remaining_parts = round(NPartitions.get() / num_row_parts)
min_block_size = min(
columns_length // num_remaining_parts, MinPartitionSize.get()
)
column_splits = compute_chunksize(
columns_length, NPartitions.get(), max(1, min_block_size)
)
col_partitions = [
columns[i : i + column_splits]
Expand Down
163 changes: 99 additions & 64 deletions modin/core/io/column_stores/parquet_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import json
import os
import re
from typing import TYPE_CHECKING

import fsspec
import numpy as np
Expand All @@ -28,9 +29,12 @@

from modin.config import NPartitions
from modin.core.io.column_stores.column_store_dispatcher import ColumnStoreDispatcher
from modin.core.storage_formats.pandas.utils import compute_chunksize
from modin.error_message import ErrorMessage
from modin.utils import _inherit_docstrings

if TYPE_CHECKING:
from modin.core.storage_formats.pandas.parsers import ParquetFileToRead


class ColumnStoreDataset:
"""
Expand Down Expand Up @@ -349,19 +353,102 @@ def get_dataset(cls, path, engine, storage_options):
raise ValueError("engine must be one of 'pyarrow', 'fastparquet'")

@classmethod
def call_deploy(cls, dataset, col_partitions, storage_options, **kwargs):
def _determine_partitioning(
cls, dataset: ColumnStoreDataset
) -> "list[list[ParquetFileToRead]]":
"""
Determine which partition will read certain files/row groups of the dataset.
Parameters
----------
dataset : ColumnStoreDataset
Returns
-------
list[list[ParquetFileToRead]]
Each element in the returned list describes a list of files that a partition has to read.
"""
from modin.core.storage_formats.pandas.parsers import ParquetFileToRead

parquet_files = dataset.files
row_groups_per_file = dataset.row_groups_per_file
num_row_groups = sum(row_groups_per_file)

if num_row_groups == 0:
return []

num_splits = min(NPartitions.get(), num_row_groups)
part_size = num_row_groups // num_splits
# If 'num_splits' does not divide 'num_row_groups' then we can't cover all of
# the row groups using the original 'part_size'. According to the 'reminder'
# there has to be that number of partitions that should read 'part_size + 1'
# number of row groups.
reminder = num_row_groups % num_splits
part_sizes = [part_size] * (num_splits - reminder) + [part_size + 1] * reminder

partition_files = []
file_idx = 0
row_group_idx = 0
row_groups_left_in_current_file = row_groups_per_file[file_idx]
# this is used for sanity check at the end, verifying that we indeed added all of the row groups
total_row_groups_added = 0
for size in part_sizes:
row_groups_taken = 0
part_files = []
while row_groups_taken != size:
if row_groups_left_in_current_file < 1:
file_idx += 1
row_group_idx = 0
row_groups_left_in_current_file = row_groups_per_file[file_idx]

to_take = min(size - row_groups_taken, row_groups_left_in_current_file)
part_files.append(
ParquetFileToRead(
parquet_files[file_idx],
row_group_start=row_group_idx,
row_group_end=row_group_idx + to_take,
)
)
row_groups_left_in_current_file -= to_take
row_groups_taken += to_take
row_group_idx += to_take

total_row_groups_added += row_groups_taken
partition_files.append(part_files)

sanity_check = (
len(partition_files) == num_splits
and total_row_groups_added == num_row_groups
)
ErrorMessage.catch_bugs_and_request_email(
failure_condition=not sanity_check,
extra_log="row groups added does not match total num of row groups across parquet files",
)
return partition_files

@classmethod
def call_deploy(
cls,
partition_files: "list[list[ParquetFileToRead]]",
col_partitions: "list[list[str]]",
storage_options: dict,
engine: str,
**kwargs,
):
"""
Deploy remote tasks to the workers with passed parameters.
Parameters
----------
dataset : Dataset
Dataset object of Parquet file/files.
col_partitions : list
partition_files : list[list[ParquetFileToRead]]
List of arrays with files that should be read by each partition.
col_partitions : list[list[str]]
List of arrays with columns names that should be read
by each partition.
storage_options : dict
Parameters for specific storage engine.
engine : {"auto", "pyarrow", "fastparquet"}
Parquet library to use for reading.
**kwargs : dict
Parameters of deploying read_* function.
Expand All @@ -370,67 +457,11 @@ def call_deploy(cls, dataset, col_partitions, storage_options, **kwargs):
List
Array with references to the task deploy result for each partition.
"""
from modin.core.storage_formats.pandas.parsers import ParquetFileToRead

# If we don't have any columns to read, we should just return an empty
# set of references.
if len(col_partitions) == 0:
return []

row_groups_per_file = dataset.row_groups_per_file
num_row_groups = sum(row_groups_per_file)
parquet_files = dataset.files

# step determines how many row groups are going to be in a partition
step = compute_chunksize(
num_row_groups,
NPartitions.get(),
min_block_size=1,
)
current_partition_size = 0
file_index = 0
partition_files = [] # 2D array - each element contains list of chunks to read
row_groups_used_in_current_file = 0
total_row_groups_added = 0
# On each iteration, we add a chunk of one file. That will
# take us either to the end of a partition, or to the end
# of a file.
while total_row_groups_added < num_row_groups:
if current_partition_size == 0:
partition_files.append([])
partition_file = partition_files[-1]
file_path = parquet_files[file_index]
row_group_start = row_groups_used_in_current_file
row_groups_left_in_file = (
row_groups_per_file[file_index] - row_groups_used_in_current_file
)
row_groups_left_for_this_partition = step - current_partition_size
if row_groups_left_for_this_partition <= row_groups_left_in_file:
# File has at least what we need to finish partition
# So finish this partition and start a new one.
num_row_groups_to_add = row_groups_left_for_this_partition
current_partition_size = 0
else:
# File doesn't have enough to complete this partition. Add
# it into current partition and go to next file.
num_row_groups_to_add = row_groups_left_in_file
current_partition_size += num_row_groups_to_add
if num_row_groups_to_add == row_groups_left_in_file:
file_index += 1
row_groups_used_in_current_file = 0
else:
row_groups_used_in_current_file += num_row_groups_to_add
partition_file.append(
ParquetFileToRead(
file_path, row_group_start, row_group_start + num_row_groups_to_add
)
)
total_row_groups_added += num_row_groups_to_add

assert (
total_row_groups_added == num_row_groups
), "row groups added does not match total num of row groups across parquet files"

all_partitions = []
for files_to_read in partition_files:
all_partitions.append(
Expand All @@ -440,7 +471,7 @@ def call_deploy(cls, dataset, col_partitions, storage_options, **kwargs):
f_kwargs={
"files_for_parser": files_to_read,
"columns": cols,
"engine": dataset.engine,
"engine": engine,
"storage_options": storage_options,
**kwargs,
},
Expand Down Expand Up @@ -625,9 +656,13 @@ def build_query_compiler(cls, dataset, columns, index_columns, **kwargs):
storage_options = kwargs.pop("storage_options", {}) or {}
filters = kwargs.get("filters", None)

col_partitions, column_widths = cls.build_columns(columns)
partition_files = cls._determine_partitioning(dataset)
col_partitions, column_widths = cls.build_columns(
columns,
num_row_parts=len(partition_files),
)
partition_ids = cls.call_deploy(
dataset, col_partitions, storage_options, **kwargs
partition_files, col_partitions, storage_options, dataset.engine, **kwargs
)
index, sync_index = cls.build_index(
dataset, partition_ids, index_columns, filters
Expand Down

0 comments on commit 2880990

Please sign in to comment.