diff --git a/dspy/streaming/messages.py b/dspy/streaming/messages.py index 4170a0a496..458e1742cb 100644 --- a/dspy/streaming/messages.py +++ b/dspy/streaming/messages.py @@ -1,5 +1,4 @@ import asyncio -import concurrent.futures from dataclasses import dataclass from typing import Any @@ -33,18 +32,10 @@ async def _send(): try: asyncio.get_running_loop() - # If we're in an event loop, offload to a new thread with its own event loop - def run_in_new_loop(): - new_loop = asyncio.new_event_loop() - asyncio.set_event_loop(new_loop) - try: - return new_loop.run_until_complete(_send()) - finally: - new_loop.close() - - with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor: - future = executor.submit(run_in_new_loop) - return future.result() + # If we're in an event loop, use send_nowait to avoid blocking + # The anyio MemoryObjectSendStream has a synchronous send_nowait method + stream.send_nowait(message) + return None except RuntimeError: # Not in an event loop, safe to use a new event loop in this thread return syncify(_send)() diff --git a/tests/streaming/test_streaming.py b/tests/streaming/test_streaming.py index 89a84250ff..0760687c71 100644 --- a/tests/streaming/test_streaming.py +++ b/tests/streaming/test_streaming.py @@ -806,6 +806,68 @@ async def aforward(self, question, **kwargs): assert timestamps[1] - timestamps[0] >= 1 +@pytest.mark.anyio +async def test_async_tool_streaming_no_deadlock(): + """Test that async tools with streaming don't cause deadlock. + + This is a regression test for the issue where sync_send_to_stream would block + the event loop when async tool callbacks tried to send status messages, causing + the stream to freeze after "Tool calling finished!" message. + """ + async def async_tool_with_delay(x: str): + """An async tool that simulates MCP or ReAct tool calls.""" + await asyncio.sleep(0.3) + return f"Processed: {x}" + + class ProgramWithAsyncTools(dspy.Module): + def __init__(self): + super().__init__() + self.async_tool = dspy.Tool(async_tool_with_delay, name="async_processor") + self.predict = dspy.Predict("input->output") + + async def aforward(self, input_text: str, **kwargs): + # Call async tool - this should trigger on_tool_start and on_tool_end callbacks + result = await self.async_tool.acall(x=input_text) + # Then call predict + return await self.predict.acall(input=result, **kwargs) + + lm = dspy.utils.DummyLM([{"output": "final result"}]) + + with dspy.context(lm=lm): + program = dspy.streamify( + ProgramWithAsyncTools(), + status_message_provider=StatusMessageProvider(), + is_async_program=True + ) + + output = program(input_text="test") + + status_messages = [] + final_prediction = None + start_time = time.time() + + # This should complete without deadlock + async for value in output: + if isinstance(value, dspy.streaming.StatusMessage): + status_messages.append(value.message) + elif isinstance(value, dspy.Prediction): + final_prediction = value + + duration = time.time() - start_time + + # Verify we got both tool start and end messages + assert len(status_messages) == 2, f"Expected 2 status messages, got {len(status_messages)}: {status_messages}" + assert "async_processor" in status_messages[0], f"First message should mention tool: {status_messages[0]}" + assert "finished" in status_messages[1].lower(), f"Second message should indicate completion: {status_messages[1]}" + + # Verify we got the final prediction (stream didn't freeze) + assert final_prediction is not None, "Should receive final prediction without deadlock" + assert final_prediction.output == "final result" + + # Verify the async delay worked (tool actually executed) + assert duration >= 0.3, f"Duration {duration} suggests tool didn't execute" + + @pytest.mark.anyio async def test_stream_listener_allow_reuse(): class MyProgram(dspy.Module):