diff --git a/src/uipath_langchain/_cli/_runtime/_conversation.py b/src/uipath_langchain/_cli/_runtime/_conversation.py index 6a588951..0f596e26 100644 --- a/src/uipath_langchain/_cli/_runtime/_conversation.py +++ b/src/uipath_langchain/_cli/_runtime/_conversation.py @@ -1,3 +1,4 @@ +import logging import uuid from datetime import datetime from typing import Any, Dict, List, Optional @@ -9,6 +10,7 @@ HumanMessage, ToolMessage, ) +from pydantic import TypeAdapter, ValidationError from uipath.agent.conversation import ( UiPathConversationContentPartChunkEvent, UiPathConversationContentPartEndEvent, @@ -26,6 +28,15 @@ UiPathInlineValue, ) +from uipath_langchain.chat.content_blocks import ( + ContentBlock, + TextContent, + ToolCallChunkContent, + ToolCallContent, +) + +logger = logging.getLogger(__name__) + def _new_id() -> str: return str(uuid.uuid4()) @@ -125,54 +136,68 @@ def map_message( ) elif isinstance(message.content, list) and message.content: - for chunk in message.content: - if not isinstance(chunk, dict): + content_adapter = TypeAdapter(ContentBlock) + + for raw_chunk in message.content: + if not isinstance(raw_chunk, dict): continue - idx = chunk.get("index", 0) - ctype = chunk.get("type") - id = chunk.get("id", f"chunk-{message.id}-{idx}") - - # Start of a tool call - if ctype == "tool_use": - msg_event.tool_call = UiPathConversationToolCallEvent( - tool_call_id=id, - start=UiPathConversationToolCallStartEvent( - tool_name=chunk.get("name") or "", - arguments=UiPathInlineValue(inline=""), - timestamp=timestamp, - ), - ) - # JSON args streaming (content part for tool args) - elif ctype == "input_json_delta": - text = chunk.get("partial_json", "") - # first delta: emit content part start + first chunk - if text == "": + try: + # Parse chunk + chunk = content_adapter.validate_python(raw_chunk) + + if isinstance(chunk, TextContent): + chunk_id = raw_chunk.get("id", f"chunk-{message.id}-0") msg_event.content_part = UiPathConversationContentPartEvent( - content_part_id=id, - start=UiPathConversationContentPartStartEvent( - mime_type="application/json" + content_part_id=chunk_id, + chunk=UiPathConversationContentPartChunkEvent( + data=chunk.text, + content_part_sequence=0, ), ) - else: - msg_event.content_part = UiPathConversationContentPartEvent( - content_part_id=id, - chunk=UiPathConversationContentPartChunkEvent( - data=text, - content_part_sequence=idx, + + elif isinstance(chunk, ToolCallContent): + # Complete tool call (non-streaming) + msg_event.tool_call = UiPathConversationToolCallEvent( + tool_call_id=chunk.id, + start=UiPathConversationToolCallStartEvent( + tool_name=chunk.name, + arguments=UiPathInlineValue(inline=str(chunk.args)), + timestamp=timestamp, ), + end=UiPathConversationToolCallEndEvent(timestamp=timestamp), ) - # Plain text from assistant - elif ctype == "text": - text = chunk.get("text", "") - msg_event.content_part = UiPathConversationContentPartEvent( - content_part_id=id, - chunk=UiPathConversationContentPartChunkEvent( - data=text, - content_part_sequence=idx, - ), + elif isinstance(chunk, ToolCallChunkContent): + # Streaming tool call chunk + chunk_id = chunk.id or f"chunk-{message.id}-{chunk.index or 0}" + + if chunk.name and not chunk.args: + # Tool call start + msg_event.tool_call = UiPathConversationToolCallEvent( + tool_call_id=chunk_id, + start=UiPathConversationToolCallStartEvent( + tool_name=chunk.name, + arguments=UiPathInlineValue(inline=""), + timestamp=timestamp, + ), + ) + elif chunk.args: + # Streaming tool arguments + msg_event.content_part = UiPathConversationContentPartEvent( + content_part_id=chunk_id, + chunk=UiPathConversationContentPartChunkEvent( + data=str(chunk.args), + content_part_sequence=chunk.index or 0, + ), + ) + + except ValidationError as e: + # Log and skip unknown/invalid chunk types + logger.warning( + f"Failed to parse content chunk: {raw_chunk}. Error: {e}" ) + continue elif isinstance(message.content, str) and message.content: msg_event.content_part = UiPathConversationContentPartEvent( content_part_id=f"content-{message.id}", diff --git a/src/uipath_langchain/_cli/_runtime/_runtime.py b/src/uipath_langchain/_cli/_runtime/_runtime.py index 6fc20e78..e4cd642d 100644 --- a/src/uipath_langchain/_cli/_runtime/_runtime.py +++ b/src/uipath_langchain/_cli/_runtime/_runtime.py @@ -25,7 +25,7 @@ UiPathAgentStateEvent, UiPathRuntimeEvent, ) - +from ._conversation import map_message from .._utils._schema import generate_schema_from_graph from ._context import LangGraphRuntimeContext from ._exception import LangGraphErrorCode, LangGraphRuntimeError @@ -159,11 +159,24 @@ async def stream( if chunk_type == "messages": if isinstance(data, tuple): message, _ = data - event = UiPathAgentMessageEvent( - payload=message, - execution_id=self.context.execution_id, + + # Use stored conversation/exchange IDs from input, or fallback to execution_id + conversation_id = getattr(self.context, "conversation_id", None) or self.context.execution_id + exchange_id = getattr(self.context, "exchange_id", None) or self.context.execution_id + + conversation_event = map_message( + message=message, + exchange_id=exchange_id, + conversation_id=conversation_id, ) - yield event + + # Only emit if conversion was successful + if conversation_event: + event = UiPathAgentMessageEvent( + payload=conversation_event, + execution_id=self.context.execution_id, + ) + yield event # Emit UiPathAgentStateEvent for state updates elif chunk_type == "updates": diff --git a/src/uipath_langchain/_cli/_utils/_config.py b/src/uipath_langchain/_cli/_utils/_config.py new file mode 100644 index 00000000..f41d4cfa --- /dev/null +++ b/src/uipath_langchain/_cli/_utils/_config.py @@ -0,0 +1,43 @@ +import json +import logging +import os +from typing import Any, Dict, Optional + +logger = logging.getLogger(__name__) + + +class UiPathConfig: + """Configuration from uipath.json""" + + def __init__(self, config_path: str = "uipath.json"): + self.config_path = config_path + self._config: Optional[Dict[str, Any]] = None + + @property + def exists(self) -> bool: + """Check if uipath.json exists""" + return os.path.exists(self.config_path) + + def load_config(self) -> Dict[str, Any]: + """Load and validate configuration""" + if not self.exists: + raise FileNotFoundError(f"Config file not found: {self.config_path}") + + try: + with open(self.config_path, "r") as f: + config = json.load(f) + + self._config = config + return config + except Exception as e: + logger.error(f"Failed to load uipath.json: {str(e)}") + raise + + @property + def is_conversational(self) -> bool: + """Check if the agent is conversational""" + if not self._config: + self.load_config() + + # Check isConversational at root level (testing purposes only) + return self._config.get("isConversational", False) if self._config else False diff --git a/src/uipath_langchain/_cli/cli_run.py b/src/uipath_langchain/_cli/cli_run.py index 819c2ae9..34b3d5cf 100644 --- a/src/uipath_langchain/_cli/cli_run.py +++ b/src/uipath_langchain/_cli/cli_run.py @@ -1,5 +1,7 @@ import asyncio +import logging import os +import json from typing import Optional from openinference.instrumentation.langchain import ( @@ -7,13 +9,17 @@ get_current_span, ) from uipath._cli._debug._bridge import ConsoleDebugBridge, UiPathDebugBridge +from uipath._cli._conversational._bridge import get_conversation_bridge +from uipath._cli._conversational._runtime import UiPathConversationRuntime from uipath._cli._runtime._contracts import ( UiPathRuntimeFactory, UiPathRuntimeResult, ) from uipath._cli.middlewares import MiddlewareResult -from uipath._events._events import UiPathAgentStateEvent +from uipath._events._events import UiPathAgentStateEvent, UiPathAgentMessageEvent from uipath.tracing import JsonLinesFileExporter, LlmOpsHttpExporter +from uipath.agent.conversation import UiPathConversationMessage +from pydantic import TypeAdapter from .._tracing import ( _instrument_traceable_attributes, @@ -23,9 +29,12 @@ LangGraphRuntimeContext, LangGraphScriptRuntime, ) +from ._utils._config import UiPathConfig from ._utils._graph import LangGraphConfig +logger = logging.getLogger(__name__) + def langgraph_run_middleware( entrypoint: Optional[str], input: Optional[str], @@ -50,6 +59,31 @@ async def execute(): context.execution_id = context.job_id or "default" _instrument_traceable_attributes() + # Check if this is a conversational agent + uipath_config = UiPathConfig() + is_conversational = False + if uipath_config.exists: + is_conversational = uipath_config.is_conversational + context.is_conversational = is_conversational + + if is_conversational and context.input: + try: + input_dict = json.loads(context.input) + + conversation_id = input_dict.get("conversation_id") or input_dict.get("conversationId") + exchange_id = input_dict.get("exchange_id") or input_dict.get("exchangeId") + + # Store IDs in context for reuse in output + if conversation_id: + context.conversation_id = conversation_id + if exchange_id: + context.exchange_id = exchange_id + + context.input_message = TypeAdapter(UiPathConversationMessage).validate_python(input_dict) + logger.info(f"Parsed conversational input: message_id={context.input_message.message_id}, conversation_id={conversation_id}, exchange_id={exchange_id}") + except Exception as e: + logger.warning(f"Failed to parse input as UiPathConversationMessage: {e}. Using as plain JSON.") + def generate_runtime( ctx: LangGraphRuntimeContext, ) -> LangGraphScriptRuntime: @@ -64,6 +98,7 @@ def generate_runtime( LangGraphScriptRuntime, LangGraphRuntimeContext, runtime_generator=generate_runtime, + context_generator=lambda: context, ) runtime_factory.add_instrumentor(LangChainInstrumentor, get_current_span) @@ -75,8 +110,22 @@ def generate_runtime( runtime_factory.add_span_exporter( LlmOpsHttpExporter(extra_process_spans=True) ) + + # Handle conversational agents + if is_conversational: + conversation_bridge = get_conversation_bridge(context) + async with UiPathConversationRuntime.from_conversation_context( + context=context, + factory=runtime_factory, + conversation_bridge=conversation_bridge, + ) as conversation_runtime: + await conversation_runtime.execute() + # Handle non-conversational agents + elif context.job_id: + # Cloud execution - direct runtime execution await runtime_factory.execute(context) else: + # Local execution - stream with debug bridge for visibility debug_bridge: UiPathDebugBridge = ConsoleDebugBridge() await debug_bridge.emit_execution_started(context.execution_id) async for event in runtime_factory.stream(context): diff --git a/src/uipath_langchain/chat/__init__.py b/src/uipath_langchain/chat/__init__.py index 78e3305d..ae35872f 100644 --- a/src/uipath_langchain/chat/__init__.py +++ b/src/uipath_langchain/chat/__init__.py @@ -1,6 +1,8 @@ +from .content_blocks import ContentBlock from .models import UiPathAzureChatOpenAI, UiPathChat __all__ = [ "UiPathChat", "UiPathAzureChatOpenAI", + "ContentBlock" ] diff --git a/src/uipath_langchain/chat/content_blocks.py b/src/uipath_langchain/chat/content_blocks.py new file mode 100644 index 00000000..ae0bf7bb --- /dev/null +++ b/src/uipath_langchain/chat/content_blocks.py @@ -0,0 +1,109 @@ +from typing import Literal, Optional, List, Any, Dict, Annotated, Union + +from pydantic import BaseModel, Field + + +class TextContent(BaseModel): + type: Literal["text"] + text: str = Field(alias="text") + annotations: Optional[List[Any]] = Field(default=None, alias="annotations") + extras: Optional[Dict[str, Any]] = Field(default=None, alias="extras") + +class ReasoningContent(BaseModel): + type: Literal["reasoning"] + reasoning: str = Field(alias="reasoning") + extras: Optional[Dict[str, Any]] = Field(default=None, alias="extras") + +class ImageContent(BaseModel): + type: Literal["image"] + url: Optional[str] = Field(default=None, alias="url") + base64: Optional[str] = Field(default=None, alias="base64") + id: Optional[str] = Field(default=None, alias="id") + mime_type: Optional[str] = Field(default=None, alias="mime_type") + +class AudioContent(BaseModel): + type: Literal["audio"] + url: Optional[str] = Field(default=None, alias="url") + base64: Optional[str] = Field(default=None, alias="base64") + id: Optional[str] = Field(default=None, alias="id") + mime_type: Optional[str] = Field(default=None, alias="mime_type") + +class VideoContent(BaseModel): + type: Literal["video"] + url: Optional[str] = Field(default=None, alias="url") + base64: Optional[str] = Field(default=None, alias="base64") + id: Optional[str] = Field(default=None, alias="id") + mime_type: Optional[str] = Field(default=None, alias="mime_type") + +class FileContent(BaseModel): + type: Literal["file"] + url: Optional[str] = Field(default=None, alias="url") + base64: Optional[str] = Field(default=None, alias="base64") + id: Optional[str] = Field(default=None, alias="id") + mime_type: Optional[str] = Field(default=None, alias="mime_type") + +class PlainTextContent(BaseModel): + type: Literal["text-plain"] + text: Optional[str] = Field(default=None, alias="text-plain") + mime_type: Optional[str] = Field(default=None, alias="mime_type") + +class ToolCallContent(BaseModel): + type: Literal["tool_call"] + name: str = Field(alias="name") + args: Dict[str, Any] = Field(alias="args") + id: str = Field(alias="id") + +class ToolCallChunkContent(BaseModel): + type: Literal["tool_call_chunk"] + name: Optional[str] = Field(default=None, alias="name") + args: Optional[str] = Field(default=None, alias="args") + id: Optional[str] = Field(default=None, alias="id") + index: Optional[int | str] = Field(default=None, alias="index") + +class InvalidToolCallContent(BaseModel): + type: Literal["invalid_tool_call"] + name: Optional[str] = Field(default=None, alias="name") + args: Optional[Dict[str, Any]] = Field(default=None, alias="args") + error: Optional[str] = Field(default=None, alias="error") + +class ServerToolCallContent(BaseModel): + type: Literal["server_tool_call"] + id: str = Field(alias="id") + name: str = Field(alias="name") + args: Dict[str, Any] = Field(default=None, alias="args") + +class ServerToolCallChunkContent(BaseModel): + type: Literal["server_tool_call_chunk"] + id: Optional[str] = Field(default=None, alias="id") + name: Optional[str] = Field(default=None, alias="name") + args: Optional[Dict[str, Any]] = Field(default=None, alias="args") + index: Optional[int | str] = Field(default=None, alias="index") + +class ServerToolResultContent(BaseModel): + type: Literal["server_tool_result"] + toll_call_id: str = Field(alias="toll_call_id") + id: Optional[str] = Field(default=None, alias="id") + status: str = Field(alias="status") + output: Optional[Any] = Field(default=None, alias="output") + +ContentBlock = Annotated[ + Union[ + TextContent, + ReasoningContent, + ImageContent, + AudioContent, + VideoContent, + FileContent, + PlainTextContent, + ToolCallContent, + ToolCallChunkContent, + InvalidToolCallContent, + ServerToolCallContent, + ServerToolCallChunkContent, + ServerToolResultContent, + InvalidToolCallContent, + ServerToolResultContent + ], + Field(discriminator="type") +] +