Skip to content

Conversation

@v1r3n
Copy link
Contributor

@v1r3n v1r3n commented Nov 9, 2025

Draft for the AsyncIO workers

  • See examples/asyncio_workers.py for example
  • Design: WORKER_CONCURRENCY_DESIGN.md

Pending Items

  • Tests
  • Performance tests against a server, running for at-least 7 days to ensure no memory leaks or other issues
  • Benchmark against sync workers and publish the results
  • More examples

@codecov
Copy link

codecov bot commented Nov 9, 2025

Codecov Report

❌ Patch coverage is 80.09479% with 294 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
.../conductor/client/automator/task_runner_asyncio.py 80.90% 105 Missing ⚠️
src/conductor/client/worker/worker_loader.py 0.00% 94 Missing ⚠️
...conductor/client/automator/task_handler_asyncio.py 78.45% 39 Missing ⚠️
...rc/conductor/client/telemetry/metrics_collector.py 82.50% 21 Missing ⚠️
src/conductor/client/event/listeners.py 67.64% 11 Missing ⚠️
src/conductor/client/event/listener_register.py 66.66% 10 Missing ⚠️
src/conductor/client/worker/worker_config.py 89.55% 7 Missing ⚠️
src/conductor/client/context/task_context.py 95.00% 3 Missing ⚠️
src/conductor/client/event/event_dispatcher.py 95.55% 2 Missing ⚠️
src/conductor/client/workflow/task/task.py 0.00% 2 Missing ⚠️
Files with missing lines Coverage Δ
src/conductor/client/automator/task_handler.py 97.56% <100.00%> (+32.22%) ⬆️
src/conductor/client/automator/task_runner.py 100.00% <100.00%> (+20.27%) ⬆️
src/conductor/client/event/conductor_event.py 100.00% <100.00%> (ø)
src/conductor/client/event/task_events.py 100.00% <100.00%> (ø)
src/conductor/client/event/task_runner_events.py 100.00% <100.00%> (ø)
src/conductor/client/event/workflow_events.py 100.00% <100.00%> (ø)
src/conductor/client/http/api_client.py 98.97% <100.00%> (+44.17%) ⬆️
...rc/conductor/client/http/models/integration_api.py 97.79% <ø> (-0.14%) ⬇️
src/conductor/client/http/models/schema_def.py 91.81% <ø> (-0.15%) ⬇️
src/conductor/client/http/models/workflow_def.py 85.19% <ø> (-0.38%) ⬇️
... and 16 more

... and 4 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@v1r3n v1r3n marked this pull request as draft November 9, 2025 08:54

# If the function is async (coroutine), run it synchronously using asyncio.run()
# This allows async workers to work in multiprocessing mode
if inspect.iscoroutine(task_output):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to be abysmally slow, especially more so in non-asyn code.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed...

For each task, this is happening hundreds (thousands?) of times per second:

  1. Create event loop (expensive)
  2. Set up signal handlers
  3. Run coroutine
  4. Cancel all tasks
  5. Shutdown generators
  6. Close loop (expensive)

# This allows async workers to work in multiprocessing mode
if inspect.iscoroutine(task_output):
import asyncio
task_output = asyncio.run(task_output)
Copy link
Collaborator

@am-orkes am-orkes Nov 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't do this in SDKs. This starts an async event loop. If I am a dev writing my own python code that has its own async event loop, this will break. Moreover this is meant to be run inside sync code iirc or at the top level.

If someone is using async calls they likely have their own event loop already running. When this exits, it will finalize all generators and break everything. I will need to test this to be certain because it's been a while. Python async has some subtle wierdnesses.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this will not work and the tests generated for coverage won't catch this.

Also, highly recommend putting "don't use in-function imports" in your CLAUDE.md

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the legacy worker - we can remove this and throw an error if that makes sense.
The error will be something like "use asyncio task handler for async functions".

Also, just for my knowledge -- I tested this, it works, why would it not work?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you say 'legacy worker', do you mean:

  1. The asyncio.run() code (lines 106-108) is legacy and we should remove support for async workers in multiprocessing mode?
    - If so, I agree - throw a clear error instead
  2. The Worker class itself is legacy and we're moving to a different architecture?
    - If so, what's the new recommended pattern?
    - What's the migration path for existing users?
  3. Direct Worker instantiation is legacy and users should only use @worker_task decorator?
    - If so, we should document this clearly and maybe deprecate the public API

The key question: Does TaskHandler (multiprocessing mode) still need Worker.execute() to work? (Currently yes - line 187 in task_runner.py)


try:
logger.debug(f"Scanning module: {module_name}")
module = importlib.import_module(module_name)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the registration expected to happen in the module/__init__.py

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is experimental - I don't think we need this. But short answer is yes.

Copy link

