Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add source & stage decorators #1364

Merged
merged 116 commits into from
Nov 29, 2023
Merged
Changes from 1 commit
Commits
Show all changes
116 commits
Select commit Hold shift + click to select a range
7fd1c4e
Remove unused method
dagardner-nv Nov 3, 2023
fcac874
Update out of date comments and variable names
dagardner-nv Nov 3, 2023
19acdf0
Don't run milvus tests by default, running 'pytest --run_slow' now ta…
dagardner-nv Oct 17, 2023
8e82ba3
Consolidate milvus fixtures
dagardner-nv Oct 17, 2023
5c9379b
Set an explicit storage dir for milvus, ensuring that we always get a…
dagardner-nv Oct 17, 2023
5513dad
Defining MORPHEUS_MILVUS_URI env var avoids milvus startup, milvus_se…
dagardner-nv Oct 18, 2023
e849600
Add _get_random_port method
dagardner-nv Nov 3, 2023
2c48a07
Use new tuple type-hint
dagardner-nv Nov 4, 2023
f79475a
Update code snippets, todo: Updating text
dagardner-nv Nov 4, 2023
bbde0a8
Update to reflect recent changes [no ci]
dagardner-nv Nov 6, 2023
e906235
Use newer tuple type hint
dagardner-nv Nov 6, 2023
42e1388
Update docs for RecipientFeaturesStage
dagardner-nv Nov 6, 2023
4c49102
wip [no ci]
dagardner-nv Nov 6, 2023
65ae66e
wip [no ci]
dagardner-nv Nov 6, 2023
e289018
wip [no ci]
dagardner-nv Nov 6, 2023
9d71d75
Set log level to INFO to ensure monitor output [no ci]
dagardner-nv Nov 6, 2023
efedc35
wip [no ci]
dagardner-nv Nov 6, 2023
077af27
Fix long-standing bug in pass-thru example, the C++ impl only works w…
dagardner-nv Nov 6, 2023
04f2396
Update the example to include two instances of the PassThruStage, the…
dagardner-nv Nov 6, 2023
a154a04
WIP [no ci]
dagardner-nv Nov 6, 2023
f65d8b4
Remove uneeded return statement
dagardner-nv Nov 6, 2023
fa7738c
WIP [no ci]
dagardner-nv Nov 6, 2023
e776419
WIP [no ci]
dagardner-nv Nov 6, 2023
ff76387
No need to rename the stage
dagardner-nv Nov 6, 2023
d2ee5a7
WIP [no ci]
dagardner-nv Nov 6, 2023
e9b7d44
Remove pylint directive from docs [no ci]
dagardner-nv Nov 6, 2023
70d16a5
Remove unused imports and formatting [no ci]
dagardner-nv Nov 6, 2023
480c86c
formatting [no ci]
dagardner-nv Nov 6, 2023
3e91fb7
WIP [no ci]
dagardner-nv Nov 6, 2023
27135a9
Formatting [no ci]
dagardner-nv Nov 7, 2023
442da9b
WIP [no ci]
dagardner-nv Nov 7, 2023
73b3797
WIP
dagardner-nv Nov 7, 2023
4d74e25
WIP [no ci]
dagardner-nv Nov 7, 2023
e090091
Ensure the module instance always receives a unique name by prependin…
dagardner-nv Nov 7, 2023
fcc8844
Add missing module config to builder.load_module calls, update module…
dagardner-nv Nov 7, 2023
affa261
Update to use a simple df of floats
dagardner-nv Nov 7, 2023
e3546ff
wip: [no ci]
dagardner-nv Nov 7, 2023
5231c81
Formatting [no ci]
dagardner-nv Nov 7, 2023
15fad5a
formatting [no ci]
dagardner-nv Nov 7, 2023
b6bd48f
formatting [no ci]
dagardner-nv Nov 7, 2023
d5f9201
Update code snippets to match api changes [no ci]
dagardner-nv Nov 7, 2023
50293db
Add end-to-end test for the code snippets used in the python_modules …
dagardner-nv Nov 7, 2023
262800f
Formatting [no ci]
dagardner-nv Nov 7, 2023
9bf6508
Make task names lower-case to be consistent [no ci]
dagardner-nv Nov 7, 2023
8d3336d
Rename out_streams to out_nodes
dagardner-nv Nov 7, 2023
b9290af
Fix path to renamed script
dagardner-nv Nov 7, 2023
57ecb18
Fix import order
dagardner-nv Nov 7, 2023
e1d75a7
First pass at a stage decorator
dagardner-nv Nov 14, 2023
7dab9f0
Add source decorator
dagardner-nv Nov 15, 2023
aef5ece
Sort imports
dagardner-nv Nov 15, 2023
5d5bcfc
WIP
dagardner-nv Nov 15, 2023
5ef9326
Merge branch 'branch-23.11' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Nov 15, 2023
250c330
Docstring cleanups [no ci]
dagardner-nv Nov 15, 2023
1ecaa96
WIP [no ci]
dagardner-nv Nov 16, 2023
37d7926
WIP docstrings [no ci]
dagardner-nv Nov 16, 2023
d3699c2
WIP docstrings [no ci]
dagardner-nv Nov 16, 2023
5155dcf
First pass at source decorator [no ci]
dagardner-nv Nov 16, 2023
83f290a
lint cleanups [no ci]
dagardner-nv Nov 16, 2023
45725b5
Fix annotations [no ci]
dagardner-nv Nov 16, 2023
78c3ec3
Disable pylint warnings [no ci]
dagardner-nv Nov 16, 2023
d15f7dc
Fix handling of *args, and more tests [no ci]
dagardner-nv Nov 17, 2023
a430eff
wip [no ci]
dagardner-nv Nov 17, 2023
a7e35db
WIP: [no ci]
dagardner-nv Nov 17, 2023
ab16c49
wip [no ci]
dagardner-nv Nov 17, 2023
113e642
wip [no ci]
dagardner-nv Nov 17, 2023
618444a
wip [no ci]
dagardner-nv Nov 17, 2023
97f7dcd
wip [no ci]
dagardner-nv Nov 17, 2023
6e5cd52
Formatting fixes
dagardner-nv Nov 17, 2023
943e8b7
Merge branch 'branch-23.11' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Nov 17, 2023
b1f783f
Expose the needed_columns functionality to the stage decorator
dagardner-nv Nov 20, 2023
054a150
wip
dagardner-nv Nov 20, 2023
9a08075
wip
dagardner-nv Nov 20, 2023
44a2c70
Expose the needed_columns functionality to the stage decorator
dagardner-nv Nov 20, 2023
a317299
Add decorated version of recipient_features_stage
dagardner-nv Nov 21, 2023
90acd43
Optionally use the decorator version of the recipient features stage
dagardner-nv Nov 21, 2023
749fa09
Bind default values for keyword arguments to method
dagardner-nv Nov 21, 2023
ee3ae31
Remove binding the keyword argument
dagardner-nv Nov 21, 2023
46c69f5
Test default values for keyword args
dagardner-nv Nov 21, 2023
5763504
Merge branch 'david-stage-decorator' into david-stage-decorator-docs
dagardner-nv Nov 21, 2023
af67576
Make sep_token an explicit keyword only argument
dagardner-nv Nov 21, 2023
f92d44b
Add test for the decorated version of the pass-thru stage
dagardner-nv Nov 21, 2023
a0bbdac
Expand tests to test the decorated version of the recipient features …
dagardner-nv Nov 21, 2023
6e5d83a
Document stage decorator
dagardner-nv Nov 21, 2023
d1dd3ae
Expose the other options as command line flags
dagardner-nv Nov 21, 2023
d08b9d0
WIP
dagardner-nv Nov 21, 2023
824a59a
Rename use_decorator to use_stage_function
dagardner-nv Nov 21, 2023
ce4286b
Remove unused import
dagardner-nv Nov 22, 2023
d321771
Merge branch 'branch-23.11' into david-stage-decorator
dagardner-nv Nov 22, 2023
7163dba
Merge branch 'branch-23.11' into david-23.11-docs-1252
dagardner-nv Nov 22, 2023
c708b4c
Merge branch 'branch-23.11' into david-stage-decorator
dagardner-nv Nov 22, 2023
bbaaa20
Merge branch 'david-stage-decorator' into david-stage-decorator-docs
dagardner-nv Nov 22, 2023
979cc03
Document that C++ impls are available to classes only
dagardner-nv Nov 22, 2023
0c356d6
Merge branch 'david-23.11-docs-1252' into david-stage-decorator-docs
dagardner-nv Nov 22, 2023
6b577b1
Merge branch 'branch-23.11' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Nov 22, 2023
1b6ea45
Merge branch 'branch-23.11' of github.com:nv-morpheus/Morpheus into d…
dagardner-nv Nov 27, 2023
7b72d03
Merge branch 'david-stage-decorator' into david-stage-decorator-docs …
dagardner-nv Nov 27, 2023
3420efa
Make type annotations a requirement [no ci]
dagardner-nv Nov 27, 2023
9989a79
wip [no ci]
dagardner-nv Nov 27, 2023
b16a25e
wip [no ci]
dagardner-nv Nov 27, 2023
48a22fb
wip [no ci]
dagardner-nv Nov 27, 2023
fe0f7a4
Handle return type of typing.Any [no ci]
dagardner-nv Nov 27, 2023
4d11732
Cleanups [no ci]
dagardner-nv Nov 27, 2023
b4b4a20
Allow config to be specified positionally [no ci]
dagardner-nv Nov 27, 2023
762501f
Allow config to be a positional argument
dagardner-nv Nov 27, 2023
6907139
Update tests [no ci]
dagardner-nv Nov 27, 2023
73ba7a0
WIP: Updating tests [no ci]
dagardner-nv Nov 28, 2023
fac9826
Fill out tests
dagardner-nv Nov 28, 2023
77f67c5
linting fixes
dagardner-nv Nov 28, 2023
02983e8
linting fixes
dagardner-nv Nov 28, 2023
ee10ad0
Merge branch 'branch-23.11' into david-stage-decorator
dagardner-nv Nov 28, 2023
650cee0
Updates for developer guide
dagardner-nv Nov 28, 2023
d18ea97
misc cleanups
dagardner-nv Nov 28, 2023
e6c1407
Document usage of the source decorator [no ci]
dagardner-nv Nov 28, 2023
0787b9b
document the name argument
dagardner-nv Nov 28, 2023
739f887
Remove explicit name, so that when we run the pipeline with both pass…
dagardner-nv Nov 28, 2023
cc7cb00
lint fix
dagardner-nv Nov 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
wip [no ci]
  • Loading branch information
