Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 29 additions & 10 deletions adalflow/adalflow/components/model_client/bedrock_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,29 @@

log = logging.getLogger(__name__)

bedrock_runtime_exceptions = boto3.client(
service_name="bedrock-runtime",
region_name=os.getenv("AWS_REGION_NAME", "us-east-1"),
).exceptions
# Lazy initialization of bedrock exceptions to avoid AWS credential issues during import
_bedrock_runtime_exceptions = None


def get_bedrock_runtime_exceptions():
"""Get bedrock runtime exceptions, creating the client lazily if needed."""
global _bedrock_runtime_exceptions
if _bedrock_runtime_exceptions is None:
try:
_bedrock_runtime_exceptions = boto3.client(
service_name="bedrock-runtime",
region_name=os.getenv("AWS_REGION_NAME", "us-east-1"),
).exceptions
except Exception as e:
log.warning(f"Could not initialize bedrock client: {e}")

# Create a mock exceptions object to prevent import failures
class MockExceptions:
def __getattr__(self, name):
return Exception

_bedrock_runtime_exceptions = MockExceptions()
return _bedrock_runtime_exceptions


def get_first_message_content(completion: Dict) -> str:
Expand All @@ -41,7 +60,7 @@ def get_first_message_content(completion: Dict) -> str:
__all__ = [
"BedrockAPIClient",
"get_first_message_content",
"bedrock_runtime_exceptions",
"get_bedrock_runtime_exceptions",
]


Expand Down Expand Up @@ -262,11 +281,11 @@ def convert_inputs_to_api_kwargs(
@backoff.on_exception(
backoff.expo,
(
bedrock_runtime_exceptions.ThrottlingException,
bedrock_runtime_exceptions.ModelTimeoutException,
bedrock_runtime_exceptions.InternalServerException,
bedrock_runtime_exceptions.ModelErrorException,
bedrock_runtime_exceptions.ValidationException,
get_bedrock_runtime_exceptions().ThrottlingException,
get_bedrock_runtime_exceptions().ModelTimeoutException,
get_bedrock_runtime_exceptions().InternalServerException,
get_bedrock_runtime_exceptions().ModelErrorException,
get_bedrock_runtime_exceptions().ValidationException,
),
max_time=2,
)
Expand Down
122 changes: 63 additions & 59 deletions adalflow/adalflow/components/model_client/openai_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class ParsedResponseContent:
code_outputs: Outputs from code interpreter
raw_output: The original output array for advanced processing
"""

text: Optional[str] = None
images: Optional[Union[str, List[str]]] = None
tool_calls: Optional[List[Dict[str, Any]]] = None
Expand All @@ -92,13 +93,9 @@ class ParsedResponseContent:

def __bool__(self) -> bool:
"""Check if there's any content."""
return any([
self.text,
self.images,
self.tool_calls,
self.reasoning,
self.code_outputs
])
return any(
[self.text, self.images, self.tool_calls, self.reasoning, self.code_outputs]
)


# OLD CHAT COMPLETION PARSING FUNCTIONS (COMMENTED OUT)
Expand Down Expand Up @@ -135,14 +132,14 @@ def parse_response_output(response: Response) -> ParsedResponseContent:
content = ParsedResponseContent()

# Store raw output for advanced users
if hasattr(response, 'output'):
if hasattr(response, "output"):
content.raw_output = response.output

# First try to use output_text if available (SDK convenience property)
if hasattr(response, 'output_text') and response.output_text:
if hasattr(response, "output_text") and response.output_text:
content.text = response.output_text
# Parse the output array manually if no output_text
if hasattr(response, 'output') and response.output:
if hasattr(response, "output") and response.output:
parsed = _parse_output_array(response.output)
content.text = content.text or parsed.get("text")
content.images = parsed.get("images", [])
Expand All @@ -153,7 +150,6 @@ def parse_response_output(response: Response) -> ParsedResponseContent:
return content



def _parse_message(item) -> Dict[str, Any]:
"""Parse a message item from the output array.

Expand All @@ -165,19 +161,21 @@ def _parse_message(item) -> Dict[str, Any]:
"""
result = {"text": None}

if hasattr(item, 'content') and isinstance(item.content, list):
# now pick the longer response
if hasattr(item, "content") and isinstance(item.content, list):
# now pick the longer response
text_parts = []

for content_item in item.content:
content_type = getattr(content_item, 'type', None)
content_type = getattr(content_item, "type", None)

if content_type == "output_text":
if hasattr(content_item, 'text'):
if hasattr(content_item, "text"):
text_parts.append(content_item.text)

if text_parts:
result["text"] = max(text_parts, key=len) if len(text_parts) > 1 else text_parts[0]
result["text"] = (
max(text_parts, key=len) if len(text_parts) > 1 else text_parts[0]
)

return result

Expand All @@ -194,11 +192,11 @@ def _parse_reasoning(item) -> Dict[str, Any]:
result = {"reasoning": None}

# Extract text from reasoning summary if available
if hasattr(item, 'summary') and isinstance(item.summary, list):
if hasattr(item, "summary") and isinstance(item.summary, list):
summary_texts = []
for summary_item in item.summary:
if hasattr(summary_item, 'type') and summary_item.type == "summary_text":
if hasattr(summary_item, 'text'):
if hasattr(summary_item, "type") and summary_item.type == "summary_text":
if hasattr(summary_item, "text"):
summary_texts.append(summary_item.text)

if summary_texts:
Expand All @@ -219,7 +217,7 @@ def _parse_image(item) -> Dict[str, Any]:
"""
result = {"images": None}

if hasattr(item, 'result'):
if hasattr(item, "result"):
# The result contains the base64 image data or URL
result["images"] = item.result

Expand All @@ -235,23 +233,18 @@ def _parse_tool_call(item) -> Dict[str, Any]:
Returns:
Dict with tool call information
"""
item_type = getattr(item, 'type', None)
item_type = getattr(item, "type", None)

if item_type == "image_generation_call":
# Handle image generation - extract the result which contains the image data
if hasattr(item, 'result'):
if hasattr(item, "result"):
# The result contains the base64 image data or URL
return {"images": item.result}
elif item_type == "code_interpreter_tool_call":
return {"code_outputs": [_serialize_item(item)]}
else:
# Generic tool call
return {
"tool_calls": [{
"type": item_type,
"content": _serialize_item(item)
}]
}
return {"tool_calls": [{"type": item_type, "content": _serialize_item(item)}]}

return {}

Expand All @@ -272,7 +265,7 @@ def _parse_output_array(output_array) -> Dict[str, Any]:
"images": None,
"tool_calls": None,
"reasoning": None,
"code_outputs": None
"code_outputs": None,
}

