Skip to content

Commit 4f29502

Browse files
Worker task definition configuration from decorators (#345)
1 parent 1f57c1e commit 4f29502

File tree

16 files changed

+1768
-3
lines changed

16 files changed

+1768
-3
lines changed
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
# Task Options Decorator Examples
2+
3+
The `@task_options` decorator provides a declarative way to configure task execution parameters directly on your worker functions.
4+
5+
## Quick Start
6+
7+
```python
8+
from conductor.client.worker.worker_task import worker_task
9+
from conductor.shared.worker.task_options import task_options
10+
11+
@task_options(
12+
timeout_seconds=3600,
13+
response_timeout_seconds=120,
14+
retry_count=3,
15+
retry_logic="EXPONENTIAL_BACKOFF"
16+
)
17+
@worker_task(task_definition_name="my_task")
18+
def my_task(task):
19+
return {"result": "success"}
20+
```
21+
22+
## Available Parameters
23+
24+
### Timeout Settings
25+
26+
| Parameter | Type | Description |
27+
| -------------------------- | ---- | ---------------------------------------------------------- |
28+
| `timeout_seconds` | int | Maximum time (in seconds) for task execution |
29+
| `response_timeout_seconds` | int | Time to wait for task response (must be < timeout_seconds) |
30+
| `poll_timeout_seconds` | int | Timeout for polling the task |
31+
32+
**Important**: `response_timeout_seconds` should be less than `timeout_seconds`, otherwise the response timeout will never trigger and the setting has no effect.
33+
34+
### Retry Configuration
35+
36+
| Parameter | Type | Values | Description |
37+
| ---------------------- | ---- | ------------------------------------------------ | ---------------------------------- |
38+
| `retry_count` | int | ≥ 0 | Number of retry attempts |
39+
| `retry_logic` | str | `FIXED`, `LINEAR_BACKOFF`, `EXPONENTIAL_BACKOFF` | Retry strategy |
40+
| `retry_delay_seconds` | int | ≥ 0 | Initial delay between retries |
41+
| `backoff_scale_factor` | int | ≥ 1 | Multiplier for exponential backoff |
42+
43+
### Rate Limiting
44+
45+
| Parameter | Type | Description |
46+
| --------------------------------- | ---- | ------------------------------ |
47+
| `rate_limit_per_frequency` | int | Max executions per time window |
48+
| `rate_limit_frequency_in_seconds` | int | Time window for rate limiting |
49+
| `concurrent_exec_limit` | int | Max concurrent executions |
50+
51+
### Other Options
52+
53+
| Parameter | Type | Values | Description |
54+
| ---------------- | ---- | ------------------------------------ | ----------------- |
55+
| `timeout_policy` | str | `TIME_OUT_WF`, `ALERT_ONLY`, `RETRY` | Action on timeout |
56+
| `owner_email` | str | - | Task owner email |
57+
| `description` | str | - | Task description |
58+
59+
## Examples
60+
61+
### 1. Simple Task with Retry
62+
63+
```python
64+
@task_options(
65+
timeout_seconds=3600,
66+
response_timeout_seconds=120,
67+
retry_count=3,
68+
retry_logic="FIXED",
69+
retry_delay_seconds=5
70+
)
71+
@worker_task(task_definition_name="simple_task")
72+
def simple_task(task):
73+
return {"status": "completed"}
74+
```
75+
76+
### 2. High Throughput Task with Rate Limiting
77+
78+
```python
79+
@task_options(
80+
timeout_seconds=3600,
81+
response_timeout_seconds=60,
82+
concurrent_exec_limit=100,
83+
rate_limit_per_frequency=1000,
84+
rate_limit_frequency_in_seconds=60,
85+
description="High throughput task with rate limiting"
86+
)
87+
@worker_task(task_definition_name="bulk_process")
88+
def bulk_process(task):
89+
items = task.input_data.get("items", [])
90+
return {"processed": len(items)}
91+
```
92+
93+
### 3. Aggressive Retry with Exponential Backoff
94+
95+
```python
96+
@task_options(
97+
timeout_seconds=7200,
98+
response_timeout_seconds=300,
99+
retry_count=10,
100+
retry_logic="EXPONENTIAL_BACKOFF",
101+
retry_delay_seconds=5,
102+
backoff_scale_factor=3,
103+
timeout_policy="RETRY"
104+
)
105+
@worker_task(task_definition_name="critical_task")
106+
def critical_task(task):
107+
# Critical operation that needs aggressive retry
108+
return {"status": "completed"}
109+
```
110+
111+
### 4. Alert Only on Timeout
112+
113+
```python
114+
@task_options(
115+
timeout_seconds=600,
116+
response_timeout_seconds=60,
117+
timeout_policy="ALERT_ONLY",
118+
description="Non-critical task"
119+
)
120+
@worker_task(task_definition_name="monitoring_task")
121+
def monitoring_task(task):
122+
# This will alert but not fail the workflow on timeout
123+
return {"metrics": {...}}
124+
```
125+
126+
## Retry Logic Comparison
127+
128+
### FIXED
129+
130+
- Same delay between each retry
131+
- Example: 5s → 5s → 5s
132+
133+
### LINEAR_BACKOFF
134+
135+
- Linearly increasing delay
136+
- Example: 5s → 10s → 15s
137+
138+
### EXPONENTIAL_BACKOFF
139+
140+
- Exponentially increasing delay (uses `backoff_scale_factor`)
141+
- Example with scale factor 2: 5s → 10s → 20s → 40s
142+
143+
## Timeout Policy Comparison
144+
145+
### TIME_OUT_WF
146+
147+
- Timeout causes the entire workflow to fail
148+
- Use for critical tasks
149+
150+
### ALERT_ONLY
151+
152+
- Timeout generates an alert but doesn't fail the workflow
153+
- Use for monitoring/metrics tasks
154+
155+
### RETRY
156+
157+
- Timeout triggers a retry attempt
158+
- Use when temporary issues might resolve
159+
160+
## Running the Examples
161+
162+
### Simple Example
163+
164+
```bash
165+
python examples/task_options_simple.py
166+
```
167+
168+
### Comprehensive Example
169+
170+
```bash
171+
python examples/task_options_example.py
172+
```
173+
174+
## Best Practices
175+
176+
1. **Always set both timeout values**: Set `response_timeout_seconds` < `timeout_seconds` to avoid validation errors
177+
178+
2. **Choose appropriate retry logic**:
179+
180+
- Use `FIXED` for predictable retry intervals
181+
- Use `LINEAR_BACKOFF` for gradual backoff
182+
- Use `EXPONENTIAL_BACKOFF` for aggressive retry with longer delays
183+
184+
3. **Set rate limits for high-volume tasks**: Prevent overwhelming downstream systems
185+
186+
4. **Use concurrent execution limits**: Control resource usage
187+
188+
5. **Add descriptions**: Document task purpose for better maintenance
189+
190+
## Integration with Task Registration
191+
192+
The `@task_options` decorator works seamlessly with task registration. When a task is registered with the metadata service, the options are automatically applied:
193+
194+
```python
195+
from conductor.client.http.models.task_def import TaskDef
196+
from conductor.client.orkes.orkes_metadata_client import OrkesMetadataClient
197+
from conductor.shared.worker.task_definition_helper import apply_task_options_to_task_def
198+
from conductor.shared.worker.task_options import get_task_options
199+
200+
# Get options from decorated function
201+
task_opts = get_task_options(my_task)
202+
203+
# Create task definition
204+
task_def = TaskDef(name="my_task")
205+
206+
# Apply options
207+
apply_task_options_to_task_def(task_def, task_opts)
208+
209+
# Register
210+
metadata_client = OrkesMetadataClient(config)
211+
metadata_client.register_task_def(task_def)
212+
```
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
"""
2+
Async example demonstrating the @task_options decorator with async worker tasks.
3+
4+
The @task_options decorator works the same way with async tasks as it does with
5+
synchronous tasks.
6+
"""
7+
8+
import asyncio
9+
10+
from conductor.asyncio_client.automator.task_handler import TaskHandler
11+
from conductor.asyncio_client.configuration.configuration import Configuration
12+
from conductor.asyncio_client.worker.worker_task import worker_task
13+
from conductor.shared.worker.task_options import task_options
14+
15+
16+
@task_options(
17+
timeout_seconds=3600,
18+
response_timeout_seconds=300,
19+
retry_count=3,
20+
retry_logic="EXPONENTIAL_BACKOFF",
21+
retry_delay_seconds=10,
22+
backoff_scale_factor=2,
23+
)
24+
@worker_task(task_definition_name="async_process_payment")
25+
async def async_process_payment(task):
26+
payment_id = task.input_data.get("payment_id")
27+
amount = task.input_data.get("amount")
28+
29+
print(f"Processing payment {payment_id} for ${amount}")
30+
31+
await asyncio.sleep(0.1)
32+
33+
return {
34+
"status": "completed",
35+
"payment_id": payment_id,
36+
"confirmation": f"CONF-{payment_id}",
37+
}
38+
39+
40+
@task_options(
41+
timeout_seconds=7200,
42+
response_timeout_seconds=600,
43+
retry_count=5,
44+
retry_logic="LINEAR_BACKOFF",
45+
retry_delay_seconds=30,
46+
concurrent_exec_limit=10,
47+
rate_limit_per_frequency=100,
48+
rate_limit_frequency_in_seconds=60,
49+
description="Async notification sender with rate limiting",
50+
)
51+
@worker_task(task_definition_name="async_send_notification")
52+
async def async_send_notification(task):
53+
recipient = task.input_data.get("email")
54+
message = task.input_data.get("message")
55+
56+
print(f"Sending notification to {recipient}: {message}")
57+
58+
await asyncio.sleep(0.1)
59+
60+
return {"status": "sent", "recipient": recipient, "sent_at": "2025-10-13T10:00:00Z"}
61+
62+
63+
@task_options(
64+
timeout_seconds=1800,
65+
response_timeout_seconds=120,
66+
retry_count=2,
67+
retry_logic="FIXED",
68+
retry_delay_seconds=5,
69+
timeout_policy="RETRY",
70+
description="Fast async task with minimal retry",
71+
)
72+
@worker_task(task_definition_name="async_validate_data")
73+
async def async_validate_data(task):
74+
data = task.input_data.get("data")
75+
76+
print(f"Validating data: {data}")
77+
78+
await asyncio.sleep(0.05)
79+
80+
if not data:
81+
return {"status": "failed", "error": "No data provided"}
82+
83+
return {"status": "valid", "validated_data": data}
84+
85+
86+
@task_options(
87+
timeout_seconds=3600,
88+
response_timeout_seconds=300,
89+
retry_count=10,
90+
retry_logic="EXPONENTIAL_BACKOFF",
91+
retry_delay_seconds=5,
92+
backoff_scale_factor=3,
93+
timeout_policy="TIME_OUT_WF",
94+
concurrent_exec_limit=5,
95+
description="Heavy async processing task with aggressive retry",
96+
)
97+
@worker_task(task_definition_name="async_heavy_computation")
98+
async def async_heavy_computation(task):
99+
iterations = task.input_data.get("iterations", 1000)
100+
101+
print(f"Running heavy computation with {iterations} iterations")
102+
103+
await asyncio.sleep(0.1)
104+
105+
result = sum(range(iterations))
106+
107+
return {"status": "completed", "result": result, "iterations": iterations}
108+
109+
110+
async def main():
111+
config = Configuration()
112+
config.apply_logging_config()
113+
114+
print("Starting async workers with task options...")
115+
print("\nConfigured async tasks:")
116+
print("1. async_process_payment - EXPONENTIAL_BACKOFF retry with 3 attempts")
117+
print(
118+
"2. async_send_notification - LINEAR_BACKOFF retry with rate limiting (100 req/min)"
119+
)
120+
print("3. async_validate_data - FIXED retry with 2 attempts")
121+
print(
122+
"4. async_heavy_computation - EXPONENTIAL_BACKOFF with high concurrency limit\n"
123+
)
124+
125+
task_handler = TaskHandler(
126+
workers=[],
127+
configuration=config,
128+
scan_for_annotated_workers=True,
129+
import_modules=[],
130+
)
131+
132+
try:
133+
task_handler.start_processes()
134+
task_handler.join_processes()
135+
finally:
136+
task_handler.stop_processes()
137+
138+
139+
if __name__ == "__main__":
140+
asyncio.run(main())

0 commit comments

Comments
 (0)