Best practices for file ingestion and state management in workflows #2941
Replies: 1 comment
-
|
Great questions, @dragospadurariu. Here are the recommended patterns for file ingestion and state management in workflows: State Management: References vs. ContentBest practice: Store content in shared state, pass lightweight references (IDs/paths) in messages. Large payloads (file content, images, parsed data) should be stored in @executor(id="ingest_file")
async def ingest_file(file_path: str, ctx: WorkflowContext[str]) -> None:
# Load file content once
async with aiofiles.open(file_path, "rb") as f:
content = await f.read()
# Store in shared state with unique key
file_id = str(uuid4())
await ctx.set_shared_state(f"file:{file_id}", {
"path": file_path,
"content": content,
"mime_type": detect_mime_type(file_path),
})
# Pass only the lightweight reference downstream
await ctx.send_message(file_id)Downstream executors retrieve via See shared_states_with_agents.py for a complete example of this pattern. Multi-modality: Carrying Image DataFor multimodal workflows where downstream agents need image access:
Patterns: Single Ingestor vs. Specialized ParsersRecommendation: Use a dispatcher pattern with fan-out to specialized parsers. @executor(id="file_dispatcher")
async def file_dispatcher(file_path: str, ctx: WorkflowContext[FileRef]) -> None:
mime = detect_mime_type(file_path)
file_id = str(uuid4())
# Store raw file
await ctx.set_shared_state(f"file:{file_id}", {"path": file_path, "mime": mime})
# Route to appropriate parser based on type
await ctx.send_message(FileRef(id=file_id, mime=mime))
# Use conditional edges to route by mime type
workflow = (
WorkflowBuilder()
.register_executor(lambda: FileDispatcher(id="dispatcher"), name="dispatcher")
.register_executor(lambda: PDFParser(id="pdf_parser"), name="pdf_parser")
.register_executor(lambda: ImageParser(id="image_parser"), name="image_parser")
.add_conditional_edge(
"dispatcher",
[("pdf_parser", lambda m: m.mime == "application/pdf"),
("image_parser", lambda m: m.mime.startswith("image/"))],
)
.set_start_executor("dispatcher")
# ...
.build()
)This approach:
For large-scale file processing with parallel map/reduce, see map_reduce_and_visualization.py which demonstrates passing file paths through shared state to bound memory usage. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Hi Team,
I am building a workflow using agent-framework that needs to ingest and analyze various files (PDFs, images, docx etc.). I'm looking for guidance on the best way to handle this in a graph-based workflow:
The current workflow samples focus on text inputs, so any advice on handling file-heavy state would be helpful. Thanks!
Beta Was this translation helpful? Give feedback.
All reactions