Skip to content

Commit 3fb3848

Browse files
committed
Add user documentation for UDWF
1 parent d997df1 commit 3fb3848

File tree

1 file changed

+178
-17
lines changed

1 file changed

+178
-17
lines changed

docs/source/user-guide/common-operations/udf-and-udfa.rst

Lines changed: 178 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,21 @@
1818
User Defined Functions
1919
======================
2020

21-
DataFusion provides powerful expressions and functions, reducing the need for custom Python functions.
22-
However you can still incorporate your own functions, i.e. User-Defined Functions (UDFs), with the :py:func:`~datafusion.udf.ScalarUDF.udf` function.
21+
DataFusion provides powerful expressions and functions, reducing the need for custom Python
22+
functions. However you can still incorporate your own functions, i.e. User-Defined Functions (UDFs).
23+
24+
Scalar Functions
25+
----------------
26+
27+
When writing a user defined function that can operate on a row by row basis, these are called Scalar
28+
Functions. You can define your own scalar function by calling
29+
:py:func:`~datafusion.udf.ScalarUDF.udf` .
30+
31+
The basic definition of a scalar UDF is a python function that takes one or more
32+
`pyarrow <https://arrow.apache.org/docs/python/index.html>`_ arrays and returns a single array as
33+
output. DataFusion scalar UDFs operate on an entire batch of record at a time, though the evaluation
34+
of those records should be on a row by row basis. In the following example, we compute if the input
35+
array contains null values.
2336