dagardner-nv committed Nov 27, 2023
commit 48a22fb351e014589da4da2d9a842964124701e2
242 changes: 133 additions & 109 deletions morpheus/pipeline/stage_decorator.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

logger = logging.getLogger(__name__)
GeneratorType = typing.Callable[..., collections.abc.Iterator[typing.Any]]
ComputeSchemaType = typing.Callable[[StageSchema], None]


def _get_name_from_fn(fn: typing.Callable) -> str:
Expand All @@ -48,26 +49,6 @@ def _get_name_from_fn(fn: typing.Callable) -> str:
return str(fn)


def _determine_return_type(gen_fn: GeneratorType) -> type:
"""
Unpacks return type annoatations like:
def soource() -> typing.Generator[MessageMeta, None, None]:
....
"""
signature = inspect.signature(gen_fn)
return_type = signature.return_annotation
if return_type is signature.empty:
raise ValueError("Wrapped source functions must have a return type annotation")

# When someone uses collections.abc.Generator or collections.abc.Iterator the return type is an instance of
# typing.GenericAlias, however when someone uses typing.Generator or typing.Iterator the return type is an
# instance of typing._GenericAlias. We need to check for both.
if isinstance(return_type, (typing.GenericAlias, typing._GenericAlias)):
return_type = return_type.__args__[0]

