Closed
Description
While attempting to use Prefect to build a Flow for some feature engineering work, I ran into some difficulty with passing unmapped
args to a map
operation:
clean_data = impute.map(data_fields, replacement_dict=unmapped({np.nan: 0.}))
I had made an impute
Task that takes in a dictionary of "old: new" value pairs for data imputation. This raised the following error:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-23-8cc2655f441d> in <module>
----> 1 clean = impute.map(data_fields, replacement_dict=unmapped({np.nan: 0.}))
/usr/local/lib/python3.6/dist-packages/prefect/utilities/tasks.py in __init__(self, task)
197
198 def __init__(self, task: "prefect.Task"):
--> 199 self.task = as_task(task)
200
201
/usr/local/lib/python3.6/dist-packages/prefect/utilities/tasks.py in as_task(x)
66 return prefect.tasks.core.collections.Set().bind(*x)
67 elif isinstance(x, dict):
---> 68 return prefect.tasks.core.collections.Dict().bind(**x)
69
70 # functions
TypeError: bind() keywords must be strings
It seems to not like np.nan
as a key.
Next, I attempted to pass a data type as an argument, in a cast
Task:
clean_data = cast.map(clean_data, new_type=unmapped(np.int))
unmapped(np.int)
raised the following:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
/usr/lib/python3.6/inspect.py in getfullargspec(func)
1123 skip_bound_arg=False,
-> 1124 sigcls=Signature)
1125 except Exception as ex:
/usr/lib/python3.6/inspect.py in _signature_from_callable(obj, follow_wrapper_chains, skip_bound_arg, sigcls)
2346 raise ValueError(
-> 2347 'no signature found for builtin type {!r}'.format(obj))
2348
ValueError: no signature found for builtin type <class 'int'>
The above exception was the direct cause of the following exception:
TypeError Traceback (most recent call last)
<ipython-input-21-fbdd941b881e> in <module>
21 # clean data
22 clean = impute.map(data_fields, replacement_pairs=unmapped([(np.nan, 0.)]))
---> 23 clean = cast.map(clean, new_type=unmapped(int))
24
25 # stats
/usr/local/lib/python3.6/dist-packages/prefect/utilities/tasks.py in __init__(self, task)
197
198 def __init__(self, task: "prefect.Task"):
--> 199 self.task = as_task(task)
200
201
/usr/local/lib/python3.6/dist-packages/prefect/utilities/tasks.py in as_task(x)
70 # functions
71 elif callable(x):
---> 72 return prefect.tasks.core.function.FunctionTask(fn=x)
73
74 # constants
/usr/local/lib/python3.6/dist-packages/prefect/tasks/core/function.py in __init__(self, fn, name, **kwargs)
44 name = getattr(fn, "__name__", type(self).__name__)
45
---> 46 prefect.core.task._validate_run_signature(fn) # type: ignore
47 self.run = fn
48
/usr/local/lib/python3.6/dist-packages/prefect/core/task.py in _validate_run_signature(run)
24 def _validate_run_signature(run: Callable) -> None:
25 func = getattr(run, "__wrapped__", run)
---> 26 run_sig = inspect.getfullargspec(func)
27 if run_sig.varargs:
28 raise ValueError(
/usr/lib/python3.6/inspect.py in getfullargspec(func)
1128 # else. So to be fully backwards compatible, we catch all
1129 # possible exceptions here, and reraise a TypeError.
-> 1130 raise TypeError('unsupported callable') from ex
1131
1132 args = []
TypeError: unsupported callable
While I was able to get around the first error by replacing the dictionary with a list of tuples, the second error seems harder to get around, since providing a data type to be cast to is key to making the Task flexible.
Activity