2437
.. ipython:: python
2538
@@ -35,14 +48,70 @@ However you can still incorporate your own functions, i.e. User-Defined Function
3548
ctx = datafusion.SessionContext()
3649
3750
batch = pyarrow.RecordBatch.from_arrays(
38-
[pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
51+
[pyarrow.array([1, None, 3]), pyarrow.array([4, 5, 6])],
3952
names=["a", "b"],
4053
)
4154
df = ctx.create_dataframe([[batch]], name="batch_array")
4255
43-
df.select(is_null_arr(col("a"))).to_pandas()
56+
df.select(col("a"), is_null_arr(col("a")).alias("is_null")).show()
57+
58+
In the previous example, we used the fact that pyarrow provides a variety of built in array
59+
functions such as ``is_null()``. There are additional pyarrow
60+
`compute functions <https://arrow.apache.org/docs/python/compute.html>`_ available. When possible,
61+
it is highly recommended to use these functions because they can perform computations without doing
62+
any copy operations from the original arrays. This leads to greatly improved performance.
63+
64+
If you need to perform an operation in python that is not available with the pyarrow compute
65+
functions, you will need to convert the record batch into python values, perform your operation,
66+
and construct an array. This operation of converting the built in data type of the array into a
67+
python object can be one of the slowest operations in DataFusion, so it should be done sparingly.
68+
69+
The following example performs the same operation as before with ``is_null`` but demonstrates
70+
converting to Python objects to do the evaluation.
71+
72+
.. ipython:: python
73+
74+
import pyarrow
75+
import datafusion
76+
from datafusion import udf, col
77+
78+
def is_null(array: pyarrow.Array) -> pyarrow.Array:
79+
results = []
80+
for value in array:
81+
results.append(value.as_py() == None)
82+
return pyarrow.array(results)
83+
84+
is_null_arr = udf(is_null, [pyarrow.int64()], pyarrow.bool_(), 'stable')
85+
86+
ctx = datafusion.SessionContext()
87+
88+
batch = pyarrow.RecordBatch.from_arrays(
89+
[pyarrow.array([1, None, 3]), pyarrow.array([4, 5, 6])],
90+
names=["a", "b"],
91+
)
92+
df = ctx.create_dataframe([[batch]], name="batch_array")
4493
45-
Additionally the :py:func:`~datafusion.udf.AggregateUDF.udaf` function allows you to define User-Defined Aggregate Functions (UDAFs)
94+
df.select(col("a"), is_null_arr(col("a")).alias("is_null")).show()
95+
96+
Aggregate Functions
97+
-------------------
98+
99+
The :py:func:`~datafusion.udf.AggregateUDF.udaf` function allows you to define User-Defined
100+
Aggregate Functions (UDAFs). To use this you must implement an
101+
:py:class:`~datafusion.udf.Accumulator` that determines how the aggregation is performed.
102+
103+
When defining a UDAF there are four methods you need to implement. The ``update`` function takes the
104+
array(s) of input and updates the internal state of the accumulator. You should define this function
105+
to have as many input arguments as you will pass when calling the UDAF. Since aggregation may be
106+
split into multiple batches, we must have a method to combine multiple batches. For this, we have
107+
two functions, ``state`` and ``merge``. ``state`` will return an array of scalar values that contain
108+
the current state of a single batch accumulation. Then we must ``merge`` the results of these
109+
different states. Finally ``evaluate`` is the call that will return the final result after the
110+
``merge`` is complete.
111+
112+
In the following example we want to define a custom aggregate function that will return the
113+
difference between the sum of two columns. The state can be represented by a single value and we can
114+
also see how the inputs to ``update`` and ``merge`` differ.
46115

47116
.. code-block:: python
48117
@@ -57,30 +126,122 @@ Additionally the :py:func:`~datafusion.udf.AggregateUDF.udaf` function allows yo
57126
Interface of a user-defined accumulation.
58127
"""
59128
def __init__(self):
60-
self._sum = pyarrow.scalar(0.0)
129+
self._sum = 0.0
61130
62-
def update(self, values: pyarrow.Array) -> None:
63-
# not nice since pyarrow scalars can't be summed yet. This breaks on `None`
64-
self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(values).as_py())
131+
def update(self, values_a: pyarrow.Array, values_b: pyarrow.Array) -> None:
132+
self._sum = self._sum + pyarrow.compute.sum(values_a).as_py() - pyarrow.compute.sum(values_b).as_py()
65133
66134
def merge(self, states: List[pyarrow.Array]) -> None:
67-
# not nice since pyarrow scalars can't be summed yet. This breaks on `None`
68-
self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(states[0]).as_py())
135+
self._sum = self._sum + pyarrow.compute.sum(states[0]).as_py()
69136
70137
def state(self) -> pyarrow.Array:
71-
return pyarrow.array([self._sum.as_py()])
138+
return pyarrow.array([self._sum])
72139
73140
def evaluate(self) -> pyarrow.Scalar:
74-
return self._sum
141+
return pyarrow.scalar(self._sum)
75142
76143
ctx = datafusion.SessionContext()
77144
df = ctx.from_pydict(
78145
{
79-
"a": [1, 2, 3],
80-
"b": [4, 5, 6],
146+
"a": [4, 5, 6],
147+
"b": [1, 2, 3],
81148
}
82149
)
83150
84-
my_udaf = udaf(MyAccumulator, pyarrow.float64(), pyarrow.float64(), [pyarrow.float64()], 'stable')
151+
my_udaf = udaf(MyAccumulator, [pyarrow.float64(), pyarrow.float64()], pyarrow.float64(), [pyarrow.float64()], 'stable')
152+
153+
df.aggregate([], [my_udaf(col("a"), col("b")).alias("col_diff")])
154+
155+
Window Functions
156+
----------------
157+
158+
To implement a User-Defined Window Function (UDWF) you must call the
159+
:py:func:`~datafusion.udf.WindowUDF.udwf` function using a class that implements the abstract
160+
class :py:class:`~datafusion.udf.WindowEvaluator`.
161+
162+
There are three methods of evaluation of UDWFs.
163+
164+
- ``evaluate`` is the simplest case, where you are given an array and are expected to calculate the
165+
value for a single row of that array. This is the simplest case, but also the least performant.
166+
- ``evaluate_all`` computes the values for all rows for an input array at a single time.
167+
- ``evaluate_all_with_rank`` computes the values for all rows, but you only have the rank
168+
information for the rows.
169+
170+
Which methods you implement are based upon which of these options are set.
171+
172+
.. list-table:: Title
173+
:header-rows: 1
174+
175+
* - ``uses_window_frame``
176+
- ``supports_bounded_execution``
177+
- ``include_rank``
178+
- function_to_implement
179+
* - False (default)
180+
- False (default)
181+
- False (default)
182+
- ``evaluate_all``
183+
* - False
184+
- True
185+
- False
186+
- ``evaluate``
187+
* - False
188+
- True
189+
- False
190+
- ``evaluate_all_with_rank``
191+
* - True
192+
- True/False
193+
- True/False
194+
- ``evaluate``
195+
196+
UDWF options
197+
^^^^^^^^^^^^
198+
199+
When you define your UDWF you can override the functions that return these values. They will
200+
determine which evaluate functions are called.
201+
202+
- ``uses_window_frame`` is set for functions that compute based on the specified window frame. If
203+
your function depends upon the specified frame, set this to ``True``.
204+
- ``supports_bounded_execution`` specifies if your function can be incrementally computed.
205+
- ``include_rank`` is set to ``True`` for window functions that can be computed only using the rank
206+
information.
207+
208+
209+
.. code-block:: python
210+
211+
import pyarrow as pa
212+
from datafusion import udwf, col, SessionContext
213+
from datafusion.udf import WindowEvaluator
214+
215+
class ExponentialSmooth(WindowEvaluator):
216+
def __init__(self, alpha: float) -> None:
217+
self.alpha = alpha
218+
219+
def evaluate_all(self, values: list[pa.Array], num_rows: int) -> pa.Array:
220+
results = []
221+
curr_value = 0.0
222+
values = values[0]
223+
for idx in range(num_rows):
224+
if idx == 0:
225+
curr_value = values[idx].as_py()
226+
else:
227+
curr_value = values[idx].as_py() * self.alpha + curr_value * (
228+
1.0 - self.alpha
229+
)
230+
results.append(curr_value)
231+
232+
return pa.array(results)
233+
234+
exp_smooth = udwf(
235+
ExponentialSmooth(0.9),
236+
pa.float64(),
237+
pa.float64(),
238+
volatility="immutable",
239+
)
240+
241+
ctx = SessionContext()
242+
243+
df = ctx.from_pydict({
244+
"a": [1.0, 2.1, 2.9, 4.0, 5.1, 6.0, 6.9, 8.0]
245+
})
85246
86-
df.aggregate([],[my_udaf(col("a"))])
247+
df.select("a", exp_smooth(col("a")).alias("smooth_a")).show()

0 commit comments

Comments
 (0)