Conversation
|
@v1r3n Overall looks good, though could you tell why we continue using threads for async workers instead of one thread and event loop in it for handling all? |
https://github.com/conductor-oss/python-sdk/blob/async_io_workers/docs/design/WORKER_ARCHITECTURE.md One thing to consider is that marking a task async makes it run in an ayncio pool -- alternatively we can make it explicit, but I don't really see much benefit and introduces another flag that you have to maintain and then creates the complication that now you have to have all or nothing implementation for workers. |
Totally agree that the current design (one process per worker, threads inside for poll/execute/update) solves the “multi-worker GIL contention” problem well. My question was a bit narrower: within a single worker that’s already marked async def, we still keep the per-task threads (ThreadPoolExecutor.submit(...) + BackgroundEventLoop) even though the work after polling is 100% asyncio. That thread handoff is what adds context-switch overhead and keeps the poll loop in a different thread from the async execution. Main thread: poll → submit to ThreadPoolExecutor So the initial suggestion is about sync workers to keep the current code path (polling thread + worker threads) and Async workers to get “one process = one thread + asyncio loop”, which removes the extra thread switches and BackgroundEventLoop plumbing while still keeping process isolation. That was the motivation behind the question. |
These are good points. I updated the PR to take care of this, the new architecture |
| DOCKER_BUILDKIT=1 docker build . \ | ||
| --target python_test_base \ | ||
| -t conductor-sdk-test:latest | ||
| python -m pip install --upgrade pip |
| """ | ||
| # Simulate async I/O operation | ||
| # Print execution info to verify parallel execution | ||
| timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] # milliseconds |
There was a problem hiding this comment.
round(time.time() * 1000) will get you ms
| @@ -14,18 +49,19 @@ def execute_shell(command: str, args: List[str]) -> str: | |||
|
|
|||
There was a problem hiding this comment.
How do we handle errors here? Ideally unless there are hard API requirements we probably want to return a tuple of (exit_code, str(result.stdout) and str(result.stderr)). If the requirement is for the output to be string only, using sentinel characters may help
There was a problem hiding this comment.
This is an example created in response to someone asking for an example shell worker in the past. Not really meant to be used in a production env.
| else: | ||
| # Job still running - poll again in 30 seconds | ||
| ctx.add_log(f"Job {job_id} still running, will check again in 30s") | ||
| ctx.set_callback_after(30) |
There was a problem hiding this comment.
Perhaps it's my lack of familiarity with the naming convention, but how it does this unclear to me? Perhaps a reference to the event loop would be helpful?
There was a problem hiding this comment.
this part is not not related to event loop in async but rather ability for conductor to schedule the task later for long running tasks.
| return {"status": "success", "operation": operation} | ||
| else: | ||
| ctx.add_log("Operation failed, will retry") | ||
| raise Exception("Operation failed") |
There was a problem hiding this comment.
It is generally recommended to throw specific exceptions instead of the base class. Easier to catch the ones you want.
There was a problem hiding this comment.
Yes, this is an example only. I would expect users to actually create exceptions and use them in production code.
| results = [] | ||
|
|
||
| try: | ||
| import httpx |
There was a problem hiding this comment.
This should be at the top.
| ctx.add_log(f"✗ {url} - Error: {e}") | ||
|
|
||
| except Exception as e: | ||
| ctx.add_log(f"Fatal error: {e}") |
There was a problem hiding this comment.
Unless there is a possibility of AsyncClient object creation failing, I am not sure you will ever get here.
| """ | ||
| if n <= 1: | ||
| return n | ||
| return await calculate_fibonacci(n - 1) + await calculate_fibonacci(n - 2) |
There was a problem hiding this comment.
While you could make cpu bound work async, I am not sure what this gets us? Also I think this may not be correct :) f(<0) should be 0 and f(1)/f(2) should be 1
There was a problem hiding this comment.
Nothing, just an example.
Worker Configuration, Event-Driven Observability & Metrics
Overview
Introduces event-driven observability with Prometheus metrics, hierarchical worker configuration, runtime pausing, and startup logging.
Key Features
1. Event-Driven Observability
Zero-coupling architecture for metrics and monitoring:
Benefits: Multiple backends (Prometheus, DataDog, custom), protocol-based, non-blocking
2. Built-in Prometheus Metrics
HTTP server with automatic multiprocess aggregation:
Metrics: API latency (p50-p99), task execution time, error rates, queue saturation
3. Worker Configuration
Single-line startup logging + hierarchical env overrides:
4. Runtime Worker Pausing
Environment-only control (no code changes):
5. Async functions can now be used with @worker_task annotations
When a function is marked as async, they are executed using background asyncio event loop.
Related Documentation