Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add daft and spark #81

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
cudf updates
upstream API changes...
  • Loading branch information
Martin Durant committed Nov 19, 2024
commit a680533b8a4e515ff1962409e074c271f4a04668
8 changes: 4 additions & 4 deletions docs/demo/akimbo-demo.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@
"id": "5c253d82-c1bf-4535-9e61-fa4b8bde799c",
"metadata": {},
"source": [
"We can convert back to an awkward Record array with merge:"
"We can convert back to an awkward Record array with pack:"
]
},
{
Expand All @@ -955,7 +955,7 @@
"metadata": {},
"outputs": [],
"source": [
"merged = akimbo.merge(df[[\"run\", \"luminosityBlock\"]])"
"merged = akimbo.pack(df[[\"run\", \"luminosityBlock\"]])"
]
},
{
Expand Down Expand Up @@ -1024,7 +1024,7 @@
}
],
"source": [
"ak.to_parquet(akimbo.merge(df[df.luminosityBlock == maybe_strange_lumiblock]), \"strange.parquet\")"
"ak.to_parquet(akimbo.pack(df[df.luminosityBlock == maybe_strange_lumiblock]), \"strange.parquet\")"
]
},
{
Expand All @@ -1051,7 +1051,7 @@
}
],
"source": [
"ak.to_parquet(akimbo.merge(df[df.luminosityBlock == maybe_strange_lumiblock]).values._data, \"strange.parquet\")"
"ak.to_parquet(akimbo.pack(df[df.luminosityBlock == maybe_strange_lumiblock]).values._data, \"strange.parquet\")"
]
},
{
Expand Down
6 changes: 3 additions & 3 deletions docs/quickstart.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@
"id": "d60887e8-582b-474f-a79f-173bc62c4bd1",
"metadata": {},
"source": [
"Of these, two are \"normal\" fields - they can be made into dataframe columns containing no nesting. To unwrap the top record-like structure of the data, we can use ``unmerge``."
"Of these, two are \"normal\" fields - they can be made into dataframe columns containing no nesting. To unwrap the top record-like structure of the data, we can use ``unpack``."
]
},
{
Expand All @@ -361,7 +361,7 @@
"metadata": {},
"outputs": [],
"source": [
"df = data.ak.unmerge()"
"df = data.ak.unpack()"
]
},
{
Expand Down Expand Up @@ -591,7 +591,7 @@
"metadata": {},
"outputs": [],
"source": [
"s = df.ak.merge()"
"s = df.ak.pack()"
]
},
{
Expand Down
2 changes: 1 addition & 1 deletion example/cudf-ak.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@
" 'typestr',\n",
" 'typetracer',\n",
" 'unflatten',\n",
" 'unmerge',\n",
" 'unpack',\n",
" 'unzip',\n",
" 'validity_error',\n",
" 'values_astype',\n",
Expand Down
31 changes: 28 additions & 3 deletions src/akimbo/apply_tree.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,22 @@
import pyarrow as pa


def match_any(*layout, **_):
return True


def leaf(*layout, **_):
"""True for the lowest elements of any akwward layout tree"""
return layout[0].is_leaf


def numeric(*layout, **_):
return layout[0].is_leaf and layout[0].parameters.get("__array__", None) not in {
"string",
"char",
}


def run_with_transform(
arr: ak.Array,
op,
Expand All @@ -24,6 +35,8 @@ def run_with_transform(
**kw,
) -> ak.Array:
def func(layout, **kwargs):
from akimbo.utils import match_string

if not isinstance(layout, tuple):
layout = (layout,)
if all(match(lay, **(match_kwargs or {})) for lay in layout):
Expand All @@ -34,11 +47,23 @@ def func(layout, **kwargs):
elif inmode == "numpy":
# works on numpy/cupy contents
out = op(*(lay.data for lay in layout), **kw, **(match_kwargs or {}))
else:
elif inmode == "ak":
out = op(*layout, **kw, **(match_kwargs or {}))
return outtype(out) if callable(outtype) else out
else:
out = op(
*(ak.Array(lay) for lay in layout), **kw, **(match_kwargs or {})
)
if callable(outtype):
return outtype(out)
elif isinstance(out, ak.Array):
return out.layout
else:
return out
if match_string(*layout):
# non-string op may fail to descend into string
return layout[0]

return ak.transform(func, arr, *others)
return ak.transform(func, arr, *others, allow_records=True)


def dec(
Expand Down
11 changes: 8 additions & 3 deletions src/akimbo/cudf.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def dec_cu(op, match=match_string):
def f(lay, **kwargs):
# op(column, ...)->column
col = op(lay._to_cudf(cudf, None, len(lay)), **kwargs)
return from_cudf(cudf.Series(col)).layout
return from_cudf(cudf.Series._from_column(col)).layout

return dec(func=f, match=match, inmode="ak")

Expand All @@ -61,7 +61,7 @@ def f(lay, method=meth, **kwargs):
# this is different from dec_cu, because we need to instantiate StringMethods
# before getting the method from it
col = getattr(
StringMethods(cudf.Series(lay._to_cudf(cudf, None, len(lay)))), method
StringMethods(cudf.Series._from_column(lay._to_cudf(cudf, None, len(lay)))), method
)(**kwargs)
return from_cudf(col).layout

Expand All @@ -87,7 +87,7 @@ def f(lay, method=meth, **kwargs):
else:
# attributes giving components
col = m
return from_cudf(cudf.Series(col)).layout
return from_cudf(cudf.Series._from_column(col)).layout

if isinstance(getattr(DatetimeColumn, meth), property):
setattr(
Expand All @@ -103,6 +103,11 @@ class CudfAwkwardAccessor(Accessor):
series_type = Series
dataframe_type = DataFrame

@classmethod
def _arrow_to_series(cls, data):
# this implies CPU->GPU copy
return Series(data)

@classmethod
def _to_output(cls, arr):
if isinstance(arr, ak.Array):
Expand Down
2 changes: 1 addition & 1 deletion src/akimbo/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def ak_to_series(ds, backend="pandas", extract=True):
else:
raise ValueError("Backend must be in {'pandas', 'polars', 'dask'}")
if extract and ds.fields:
return s.ak.unmerge()
return s.ak.unpack()
return s


Expand Down
139 changes: 111 additions & 28 deletions src/akimbo/mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
import awkward as ak
import pyarrow.compute as pc

from akimbo.apply_tree import dec, run_with_transform
from akimbo.apply_tree import dec, match_any, numeric, run_with_transform
from akimbo.utils import rec_list_swap, to_ak_layout

methods = [
_ for _ in (dir(ak)) if not _.startswith(("_", "ak_")) and not _[0].isupper()
] + ["apply", "array", "explode", "dt", "str"]

df_methods = sorted(methods + ["merge"])
series_methods = sorted(methods + ["unmerge"])
df_methods = sorted(methods + ["pack"])
series_methods = sorted(methods + ["unpack"])


def radd(left, right):
Expand Down Expand Up @@ -164,11 +165,19 @@ def is_dataframe(cls, data):
return isinstance(data, cls.dataframe_type)

@classmethod
def _to_output(cls, data):
# TODO: clarify protocol here; can data be in arrow already?
def _arrow_to_series(cls, data):
"""How to make a series from arrow data"""
raise NotImplementedError

@classmethod
def _to_output(cls, data):
"""How to make a series from ak or arrow"""
if isinstance(data, ak.Array):
data = ak.to_arrow(data, extensionarray=False)
return cls._arrow_to_series(data)

def to_output(self, data=None):
"""Data returned as a series"""
data = data if data is not None else self.array
if not isinstance(data, Iterable):
return data
Expand All @@ -179,6 +188,9 @@ def apply(self, fn: Callable, where=None, **kwargs):

The function should take an ak array as input and produce an
ak array or scalar.

Unlike ``transform``, the function takes and returns ak.Array instances
and acts on a whole schema tree.
"""
if where:
bits = tuple(where.split("."))
Expand All @@ -190,6 +202,44 @@ def apply(self, fn: Callable, where=None, **kwargs):
final = fn(self.array)
return self.to_output(final)

def transform(
self, fn: Callable, *others, where=None, match=match_any, inmode="ak", **kwargs
):
"""Perform arbitrary function to selected parts of the data tree

This process walks thought the data's schema tree, and applies the given
function only on the matching nodes.

Parameters
----------
fn: the operation you want to perform. Typically unary or binary, and may take
extra kwargs
others: extra arguments, perhaps other akimbo series
where: path in the schema tree to apply this
match: when walking the schema, this determines if a node should be processed;
it will be a function taking one or more ak.contents classes. ak.apaply_tree
contains convenience matchers macth_any, leaf and numeric, and more matchers
can be found in the string and datetime modules
inmode: data should be passed to the given function as:
"arrow" | "numpy" (includes cupy) | "ak" layout | "array" high-level ak.Array
kwargs: passed to the operation, except those that are taken by ``run_with_transform``.
"""
if where:
bits = tuple(where.split("."))
arr = self.array
part = arr.__getitem__(bits)
# TODO: apply ``where`` to any arrays in others
# other = [to_ak_layout(ar) for ar in others]
out = run_with_transform(
part, fn, match=match, others=others, inmode=inmode, **kwargs
)
final = ak.with_field(arr, out, where=where)
else:
final = run_with_transform(
self.array, fn, match=match, others=others, inmode=inmode, **kwargs
)
return self.to_output(final)

def __getitem__(self, item):
out = self.array.__getitem__(item)
return self.to_output(out)
Expand Down Expand Up @@ -271,26 +321,54 @@ def rename(self, where, to):
parent.fields[this] = to
return self.to_output(ak.Array(lay))

def merge(self):
def pack(self):
"""Make a single complex series out of the columns of a dataframe"""
if not self.is_dataframe(self._obj):
raise ValueError("Can only merge on a dataframe")
raise ValueError("Can only pack on a dataframe")
out = {}
for k in self._obj.columns:
# TODO: partial merge when column names are like "record.field"
# TODO: partial pack when column names are like "record.field"
out[k] = self._obj[k].ak.array
arr = ak.Array(out)
return self.to_output(arr)

def unmerge(self):
def unpack(self):
"""Make dataframe out of a series of record type"""
# TODO: what to do when passed a dataframe, partial unpack of record fields?
arr = self.array
if not arr.fields:
raise ValueError("Not array-of-records")
# TODO: partial unmerge when (some) fields are records
out = {k: self.to_output(arr[k]) for k in arr.fields}
return self.dataframe_type(out)

def unexplode(self, *cols, outname="grouped"):
"""Repack "exploded" form dataframes into lists of structs

This is the inverse of the regular dataframe explode() process.
"""
# TODO: this does not work on cuDF as here we use arrow directly
# TODO: pandas indexes are pre-grouped cat-like structures
cols = list(cols)
arr = self.arrow
if set(cols) - set(arr.column_names):
raise ValueError(
"One or more rouping column (%s) not in available columns %s",
cols,
arr.column_names,
)
outcols = [(_, "list") for _ in arr.column_names if _ not in cols]
if not outcols:
raise ValueError("Cannot group on all available columns")
outcols2 = [f"{_[0]}_list" for _ in outcols]
grouped = arr.group_by(cols).aggregate(outcols)
akarr = ak.from_arrow(grouped)
akarr2 = akarr[outcols2]
akarr2.layout._fields = [_[0] for _ in outcols]
struct = rec_list_swap(akarr2)
final = ak.with_field(akarr[cols], struct, outname)

return self._to_output(final).ak.unpack()

def join(
self,
other,
Expand Down Expand Up @@ -331,26 +409,31 @@ def join(
def _create_op(cls, op):
"""Make functions to perform all the arithmetic, logical and comparison ops"""

def op2(*arg, **kwargs):
return op(*[ak.Array(_) for _ in arg], **kwargs).layout

def run(self, *args, **kwargs):
ar3 = []
for ar in args:
if hasattr(ar, "ak"):
ar3.append(ar.ak.array)
elif isinstance(ar, cls):
ar3.append(ar.array)
elif isinstance(ar, (ak.Array)):
ar3.appen(ar)
else:
ar3.append(ak.Array(ak.to_layout(ar)))
out = run_with_transform(
self.array, op=op2, inmode="ak", others=ar3, **kwargs
def op2(*args, extra=None, **kw):
args = list(args) + list(extra or [])
return op(*args, **kw)

def f(self, *args, **kw):
# TODO: test here is for literals, but really we want "don't know how to
# array that" condition
extra = (_ for _ in args if isinstance(_, (str, int, float)))
args = (
to_ak_layout(_) for _ in args if not isinstance(_, (str, int, float))
)
out = self.transform(
op2,
*args,
match=numeric,
inmode="numpy",
extra=extra,
outtype=ak.contents.NumpyArray,
**kw,
)
return self.to_output(out)
if isinstance(self._obj, self.dataframe_type):
return out.ak.unpack()
return out

return run
return f

def __getattr__(self, item):
arr = self.array
Expand Down
6 changes: 2 additions & 4 deletions src/akimbo/pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,8 @@ def to_arrow(cls, data):
return pa.table(data)

@classmethod
def _to_output(cls, data):
return pd.Series(
pd.arrays.ArrowExtensionArray(ak.to_arrow(data, extensionarray=False))
)
def _arrow_to_series(cls, data):
return pd.Series(pd.arrays.ArrowExtensionArray(data))

def to_output(self, data=None):
# override to apply index
Expand Down
Loading
Loading