@nthmost-orkes nthmost-orkes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm prepared to be proven wrong but here's what i see.


Critical Issue #1: Event Loop Management in Worker.execute()

From PR's worker.py --

  # If the function is async (coroutine), run it synchronously using asyncio.run()
  # This allows async workers to work in multiprocessing mode
  if inspect.iscoroutine(task_output):
      import asyncio
      task_output = asyncio.run(task_output)

Problem 1A: Cannot Call from Existing Event Loop

asyncio.run() creates a new event loop, runs the coroutine, then closes it. This fails if an event loop is already running:

async def main():
    # Event loop is running here
    worker = Worker("my_task", my_async_func)
    result = worker.execute(task)  # CRASH!     
   # RuntimeError: asyncio.run() cannot be called from a running event loop

Problem 1B: Breaks Shared Resources

Even when asyncio.run() succeeds (in multiprocessing mode), it creates a fresh event loop that's isolated from the calling context:

# User's worker code
db_pool = await asyncpg.create_pool(...)  # Pool created in main loop

async def worker_func(task):
    # This pool was created in a DIFFERENT event loop
    async with db_pool.acquire() as conn:         
        result = await conn.fetch(...)  # ERROR!
     return result

# In multiprocessing worker:
worker = Worker("db_task", worker_func)
worker.execute(task)  # asyncio.run() creates NEW loop
# Error: Pool's event loop is closed/different

Real-world examples that break:

  • Database connection pools (asyncpg, databases)
  • HTTP client connection pooling (httpx, aiohttp)
  • Context variables (request IDs, tracing)
  • Background task coordination
  • Async generators
  • Message queue connections

Problem 1C: Performance Overhead

Creating and destroying an event loop for every task execution:

  # For each task (potentially thousands per second):
  # 1. Create event loop        (~1-5ms)
  # 2. Install signal handlers
  # 3. Run coroutine
  # 4. Cancel remaining tasks
  # 5. Shutdown generators
  # 6. Close loop               (~1-5ms)

Using the PR's claimed throughput (multiprocessing, I/O-bound):

  • Target: 400 tasks/sec
  • Event loop overhead: 1-5ms per task
  • Total overhead per second: 400-2000ms

Critical Issue #2: Test Coverage Doesn't Validate Real Usage

The test suite has 65 tests and 100% pass rate, but tests bypass the broken code.

All test workers extend WorkerInterface directly:

# tests/unit/resources/workers.py
class AsyncWorker(WorkerInterface):  # NOT Worker class!
    async def execute(self, task: Task) -> TaskResult:       
    await asyncio.sleep(0.01)  # Trivial operation
        return task_result

The AsyncIO runner bypasses Worker.execute():

# src/conductor/client/automator/task_runner_asyncio.py:947
execute_func = self.worker._execute_function  # Direct access!
# NOT calling self.worker.execute(task)

What's Not Tested

1. Worker Class with Async Functions

  # This pattern is NEVER tested but is how users would use it:
  async def my_worker(task):
      return await do_work()

  worker = Worker('my_task', my_worker)  # Uses Worker class
  result = worker.execute(task)  # Calls asyncio.run() - untested!

2. Shared Async Resources

You can't fetch data from a different async loop.

  # Real user pattern (which is not represented in tests in this PR):
  client = httpx.AsyncClient()  # Shared connection pool

  async def fetch_data(url: str):
      response = await client.get(url)  # Uses shared resource
      return response.json()

  worker = Worker('fetch', fetch_data)
  # Would fail: client's event loop is different from asyncio.run()'s loop

3. Context Variables

It's not 100% clear how people are meant to access context variables in this architecture, but they wouldn't be available anyway.

# This pattern isn't represented in the tests either:
import contextvars

request_id = contextvars.ContextVar('request_id')

async def worker_func(task):
    rid = request_id.get()  # Would fail - different context
    logger.info(f"Processing {task.id} for request {rid}")

What tests actually do:

class SimpleAsyncWorker(WorkerInterface):
    async def execute(self, task: Task) -> TaskResult:
        await asyncio.sleep(0.01)  # Just sleeps
        return TaskResult(status='COMPLETED')

What users will do:

db = await asyncpg.create_pool(dsn)
cache = aioredis.from_url('redis://localhost')
http_client = httpx.AsyncClient()

@worker_task(task_definition_name='process_order')
async def process_order(order_id: str):
    # Uses shared resources created outside this function
    async with db.acquire() as conn:
        order = await conn.fetchrow('SELECT ... whatever', order_id)

    async with http_client as client:
        await client.post('https://api.example.com/notify', json=order)

    return {'status': 'processed'}

# This will break in multiprocessing mode!

Claude is bad at Python async

Most of the code that Claude generates for async in Python is terrible, and this is probably why:

https://news.ycombinator.com/item?id=45106189

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants