Skip to content

Commit

Permalink
FEAT-modin-project#2627: merging all files into one main file
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoly Myachev <anatoly.myachev@intel.com>
  • Loading branch information
anmyachev committed Aug 24, 2021
1 parent eb29b4d commit fdf9c4f
Showing 1 changed file with 73 additions and 13 deletions.
86 changes: 73 additions & 13 deletions modin/experimental/engines/pandas_on_ray/io_exp.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class ExperimentalPandasOnRayIO(PandasOnRayIO):
"", (RayTask, PandasCSVGlobParser, CSVGlobDispatcher), build_args
)._read
read_parquet_remote_task = _read_parquet_columns
format_modin_pickle_files = 0.1

@classmethod
def read_sql(
Expand Down Expand Up @@ -218,25 +219,74 @@ def read_sql(
new_query_compiler._modin_frame.synchronize_labels(axis=0)
return new_query_compiler

@classmethod
def create_header_pattern(cls, num_partitions):
locations_pattern = " ".join(["{:12}"] * num_partitions * 2)
# format: - "modin format of pickle files XXX: num_partitions: XXX,
# locations: position + lengths for all partitions
header_pattern = "modin_format_of_pickle_files {:3} {:3} " + locations_pattern
return header_pattern

@classmethod
def get_header_size(cls, num_partitions):
# starts with `modin_format_of_pickle_files` - 28 bytes in utf-8 encoding
# format - 3 bytes, num_splits - 3 bytes
# for partition: position - 12 bytes, lengths - 12 bytes
# all numbers splits by whitespace symbol
count_whitespaces = (3 + num_partitions * 2) - 1
return 28 + 6 + 24 * num_partitions + count_whitespaces

@classmethod
def to_pickle(cls, qc, **kwargs):
root_dir = kwargs["path"]
if not os.path.exists(root_dir):
os.mkdir(root_dir)
kwargs["path"] = root_dir + "/" + kwargs["path"]
num_partitions = 4
header_size = cls.get_header_size(num_partitions)

def func(df, **kw):
partition_idx = kw["partition_idx"]
kwargs["path"] = kwargs["path"] + str(partition_idx)
df.to_pickle(**kwargs)
if partition_idx == 0:
with open(kwargs["path"], mode="wb") as dst:
# dummy header
dst.write(b"X" * header_size)
kwargs["path"] = dst
df.to_pickle(**kwargs)
else:
kwargs["path"] = kwargs["path"] + str(partition_idx)
df.to_pickle(**kwargs)
return pandas.DataFrame()

result = qc._modin_frame._apply_full_axis(
1, func, new_index=[], new_columns=[], enumerate_partitions=True
)
import shutil

# from time import time

# import pdb;pdb.set_trace()
header_pattern = cls.create_header_pattern(num_partitions)
# blocking operation
result.to_pandas()

locations = []
with open(kwargs["path"], mode="ab+") as dst:
# take into account first partition
locations.append(header_size)
locations.append(dst.tell() - header_size)
for idx in range(1, 4):
cur_pos = dst.tell()
# start = time()
with open(kwargs["path"] + str(idx), mode="rb") as src:
shutil.copyfileobj(src, dst)
os.remove(kwargs["path"] + str(idx))
locations.append(cur_pos)
locations.append(dst.tell() - cur_pos)
# print(f"copy {idx} file {time()-start}")

header = header_pattern.format(
cls.format_modin_pickle_files, num_partitions, *locations
)
with open(kwargs["path"], mode="rb+") as dst:
dst.write(header.encode())

@classmethod
def read_pickle(cls, filepath_or_buffer, compression="infer"):
if not isinstance(filepath_or_buffer, str):
Expand All @@ -251,13 +301,16 @@ def read_pickle(cls, filepath_or_buffer, compression="infer"):
partition_ids = []
lengths_ids = []
widths_ids = []

header_size = cls.get_header_size(num_partitions)
# import pdb;pdb.set_trace()
for idx_file in range(num_partitions):
partition_id = _read_pickle_files_in_folder._remote(
args=(
filepath_or_buffer,
compression,
idx_file,
num_splits,
header_size,
),
num_returns=num_splits + 2,
)
Expand Down Expand Up @@ -304,23 +357,30 @@ def _read_pickle_files_in_folder(
filepath: str,
compression: str,
idx_file: int,
num_splits: int,
header_size: int,
): # pragma: no cover
"""
Use a Ray task to read a pickle file under filepath folder.
TODO: add parameters descriptors
"""

def choose_file(filepath, idx_file):
return filepath + "/" + filepath + str(idx_file)
# def choose_file(filepath, idx_file):
# return filepath + "/" + filepath + str(idx_file)

filepath = choose_file(filepath, idx_file)
# filepath = choose_file(filepath, idx_file)
from io import BytesIO
import re

df = pandas.read_pickle(filepath, compression)
with open(filepath, mode="rb") as src:
header = re.split(b"\s+", src.read(header_size))
position = int(header[3 + idx_file * 2])
length = int(header[4 + idx_file * 2])
src.seek(position)
df = pandas.read_pickle(BytesIO(src.read(length)), compression)
length = len(df)
width = len(df.columns)

num_splits = 1
return _split_result_for_readers(1, num_splits, df) + [length, width]


Expand Down

0 comments on commit fdf9c4f

Please sign in to comment.