Skip to content

Caching for async driver #1263

@nemilentsau

Description

@nemilentsau

I am getting a value error when trying to use caching with an async_driver

Current behavior

dr = await (
async_driver.Builder().with_modules(pdf_processor_async).with_cache().build()
)

ValueError Traceback (most recent call last)
/home/sagemaker-user/ai-experiment/llm-frameworks/hamilton/test_hamilton_async.ipynb Cell 6 line 1
----> 1 dr = await (
2 async_driver.Builder().with_modules(pdf_processor_async).with_cache().build()
3 )

ValueError: Multiple adapters cannot (currently) implement the same lifecycle method. Sync methods: ['do_node_execute']. Async methods: []

Stack Traces


ValueError Traceback (most recent call last)
/home/sagemaker-user/ai-experiment/llm-frameworks/hamilton/test_hamilton_async.ipynb Cell 6 line 1
----> 1 dr = await (
2 async_driver.Builder().with_modules(pdf_processor_async).with_cache().build()
3 )

File /opt/conda/lib/python3.10/site-packages/hamilton/async_driver.py:512, in Builder.build(self)
504 async def build(self):
505 """Builds the async driver. This also initializes it, hence the async definition.
506 If you don't want to use async, you can use build_without_init and call ainit later,
507 but we recommend using this in an asynchronous lifespan management function (E.G. in fastAPI),
(...)
510 :return: The fully
511 """
--> 512 dr = self.build_without_init()
513 return await dr.ainit()

File /opt/conda/lib/python3.10/site-packages/hamilton/async_driver.py:496, in Builder.build_without_init(self)
494 result_builders = [adapter for adapter in adapters if isinstance(adapter, base.ResultMixin)]
495 specified_result_builder = base.DictResult() if len(result_builders) == 0 else None
--> 496 return AsyncDriver(
497 self.config,
498 *self.modules,
499 adapters=self.adapters,
500 result_builder=specified_result_builder,
501 allow_module_overrides=self._allow_module_overrides,
502 )

File /opt/conda/lib/python3.10/site-packages/hamilton/async_driver.py:237, in AsyncDriver.init(self, config, result_builder, adapters, allow_module_overrides, *modules)
235 # it will be defaulted by the graph adapter
236 result_builder = result_builders[0] if len(result_builders) == 1 else None
--> 237 super(AsyncDriver, self).init(
238 config,
239 *modules,
240 adapter=[
241 # We pass in the async adapters here as this can call node-level hooks
242 # Otherwise we trust the driver/fn graph to call sync adapters
243 AsyncGraphAdapter(
244 result_builder=result_builder,
245 async_lifecycle_adapters=lifecycle_base.LifecycleAdapterSet(*async_adapters),
246 ),
247 # We pass in the sync adapters here as this can call
248 *sync_adapters,
249 *async_adapters, # note async adapters will not be called during synchronous execution -- this is for access later
250 ],
251 allow_module_overrides=allow_module_overrides,
252 )
253 self.initialized = False

File /opt/conda/lib/python3.10/site-packages/hamilton/driver.py:434, in Driver.init(self, config, adapter, allow_module_overrides, _materializers, _graph_executor, _use_legacy_adapter, *modules)
413 """Constructor: creates a DAG given the configuration & modules to crawl.
414
415 :param config: This is a dictionary of initial data & configuration.
(...)
430
431 """
433 self.driver_run_id = uuid.uuid4()
--> 434 adapter = self.normalize_adapter_input(adapter, use_legacy_adapter=_use_legacy_adapter)
435 if adapter.does_hook("pre_do_anything", is_async=False):
436 adapter.call_all_lifecycle_hooks_sync("pre_do_anything")

File /opt/conda/lib/python3.10/site-packages/hamilton/driver.py:341, in Driver.normalize_adapter_input(adapter, use_legacy_adapter)
339 if use_legacy_adapter:
340 adapter.append(base.PandasDataFrameResult())
--> 341 return lifecycle_base.LifecycleAdapterSet(*adapter)

File /opt/conda/lib/python3.10/site-packages/hamilton/lifecycle/base.py:770, in LifecycleAdapterSet.init(self, *adapters)
768 self._adapters = self._uniqify_adapters(adapters)
769 self.sync_hooks, self.async_hooks = self._get_lifecycle_hooks()
--> 770 self.sync_methods, self.async_methods = self._get_lifecycle_methods()
771 self.sync_validators = self._get_lifecycle_validators()

File /opt/conda/lib/python3.10/site-packages/hamilton/lifecycle/base.py:838, in LifecycleAdapterSet._get_lifecycle_methods(self)
834 multiple_implementations_async = [
835 method for method, adapters in async_methods.items() if len(adapters) > 1
836 ]
837 if len(multiple_implementations_sync) > 0 or len(multiple_implementations_async) > 0:
--> 838 raise ValueError(
839 f"Multiple adapters cannot (currently) implement the same lifecycle method. "
840 f"Sync methods: {multiple_implementations_sync}. "
841 f"Async methods: {multiple_implementations_async}"
842 )
843 return (
844 {method: list(adapters) for method, adapters in sync_methods.items()},
845 {method: list(adapters) for method, adapters in async_methods.items()},
846 )

ValueError: Multiple adapters cannot (currently) implement the same lifecycle method. Sync methods: ['do_node_execute']. Async methods: []

Steps to replicate behavior

  1. dr = await (
    async_driver.Builder().with_modules(pdf_processor_async).with_cache().build()
    )

Library & System Information

python=3.10.14
hamilton=1.85.1

Metadata

Metadata

Assignees

No one assigned

    Labels

    triagelabel for issues that need to be triaged.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions