-
Notifications
You must be signed in to change notification settings - Fork 37
Asyncio workers #361
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Asyncio workers #361
Conversation
|
|
||
| # If the function is async (coroutine), run it synchronously using asyncio.run() | ||
| # This allows async workers to work in multiprocessing mode | ||
| if inspect.iscoroutine(task_output): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to be abysmally slow, especially more so in non-asyn code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed...
For each task, this is happening hundreds (thousands?) of times per second:
- Create event loop (expensive)
- Set up signal handlers
- Run coroutine
- Cancel all tasks
- Shutdown generators
- Close loop (expensive)
| # This allows async workers to work in multiprocessing mode | ||
| if inspect.iscoroutine(task_output): | ||
| import asyncio | ||
| task_output = asyncio.run(task_output) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't do this in SDKs. This starts an async event loop. If I am a dev writing my own python code that has its own async event loop, this will break. Moreover this is meant to be run inside sync code iirc or at the top level.
If someone is using async calls they likely have their own event loop already running. When this exits, it will finalize all generators and break everything. I will need to test this to be certain because it's been a while. Python async has some subtle wierdnesses.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this will not work and the tests generated for coverage won't catch this.
Also, highly recommend putting "don't use in-function imports" in your CLAUDE.md
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the legacy worker - we can remove this and throw an error if that makes sense.
The error will be something like "use asyncio task handler for async functions".
Also, just for my knowledge -- I tested this, it works, why would it not work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you say 'legacy worker', do you mean:
- The asyncio.run() code (lines 106-108) is legacy and we should remove support for async workers in multiprocessing mode?
- If so, I agree - throw a clear error instead - The Worker class itself is legacy and we're moving to a different architecture?
- If so, what's the new recommended pattern?
- What's the migration path for existing users? - Direct Worker instantiation is legacy and users should only use @worker_task decorator?
- If so, we should document this clearly and maybe deprecate the public API
The key question: Does TaskHandler (multiprocessing mode) still need Worker.execute() to work? (Currently yes - line 187 in task_runner.py)
|
|
||
| try: | ||
| logger.debug(f"Scanning module: {module_name}") | ||
| module = importlib.import_module(module_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the registration expected to happen in the module/__init__.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is experimental - I don't think we need this. But short answer is yes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm prepared to be proven wrong but here's what i see.
Critical Issue #1: Event Loop Management in Worker.execute()
From PR's worker.py --
# If the function is async (coroutine), run it synchronously using asyncio.run()
# This allows async workers to work in multiprocessing mode
if inspect.iscoroutine(task_output):
import asyncio
task_output = asyncio.run(task_output)
Problem 1A: Cannot Call from Existing Event Loop
asyncio.run() creates a new event loop, runs the coroutine, then closes it. This fails if an event loop is already running:
async def main():
# Event loop is running here
worker = Worker("my_task", my_async_func)
result = worker.execute(task) # CRASH!
# RuntimeError: asyncio.run() cannot be called from a running event loop
Problem 1B: Breaks Shared Resources
Even when asyncio.run() succeeds (in multiprocessing mode), it creates a fresh event loop that's isolated from the calling context:
# User's worker code
db_pool = await asyncpg.create_pool(...) # Pool created in main loop
async def worker_func(task):
# This pool was created in a DIFFERENT event loop
async with db_pool.acquire() as conn:
result = await conn.fetch(...) # ERROR!
return result
# In multiprocessing worker:
worker = Worker("db_task", worker_func)
worker.execute(task) # asyncio.run() creates NEW loop
# Error: Pool's event loop is closed/different
Real-world examples that break:
- Database connection pools (asyncpg, databases)
- HTTP client connection pooling (httpx, aiohttp)
- Context variables (request IDs, tracing)
- Background task coordination
- Async generators
- Message queue connections
Problem 1C: Performance Overhead
Creating and destroying an event loop for every task execution:
# For each task (potentially thousands per second):
# 1. Create event loop (~1-5ms)
# 2. Install signal handlers
# 3. Run coroutine
# 4. Cancel remaining tasks
# 5. Shutdown generators
# 6. Close loop (~1-5ms)
Using the PR's claimed throughput (multiprocessing, I/O-bound):
- Target: 400 tasks/sec
- Event loop overhead: 1-5ms per task
- Total overhead per second: 400-2000ms
Critical Issue #2: Test Coverage Doesn't Validate Real Usage
The test suite has 65 tests and 100% pass rate, but tests bypass the broken code.
All test workers extend WorkerInterface directly:
# tests/unit/resources/workers.py
class AsyncWorker(WorkerInterface): # NOT Worker class!
async def execute(self, task: Task) -> TaskResult:
await asyncio.sleep(0.01) # Trivial operation
return task_result
The AsyncIO runner bypasses Worker.execute():
# src/conductor/client/automator/task_runner_asyncio.py:947
execute_func = self.worker._execute_function # Direct access!
# NOT calling self.worker.execute(task)
What's Not Tested
1. Worker Class with Async Functions
# This pattern is NEVER tested but is how users would use it:
async def my_worker(task):
return await do_work()
worker = Worker('my_task', my_worker) # Uses Worker class
result = worker.execute(task) # Calls asyncio.run() - untested!
2. Shared Async Resources
You can't fetch data from a different async loop.
# Real user pattern (which is not represented in tests in this PR):
client = httpx.AsyncClient() # Shared connection pool
async def fetch_data(url: str):
response = await client.get(url) # Uses shared resource
return response.json()
worker = Worker('fetch', fetch_data)
# Would fail: client's event loop is different from asyncio.run()'s loop
3. Context Variables
It's not 100% clear how people are meant to access context variables in this architecture, but they wouldn't be available anyway.
# This pattern isn't represented in the tests either:
import contextvars
request_id = contextvars.ContextVar('request_id')
async def worker_func(task):
rid = request_id.get() # Would fail - different context
logger.info(f"Processing {task.id} for request {rid}")
What tests actually do:
class SimpleAsyncWorker(WorkerInterface):
async def execute(self, task: Task) -> TaskResult:
await asyncio.sleep(0.01) # Just sleeps
return TaskResult(status='COMPLETED')
What users will do:
db = await asyncpg.create_pool(dsn)
cache = aioredis.from_url('redis://localhost')
http_client = httpx.AsyncClient()
@worker_task(task_definition_name='process_order')
async def process_order(order_id: str):
# Uses shared resources created outside this function
async with db.acquire() as conn:
order = await conn.fetchrow('SELECT ... whatever', order_id)
async with http_client as client:
await client.post('https://api.example.com/notify', json=order)
return {'status': 'processed'}
# This will break in multiprocessing mode!
Claude is bad at Python async
Most of the code that Claude generates for async in Python is terrible, and this is probably why:
Draft for the AsyncIO workers
Pending Items