Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 64 additions & 39 deletions src/uipath_langchain/_cli/_runtime/_conversation.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import uuid
from datetime import datetime
from typing import Any, Dict, List, Optional
Expand All @@ -9,6 +10,7 @@
HumanMessage,
ToolMessage,
)
from pydantic import TypeAdapter, ValidationError
from uipath.agent.conversation import (
UiPathConversationContentPartChunkEvent,
UiPathConversationContentPartEndEvent,
Expand All @@ -26,6 +28,15 @@
UiPathInlineValue,
)

from uipath_langchain.chat.content_blocks import (
ContentBlock,
TextContent,
ToolCallChunkContent,
ToolCallContent,
)

logger = logging.getLogger(__name__)


def _new_id() -> str:
return str(uuid.uuid4())
Expand Down Expand Up @@ -125,54 +136,68 @@ def map_message(
)

elif isinstance(message.content, list) and message.content:
for chunk in message.content:
if not isinstance(chunk, dict):
content_adapter = TypeAdapter(ContentBlock)

for raw_chunk in message.content:
if not isinstance(raw_chunk, dict):
continue
idx = chunk.get("index", 0)
ctype = chunk.get("type")
id = chunk.get("id", f"chunk-{message.id}-{idx}")

# Start of a tool call
if ctype == "tool_use":
msg_event.tool_call = UiPathConversationToolCallEvent(
tool_call_id=id,
start=UiPathConversationToolCallStartEvent(
tool_name=chunk.get("name") or "",
arguments=UiPathInlineValue(inline=""),
timestamp=timestamp,
),
)

# JSON args streaming (content part for tool args)
elif ctype == "input_json_delta":
text = chunk.get("partial_json", "")
# first delta: emit content part start + first chunk
if text == "":
try:
# Parse chunk
chunk = content_adapter.validate_python(raw_chunk)