if not output_array:
Expand All @@ -286,7 +279,7 @@ def _parse_output_array(output_array) -> Dict[str, Any]:
text = None

for item in output_array:
item_type = getattr(item, 'type', None)
item_type = getattr(item, "type", None)

if item_type == "reasoning":
# Parse reasoning item
Expand All @@ -306,16 +299,17 @@ def _parse_output_array(output_array) -> Dict[str, Any]:
if parsed.get("images"):
all_images.append(parsed["images"])

elif item_type and ('call' in item_type or 'tool' in item_type):
elif item_type and ("call" in item_type or "tool" in item_type):
# Parse other tool calls
parsed = _parse_tool_call(item)
if parsed.get("tool_calls"):
all_tool_calls.extend(parsed["tool_calls"])
if parsed.get("code_outputs"):
all_code_outputs.extend(parsed["code_outputs"])


result["text"] = text if text else None # TODO: they can potentially send multiple complete text messages, we might need to save all of them and only return the first that can convert to outpu parser
result["text"] = (
text if text else None
) # TODO: they can potentially send multiple complete text messages, we might need to save all of them and only return the first that can convert to outpu parser

# Set other fields if they have content
result["images"] = all_images
Expand All @@ -333,7 +327,7 @@ def _serialize_item(item) -> Dict[str, Any]:
"""Convert an output item to a serializable dict."""
result = {}
for attr in dir(item):
if not attr.startswith('_'):
if not attr.startswith("_"):
value = getattr(item, attr, None)
if value is not None and not callable(value):
result[attr] = value
Expand Down Expand Up @@ -406,8 +400,6 @@ def handle_streaming_response_sync(stream: Iterable) -> GeneratorType:
yield event




