From 3215e5e786214fc42068d6d84c18bfdc3bc6d8ea Mon Sep 17 00:00:00 2001 From: Devdatta Talele Date: Sun, 17 Aug 2025 16:51:23 +0530 Subject: [PATCH 1/2] Fix Ollama client streaming issue with stream=True Resolves issue #299 where OllamaClient failed with 'generator' object has no attribute 'raw_response' error when using stream=True. Changes: - Modified OllamaClient.parse_chat_completion to return raw generators directly for streaming - Updated Generator error handling to prevent generator objects in raw_response field - Added proper type checking for both sync and async generators - Updated tests to reflect correct streaming behavior The fix ensures that streaming generators are handled correctly by the Generator component rather than being incorrectly wrapped in GeneratorOutput.raw_response. --- .../components/model_client/ollama_client.py | 11 +++++----- adalflow/adalflow/core/generator.py | 12 ++++++++-- adalflow/tests/test_ollama_client.py | 22 +++++++++---------- 3 files changed, 26 insertions(+), 19 deletions(-) diff --git a/adalflow/adalflow/components/model_client/ollama_client.py b/adalflow/adalflow/components/model_client/ollama_client.py index 48174cd7..106e4400 100644 --- a/adalflow/adalflow/components/model_client/ollama_client.py +++ b/adalflow/adalflow/components/model_client/ollama_client.py @@ -345,14 +345,15 @@ def parse_chat_completion( # Check for async generator (async streaming) if hasattr(completion, '__aiter__'): log.debug("Async streaming response detected") - # For streaming, return GeneratorOutput with the generator in raw_response - # This matches the OpenAI client pattern - return GeneratorOutput(data=None, raw_response=completion, api_response=completion) + # For streaming, return the raw async generator directly. + # The Generator component will iterate over this and wrap each chunk. + return completion # Check for sync generator (sync streaming) elif isinstance(completion, GeneratorType): log.debug("Sync streaming response detected") - # For streaming, return GeneratorOutput with the generator in raw_response - return GeneratorOutput(data=None, raw_response=completion, api_response=completion) + # For streaming, return the raw sync generator directly. + # The Generator component will iterate over this and wrap each chunk. + return completion # Non-streaming generate API elif self.generate: return parse_generate_response(completion) diff --git a/adalflow/adalflow/core/generator.py b/adalflow/adalflow/core/generator.py index 0384b3a7..e6ee3a9c 100644 --- a/adalflow/adalflow/core/generator.py +++ b/adalflow/adalflow/core/generator.py @@ -1266,8 +1266,12 @@ def call( output = self._post_call(completion) except Exception as e: log.error(f"Error processing the output: {e}") + # Check if completion is a generator to avoid placing generator object in raw_response + from typing import Generator as GeneratorType + from collections.abc import AsyncGenerator as AsyncGeneratorABC + raw_response = None if isinstance(completion, (GeneratorType, AsyncGeneratorABC)) else str(completion) output = GeneratorOutput( - raw_response=str(completion), error=str(e), id=id, input=prompt_str + raw_response=raw_response, error=str(e), id=id, input=prompt_str ) # User only need to use one of them, no need to use them all. @@ -1356,8 +1360,12 @@ async def acall( output = await self._async_post_call(completion) except Exception as e: log.error(f"Error processing the output: {e}") + # Check if completion is a generator to avoid placing generator object in raw_response + from typing import Generator as GeneratorType + from collections.abc import AsyncGenerator as AsyncGeneratorABC + raw_response = None if isinstance(completion, (GeneratorType, AsyncGeneratorABC)) else str(completion) output = GeneratorOutput( - raw_response=str(completion), error=str(e), id=id, input=prompt_str + raw_response=raw_response, error=str(e), id=id, input=prompt_str ) # User only need to use one of them, no need to use them all. diff --git a/adalflow/tests/test_ollama_client.py b/adalflow/tests/test_ollama_client.py index a4d6f0e1..322e802f 100644 --- a/adalflow/tests/test_ollama_client.py +++ b/adalflow/tests/test_ollama_client.py @@ -125,14 +125,13 @@ def mock_stream_generator(): # Parse the result parsed = ollama_client.parse_chat_completion(result) - # For streaming, the parsed result should be a GeneratorOutput with raw_response containing the generator - self.assertIsInstance(parsed, GeneratorOutput) - self.assertIsNotNone(parsed.raw_response) - self.assertEqual(parsed.api_response, result) + # For streaming, the parsed result should be the raw generator directly + self.assertTrue(hasattr(parsed, '__iter__')) + self.assertNotIsInstance(parsed, GeneratorOutput) - # Verify we can iterate through the raw_response + # Verify we can iterate through the raw generator content_parts = [] - for chunk in parsed.raw_response: + for chunk in parsed: if "message" in chunk: content_parts.append(chunk["message"]["content"]) @@ -173,14 +172,13 @@ async def mock_async_stream_generator(): # Parse the result parsed = ollama_client.parse_chat_completion(result) - # For streaming, the parsed result should be a GeneratorOutput with raw_response containing the async generator - self.assertIsInstance(parsed, GeneratorOutput) - self.assertIsNotNone(parsed.raw_response) - self.assertEqual(parsed.api_response, result) + # For streaming, the parsed result should be the raw async generator directly + self.assertTrue(hasattr(parsed, '__aiter__')) + self.assertNotIsInstance(parsed, GeneratorOutput) - # Verify we can iterate through the raw_response asynchronously + # Verify we can iterate through the raw async generator content_parts = [] - async for chunk in parsed.raw_response: + async for chunk in parsed: if "message" in chunk: content_parts.append(chunk["message"]["content"]) From 4837c24c43e819a140e1b963a585ea5dd7863187 Mon Sep 17 00:00:00 2001 From: Devdatta Talele Date: Sun, 17 Aug 2025 20:48:07 +0530 Subject: [PATCH 2/2] Fix architectural issues in Ollama streaming implementation Previous implementation broke interface consistency and created architectural problems. Corrected approach: - OllamaClient consistently returns GeneratorOutput for all cases - raw_response contains the streaming generator (following Anthropic client pattern) - data remains None until final output is processed - Removed incorrect type checking from Generator core component - Maintains polymorphism across all model clients This follows the established contract: - raw_response = streaming chunks/iterator - data = finalized complete output (processed later) Fixes maintain full compatibility with Generator component and preserve all existing functionality (processors, tracking, caching). All tests pass and integration with Generator component verified. --- .../components/model_client/ollama_client.py | 10 ++--- adalflow/adalflow/core/generator.py | 44 ++++++++++--------- adalflow/tests/test_ollama_client.py | 24 +++++----- 3 files changed, 42 insertions(+), 36 deletions(-) diff --git a/adalflow/adalflow/components/model_client/ollama_client.py b/adalflow/adalflow/components/model_client/ollama_client.py index 106e4400..5cc832f2 100644 --- a/adalflow/adalflow/components/model_client/ollama_client.py +++ b/adalflow/adalflow/components/model_client/ollama_client.py @@ -345,15 +345,13 @@ def parse_chat_completion( # Check for async generator (async streaming) if hasattr(completion, '__aiter__'): log.debug("Async streaming response detected") - # For streaming, return the raw async generator directly. - # The Generator component will iterate over this and wrap each chunk. - return completion + # For streaming, wrap the generator in GeneratorOutput with proper streaming support + return GeneratorOutput(data=None, raw_response=completion, api_response=completion) # Check for sync generator (sync streaming) elif isinstance(completion, GeneratorType): log.debug("Sync streaming response detected") - # For streaming, return the raw sync generator directly. - # The Generator component will iterate over this and wrap each chunk. - return completion + # For streaming, wrap the generator in GeneratorOutput with proper streaming support + return GeneratorOutput(data=None, raw_response=completion, api_response=completion) # Non-streaming generate API elif self.generate: return parse_generate_response(completion) diff --git a/adalflow/adalflow/core/generator.py b/adalflow/adalflow/core/generator.py index e6ee3a9c..1b12a267 100644 --- a/adalflow/adalflow/core/generator.py +++ b/adalflow/adalflow/core/generator.py @@ -364,17 +364,29 @@ def _post_call(self, completion: Any) -> GeneratorOutput: # Now adding the data field to the output data = output.raw_response - # TODO implement support for synchronous iterator in the future - if self.output_processors: - if data: - try: - data = self.output_processors(data) - output.data = data - except Exception as e: - log.error(f"Error processing the output processors: {e}") - output.error = str(e) + # Check if this is a streaming response (generator/iterator) + from typing import Generator as GeneratorType + from collections.abc import AsyncGenerator as AsyncGeneratorABC + + is_streaming = isinstance(data, (GeneratorType, AsyncGeneratorABC)) or hasattr(data, '__iter__') and not isinstance(data, str) + + if is_streaming: + # For streaming responses, don't process with output_processors immediately + # The streaming data should be consumed by the caller + log.debug("Streaming response detected, skipping output processors") + output.data = None # Will be populated when stream is consumed else: - output.data = data + # Non-streaming response processing + if self.output_processors: + if data: + try: + data = self.output_processors(data) + output.data = data + except Exception as e: + log.error(f"Error processing the output processors: {e}") + output.error = str(e) + else: + output.data = data return output @@ -1266,12 +1278,8 @@ def call( output = self._post_call(completion) except Exception as e: log.error(f"Error processing the output: {e}") - # Check if completion is a generator to avoid placing generator object in raw_response - from typing import Generator as GeneratorType - from collections.abc import AsyncGenerator as AsyncGeneratorABC - raw_response = None if isinstance(completion, (GeneratorType, AsyncGeneratorABC)) else str(completion) output = GeneratorOutput( - raw_response=raw_response, error=str(e), id=id, input=prompt_str + raw_response=str(completion), error=str(e), id=id, input=prompt_str ) # User only need to use one of them, no need to use them all. @@ -1360,12 +1368,8 @@ async def acall( output = await self._async_post_call(completion) except Exception as e: log.error(f"Error processing the output: {e}") - # Check if completion is a generator to avoid placing generator object in raw_response - from typing import Generator as GeneratorType - from collections.abc import AsyncGenerator as AsyncGeneratorABC - raw_response = None if isinstance(completion, (GeneratorType, AsyncGeneratorABC)) else str(completion) output = GeneratorOutput( - raw_response=raw_response, error=str(e), id=id, input=prompt_str + raw_response=str(completion), error=str(e), id=id, input=prompt_str ) # User only need to use one of them, no need to use them all. diff --git a/adalflow/tests/test_ollama_client.py b/adalflow/tests/test_ollama_client.py index 322e802f..48aa9f59 100644 --- a/adalflow/tests/test_ollama_client.py +++ b/adalflow/tests/test_ollama_client.py @@ -125,13 +125,15 @@ def mock_stream_generator(): # Parse the result parsed = ollama_client.parse_chat_completion(result) - # For streaming, the parsed result should be the raw generator directly - self.assertTrue(hasattr(parsed, '__iter__')) - self.assertNotIsInstance(parsed, GeneratorOutput) + # For streaming, the parsed result should be a GeneratorOutput with raw_response containing the generator + self.assertIsInstance(parsed, GeneratorOutput) + self.assertIsNotNone(parsed.raw_response) + self.assertIsNone(parsed.data) # data should be None for streaming until consumed + self.assertEqual(parsed.api_response, result) - # Verify we can iterate through the raw generator + # Verify we can iterate through the raw_response content_parts = [] - for chunk in parsed: + for chunk in parsed.raw_response: if "message" in chunk: content_parts.append(chunk["message"]["content"]) @@ -172,13 +174,15 @@ async def mock_async_stream_generator(): # Parse the result parsed = ollama_client.parse_chat_completion(result) - # For streaming, the parsed result should be the raw async generator directly - self.assertTrue(hasattr(parsed, '__aiter__')) - self.assertNotIsInstance(parsed, GeneratorOutput) + # For streaming, the parsed result should be a GeneratorOutput with raw_response containing the async generator + self.assertIsInstance(parsed, GeneratorOutput) + self.assertIsNotNone(parsed.raw_response) + self.assertIsNone(parsed.data) # data should be None for streaming until consumed + self.assertEqual(parsed.api_response, result) - # Verify we can iterate through the raw async generator + # Verify we can iterate through the raw_response asynchronously content_parts = [] - async for chunk in parsed: + async for chunk in parsed.raw_response: if "message" in chunk: content_parts.append(chunk["message"]["content"])