if isinstance(chunk, TextContent):
chunk_id = raw_chunk.get("id", f"chunk-{message.id}-0")
msg_event.content_part = UiPathConversationContentPartEvent(
content_part_id=id,
start=UiPathConversationContentPartStartEvent(
mime_type="application/json"
content_part_id=chunk_id,
chunk=UiPathConversationContentPartChunkEvent(
data=chunk.text,
content_part_sequence=0,
),
)
else:
msg_event.content_part = UiPathConversationContentPartEvent(
content_part_id=id,
chunk=UiPathConversationContentPartChunkEvent(
data=text,
content_part_sequence=idx,

elif isinstance(chunk, ToolCallContent):
# Complete tool call (non-streaming)
msg_event.tool_call = UiPathConversationToolCallEvent(
tool_call_id=chunk.id,
start=UiPathConversationToolCallStartEvent(
tool_name=chunk.name,
arguments=UiPathInlineValue(inline=str(chunk.args)),
timestamp=timestamp,
),
end=UiPathConversationToolCallEndEvent(timestamp=timestamp),
)

# Plain text from assistant
elif ctype == "text":
text = chunk.get("text", "")
msg_event.content_part = UiPathConversationContentPartEvent(
content_part_id=id,
chunk=UiPathConversationContentPartChunkEvent(
data=text,
content_part_sequence=idx,
),
elif isinstance(chunk, ToolCallChunkContent):
# Streaming tool call chunk
chunk_id = chunk.id or f"chunk-{message.id}-{chunk.index or 0}"

if chunk.name and not chunk.args:
# Tool call start
msg_event.tool_call = UiPathConversationToolCallEvent(
tool_call_id=chunk_id,
start=UiPathConversationToolCallStartEvent(
tool_name=chunk.name,
arguments=UiPathInlineValue(inline=""),
timestamp=timestamp,
),
)
elif chunk.args:
# Streaming tool arguments
msg_event.content_part = UiPathConversationContentPartEvent(
content_part_id=chunk_id,
chunk=UiPathConversationContentPartChunkEvent(
data=str(chunk.args),
content_part_sequence=chunk.index or 0,
),
)

except ValidationError as e:
# Log and skip unknown/invalid chunk types
logger.warning(
f"Failed to parse content chunk: {raw_chunk}. Error: {e}"
)
continue
elif isinstance(message.content, str) and message.content:
msg_event.content_part = UiPathConversationContentPartEvent(
content_part_id=f"content-{message.id}",
Expand Down
23 changes: 18 additions & 5 deletions src/uipath_langchain/_cli/_runtime/_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
UiPathAgentStateEvent,
UiPathRuntimeEvent,
)

from ._conversation import map_message
from .._utils._schema import generate_schema_from_graph
from ._context import LangGraphRuntimeContext
from ._exception import LangGraphErrorCode, LangGraphRuntimeError
Expand Down Expand Up @@ -159,11 +159,24 @@ async def stream(
if chunk_type == "messages":
if isinstance(data, tuple):
message, _ = data
event = UiPathAgentMessageEvent(
payload=message,
execution_id=self.context.execution_id,

# Use stored conversation/exchange IDs from input, or fallback to execution_id
conversation_id = getattr(self.context, "conversation_id", None) or self.context.execution_id
exchange_id = getattr(self.context, "exchange_id", None) or self.context.execution_id

conversation_event = map_message(
message=message,
exchange_id=exchange_id,
conversation_id=conversation_id,
)
yield event

# Only emit if conversion was successful
if conversation_event:
event = UiPathAgentMessageEvent(
payload=conversation_event,
execution_id=self.context.execution_id,
)
yield event

# Emit UiPathAgentStateEvent for state updates
elif chunk_type == "updates":
Expand Down
43 changes: 43 additions & 0 deletions src/uipath_langchain/_cli/_utils/_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import json
import logging
import os
from typing import Any, Dict, Optional

logger = logging.getLogger(__name__)


class UiPathConfig:
"""Configuration from uipath.json"""

def __init__(self, config_path: str = "uipath.json"):
self.config_path = config_path
self._config: Optional[Dict[str, Any]] = None

@property
def exists(self) -> bool:
"""Check if uipath.json exists"""
return os.path.exists(self.config_path)

def load_config(self) -> Dict[str, Any]:
"""Load and validate configuration"""
if not self.exists:
raise FileNotFoundError(f"Config file not found: {self.config_path}")

try:
with open(self.config_path, "r") as f:
config = json.load(f)

self._config = config
return config
except Exception as e:
logger.error(f"Failed to load uipath.json: {str(e)}")
raise

@property
def is_conversational(self) -> bool:
"""Check if the agent is conversational"""
if not self._config:
self.load_config()

# Check isConversational at root level (testing purposes only)
return self._config.get("isConversational", False) if self._config else False
51 changes: 50 additions & 1 deletion src/uipath_langchain/_cli/cli_run.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
import asyncio
import logging
import os
import json
from typing import Optional

from openinference.instrumentation.langchain import (
LangChainInstrumentor,
get_current_span,
)
from uipath._cli._debug._bridge import ConsoleDebugBridge, UiPathDebugBridge
from uipath._cli._conversational._bridge import get_conversation_bridge
from uipath._cli._conversational._runtime import UiPathConversationRuntime
from uipath._cli._runtime._contracts import (
UiPathRuntimeFactory,
UiPathRuntimeResult,
)
from uipath._cli.middlewares import MiddlewareResult
from uipath._events._events import UiPathAgentStateEvent
from uipath._events._events import UiPathAgentStateEvent, UiPathAgentMessageEvent
from uipath.tracing import JsonLinesFileExporter, LlmOpsHttpExporter
from uipath.agent.conversation import UiPathConversationMessage
from pydantic import TypeAdapter

from .._tracing import (
_instrument_traceable_attributes,
Expand All @@ -23,9 +29,12 @@
LangGraphRuntimeContext,
LangGraphScriptRuntime,
)
from ._utils._config import UiPathConfig
from ._utils._graph import LangGraphConfig


logger = logging.getLogger(__name__)

def langgraph_run_middleware(
entrypoint: Optional[str],
input: Optional[str],
Expand All @@ -50,6 +59,31 @@ async def execute():
context.execution_id = context.job_id or "default"
_instrument_traceable_attributes()

# Check if this is a conversational agent
uipath_config = UiPathConfig()
is_conversational = False
if uipath_config.exists:
is_conversational = uipath_config.is_conversational
context.is_conversational = is_conversational

if is_conversational and context.input:
try:
input_dict = json.loads(context.input)

conversation_id = input_dict.get("conversation_id") or input_dict.get("conversationId")
exchange_id = input_dict.get("exchange_id") or input_dict.get("exchangeId")

# Store IDs in context for reuse in output
if conversation_id:
context.conversation_id = conversation_id
if exchange_id:
context.exchange_id = exchange_id

context.input_message = TypeAdapter(UiPathConversationMessage).validate_python(input_dict)
logger.info(f"Parsed conversational input: message_id={context.input_message.message_id}, conversation_id={conversation_id}, exchange_id={exchange_id}")
except Exception as e:
logger.warning(f"Failed to parse input as UiPathConversationMessage: {e}. Using as plain JSON.")

def generate_runtime(
ctx: LangGraphRuntimeContext,
) -> LangGraphScriptRuntime:
Expand All @@ -64,6 +98,7 @@ def generate_runtime(
LangGraphScriptRuntime,
LangGraphRuntimeContext,
runtime_generator=generate_runtime,
context_generator=lambda: context,
)

runtime_factory.add_instrumentor(LangChainInstrumentor, get_current_span)
Expand All @@ -75,8 +110,22 @@ def generate_runtime(
runtime_factory.add_span_exporter(
LlmOpsHttpExporter(extra_process_spans=True)
)

# Handle conversational agents
if is_conversational:
conversation_bridge = get_conversation_bridge(context)
async with UiPathConversationRuntime.from_conversation_context(
context=context,
factory=runtime_factory,
conversation_bridge=conversation_bridge,
) as conversation_runtime:
await conversation_runtime.execute()
# Handle non-conversational agents
elif context.job_id:
# Cloud execution - direct runtime execution
await runtime_factory.execute(context)
else:
# Local execution - stream with debug bridge for visibility
debug_bridge: UiPathDebugBridge = ConsoleDebugBridge()
await debug_bridge.emit_execution_started(context.execution_id)
async for event in runtime_factory.stream(context):
Expand Down
2 changes: 2 additions & 0 deletions src/uipath_langchain/chat/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from .content_blocks import ContentBlock
from .models import UiPathAzureChatOpenAI, UiPathChat

__all__ = [
"UiPathChat",
"UiPathAzureChatOpenAI",
"ContentBlock"
]
Loading
Loading