|
28 | 28 | def read_parquet(path, engine='auto', columns=None, **kwargs): |
29 | 29 | """Load a parquet object from the file path, returning a DataFrame. |
30 | 30 | Ray DataFrame only supports pyarrow engine for now. |
31 | | -
|
32 | 31 | Args: |
33 | 32 | path: The filepath of the parquet file. |
34 | 33 | We only support local files for now. |
35 | 34 | engine: Ray only support pyarrow reader. |
36 | 35 | This argument doesn't do anything for now. |
37 | 36 | kwargs: Pass into parquet's read_pandas function. |
38 | | -
|
39 | 37 | Notes: |
40 | 38 | ParquetFile API is used. Please refer to the documentation here |
41 | 39 | https://arrow.apache.org/docs/python/parquet.html |
42 | 40 | """ |
| 41 | + return _read_parquet_pandas_on_ray(path, engine, columns, **kwargs) |
| 42 | + |
| 43 | + |
| 44 | +def _read_parquet_pandas_on_ray(path, engine, columns, **kwargs): |
| 45 | + from pyarrow.parquet import ParquetFile |
| 46 | + |
43 | 47 | if not columns: |
44 | 48 | pf = ParquetFile(path) |
45 | 49 | columns = [ |
46 | 50 | name for name in pf.metadata.schema.names |
47 | 51 | if not PQ_INDEX_REGEX.match(name) |
48 | 52 | ] |
49 | | - |
| 53 | + num_splits = min( |
| 54 | + len(columns), RayBlockPartitions._compute_num_partitions()) |
50 | 55 | # Each item in this list will be a column of original df |
51 | 56 | # partitioned to smaller pieces along rows. |
52 | 57 | # We need to transpose the oids array to fit our schema. |
53 | | - blk_partitions = [ |
54 | | - ray.get(_read_parquet_column.remote(path, col, kwargs)) |
55 | | - for col in columns |
56 | | - ] |
57 | | - blk_partitions = np.array(blk_partitions).T |
58 | | - |
59 | | - return DataFrame(block_partitions=blk_partitions, columns=columns) |
| 58 | + blk_partitions = np.array([ |
| 59 | + _read_parquet_column._submit( |
| 60 | + args=(path, col, num_splits, kwargs), |
| 61 | + num_return_vals=num_splits + 1) for col in columns |
| 62 | + ]).T |
| 63 | + remote_partitions = np.array([[RayRemotePartition(obj) for obj in row] |
| 64 | + for row in blk_partitions[:-1]]) |
| 65 | + index_len = ray.get(blk_partitions[-1][0]) |
| 66 | + index = pandas.RangeIndex(index_len) |
| 67 | + new_manager = PandasDataManager( |
| 68 | + RayBlockPartitions(remote_partitions), index, columns) |
| 69 | + df = DataFrame(data_manager=new_manager) |
| 70 | + return df |
60 | 71 |
|
61 | 72 |
|
62 | 73 | # CSV |
@@ -555,6 +566,13 @@ def _read_csv_with_offset_pandas_on_ray(fname, num_splits, start, end, kwargs, h |
555 | 566 |
|
556 | 567 | @ray.remote |
557 | 568 | def _read_parquet_column(path, column, kwargs={}): |
| 569 | + import pyarrow.parquet as pq |
| 570 | + df = pq.read_pandas(path, columns=[column], **kwargs).to_pandas() |
| 571 | + # Append the length of the index here to build it externally |
| 572 | + return split_result_of_axis_func_pandas(0, num_splits, |
| 573 | + df) + [len(df.index)] |
| 574 | + # IMPORTANT: DO NOT DELETE THE CODE BELOW |
| 575 | + # Deleting this code for some reason causes workers to crash on high-CPU machines. |
558 | 576 | df = pq.read_pandas(path, columns=[column], **kwargs).to_pandas() |
559 | 577 | oids = _partition_pandas_dataframe(df, num_partitions=get_npartitions()) |
560 | 578 | return oids |
|
0 commit comments