return return_type


def _is_dataframe_containing_type(type_: type) -> bool:
return type_ in (pd.DataFrame, cudf.DataFrame, MessageMeta, MultiMessage)

Expand Down Expand Up @@ -119,36 +100,33 @@ class WrappedFunctionSourceStage(SingleOutputSource):
----------
config : `morpheus.config.Config`
Pipeline configuration instance.
name: `str`
Name of the stage.
gen_fn : `GeneratorType`
Generator function to use as the source of messages.
return_type : `type`, optional
Return type of `gen_fn` if not provided the stage will use the return type annotation of `gen_fn` as the output
if not provided the stage will use `typing.Any` as the output type.
**gen_fn_kwargs : `typing.Any`
Additional keyword arguments to bind to `gen_fn` via `functools.partial`.
compute_schema_fn : `ComputeSchemaType`
Function to use for computing the schema of the stage.
"""

def __init__(self, *, config: Config, gen_fn: GeneratorType, return_type: type = None, **gen_fn_kwargs):
def __init__(self, *, config: Config, name: str, gen_fn: GeneratorType, compute_schema_fn: ComputeSchemaType):
super().__init__(config)
# collections.abc.Generator is a subclass of collections.abc.Iterator
if not inspect.isgeneratorfunction(gen_fn):
raise ValueError("Wrapped source functions must be generator functions")

self._gen_fn_name = _get_name_from_fn(gen_fn)
_validate_keyword_arguments(self._gen_fn_name, inspect.signature(gen_fn), gen_fn_kwargs)
self._gen_fn = functools.partial(gen_fn, **gen_fn_kwargs)

self._return_type = return_type or _determine_return_type(gen_fn)
self._name = name
self._gen_fn = gen_fn
self._compute_schema_fn = compute_schema_fn

@property
def name(self) -> str:
return self._gen_fn_name
return self._name

def supports_cpp_node(self) -> bool:
return False

def compute_schema(self, schema: StageSchema):
schema.output_schema.set_type(self._return_type)
self._compute_schema_fn(schema)

def _build_source(self, builder: mrc.Builder) -> mrc.SegmentObject:
return builder.make_source(self.unique_name, self._gen_fn)
Expand All @@ -169,31 +147,26 @@ class PreAllocatedWrappedFunctionStage(PreallocatorMixin, WrappedFunctionSourceS
----------
config : `morpheus.config.Config`
Pipeline configuration instance.
name: `str`
Name of the stage.
gen_fn : `GeneratorType`
Generator function to use as the source of messages.
return_type : `type`, optional
Return type of `gen_fn` if not provided the stage will use the return type annotation of `gen_fn` as the output.
*gen_args : `typing.Any`
Additional arguments to bind to `gen_fn` via `functools.partial`.
**gen_fn_kwargs : `typing.Any`
Additional keyword arguments to bind to `gen_fn` via `functools.partial`.
compute_schema_fn : `ComputeSchemaType`
Function to use for computing the schema of the stage.
"""

