Skip to content

Asynchronous multiprocessing.Pipe reads throw when using uvloop #686

@judilsteve

Description

@judilsteve

Summary

When using an asyncio.Event() with loop.add_reader() to asynchronously read from a multiprocessing.Pipe, the call to recv_bytes() throws:

Process reader:
Traceback (most recent call last):
  File "/home/james/.pyenv/versions/3.13.5/lib/python3.13/multiprocessing/process.py", line 313, in _bootstrap
    self.run()
    ~~~~~~~~^^
  File "/home/james/.pyenv/versions/3.13.5/lib/python3.13/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
    ~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/james/code/connection_test.py", line 25, in reader_wrapper
    uvloop.run(async_reader(read_pipe))
    ~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/james/code/.venv/lib/python3.13/site-packages/uvloop/__init__.py", line 109, in run
    return __asyncio.run(
           ~~~~~~~~~~~~~^
        wrapper(),
        ^^^^^^^^^^
    ...<2 lines>...
        **run_kwargs
        ^^^^^^^^^^^^
    )
    ^
  File "/home/james/.pyenv/versions/3.13.5/lib/python3.13/asyncio/runners.py", line 195, in run
    return runner.run(main)
           ~~~~~~~~~~^^^^^^
  File "/home/james/.pyenv/versions/3.13.5/lib/python3.13/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
  File "uvloop/loop.pyx", line 1518, in uvloop.loop.Loop.run_until_complete
  File "/home/james/code/.venv/lib/python3.13/site-packages/uvloop/__init__.py", line 61, in wrapper
    return await main
           ^^^^^^^^^^
  File "/home/james/code/connection_test.py", line 20, in async_reader
    message = read_pipe.recv_bytes()
  File "/home/james/.pyenv/versions/3.13.5/lib/python3.13/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/home/james/.pyenv/versions/3.13.5/lib/python3.13/multiprocessing/connection.py", line 437, in _recv_bytes
    return self._recv(size)
           ~~~~~~~~~~^^^^^^
  File "/home/james/.pyenv/versions/3.13.5/lib/python3.13/multiprocessing/connection.py", line 395, in _recv
    chunk = read(handle, remaining)
BlockingIOError: [Errno 11] Resource temporarily unavailable

This does not occur when running with asyncio

Environment

Python: 3.13.5
uvloop: 0.21.0

Reproduction

import asyncio
import uvloop
from multiprocessing import Process
from multiprocessing.connection import Connection, Pipe, wait

def writer(write_pipe: Connection):
    message = b'!' * 65536
    while True:
        write_pipe.send_bytes(message)

async def async_reader(read_pipe: Connection):
    message_available = asyncio.Event()
    asyncio.get_running_loop().add_reader(
        read_pipe.fileno(), message_available.set
    )

    while True:
        await message_available.wait()
        while read_pipe.poll():
            message = read_pipe.recv_bytes()
            print('Got message')
        message_available.clear()

def reader_wrapper(read_pipe: Connection):
    uvloop.run(async_reader(read_pipe))

if __name__ == '__main__':
    read_pipe, write_pipe = Pipe(duplex=False)

    reader_process = Process(
        name="reader", target=reader_wrapper, args=(read_pipe,)
    )
    reader_process.start()

    writer_process = Process(
        name="writer", target=writer, args=(write_pipe,)
    )
    writer_process.start()

    wait([reader_process.sentinel, writer_process.sentinel])

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions