diff --git a/adalflow/adalflow/components/model_client/ollama_client.py b/adalflow/adalflow/components/model_client/ollama_client.py index 48174cd7..5cc832f2 100644 --- a/adalflow/adalflow/components/model_client/ollama_client.py +++ b/adalflow/adalflow/components/model_client/ollama_client.py @@ -345,13 +345,12 @@ def parse_chat_completion( # Check for async generator (async streaming) if hasattr(completion, '__aiter__'): log.debug("Async streaming response detected") - # For streaming, return GeneratorOutput with the generator in raw_response - # This matches the OpenAI client pattern + # For streaming, wrap the generator in GeneratorOutput with proper streaming support return GeneratorOutput(data=None, raw_response=completion, api_response=completion) # Check for sync generator (sync streaming) elif isinstance(completion, GeneratorType): log.debug("Sync streaming response detected") - # For streaming, return GeneratorOutput with the generator in raw_response + # For streaming, wrap the generator in GeneratorOutput with proper streaming support return GeneratorOutput(data=None, raw_response=completion, api_response=completion) # Non-streaming generate API elif self.generate: diff --git a/adalflow/adalflow/core/generator.py b/adalflow/adalflow/core/generator.py index 0384b3a7..1b12a267 100644 --- a/adalflow/adalflow/core/generator.py +++ b/adalflow/adalflow/core/generator.py @@ -364,17 +364,29 @@ def _post_call(self, completion: Any) -> GeneratorOutput: # Now adding the data field to the output data = output.raw_response - # TODO implement support for synchronous iterator in the future - if self.output_processors: - if data: - try: - data = self.output_processors(data) - output.data = data - except Exception as e: - log.error(f"Error processing the output processors: {e}") - output.error = str(e) + # Check if this is a streaming response (generator/iterator) + from typing import Generator as GeneratorType + from collections.abc import AsyncGenerator as AsyncGeneratorABC + + is_streaming = isinstance(data, (GeneratorType, AsyncGeneratorABC)) or hasattr(data, '__iter__') and not isinstance(data, str) + + if is_streaming: + # For streaming responses, don't process with output_processors immediately + # The streaming data should be consumed by the caller + log.debug("Streaming response detected, skipping output processors") + output.data = None # Will be populated when stream is consumed else: - output.data = data + # Non-streaming response processing + if self.output_processors: + if data: + try: + data = self.output_processors(data) + output.data = data + except Exception as e: + log.error(f"Error processing the output processors: {e}") + output.error = str(e) + else: + output.data = data return output diff --git a/adalflow/tests/test_ollama_client.py b/adalflow/tests/test_ollama_client.py index a4d6f0e1..48aa9f59 100644 --- a/adalflow/tests/test_ollama_client.py +++ b/adalflow/tests/test_ollama_client.py @@ -128,6 +128,7 @@ def mock_stream_generator(): # For streaming, the parsed result should be a GeneratorOutput with raw_response containing the generator self.assertIsInstance(parsed, GeneratorOutput) self.assertIsNotNone(parsed.raw_response) + self.assertIsNone(parsed.data) # data should be None for streaming until consumed self.assertEqual(parsed.api_response, result) # Verify we can iterate through the raw_response @@ -176,6 +177,7 @@ async def mock_async_stream_generator(): # For streaming, the parsed result should be a GeneratorOutput with raw_response containing the async generator self.assertIsInstance(parsed, GeneratorOutput) self.assertIsNotNone(parsed.raw_response) + self.assertIsNone(parsed.data) # data should be None for streaming until consumed self.assertEqual(parsed.api_response, result) # Verify we can iterate through the raw_response asynchronously