def __init__(self, config: Config, gen_fn: GeneratorType, *gen_args, return_type: type = None, **gen_fn_kwargs):
super().__init__(*gen_args, config=config, gen_fn=gen_fn, return_type=return_type, **gen_fn_kwargs)
if not _is_dataframe_containing_type(self._return_type):
raise ValueError("PreAllocatedWrappedFunctionStage can only be used with DataFrame containing types")
def __init__(self, *, config: Config, name: str, gen_fn: GeneratorType, compute_schema_fn: ComputeSchemaType):
super().__init__(config=config, name=name, gen_fn=gen_fn, compute_schema_fn=compute_schema_fn)


def source(gen_fn: GeneratorType):
def source(gen_fn: GeneratorType = None, *, name: str = None, compute_schema_fn: ComputeSchemaType = None):
"""
Decorator for wrapping a function as a source stage. The function must be a generator method.

It is required to use a return type annotation, as this will be used by the stage as the output type. If
no return type annotation is provided, the stage will use `typing.Any` as the output type.
When `compute_schema_fn` is `None`, the return type annotation will be used by the stage as the output type.

When invoked the wrapped function will return a source stage, any additional arguments passed in aside from the
config, will be bound to the wrapped function via `functools.partial`.
When invoked the wrapped function will return a source stage, any additional keyword arguments passed in aside from
the config, will be bound to the wrapped function via `functools.partial`.

Examples
--------
Expand All @@ -207,22 +180,52 @@ def source(gen_fn: GeneratorType):

>>> pipe.set_source(source_gen(config, dataframes=[df]))
"""
if gen_fn is None:
return functools.partial(source, name=name, compute_schema_fn=compute_schema_fn)

