Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 13 additions & 3 deletions adalflow/adalflow/components/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
from adalflow.core.tool_manager import ToolManager
from adalflow.core.prompt_builder import Prompt
from adalflow.core.types import GeneratorOutput, ModelType, Function
from adalflow.core.base_data_class import DataClass, DataClassFormatType

from adalflow.optim.parameter import Parameter, ParameterType
from adalflow.components.output_parsers import JsonOutputParser
from adalflow.utils import printc
Expand Down Expand Up @@ -54,7 +56,7 @@

# '


# TODO: replace the agent to pydantic Function Model. But it cant control the fields
# the context will wrap the whole component
def create_default_tool_manager(
# Tool manager parameters
Expand Down Expand Up @@ -154,9 +156,17 @@ def create_default_planner(
include_fields = ["name", "kwargs", "_is_answer_final", "_answer"]
else:
include_fields = ["thought", "name", "kwargs", "_is_answer_final", "_answer"]

examples = [
Function(
name="example_function",
kwargs={"param1": "value1", "param2": "value2"},
_is_answer_final=False,
_answer=None,)
]
output_parser = JsonOutputParser(
data_class=ouput_data_class,
examples=None,
examples=examples,
# examples=self._examples, # TODO: add examples
return_data_class=True,
include_fields=include_fields,
Expand All @@ -169,7 +179,7 @@ def create_default_planner(

prompt_kwargs = {
"tools": tool_manager.yaml_definitions,
"output_format_str": output_parser.format_instructions(),
"output_format_str": output_parser.format_instructions(format_type=DataClassFormatType.SIGNATURE_JSON),
"task_desc": Parameter(
name="react_agent_task_desc",
data=task_desc,
Expand Down
3 changes: 2 additions & 1 deletion adalflow/adalflow/components/agent/prompts.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@
- If the last observation starts with "Run into error", you should try to fix the error in the next step.
<END_OF_TASK_SPEC>
"""
# TODO: access the max steps in the agent prompt or not
# Chat history should be user: message, assistant: message + meta data {step_history}
# step_history is the observations.
DEFAULT_ADALFLOW_AGENT_SYSTEM_PROMPT = r"""<START_OF_SYSTEM_PROMPT>
{{task_desc}}
- You cant use more than {{max_steps}} steps. At the {{max_steps}}th current step, must set `_is_answer_final` to True and provide the answer.
Expand Down
127 changes: 87 additions & 40 deletions adalflow/adalflow/components/agent/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
AsyncIterable,
)
from typing_extensions import TypeAlias
import sys
import uuid


from adalflow.optim.parameter import Parameter
Expand Down Expand Up @@ -323,7 +323,7 @@ def _check_last_step(self, step: Function) -> bool:

def _get_final_answer(self, function: Function) -> Any:
"""Get and process the final answer from the function."""
if hasattr(function, "_answer"):
if hasattr(function, "_answer") or (hasattr(function, "_is_answer_final") and function._is_answer_final):
return self._process_data(function._answer)
return None

Expand All @@ -338,30 +338,44 @@ def _create_runner_result(self, answer: Any, step_history, error: Optional[str]
)
def _create_execution_complete_stream_event(self, streaming_result: RunnerStreamingResult, final_output_item: FinalOutputItem):
"""Complete the streaming execution by adding a sentinel."""
final_output_event = RunItemStreamEvent(
name="agent.execution_complete",
item=final_output_item,
)
streaming_result.put_nowait(final_output_event)
try:
final_output_event = RunItemStreamEvent(
name="agent.execution_complete",
item=final_output_item,
)
streaming_result.put_nowait(final_output_event)

runner_result: RunnerResult = final_output_item.data
runner_result: RunnerResult = final_output_item.data

# set up the final answer
streaming_result.answer = runner_result.answer if runner_result else None
streaming_result.step_history = self.step_history.copy()
streaming_result._is_complete = True
# set up the final answer
streaming_result.answer = runner_result.answer if runner_result else None
streaming_result.step_history = self.step_history.copy()
streaming_result._is_complete = True

except Exception as e:
log.error(f"Failed to create execution complete stream event: {e}")
raise e

def _add_assistant_response_to_memory(self, final_output_item: FinalOutputItem):
def _add_assistant_response_to_memory(self, final_output_item: FinalOutputItem, turn_id: Any = None):
# add the assistant response to the conversation memory
if self.use_conversation_memory and self.conversation_memory._pending_user_query is not None:
self.conversation_memory.add_assistant_response(
AssistantResponse(
response_str=final_output_item.data.answer,
metadata={
"step_history": final_output_item.data.step_history.copy()
},
try:
if self.use_conversation_memory and turn_id is not None:
# Only add if we have a valid turn_id
self.conversation_memory.add_assistant_response(
AssistantResponse(
response_str=final_output_item.data.answer,
metadata={
"step_history": final_output_item.data.step_history.copy()
},
),
turn_id=turn_id
)
)
elif self.use_conversation_memory:
log.warning("Skipping add_assistant_response - no turn_id available")
except Exception as e:
log.error(f"Failed to add assistant response to memory: {e}")
# Don't re-raise, just log the error
return

def create_response_span(self, runner_result, step_count: int, streaming_result: RunnerStreamingResult, runner_span_instance, workflow_status: str = "stream_completed"):

Expand Down Expand Up @@ -395,7 +409,8 @@ async def _process_stream_final_step(
answer: Any,
step_count: int,
streaming_result,
runner_span_instance
runner_span_instance,
turn_id: Any = None,
) -> FinalOutputItem:
"""Process the final step and trace it."""
# processed_data = self._get_final_answer(function)
Expand All @@ -422,8 +437,13 @@ async def _process_stream_final_step(
self._create_execution_complete_stream_event(
streaming_result, final_output_item
)

# Ensure event is in queue by yielding control briefly
# This allows the event loop to process the put_nowait before we return
await asyncio.sleep(0) # Small delay to ensure event is properly queued

# add the assistant response to the conversation memory
self._add_assistant_response_to_memory(final_output_item)
self._add_assistant_response_to_memory(final_output_item, turn_id)
return final_output_item

# TODO: improved after the finish function is refactored
Expand Down Expand Up @@ -561,6 +581,7 @@ def call(
self.step_history
) # a reference to the step history

turn_id = None
if self.use_conversation_memory:
# Reset any pending query state before starting a new query
self.conversation_memory.reset_pending_query()
Expand All @@ -570,7 +591,7 @@ def call(

# meta data is all keys in the list of context_str
query_metadata = {"context_str": prompt_kwargs.get("context_str", None)}
self.conversation_memory.add_user_query(
turn_id = self.conversation_memory.add_user_query(
UserQuery(
query_str=prompt_kwargs.get("input_str", None),
metadata=query_metadata,
Expand Down Expand Up @@ -672,14 +693,15 @@ def call(
)

# Add assistant response to conversation memory
if self.use_conversation_memory:
if self.use_conversation_memory and turn_id is not None:
self.conversation_memory.add_assistant_response(
AssistantResponse(
response_str=processed_data,
metadata={
"step_history": self.step_history.copy()
},
)
),
turn_id=turn_id
)

step_count += 1 # Increment step count before breaking
Expand Down Expand Up @@ -917,6 +939,7 @@ async def acall(
self.step_history
) # a reference to the step history

turn_id = None
if self.use_conversation_memory:
# Reset any pending query state before starting a new query
self.conversation_memory.reset_pending_query()
Expand All @@ -926,7 +949,7 @@ async def acall(

# meta data is all keys in the list of context_str
query_metadata = {"context_str": prompt_kwargs.get("context_str", None)}
self.conversation_memory.add_user_query(
turn_id = self.conversation_memory.add_user_query(
UserQuery(
query_str=prompt_kwargs.get("input_str", None),
metadata=query_metadata,
Expand Down Expand Up @@ -1043,14 +1066,15 @@ async def acall(
)

# Add assistant response to conversation memory
if self.use_conversation_memory:
if self.use_conversation_memory and turn_id is not None:
self.conversation_memory.add_assistant_response(
AssistantResponse(
response_str=answer,
metadata={
"step_history": self.step_history.copy()
},
)
),
turn_id=turn_id
)


Expand Down Expand Up @@ -1268,6 +1292,7 @@ async def impl_astream(
"""
workflow_status: Literal["streaming", "stream_completed", "stream_failed", "stream_incomplete"] = "streaming"
# Create runner span for tracing streaming execution
turn_id = None
with runner_span(
runner_id=id or f"stream_runner_{hash(str(prompt_kwargs))}",
max_steps=self.max_steps,
Expand All @@ -1288,7 +1313,7 @@ async def impl_astream(

# meta data is all keys in the list of context_str
query_metadata = {"context_str": prompt_kwargs.get("context_str", None)}
self.conversation_memory.add_user_query(
turn_id = self.conversation_memory.add_user_query(
UserQuery(
query_str=prompt_kwargs.get("input_str", None),
metadata=query_metadata,
Expand Down Expand Up @@ -1386,10 +1411,10 @@ async def impl_astream(

else: # non-streaming cases
# yield the final planner response
if output.data is None:
if output.data is None or (not isinstance(output.data, Function)):

# recoverable errors, continue to create stepout
current_error = output.error
current_error = f"Error: {output.error} - data: {output.data}, raw_response: {output.raw_response}"
# wrap the error in a RawResponsesStreamEvent
wrapped_event = RawResponsesStreamEvent(
data=None, # no data in this case
Expand All @@ -1411,6 +1436,10 @@ async def impl_astream(
item=step_item,
)
streaming_result.put_nowait(step_complete_event)

# Ensure event is processed before continuing
await asyncio.sleep(0) # Yield control to allow queue processing

self.step_history.append(step_output)

if output.error is not None:
Expand Down Expand Up @@ -1467,14 +1496,26 @@ async def impl_astream(
log.debug(f"function: {function}")

if self._check_last_step(function): # skip stepoutput
answer = self._get_final_answer(function)
try:
answer = self._get_final_answer(function)
except Exception as e:
# If processing the final answer fails, use the raw answer
log.warning(f"Failed to process final answer: {e}. Using raw answer.")
answer = function._answer if hasattr(function, "_answer") else str(e)

final_output_item = await self._process_stream_final_step(
answer=answer,
step_count=step_count,
streaming_result=streaming_result,
runner_span_instance=runner_span_instance,
turn_id=turn_id,
)
workflow_status = "stream_completed"

# Ensure the queue has processed the execution_complete event
# Add a small yield to allow the event loop to process the queued events
await asyncio.sleep(0) # Yield control to allow queue processing

break

# Check if permission is required and emit permission event
Expand Down Expand Up @@ -1556,6 +1597,10 @@ async def impl_astream(
name="agent.step_complete", item=step_item
)
streaming_result.put_nowait(step_event)

# Ensure event is processed before continuing
await asyncio.sleep(0) # Yield control to allow queue processing

step_count += 1

except asyncio.CancelledError:
Expand All @@ -1580,7 +1625,7 @@ async def impl_astream(
streaming_result._is_complete = True

# Add cancellation response to conversation memory
if self.use_conversation_memory:
if self.use_conversation_memory and turn_id is not None:
self.conversation_memory.add_assistant_response(
AssistantResponse(
response_str="I apologize, but the execution was cancelled by the user.",
Expand All @@ -1589,7 +1634,8 @@ async def impl_astream(
"status": "cancelled",
"timestamp": datetime.now().isoformat()
}
)
),
turn_id=turn_id
)

# Signal completion and break
Expand Down Expand Up @@ -1627,6 +1673,12 @@ async def impl_astream(
workflow_status = "stream_incomplete"
current_error = f"No output generated after {step_count} steps (max_steps: {self.max_steps})"

# Only emit execution_complete if we created a new final_output_item
# (i.e., when the loop ended without a final answer)
self._create_execution_complete_stream_event(
streaming_result=streaming_result,
final_output_item=final_output_item,
)

runner_span_instance.span_data.update_attributes(
{
Expand All @@ -1644,11 +1696,6 @@ async def impl_astream(
error=current_error,
)

self._create_execution_complete_stream_event(
streaming_result=streaming_result,
final_output_item=final_output_item,
)

# create response span for final output
# if workflow_status in ["stream_incomplete", "stream_failed"]:
self.create_response_span(
Expand Down
Loading
Loading