-
Notifications
You must be signed in to change notification settings - Fork 167
Closed
Labels
triagelabel for issues that need to be triaged.label for issues that need to be triaged.
Description
Current behavior
When trying to cache a node whose output is a polars DataFrame, an exception is raised.
Stack Traces
********************************************************************************
>[post-node-execute] hello [test_module.hello()] encountered an error <
> Node inputs:
{}
********************************************************************************
Traceback (most recent call last):
File "/home/eisler/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/execution/graph_functions.py", line 318, in execute_lifecycle_for_node
__adapter.call_all_lifecycle_hooks_sync(
File "/home/eisler/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/lifecycle/base.py", line 915, in call_all_lifecycle_hooks_sync
getattr(adapter, hook_name)(**kwargs)
File "/home/eisler/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/caching/adapter.py", line 1446, in post_node_execute
self.result_store.set(
File "/home/eisler/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/caching/stores/file.py", line 71, in set
saver = saver_cls(path=str(materialized_path.absolute()))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: PolarsParquetWriter.__init__() got an unexpected keyword argument 'path'
-------------------------------------------------------------------
{
"name": "TypeError",
"message": "PolarsParquetWriter.__init__() got an unexpected keyword argument 'path'",
"stack": "---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
Cell In[4], line 1
----> 1 dr.execute(final_vars=[\"hello\"])
File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/driver.py:637, in Driver.execute(self, final_vars, overrides, display_graph, inputs)
635 error_execution = e
636 error_telemetry = telemetry.sanitize_error(*sys.exc_info())
--> 637 raise e
638 finally:
639 if self.adapter.does_hook(\"post_graph_execute\", is_async=False):
File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/driver.py:623, in Driver.execute(self, final_vars, overrides, display_graph, inputs)
614 self.adapter.call_all_lifecycle_hooks_sync(
615 \"pre_graph_execute\",
616 run_id=run_id,
(...)
620 overrides=overrides,
621 )
622 try:
--> 623 outputs = self.__raw_execute(
624 _final_vars, overrides, display_graph, inputs=inputs, _run_id=run_id
625 )
626 if self.adapter.does_method(\"do_build_result\", is_async=False):
627 # Build the result if we have a result builder
628 outputs = self.adapter.call_lifecycle_method_sync(
629 \"do_build_result\", outputs=outputs
630 )
File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/driver.py:834, in Driver.__raw_execute(self, final_vars, overrides, display_graph, inputs, _fn_graph, _run_id)
832 return results
833 except Exception as e:
--> 834 raise e
File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/driver.py:825, in Driver.__raw_execute(self, final_vars, overrides, display_graph, inputs, _fn_graph, _run_id)
823 results = None
824 try:
--> 825 results = self.graph_executor.execute(
826 function_graph,
827 final_vars,
828 overrides if overrides is not None else {},
829 inputs if inputs is not None else {},
830 run_id,
831 )
832 return results
833 except Exception as e:
File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/driver.py:175, in DefaultGraphExecutor.execute(self, fg, final_vars, overrides, inputs, run_id)
173 memoized_computation = dict() # memoized storage
174 nodes = [fg.nodes[node_name] for node_name in final_vars if node_name in fg.nodes]
--> 175 fg.execute(nodes, memoized_computation, overrides, inputs, run_id=run_id)
176 outputs = {
177 # we do this here to enable inputs to also be used as outputs
178 # putting inputs into memoized before execution doesn't work due to some graphadapter assumptions.
179 final_var: memoized_computation.get(final_var, inputs.get(final_var))
180 for final_var in final_vars
181 } # only want request variables in df.
182 del memoized_computation # trying to cleanup some memory
File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/graph.py:1099, in FunctionGraph.execute(self, nodes, computed, overrides, inputs, run_id)
1097 run_id = str(uuid.uuid4())
1098 inputs = graph_functions.combine_config_and_inputs(self.config, inputs)
-> 1099 return graph_functions.execute_subdag(
1100 nodes=nodes,
1101 inputs=inputs,
1102 adapter=self.adapter,
1103 computed=computed,
1104 overrides=overrides,
1105 run_id=run_id,
1106 )
File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/execution/graph_functions.py:250, in execute_subdag(nodes, inputs, adapter, computed, overrides, run_id, task_id)
247 if final_var_node.user_defined:
248 # from the top level, we don't know if this UserInput is required. So mark as optional.
249 dep_type = node.DependencyType.OPTIONAL
--> 250 dfs_traverse(final_var_node, dep_type)
251 return computed
File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/execution/graph_functions.py:222, in execute_subdag.<locals>.dfs_traverse(node_, dependency_type)
215 result = adapter.call_lifecycle_method_sync(
216 \"do_remote_execute\",
217 node=node_,
218 execute_lifecycle_for_node=execute_lifecycle_for_node_partial,
219 **kwargs,
220 )
221 else:
--> 222 result = execute_lifecycle_for_node_partial(**kwargs)
224 computed[node_.name] = result
225 # > pruning the graph
226 # This doesn't narrow it down to the entire space of the graph
227 # E.G. if something is not needed by this current execution due to
228 # the selection of nodes to run it might not prune everything.
229 # to do this we'd need to first determine all nodes on the path, then prune
230 # We may also want to use a reference counter for slightly cleaner/more efficient memory management
File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/execution/graph_functions.py:318, in execute_lifecycle_for_node(__node_, __adapter, __run_id, __task_id, **__kwargs)
314 if not pre_node_execute_errored and __adapter.does_hook(
315 \"post_node_execute\", is_async=False
316 ):
317 try:
--> 318 __adapter.call_all_lifecycle_hooks_sync(
319 \"post_node_execute\",
320 run_id=__run_id,
321 node_=__node_,
322 kwargs=__kwargs,
323 success=success,
324 error=error,
325 result=result,
326 task_id=__task_id,
327 )
328 except Exception:
329 message = create_error_message(__kwargs, __node_, \"[post-node-execute]\")
File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/lifecycle/base.py:915, in LifecycleAdapterSet.call_all_lifecycle_hooks_sync(self, hook_name, **kwargs)
909 \"\"\"Calls all the lifecycle hooks in this group, by hook name (stage)
910
911 :param hook_name: Name of the hooks to call
912 :param kwargs: Keyword arguments to pass into the hook
913 \"\"\"
914 for adapter in self.sync_hooks.get(hook_name, []):
--> 915 getattr(adapter, hook_name)(**kwargs)
File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/caching/adapter.py:1446, in HamiltonCacheAdapter.post_node_execute(self, run_id, node_, result, success, error, task_id, **future_kwargs)
1444 result_missing = not self.result_store.exists(data_version)
1445 if result_missing or materialized_path_missing:
-> 1446 self.result_store.set(
1447 data_version=data_version,
1448 result=result,
1449 saver_cls=saver_cls,
1450 loader_cls=loader_cls,
1451 )
1452 self._log_event(
1453 run_id=run_id,
1454 node_name=node_name,
(...)
1458 value=data_version,
1459 )
File ~/miniconda3/envs/test/lib/python3.11/site-packages/hamilton/caching/stores/file.py:71, in FileResultStore.set(self, data_version, result, saver_cls, loader_cls)
68 if saver_cls is not None:
69 # materialized_path
70 materialized_path = self._materialized_path(data_version, saver_cls)
---> 71 saver = saver_cls(path=str(materialized_path.absolute()))
72 loader = loader_cls(path=str(materialized_path.absolute()))
73 else:
TypeError: PolarsParquetWriter.__init__() got an unexpected keyword argument 'path'"
}
Steps to replicate behavior
Write and run a jupyter notebook with the following cells:
%load_ext hamilton.plugins.jupyter_magic
%%cell_to_module -m test_module --display --rebuild-drivers
import polars as pl
from hamilton.function_modifiers import cache
@cache(format="parquet")
def hello() -> pl.DataFrame:
return pl.DataFrame({"a": [1,2]})
from hamilton import driver
import test_module
dr = (
driver
.Builder()
.with_config({})
.with_modules(test_module)
.with_cache(path=".")
.build()
)
dr.execute(final_vars=["hello"])
Library & System Information
python=3.11.8, sf-hamilton=1.81.0 and 1.83.2, polars=1.10.0
Expected behavior
I expected the node output to be persisted to disk in a parquet format.
skrawcz
Metadata
Metadata
Assignees
Labels
triagelabel for issues that need to be triaged.label for issues that need to be triaged.