# Use wraps to ensure user's don't lose their function name and docstrinsgs, however we do want to override the
# annotations to reflect that the returned function requires a config and returns a stage
@functools.wraps(gen_fn, assigned=('__module__', '__name__', '__qualname__', '__doc__'))
def wrapper(config: Config, *args, **kwargs) -> WrappedFunctionSourceStage:
return_type = _determine_return_type(gen_fn)
def wrapper(*, config: Config, **kwargs) -> WrappedFunctionSourceStage:
nonlocal name
nonlocal compute_schema_fn

if name is None:
name = _get_name_from_fn(gen_fn)

signature = inspect.signature(gen_fn)

if compute_schema_fn is None:
return_type = signature.return_annotation
if return_type is signature.empty:
raise ValueError(
"Source functions must have either a return type annotation or specify a compute_schema_fn")

# We need to unpack generator and iterator return types to get the actual type of the yielded type.
# When someone uses collections.abc.Generator or collections.abc.Iterator the return type is an instance of
# typing.GenericAlias, however when someone uses typing.Generator or typing.Iterator the return type is an
# instance of typing._GenericAlias. We need to check for both.
if isinstance(return_type, (typing.GenericAlias, typing._GenericAlias)):
return_type = return_type.__args__[0]

def compute_schema_fn(schema: StageSchema):
schema.output_schema.set_type(return_type)

_validate_keyword_arguments(name, signature, kwargs)

bound_gen_fn = functools.partial(gen_fn, **kwargs)

# If the return type supports pre-allocation we use the pre-allocating source
if _is_dataframe_containing_type(return_type):
return PreAllocatedWrappedFunctionStage(*args,
config=config,
gen_fn=gen_fn,
return_type=return_type,
**kwargs)
return PreAllocatedWrappedFunctionStage(config=config,
name=name,
gen_fn=bound_gen_fn,
compute_schema_fn=compute_schema_fn)

return WrappedFunctionSourceStage(*args, config=config, gen_fn=gen_fn, return_type=return_type, **kwargs)
return WrappedFunctionSourceStage(config=config,
name=name,
gen_fn=bound_gen_fn,
compute_schema_fn=compute_schema_fn)

return wrapper

