-
Notifications
You must be signed in to change notification settings - Fork 669
[FEATURE] Pivot implementation #1645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FEATURE] Pivot implementation #1645
Conversation
Codecov Report
@@ Coverage Diff @@
## master #1645 +/- ##
==========================================
+ Coverage 81.88% 81.93% +0.05%
==========================================
Files 79 79
Lines 9353 9379 +26
==========================================
+ Hits 7659 7685 +26
Misses 1694 1694
Continue to review full report at Codecov.
|
641adc2 to
408dbbd
Compare
devin-petersohn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would prefer to move the logic in dataframe.py to the _query_compiler. Some implementations have an optimized pivot operation, so we will want the control to in the query_compiler layer.
|
At |
|
Ok @dchigarev I will place a blocked tag on this PR. Feel free to remove it once #1649 is merged. |
devin-petersohn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice, this is quite clean and should perform very well.
|
@devin-petersohn I've add some fixes into But there is some mismatch from pandas in this implementation. If our specified |
|
@dchigarev pandas 1.1 (August 1) will have a new feature added in pandas-dev/pandas#30584 to keep or drop |
|
@devin-petersohn I've finished implementing
|
dee328b to
a1c6c90
Compare
devin-petersohn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran some timings to see how this implementation performs, and I noticed that utilization is very high (good), but overall time is a bit slow.
On 30,000 rows 3 columns, pandas takes 166 ms, this PR takes >30s. The performance is highly dependent on the number of unique values in the index column. With this case, defaulting to pandas takes 250ms.
Did you run performance on this yourself? I would not expect this performance seeing the implementation and knowing how groupby is implemented.
|
@devin-petersohn Yeah, the problem seems to appear with increasing amount of Reduce part in (please note if this method of measurement is incorrect) groupby_reduce @classmethod
def groupby_reduce(
cls, axis, partitions, by, map_func, reduce_func
): # pragma: no cover
from timeit import default_timer as timer
t1 = timer()
map_func = ray.put(map_func)
by_parts = np.squeeze(by)
if len(by_parts.shape) == 0:
by_parts = np.array([by_parts.item()])
print("GRP_preparations:", timer() - t1)
t1 = timer()
new_partitions = np.array(
[
[
PandasOnRayFramePartition(
func.remote(
part.oid,
by_parts[col_idx].oid if axis else by_parts[row_idx].oid,
map_func,
part.call_queue,
by_parts[col_idx].call_queue
if axis
else by_parts[row_idx].call_queue,
)
)
for col_idx, part in enumerate(partitions[row_idx])
]
for row_idx in range(len(partitions))
]
)
[y.get() for x in new_partitions for y in x]
print("GRP_map:", timer() - t1)
t1 = timer()
res = cls.map_axis_partitions(axis, new_partitions, reduce_func)
[y.get() for x in res for y in x]
print("GRP_reduce:", timer() - t1)
return resAnd I've got this result for And if you'll look at CPU usage at the reduce part, probably only one core would be used. Let's look at two corner cases:
Let's say we have
Adding this line before reduce part to trigger resplitting at rows axis, gives increase of performance in 5x times new_partitions = cls.map_axis_partitions(1, new_partitions, lambda df: df)Execution time for
It seems that groupby + apply for high dimension data is pretty hard for pandas, for example for by = pd_df["_index"]
pd_df.drop(columns=["_index", "_columns"]).groupby(by).apply(lambda df: df)So I think unsplitted data passed to the reduce function is a problem here. P.S. Also tested the previous implementation of 766cc14 still has places to optimize and requires refactoring, this uses draft |
1995ae4 to
fdbaaab
Compare
|
@devin-petersohn I've refactored |
devin-petersohn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dchigarev It looks reasonable using unstack here, I will leave comments about unstack implementation on #1649.
a523f1b to
ef58744
Compare
|
@devin-petersohn since |
ef58744 to
0b64517
Compare
0b64517 to
ef9ff37
Compare
Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>
ef9ff37 to
668987a
Compare
|
@devin-petersohn since |
devin-petersohn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The performance of this will not be very fast for this first implementation, but we will tune stack and unstack when #1975 is resolved.
…ect#1645) Signed-off-by: Dmitry Chigarev <dmitry.chigarev@intel.com>

What do these changes do?
flake8 modinblack --check modingit commit -sThat implementation uses
unstaskfunction that falls default to pandas in current master, but after #1649 will be merged it will not