Skip to content

Commit 3d444cd

Browse files
authored
Merge pull request #227 from UiPath/fix/refactor_runtime
fix: refactor runtime execute
2 parents 38324ad + 015d49b commit 3d444cd

File tree

10 files changed

+3070
-2167
lines changed

10 files changed

+3070
-2167
lines changed

pyproject.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
[project]
22
name = "uipath-langchain"
3-
version = "0.0.140"
3+
version = "0.0.141"
44
description = "UiPath Langchain"
55
readme = { file = "README.md", content-type = "text/markdown" }
66
requires-python = ">=3.10"
77
dependencies = [
8-
"uipath>=2.1.76, <2.2.0",
8+
"uipath>=2.1.96, <2.2.0",
99
"langgraph>=0.5.0, <0.7.0",
1010
"langchain-core>=0.3.34",
1111
"langgraph-checkpoint-sqlite>=2.0.3",
Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from typing import Any, Optional, Union
1+
from typing import Optional
22

33
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
44
from uipath._cli._runtime._contracts import UiPathRuntimeContext
@@ -7,10 +7,4 @@
77
class LangGraphRuntimeContext(UiPathRuntimeContext):
88
"""Context information passed throughout the runtime execution."""
99

10-
output: Optional[Any] = None
11-
state: Optional[Any] = (
12-
None # TypedDict issue, the actual type is: Optional[langgraph.types.StateSnapshot]
13-
)
1410
memory: Optional[AsyncSqliteSaver] = None
15-
langsmith_tracing_enabled: Union[str, bool, None] = False
16-
resume_triggers_table: str = "__uipath_resume_triggers"
Lines changed: 125 additions & 117 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import logging
22
from typing import Any, Optional, cast
33

4+
from langgraph.checkpoint.sqlite.aio import AsyncSqliteSaver
45
from langgraph.types import Command
56
from uipath._cli._runtime._contracts import (
67
UiPathApiTrigger,
@@ -17,123 +18,130 @@
1718
logger = logging.getLogger(__name__)
1819

1920

20-
class LangGraphInputProcessor:
21+
async def get_graph_input(
22+
context: LangGraphRuntimeContext,
23+
memory: AsyncSqliteSaver,
24+
resume_triggers_table: str = "__uipath_resume_triggers",
25+
) -> Any:
2126
"""
22-
Handles input processing for graph execution, including resume scenarios
23-
where it needs to fetch data from UiPath.
27+
Process the input data for graph execution, handling both fresh starts and resume scenarios.
28+
29+
This method determines whether the graph is being executed fresh or resumed from a previous state.
30+
For fresh executions, it returns the input JSON directly. For resume scenarios, it fetches
31+
the latest trigger information from the database and constructs a Command object with the
32+
appropriate resume data.
33+
34+
The method handles different types of resume triggers:
35+
- API triggers: Creates an UiPathApiTrigger with inbox_id and request payload
36+
- Other triggers: Uses the HitlReader to process the resume data
37+
38+
Args:
39+
context: The runtime context for the graph execution.
40+
memory: AsyncSqliteSaver. The async database saver used to fetch resume trigger data.
41+
resume_triggers_table: str, optional. The name of the database table containing resume triggers (default: "__uipath_resume_triggers").
42+
43+
Returns:
44+
Any: For fresh executions, returns the input JSON data directly.
45+
For resume scenarios, returns a Command object containing the resume data
46+
processed through the appropriate trigger handler.
47+
48+
Raises:
49+
LangGraphRuntimeError: If there's an error fetching trigger data from the database
50+
during resume processing.
2451
"""
25-
26-
def __init__(self, context: LangGraphRuntimeContext):
27-
"""
28-
Initialize the LangGraphInputProcessor.
29-
30-
Args:
31-
context: The runtime context for the graph execution.
32-
"""
33-
self.context = context
34-
35-
async def process(self) -> Any:
36-
"""
37-
Process the input data for graph execution, handling both fresh starts and resume scenarios.
38-
39-
This method determines whether the graph is being executed fresh or resumed from a previous state.
40-
For fresh executions, it returns the input JSON directly. For resume scenarios, it fetches
41-
the latest trigger information from the database and constructs a Command object with the
42-
appropriate resume data.
43-
44-
The method handles different types of resume triggers:
45-
- API triggers: Creates an UiPathApiTrigger with inbox_id and request payload
46-
- Other triggers: Uses the HitlReader to process the resume data
47-
48-
Returns:
49-
Any: For fresh executions, returns the input JSON data directly.
50-
For resume scenarios, returns a Command object containing the resume data
51-
processed through the appropriate trigger handler.
52-
53-
Raises:
54-
LangGraphRuntimeError: If there's an error fetching trigger data from the database
55-
during resume processing.
56-
"""
57-
logger.debug(f"Resumed: {self.context.resume} Input: {self.context.input_json}")
58-
59-
if not self.context.resume:
60-
if self.context.input_message:
61-
return {
62-
"messages": uipath_to_human_messages(self.context.input_message)
63-
}
64-
return self.context.input_json
65-
66-
if self.context.input_json:
67-
return Command(resume=self.context.input_json)
68-
69-
trigger = await self._get_latest_trigger()
70-
if not trigger:
71-
return Command(resume=self.context.input_json)
72-
73-
trigger_type, key, folder_path, folder_key, payload = trigger
74-
resume_trigger = UiPathResumeTrigger(
75-
trigger_type=trigger_type,
76-
item_key=key,
77-
folder_path=folder_path,
78-
folder_key=folder_key,
79-
payload=payload,
52+
logger.debug(f"Resumed: {context.resume} Input: {context.input_json}")
53+
54+
# Fresh execution - return input directly
55+
if not context.resume:
56+
if context.input_message:
57+
return {"messages": uipath_to_human_messages(context.input_message)}
58+
return context.input_json
59+
60+
# Resume with explicit input provided
61+
if context.input_json:
62+
return Command(resume=context.input_json)
63+
64+
# Resume from database trigger
65+
trigger = await _get_latest_trigger(
66+
memory, resume_triggers_table=resume_triggers_table
67+
)
68+
if not trigger:
69+
return Command(resume=context.input_json)
70+
71+
trigger_type, key, folder_path, folder_key, payload = trigger
72+
resume_trigger = UiPathResumeTrigger(
73+
trigger_type=trigger_type,
74+
item_key=key,
75+
folder_path=folder_path,
76+
folder_key=folder_key,
77+
payload=payload,
78+
)
79+
logger.debug(f"ResumeTrigger: {trigger_type} {key}")
80+
81+
# Populate back expected fields for api_triggers
82+
if resume_trigger.trigger_type == UiPathResumeTriggerType.API:
83+
resume_trigger.api_resume = UiPathApiTrigger(
84+
inbox_id=resume_trigger.item_key, request=resume_trigger.payload
8085
)
81-
logger.debug(f"ResumeTrigger: {trigger_type} {key}")
82-
83-
# populate back expected fields for api_triggers
84-
if resume_trigger.trigger_type == UiPathResumeTriggerType.API:
85-
resume_trigger.api_resume = UiPathApiTrigger(
86-
inbox_id=resume_trigger.item_key, request=resume_trigger.payload
87-
)
88-
return Command(resume=await HitlReader.read(resume_trigger))
89-
90-
async def _get_latest_trigger(self) -> Optional[tuple[str, str, str, str, str]]:
91-
"""
92-
Fetch the most recent resume trigger from the database.
93-
94-
This private method queries the resume triggers table to retrieve the latest trigger
95-
information based on timestamp. It handles database connection setup and executes
96-
a SQL query to fetch trigger data needed for resume operations.
97-
98-
The method returns trigger information as a tuple containing:
99-
- type: The type of trigger (e.g., 'API', 'MANUAL', etc.)
100-
- key: The unique identifier for the trigger/item
101-
- folder_path: The path to the folder containing the trigger
102-
- folder_key: The unique identifier for the folder
103-
- payload: The serialized payload data associated with the trigger
104-
105-
Returns:
106-
Optional[tuple[str, str, str, str, str]]: A tuple containing (type, key, folder_path,
107-
folder_key, payload) for the most recent trigger, or None if no triggers are found
108-
or if the memory context is not available.
109-
110-
Raises:
111-
LangGraphRuntimeError: If there's an error during database connection setup, query
112-
execution, or result fetching. The original exception is wrapped with context
113-
about the database operation failure.
114-
"""
115-
if self.context.memory is None:
116-
return None
117-
try:
118-
await self.context.memory.setup()
119-
async with (
120-
self.context.memory.lock,
121-
self.context.memory.conn.cursor() as cur,
122-
):
123-
await cur.execute(f"""
124-
SELECT type, key, folder_path, folder_key, payload
125-
FROM {self.context.resume_triggers_table}
126-
ORDER BY timestamp DESC
127-
LIMIT 1
128-
""")
129-
result = await cur.fetchone()
130-
if result is None:
131-
return None
132-
return cast(tuple[str, str, str, str, str], tuple(result))
133-
except Exception as e:
134-
raise LangGraphRuntimeError(
135-
"DB_QUERY_FAILED",
136-
"Database query failed",
137-
f"Error querying resume trigger information: {str(e)}",
138-
UiPathErrorCategory.SYSTEM,
139-
) from e
86+
87+
return Command(resume=await HitlReader.read(resume_trigger))
88+
89+
90+
async def _get_latest_trigger(
91+
memory: AsyncSqliteSaver,
92+
resume_triggers_table: str = "__uipath_resume_triggers",
93+
) -> Optional[tuple[str, str, str, str, str]]:
94+
"""
95+
Fetch the most recent resume trigger from the database.
96+
97+
This private method queries the resume triggers table to retrieve the latest trigger
98+
information based on timestamp. It handles database connection setup and executes
99+
a SQL query to fetch trigger data needed for resume operations.
100+
101+
The method returns trigger information as a tuple containing:
102+
- type: The type of trigger (e.g., 'API', 'MANUAL', etc.)
103+
- key: The unique identifier for the trigger/item
104+
- folder_path: The path to the folder containing the trigger
105+
- folder_key: The unique identifier for the folder
106+
- payload: The serialized payload data associated with the trigger
107+
108+
Args:
109+
memory: The AsyncSqliteSaver instance used to access the database.
110+
resume_triggers_table: The name of the table containing resume triggers (default: "__uipath_resume_triggers").
111+
112+
Returns:
113+
Optional[tuple[str, str, str, str, str]]: A tuple containing (type, key, folder_path,
114+
folder_key, payload) for the most recent trigger, or None if no triggers are found
115+
or if the memory context is not available.
116+
117+
Raises:
118+
LangGraphRuntimeError: If there's an error during database connection setup, query
119+
execution, or result fetching. The original exception is wrapped with context
120+
about the database operation failure.
121+
"""
122+
if memory is None:
123+
return None
124+
125+
try:
126+
await memory.setup()
127+
async with (
128+
memory.lock,
129+
memory.conn.cursor() as cur,
130+
):
131+
await cur.execute(f"""
132+
SELECT type, key, folder_path, folder_key, payload
133+
FROM {resume_triggers_table}
134+
ORDER BY timestamp DESC
135+
LIMIT 1
136+
""")
137+
result = await cur.fetchone()
138+
if result is None:
139+
return None
140+
return cast(tuple[str, str, str, str, str], tuple(result))
141+
except Exception as e:
142+
raise LangGraphRuntimeError(
143+
"DB_QUERY_FAILED",
144+
"Database query failed",
145+
f"Error querying resume trigger information: {str(e)}",
146+
UiPathErrorCategory.SYSTEM,
147+
) from e

0 commit comments

Comments
 (0)