Skip to content

Commit a561d90

Browse files
Updated dynamic for task to set params in right way (#352)
1 parent c37dff1 commit a561d90

File tree

5 files changed

+619
-2
lines changed

5 files changed

+619
-2
lines changed

src/conductor/asyncio_client/workflow/task/dynamic_fork_task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ def __init__(
2828

2929
def to_workflow_task(self) -> List[WorkflowTaskAdapter]:
3030
wf_task = super().to_workflow_task()
31-
wf_task.dynamic_fork_join_tasks_param = self.tasks_param
31+
wf_task.dynamic_fork_tasks_param = self.tasks_param
3232
wf_task.dynamic_fork_tasks_input_param_name = self.tasks_input_param_name
3333

3434
tasks = [wf_task]

src/conductor/client/workflow/task/dynamic_fork_task.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def __init__(self, task_ref_name: str, tasks_param: str = "dynamicTasks", tasks_
2020

2121
def to_workflow_task(self) -> WorkflowTask:
2222
wf_task = super().to_workflow_task()
23-
wf_task.dynamic_fork_join_tasks_param = self.tasks_param
23+
wf_task.dynamic_fork_tasks_param = self.tasks_param
2424
wf_task.dynamic_fork_tasks_input_param_name = self.tasks_input_param_name
2525
tasks = [
2626
wf_task,

tests/integration/async/workflow/__init__.py

Whitespace-only changes.
Lines changed: 311 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,311 @@
1+
import os
2+
import time
3+
import uuid
4+
5+
import pytest
6+
7+
from conductor.asyncio_client.adapters.api_client_adapter import (
8+
ApiClientAdapter as ApiClient,
9+
)
10+
from conductor.asyncio_client.adapters.models.extended_task_def_adapter import (
11+
ExtendedTaskDefAdapter as ExtendedTaskDef,
12+
)
13+
from conductor.asyncio_client.configuration.configuration import Configuration
14+
from conductor.asyncio_client.orkes.orkes_metadata_client import OrkesMetadataClient
15+
from conductor.asyncio_client.orkes.orkes_workflow_client import OrkesWorkflowClient
16+
from conductor.asyncio_client.workflow.conductor_workflow import AsyncConductorWorkflow
17+
from conductor.asyncio_client.workflow.executor.workflow_executor import (
18+
AsyncWorkflowExecutor,
19+
)
20+
from conductor.asyncio_client.workflow.task.dynamic_fork_task import DynamicForkTask
21+
from conductor.asyncio_client.workflow.task.join_task import JoinTask
22+
23+
24+
@pytest.fixture(scope="module")
25+
def configuration():
26+
config = Configuration()
27+
config.debug = os.getenv("CONDUCTOR_DEBUG", "false").lower() == "true"
28+
config.apply_logging_config()
29+
return config
30+
31+
32+
@pytest.fixture(scope="module")
33+
def test_suffix():
34+
return str(uuid.uuid4())[:8]
35+
36+
37+
@pytest.fixture(scope="module")
38+
def test_task_name(test_suffix):
39+
return f"async_dynamic_fork_test_task_{test_suffix}"
40+
41+
42+
@pytest.fixture(scope="module")
43+
def test_workflow_name(test_suffix):
44+
return f"async_dynamic_fork_test_workflow_{test_suffix}"
45+
46+
47+
@pytest.fixture(scope="module")
48+
def test_task_def(test_task_name):
49+
return ExtendedTaskDef(
50+
name=test_task_name,
51+
description="Test task for async dynamic fork integration test",
52+
retry_count=3,
53+
retry_logic="FIXED",
54+
retry_delay_seconds=1,
55+
timeout_seconds=60,
56+
response_timeout_seconds=60,
57+
owner_email="test@example.com",
58+
)
59+
60+
61+
@pytest.mark.v5_2_6
62+
@pytest.mark.v4_1_73
63+
@pytest.mark.v3_21_16
64+
@pytest.mark.asyncio
65+
async def test_async_dynamic_fork_task_with_separate_params(
66+
configuration,
67+
test_workflow_name,
68+
test_task_name,
69+
test_task_def,
70+
):
71+
workflow_id = None
72+
73+
async with ApiClient(configuration) as api_client:
74+
metadata_client = OrkesMetadataClient(configuration, api_client=api_client)
75+
workflow_client = OrkesWorkflowClient(configuration, api_client=api_client)
76+
workflow_executor = AsyncWorkflowExecutor(configuration, api_client=api_client)
77+
78+
try:
79+
await metadata_client.register_task_def(test_task_def)
80+
time.sleep(1)
81+
82+
join_task = JoinTask(task_ref_name="async_dynamic_join")
83+
dynamic_fork = DynamicForkTask(
84+
task_ref_name="async_dynamic_fork",
85+
tasks_param="dynamicTasks",
86+
tasks_input_param_name="dynamicTasksInputs",
87+
join_task=join_task,
88+
)
89+
90+
dynamic_fork.input_parameters["dynamicTasks"] = [
91+
{
92+
"name": test_task_name,
93+
"taskReferenceName": f"{test_task_name}_1",
94+
"type": "SIMPLE",
95+
},
96+
{
97+
"name": test_task_name,
98+
"taskReferenceName": f"{test_task_name}_2",
99+
"type": "SIMPLE",
100+
},
101+
]
102+
103+
dynamic_fork.input_parameters["dynamicTasksInputs"] = [
104+
{"task_input_1": "value1"},
105+
{"task_input_2": "value2"},
106+
]
107+
108+
workflow = (
109+
AsyncConductorWorkflow(
110+
executor=workflow_executor,
111+
name=test_workflow_name,
112+
description="Async test workflow for DynamicForkTask with separate params",
113+
version=1,
114+
)
115+
.owner_email("test@example.com")
116+
.add(dynamic_fork)
117+
)
118+
119+
await workflow.register(overwrite=True)
120+
time.sleep(2)
121+
122+
registered_workflow = await metadata_client.get_workflow_def(
123+
test_workflow_name, version=1
124+
)
125+
assert registered_workflow is not None
126+
assert registered_workflow.name == test_workflow_name
127+
128+
dynamic_fork_tasks = [
129+
task
130+
for task in registered_workflow.tasks
131+
if task.type == "FORK_JOIN_DYNAMIC"
132+
]
133+
assert len(dynamic_fork_tasks) == 1
134+
135+
dynamic_fork_task = dynamic_fork_tasks[0]
136+
assert dynamic_fork_task.dynamic_fork_tasks_param == "dynamicTasks"
137+
assert (
138+
dynamic_fork_task.dynamic_fork_tasks_input_param_name
139+
== "dynamicTasksInputs"
140+
)
141+
assert dynamic_fork_task.dynamic_fork_join_tasks_param is None
142+
143+
workflow_id = await workflow_client.start_workflow_by_name(
144+
name=test_workflow_name,
145+
version=1,
146+
input_data={},
147+
)
148+
149+
assert workflow_id is not None
150+
assert isinstance(workflow_id, str)
151+
assert len(workflow_id) > 0
152+
153+
time.sleep(2)
154+
155+
workflow_execution = await workflow_client.get_workflow(
156+
workflow_id, include_tasks=True
157+
)
158+
assert workflow_execution.workflow_id == workflow_id
159+
assert workflow_execution.workflow_name == test_workflow_name
160+
161+
except Exception as e:
162+
print(f"Test failed with error: {str(e)}")
163+
raise
164+
165+
finally:
166+
if workflow_id:
167+
try:
168+
await workflow_client.terminate_workflow(
169+
workflow_id,
170+
reason="Integration test cleanup",
171+
trigger_failure_workflow=False,
172+
)
173+
await workflow_client.delete_workflow(
174+
workflow_id, archive_workflow=True
175+
)
176+
except Exception as cleanup_error:
177+
print(
178+
f"Warning: Failed to cleanup workflow {workflow_id}: {cleanup_error}"
179+
)
180+
181+
try:
182+
await metadata_client.unregister_workflow_def(
183+
test_workflow_name, version=1
184+
)
185+
except Exception as cleanup_error:
186+
print(
187+
f"Warning: Failed to cleanup workflow definition: {cleanup_error}"
188+
)
189+
190+
try:
191+
await metadata_client.unregister_task_def(test_task_name)
192+
except Exception as cleanup_error:
193+
print(f"Warning: Failed to cleanup task definition: {cleanup_error}")
194+
195+
196+
@pytest.mark.v5_2_6
197+
@pytest.mark.v4_1_73
198+
@pytest.mark.v3_21_16
199+
@pytest.mark.asyncio
200+
async def test_async_dynamic_fork_task_with_combined_param(
201+
configuration,
202+
test_workflow_name,
203+
test_task_name,
204+
test_task_def,
205+
):
206+
workflow_id = None
207+
workflow_name_combined = f"{test_workflow_name}_combined"
208+
209+
async with ApiClient(configuration) as api_client:
210+
metadata_client = OrkesMetadataClient(configuration, api_client=api_client)
211+
workflow_client = OrkesWorkflowClient(configuration, api_client=api_client)
212+
workflow_executor = AsyncWorkflowExecutor(configuration, api_client=api_client)
213+
214+
try:
215+
await metadata_client.register_task_def(test_task_def)
216+
time.sleep(1)
217+
218+
workflow = AsyncConductorWorkflow(
219+
executor=workflow_executor,
220+
name=workflow_name_combined,
221+
description="Async test workflow for DynamicForkTask with combined param",
222+
version=1,
223+
).owner_email("test@example.com")
224+
225+
join_task = JoinTask(task_ref_name="async_dynamic_join_combined")
226+
dynamic_fork = DynamicForkTask(
227+
task_ref_name="async_dynamic_fork_combined",
228+
tasks_param="dynamicForkJoinTasks",
229+
join_task=join_task,
230+
)
231+
232+
dynamic_fork.input_parameters["dynamicForkJoinTasks"] = [
233+
{
234+
"task": {
235+
"name": test_task_name,
236+
"taskReferenceName": f"{test_task_name}_combined_1",
237+
"type": "SIMPLE",
238+
},
239+
"input": {"combined_input_1": "value1"},
240+
},
241+
{
242+
"task": {
243+
"name": test_task_name,
244+
"taskReferenceName": f"{test_task_name}_combined_2",
245+
"type": "SIMPLE",
246+
},
247+
"input": {"combined_input_2": "value2"},
248+
},
249+
]
250+
251+
workflow.add(dynamic_fork)
252+
await workflow.register(overwrite=True)
253+
time.sleep(2)
254+
255+
registered_workflow = await metadata_client.get_workflow_def(
256+
workflow_name_combined, version=1
257+
)
258+
assert registered_workflow is not None
259+
assert registered_workflow.name == workflow_name_combined
260+
261+
workflow_id = await workflow_client.start_workflow_by_name(
262+
name=workflow_name_combined,
263+
version=1,
264+
input_data={},
265+
)
266+
267+
assert workflow_id is not None
268+
assert isinstance(workflow_id, str)
269+
assert len(workflow_id) > 0
270+
271+
time.sleep(2)
272+
273+
workflow_execution = await workflow_client.get_workflow(
274+
workflow_id, include_tasks=True
275+
)
276+
assert workflow_execution.workflow_id == workflow_id
277+
assert workflow_execution.workflow_name == workflow_name_combined
278+
279+
except Exception as e:
280+
print(f"Test failed with error: {str(e)}")
281+
raise
282+
283+
finally:
284+
if workflow_id:
285+
try:
286+
await workflow_client.terminate_workflow(
287+
workflow_id,
288+
reason="Integration test cleanup",
289+
trigger_failure_workflow=False,
290+
)
291+
await workflow_client.delete_workflow(
292+
workflow_id, archive_workflow=True
293+
)
294+
except Exception as cleanup_error:
295+
print(
296+
f"Warning: Failed to cleanup workflow {workflow_id}: {cleanup_error}"
297+
)
298+
299+
try:
300+
await metadata_client.unregister_workflow_def(
301+
workflow_name_combined, version=1
302+
)
303+
except Exception as cleanup_error:
304+
print(
305+
f"Warning: Failed to cleanup workflow definition: {cleanup_error}"
306+
)
307+
308+
try:
309+
await metadata_client.unregister_task_def(test_task_name)
310+
except Exception as cleanup_error:
311+
print(f"Warning: Failed to cleanup task definition: {cleanup_error}")

0 commit comments

Comments
 (0)