class OpenAIClient(ModelClient):
__doc__ = r"""A component wrapper for the OpenAI API client.

Expand Down Expand Up @@ -783,11 +775,15 @@ def parse_chat_completion(
"""Parse the Response API completion and put it into the raw_response.
Fully migrated to Response API only."""

parser = self.response_parser
log.info(f"completion/response: {completion}, parser: {parser}")

# Check if this is a Response with complex output (tools, images, etc.)
# Determine parser dynamically based on completion type instead of relying on instance variable
# This fixes the issue where streaming/non-streaming modes get mixed up
if isinstance(completion, Response):
# Non-streaming Response object
parser = self.non_streaming_response_parser
log.info(
f"completion/response: {completion}, parser: {parser} (non-streaming)"
)

parsed_content = parse_response_output(completion)
usage = self.track_completion_usage(completion)

Expand All @@ -797,22 +793,41 @@ def parse_chat_completion(
if parsed_content.reasoning:
thinking = str(parsed_content.reasoning)


return GeneratorOutput(
data=data, # only text
thinking=thinking,
images=parsed_content.images, # List of image data (base64 or URLs)
tool_use=None, # Will be populated when we handle function tool calls
error=None,
raw_response=data,
usage=usage
usage=usage,
)
elif hasattr(completion, "__aiter__"):
# Async streaming (AsyncIterable)
parser = self.streaming_response_parser_async
log.info(
f"completion/response: {completion}, parser: {parser} (async streaming)"
)
elif hasattr(completion, "__iter__") and not isinstance(
completion, (str, bytes, dict)
):
# Sync streaming (Iterable) - exclude basic types that have __iter__ but aren't streams
parser = self.streaming_response_parser_sync
log.info(
f"completion/response: {completion}, parser: {parser} (sync streaming)"
)
else:
# Fallback to non-streaming parser (includes strings, dicts, etc.)
parser = self.non_streaming_response_parser
log.info(
f"completion/response: {completion}, parser: {parser} (fallback non-streaming)"
)

# Regular response handling (streaming or other)
data = parser(completion)
usage = self.track_completion_usage(completion)
return GeneratorOutput(data=None, error=None, raw_response=data, usage=usage)


# NEW RESPONSE API ONLY FUNCTION
def track_completion_usage(
self,
Expand Down Expand Up @@ -965,12 +980,7 @@ def convert_inputs_to_api_kwargs(
content = format_content_for_response_api(input, images)

# For responses.create API, wrap in user message format
final_model_kwargs["input"] = [
{
"role": "user",
"content": content
}
]
final_model_kwargs["input"] = [{"role": "user", "content": content}]
else:
# Text-only input
final_model_kwargs["input"] = input
Expand Down Expand Up @@ -1034,13 +1044,11 @@ def call(self, api_kwargs: Dict = {}, model_type: ModelType = ModelType.UNDEFINE
elif model_type == ModelType.LLM_REASONING or model_type == ModelType.LLM:
if "stream" in api_kwargs and api_kwargs.get("stream", False):
log.debug("streaming call")
self.response_parser = (
self.streaming_response_parser_sync
) # Use sync streaming parser
# No longer setting self.response_parser - parser will be determined dynamically
return self.sync_client.responses.create(**api_kwargs)
else:
log.debug("non-streaming call")
self.response_parser = self.non_streaming_response_parser
# No longer setting self.response_parser - parser will be determined dynamically
return self.sync_client.responses.create(**api_kwargs)

else:
Expand Down Expand Up @@ -1089,15 +1097,11 @@ async def acall(
elif model_type == ModelType.LLM or model_type == ModelType.LLM_REASONING:
if "stream" in api_kwargs and api_kwargs.get("stream", False):
log.debug("async streaming call")
self.response_parser = (
self.streaming_response_parser_async
) # Use async streaming parser
# setting response parser as async streaming parser for Response API
# No longer setting self.response_parser - parser will be determined dynamically
return await self.async_client.responses.create(**api_kwargs)
else:
log.debug("async non-streaming call")
self.response_parser = self.non_streaming_response_parser
# setting response parser as async non-streaming parser for Response API
# No longer setting self.response_parser - parser will be determined dynamically
return await self.async_client.responses.create(**api_kwargs)
elif model_type == ModelType.IMAGE_GENERATION:
# Determine which image API to call based on the presence of image/mask
Expand Down
5 changes: 2 additions & 3 deletions adalflow/adalflow/optim/gradient.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,8 @@ def __init__(
self.from_response_id = from_response.id
self.to_pred_id = to_pred.id
self.score = score
self.data_id = data_id
if self.data_id is None:
raise ValueError("The data_id should not be None.")
# Use provided data_id or generate a default one to prevent tutorial errors
self.data_id = data_id or f"gradient_{self.id}"
self.data = data
self.order = None

Expand Down
Loading
Loading