This project demonstrates task lifecycle management in FastMCP using the experimental sep-1686 branch. Learn how background tasks enable long-running operations that survive client disconnects, can be monitored from multiple clients, and persist beyond the initial connection.
This demo shows:
- ✅ Concurrent execution - Tasks run in parallel (3s+2s+1s = ~3s total)
- ✅ Status polling - Monitor task progress in real-time
- ✅ Fire-and-forget patterns - Submit work and check back later
- ✅ Comparison with asyncio.gather - Understand when to use each approach
Imagine you're building an MCP tool for molecular analysis in drug discovery. A single analysis takes 3 hours. What happens if:
- Your laptop goes to sleep at hour 2?
- Your network connection drops?
- You need to check the status from your phone?
- Another team member needs to see if the analysis is done?
With traditional MCP tool calls, the work is lost. The client must stay connected for the entire duration, and only that client can access the result.
It's not about concurrency - both asyncio.gather and task=True give you parallel execution (~3s in our demo).
It's about task lifecycle management:
# Submit work and disconnect
task = await client.call_tool("analyze_molecules", {...}, task=True)
task_id = task.task_id # Save this!
# Close laptop, go home, reconnect tomorrow
# Work continues on server
# Check back later from any device
status = await client.get_task_status(task_id)
result = await client.get_task_result(task_id)If your client crashes at hour 2 of a 3-hour analysis:
- With normal calls: Work is lost, start over (3 more hours)
- With background tasks: Work continues, just reconnect and retrieve result
# Get result on your laptop
result = await task.result()
# Later, get the same result on your phone
result = await client.get_task_result(task_id) # Same result!
# Share task_id with teammate
# They can also retrieve the result# Team member A submits the analysis
task = await client.call_tool("run_tests", {...}, task=True)
# Shares task_id: "abc-123"
# Team member B (maybe in a dashboard)
status = await client.get_task_status("abc-123")
# "Still running... 45% complete"
# Team member C (mobile app)
if await client.get_task_status("abc-123").status == "completed":
notify_slack("Tests passed!")The server can:
- Queue tasks when resources are limited
- Prioritize important work
- Rate-limit per-user submissions
- Manage connection pools centrally
Healthcare & Life Sciences: Molecular property analysis processing 100,000+ data points through multiple models. Takes hours. Cannot re-run if connection drops.
Enterprise Automation: Code migrations across multiple repositories. Analyze dependencies, transform code, validate changes. Takes 20 minutes to 2 hours. Need to resume if laptop sleeps.
Test Infrastructure: Comprehensive test suites with thousands of cases. Takes hours. Need to stream logs while tests run, see which tests failed without waiting for entire suite.
Deep Research: Multi-agent research systems that spawn multiple research agents internally. Takes 10-30 minutes. Model can't "wait" in a single turn.
CI/CD Pipelines: Build, test, deploy workflows. Takes 15-45 minutes. Multiple team members need visibility into build status.
Use asyncio.gather when:
- Operations complete quickly (<30 seconds)
- Client will definitely stay connected
- Only one client needs the result
- Simple fan-out/fan-in pattern
# Perfect for asyncio.gather
results = await asyncio.gather(
client.call_tool("quick_lookup", {...}),
client.call_tool("quick_search", {...}),
client.call_tool("quick_parse", {...}),
)Use background tasks (task=True) when:
- Operations take minutes or hours
- Client might disconnect (mobile, laptop sleep, network issues)
- Multiple clients need task visibility
- Need to retrieve results multiple times
- Building dashboards showing task progress
- Server needs to manage resources/rate limits
# Perfect for background tasks
task = await client.call_tool("run_comprehensive_tests", {...}, task=True)
# Do other work, client can disconnect/reconnect
# Check back later from anywhere
result = await task.result()Both
asyncio.gatherandtask=Truegive you concurrency (~3s vs ~3s in our demo).The difference is task lifecycle management. Tasks persist beyond the connection, survive client crashes, can be queried by multiple clients, and results can be retrieved multiple times.
Think of tasks like submitting a job to a queue, not just making concurrent API calls.
Critical requirement: You must enable experimental settings:
from fastmcp import settings
settings.experimental.enable_docket = True
settings.experimental.enable_tasks = TrueWithout these, tasks will execute synchronously.
Basic Demos:
demo.py- Standalone demo showing all task patternsserver.py- FastMCP server with task-enabled toolsclient.py- Client demonstrating three different task usage patterns
Deep Research Demos (SEP-1686 Use Case):
deep_research_mock_server.py- Mock research server (no API keys required)deep_research_mock_client.py- Client demonstrating 4 task patternsdeep_research_server.py- Real AI research with pydantic_ai (requires API keys)
Configuration:
pyproject.toml- Project dependencies (fastmcp from sep-1686 branch)
This project uses uv for dependency management:
# Install dependencies
uv syncThis will install fastmcp from the sep-1686 branch on GitHub.
Run the standalone demo that shows all task patterns:
uv run python demo.pyThis demonstrates: 0. Three Approaches Comparison - Same 3 tools called different ways:
- Sequential: 6.01s (one at a time)
asyncio.gather: 3.00s (client-side concurrency)task=True: 3.05s (server-side background tasks)- Key finding: Both concurrent approaches are ~2x faster than sequential
- Unique value of tasks: Status polling, fire-and-forget, observability
- What tasks enable - Polling status while working (impossible with gather)
- Direct await - Call task and immediately wait for result
- Concurrent tasks - Launch multiple tasks simultaneously
- Status polling - Monitor 3 tasks with real-time state transitions
- Instant tools - Compare with non-task tools
Run the client connecting to the server:
uv run python client.pyThis shows three usage patterns:
- Pattern 1: Direct Await - Simplest approach
- Pattern 2: Status Polling - Monitor progress
- Pattern 3: Manual Result Fetching - Do work while task runs
This demo implements the SEP-1686 use case: "Deep research tools that spawn multiple research agents... takes 10-30 minutes"
Mock Version (No API Keys Required):
uv run python deep_research_mock_client.pyDemonstrates all 4 key patterns:
- Fire-and-forget: Submit research, do other work while it runs (~23s)
- Status monitoring: Poll progress during execution
- Multiple retrieval: Get same result from different devices/sessions
- Concurrent research: Run 3 research tasks in parallel
Real Version (Requires API Keys):
# Install optional dependencies
uv sync --extra deep-research
# Set up environment variables
cp .env.example .env
# Edit .env and add your API keys:
# ANTHROPIC_API_KEY=your-anthropic-key
# GOOGLE_VERTEX_PROJECT=your-gcp-project
# Run with real AI agents (takes 10-30 minutes)
python deep_research_server.pyUses real pydantic_ai agents:
- Claude Sonnet 4.5 for planning and analysis
- Gemini 2.5 Flash for web searches
- Demonstrates true long-running operations (minutes)
Why This Matters:
- Operations taking 10-30 minutes cannot use regular tool calls
- Client must be able to disconnect/reconnect
- Multiple team members need visibility into research status
- Results must persist beyond the original connection
- This is impossible with
asyncio.gather- client must stay connected the entire time
Critical: You must enable these settings before creating your server:
from fastmcp import FastMCP, settings
# Enable experimental task support
settings.experimental.enable_docket = True
settings.experimental.enable_tasks = True
server = FastMCP(name="my-server")Tools are marked as task-enabled using @server.tool(task=True):
@server.tool(task=True)
async def long_running_task(duration: int) -> dict:
"""This tool can execute as a background task"""
await asyncio.sleep(duration) # Use async operations
return {"status": "completed"}from fastmcp import Client
async with Client(server) as client:
# Call tool with task=True
task = await client.call_tool("my_tool", {...}, task=True)
# Await the result
result = await task
print(result.data)# Create task
task = await client.call_tool("my_tool", {...}, task=True)
# Poll status
while True:
status = await task.status()
print(f"Status: {status.status}")
if status.status in ["completed", "failed", "cancelled"]:
break
await asyncio.sleep(1)
# Get result
result = await task.result()# Launch multiple tasks
task_a = await client.call_tool("tool_1", {...}, task=True)
task_b = await client.call_tool("tool_2", {...}, task=True)
task_c = await client.call_tool("tool_3", {...}, task=True)
# Wait for all to complete
results = await asyncio.gather(task_a, task_b, task_c)- In-memory only - Tasks run in a single process, not distributed across machines
- FastMCP client only - Can only invoke with the fastmcp client at this time
- Experimental settings required - Must explicitly enable
enable_docketandenable_tasks - Experimental feature - Under active development, API may change
Despite being in-memory/single-process, tasks do execute concurrently within that process using asyncio and the Docket task queue.
Task Object Methods:
await task- Wait for result (shorthand fortask.result())await task.result()- Get the final resultawait task.status()- Check current task statusawait task.wait()- Wait until task reaches terminal stateawait task.cancel()- Request task cancellation
Result Access:
Results are returned as CallToolResult objects. Access the actual data via .data:
result = await task
print(result.data) # The actual return value from your toolStatus Values:
submitted- Task has been submittedworking- Task is currently executingcompleted- Task finished successfullyfailed- Task encountered an errorcancelled- Task was cancelled
# Each call waits for the previous to complete
result1 = await client.call_tool("my_tool", {"duration": 3}, task=False)
result2 = await client.call_tool("my_tool", {"duration": 2}, task=False)
result3 = await client.call_tool("my_tool", {"duration": 1}, task=False)
# Total: 3 + 2 + 1 = 6 seconds# Client-side concurrency - all calls run in parallel!
results = await asyncio.gather(
client.call_tool("my_tool", {"duration": 3}, task=False),
client.call_tool("my_tool", {"duration": 2}, task=False),
client.call_tool("my_tool", {"duration": 1}, task=False),
)
# Total: max(3, 2, 1) = ~3 seconds# Server-side background tasks - same speed, more features
task1 = await client.call_tool("my_tool", {"duration": 3}, task=True)
task2 = await client.call_tool("my_tool", {"duration": 2}, task=True)
task3 = await client.call_tool("my_tool", {"duration": 1}, task=True)
results = await asyncio.gather(task1, task2, task3)
# Total: max(3, 2, 1) = ~3 secondsWhen running demo.py, you'll see output like:
COMPARISON: Serial (task=False) vs Concurrent (task=True)
======================================================================
Part A: Serial execution (task=False) - Traditional approach
----------------------------------------------------------------------
[CLIENT] Calling 3 tools serially (waiting for each to complete)...
[SERVER] Task 'serial-A' starting, will run for 3s
[SERVER] Task 'serial-A' completed after 3.00s
[SERVER] Task 'serial-B' starting, will run for 2s
[SERVER] Task 'serial-B' completed after 2.00s
[SERVER] Task 'serial-C' starting, will run for 1s
[SERVER] Task 'serial-C' completed after 1.00s
[CLIENT] ⏱️ Serial execution took: 6.01s
(Expected: 3+2+1 = 6 seconds)
Part B: Concurrent execution (task=True) - Background tasks
----------------------------------------------------------------------
[CLIENT] Launching 3 tasks concurrently...
[SERVER] Task 'concurrent-A' starting, will run for 3s
[SERVER] Task 'concurrent-B' starting, will run for 2s
[SERVER] Task 'concurrent-C' starting, will run for 1s
[SERVER] Task 'concurrent-C' completed after 1.00s
[SERVER] Task 'concurrent-B' completed after 2.00s
[SERVER] Task 'concurrent-A' completed after 3.00s
[CLIENT] ⏱️ Concurrent execution took: 3.05s
(Expected: max(3,2,1) = ~3 seconds)
🚀 Speedup: 1.97x faster with background tasks!
Time saved: 2.96 seconds
This demo project follows the same license as FastMCP.