Expand All @@ -245,56 +248,39 @@ class WrappedFunctionStage(SinglePortStage):
----------
config : `morpheus.config.Config`
Pipeline configuration instance.
name: `str`
Name of the stage.
on_data_fn : `typing.Callable`
Function to be used for processing messages.
accept_type : `type`, optional
Type of message to accept, if not provided the stage will use the type annotation of the first parameter of
`on_data_fn` as the accept type.
return_type : `type`, optional
Return type of `gen_fn` if not provided the stage will use the return type annotation of `gen_fn` as the output.
**on_data_kwargs : `typing.Any`
Additional keyword arguments to bind to `on_data_fn` via `functools.partial`.
compute_schema_fn : `ComputeSchemaType`
Function to use for computing the schema of the stage.
needed_columns : `dict[str, TypeId]`, optional
Dictionary of column names and types that the function requires to be present in the DataFrame. This is used
by the `PreAllocatedWrappedFunctionStage` to ensure the DataFrame has the needed columns allocated.
"""

def __init__(self,
*,
config: Config,
on_data_fn: typing.Callable,
accept_type: type = None,
return_type: type = None,
needed_columns: dict[str, TypeId] = None,
**on_data_kwargs):
def __init__(
self,
*,
config: Config,
name: str = None,
on_data_fn: typing.Callable,
accept_type: type,
compute_schema_fn: ComputeSchemaType,
needed_columns: dict[str, TypeId] = None,
):
super().__init__(config)
self._on_data_fn_name = _get_name_from_fn(on_data_fn)
self._name = name
self._on_data_fn = on_data_fn
self._accept_type = accept_type
self._compute_schema_fn = compute_schema_fn

if needed_columns is not None:
self._needed_columns.update(needed_columns)

# Even if both accept_type and return_type are provided, we should still need to inspect the function signature
# to verify it is callable with at least one argument
signature = inspect.signature(on_data_fn)
param_iter = iter(signature.parameters.values())

try:
first_param = next(param_iter)
self._accept_type = accept_type or first_param.annotation
if self._accept_type is signature.empty:
raise ValueError(f"{first_param.name} argument of {self._on_data_fn_name} has no type annotation")
except StopIteration as e:
raise ValueError(f"Wrapped stage functions {self._on_data_fn_name} must have at least one parameter") from e

_validate_keyword_arguments(self._on_data_fn_name, signature, on_data_kwargs, param_iter)

self._return_type = return_type or signature.return_annotation
if self._return_type is signature.empty:
raise ValueError("Wrapped stage functions must have a return type annotation")

# Finally bind any additional arguments to the function
self._on_data_fn = functools.partial(on_data_fn, **on_data_kwargs)

@property
def name(self) -> str:
return self._on_data_fn_name
return self._name

def accepted_types(self) -> typing.Tuple:
return (self._accept_type, )
Expand All @@ -303,12 +289,7 @@ def supports_cpp_node(self) -> bool:
return False

def compute_schema(self, schema: StageSchema):
if self._return_type is not typing.Any:
return_type = self._return_type
else:
return_type = schema.input_schema.get_type()

schema.output_schema.set_type(return_type)
self._compute_schema_fn(schema)

def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) -> mrc.SegmentObject:
node = builder.make_node(self.unique_name, ops.map(self._on_data_fn))
Expand All @@ -317,7 +298,12 @@ def _build_single(self, builder: mrc.Builder, input_node: mrc.SegmentObject) ->
return node


def stage(on_data_fn: typing.Callable = None, *, needed_columns: dict[str, TypeId] = None):
def stage(on_data_fn: typing.Callable = None,
*,
name: str = None,
accept_type: type = None,
compute_schema_fn: ComputeSchemaType = None,
needed_columns: dict[str, TypeId] = None):
"""
Decorator for wrapping a function as a stage. The function must receive at least one argument, the first argument
must be the incoming message, and must return a value.
Expand Down Expand Up @@ -349,16 +335,54 @@ def stage(on_data_fn: typing.Callable = None, *, needed_columns: dict[str, TypeI
"""

if on_data_fn is None:
return functools.partial(stage, needed_columns=needed_columns)
return functools.partial(stage,
name=name,
accept_type=accept_type,
compute_schema_fn=compute_schema_fn,
needed_columns=needed_columns)

# Use wraps to ensure user's don't lose their function name and docstrinsgs, however we do want to override the
# annotations to reflect that the returned function requires a config and returns a stage
@functools.wraps(on_data_fn, assigned=('__module__', '__name__', '__qualname__', '__doc__'))
def wrapper(config: Config, *args, **kwargs) -> WrappedFunctionStage:
return WrappedFunctionStage(*args,
config=config,
on_data_fn=on_data_fn,
needed_columns=needed_columns,
**kwargs)
def wrapper(*, config: Config, **kwargs) -> WrappedFunctionStage:
nonlocal name
nonlocal accept_type
nonlocal compute_schema_fn

if name is None:
name = _get_name_from_fn(on_data_fn)

# Even if both accept_type and compute_schema_fn are provided, we should still need to inspect the function
# signature to verify it is callable with at least one argument
signature = inspect.signature(on_data_fn)
param_iter = iter(signature.parameters.values())

try:
first_param = next(param_iter)
accept_type = accept_type or first_param.annotation
if accept_type is signature.empty:
raise ValueError(f"{first_param.name} argument of {name} has no type annotation")
except StopIteration as e:
raise ValueError(f"Stage function {name} must have at least one parameter") from e

if compute_schema_fn is None:
return_type = signature.return_annotation
if return_type is signature.empty:
raise ValueError(
"Stage functions must have either a return type annotation or specify a compute_schema_fn")

def compute_schema_fn(schema: StageSchema):
schema.output_schema.set_type(return_type)

_validate_keyword_arguments(name, signature, kwargs, param_iter=param_iter)

bound_on_data_fn = functools.partial(on_data_fn, **kwargs)

return WrappedFunctionStage(config=config,
name=name,
on_data_fn=bound_on_data_fn,
accept_type=accept_type,
compute_schema_fn=compute_schema_fn,
needed_columns=needed_columns)

return wrapper