Skip to content

Cache read in thread contructor#157

Open
kosstbarz wants to merge 1 commit intomainfrom
kosst/cache-load
Open

Cache read in thread contructor#157
kosstbarz wants to merge 1 commit intomainfrom
kosst/cache-load

Conversation

@kosstbarz
Copy link
Contributor

Implemented simplest use case for cache:

agent = databao.new_agent(cache=DiskCache())
thread = agent.thread(name="thread1")
thread.ask("q1").df()
# After kernel rerun:
agent = databao.new_agent(cache=DiskCache())
thread = agent.thread(name="thread1")
# Thread contains messages as well as result
# System -> Q1 -> A1
thread.text() # Must work
thread.df() # Must work

@kosstbarz kosstbarz requested review from a team, Rauf-Kurbanov and mare5x December 18, 2025 11:46
def thread(
self,
*,
name: str | None = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we have a Thread.name and an Agent.name. I think we should remove Agent.name as we don't use it anywhere and it can cause confusion with Thread.name.

self._default_rows_limit = rows_limit

self._lazy_mode = lazy
self._name = name or f"thread-{uuid.uuid4()!s}"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably expose the name publicly as well, like we have with Agent.name.

Comment on lines +65 to 83
self._cache = self._agent.cache.scoped(self._name)
state = self._cache.get("state", default=None)
if state:
if overwrite:
self._cache.put("state", {})
print(f"Overwrote existing state for thread {self._name}. History is empty.")
else:
print(f"Loaded existing state for thread {self._name}.\nOperations:")
self._opas = state.get("operations", [])
self._opas_processed_count = len(self._opas)
messages = state.get("messages", [])
self._meta = {"messages": messages}
counter = 1
for opa_group in self._opas:
for opa in opa_group:
print(f"- Op {counter}: {opa.query}")
counter += 1
self._data_result = self._agent.executor.get_result(messages)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it strange to assume the existence of these cache keys in Thread, as I would assume that they are specific to an Executor. Maybe it would be cleaner to have a thread-specific cache scope and an Executor specific cache scope instead. That way, the Thread cache would save _opas and _data_result without relying on Executor.

self._graph_recursion_limit = 50

def _process_opas(self, opas: list[Opa], cache: Cache) -> list[Any]:
def _process_opas(self, opas: list[Opa], cache: Cache) -> dict[str, Any]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these changes break ReactDuckDBExecutor

"""Update message history in cache with final messages from graph execution."""
if final_messages:
cache.put("state", {"messages": final_messages})
def _update_message_history(self, cache: Cache, state: dict[str, Any]) -> None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rename the method name now to something like _save_state

@kosstbarz kosstbarz linked an issue Jan 7, 2026 that may be closed by this pull request
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Caching of opas results for rerun of the Notebooks

2 participants