Skip to content

Commit 8b86643

Browse files
authored
Merge pull request #3 from flamingo-run/feature/try-and-retry
Workflow Try & Retry
2 parents 7b2fa14 + df49371 commit 8b86643

29 files changed

+2503
-77
lines changed

.cursor/rules/writing-tests.mdc

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
---
2+
description:
3+
globs:
4+
alwaysApply: true
5+
---
6+
Tests should:
7+
- be placed in the `tests` directory.
8+
- be named like `test_<name>.py`.
9+
- not cheat by using if/else ot try/except statements to assert different outcomes.
10+
- be deterministic
11+
- not too redundant with other tests
12+
13+
Example workflows should:
14+
- be simple and focused
15+
- cover a specific feature
16+
- have friendly name
17+
- be creative/funny whenever possible
18+
19+
Unit tests should:
20+
- be placed in the `tests/unit` directory.
21+
- assert small portions of the code, such as validation logic or input/output shapes.
22+
23+
Codegen tests should:
24+
- be placed in the `tests/codegen` directory.
25+
- build the workflow to cover a specific feature
26+
- assert the YAML emitted by the CLI (stored in `tests/codegen/fixtures/yaml`)
27+
- assert the mermaid graphs emitted by the CLI (stored in `tests/codegen/fixtures/mermaid`)
28+
29+
Smoke tests should:
30+
- be placed in the `tests/smoke` directory.
31+
- deploy the workflow to GCP
32+
- assert the workflow runs successfully
33+
- assert the workflow output is as expected

CONTRIBUTING.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ uv sync
2323
Run end-to-end against deployed Workflows (requires Google Cloud project and a deployed example app):
2424
```
2525
export GOOGLE_CLOUD_PROJECT=<your-project>
26-
uv run -q python tests/smoke/run_smoke.py --region us-central1
26+
uv run -q python tests/smoke/test_run_smoke.py --region us-central1
2727
```
2828

2929
### Project structure

README.md

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,6 @@ graph TD
9393
- `post_story.py` → post-story-flow: build story → POST external → summarize
9494
- `jokes.py` → joke-flow: fetch → split → rate
9595

96-
## Codegen & tests
97-
- Codegen snapshots: `uv run -q pytest -q tests/codegen` (full-file YAML equality)
98-
- Unit tests: `uv run -q pytest -q tests/unit` (hits `/steps/<name>` endpoints with TestClient)
99-
- Smoke tests: `uv run -q python tests/smoke/run_smoke.py --region us-central1` (requires GCP & deployed example)
100-
10196
## Supported features (Cloud Workflows)
10297

10398
| Feature | Status | Notes |
@@ -110,15 +105,31 @@ graph TD
110105
| Sequential composition || `workflow(...) >> step_a >> step_b` |
111106
| Workflow input/output || single `payload` param; final `return: ${payload}` |
112107
| Error surfacing || HTTP errors propagate; FastAPI returns typed 4xx/5xx |
113-
| Retries || planned `RetryPolicy` emission |
114-
| Try/catch || not yet |
115-
| Conditionals / switch || not yet |
116-
| Loops || not yet |
117-
| Parallel branches / join || not yet |
118-
| Subworkflows / call other workflows || not yet |
119-
| GCP connectors / direct service calls || not yet |
108+
| **Retries** || `RetryPolicy` with backoff, predicates, max attempts |
109+
| **Try/catch** || `TryCatchStep` with exception handling and optional re-raise |
110+
111+
## Roadmap (Prioritized)
112+
113+
Priority order for upcoming features:
114+
115+
| Priority | Feature | Status | Notes |
116+
| --- | --- | --- | --- |
117+
| 1 | **Retries** | ✅ Completed | `RetryPolicy` with configurable backoff and predicates |
118+
| 2 | **Try/catch** | ✅ Completed | `TryCatchStep` for exception handling with fallback flows |
119+
| 3 | **Subworkflows** | 📋 Planned | Call other workflows, composition patterns |
120+
| 4 | **GCP connectors** | 📋 Planned | Direct service calls, native Cloud Workflows connectors |
121+
| 5 | **Deployment API** | 📋 Planned | Programmatic deployment of workflows & EventArc triggers via GCP APIs |
122+
| 6 | **Loops** | 📋 Planned | For/while constructs, iteration over collections |
123+
| 7 | **Conditionals / switch** | 📋 Planned | Branching logic, switch statements |
124+
| 8 | **Parallel branches / join** | 📋 Planned | Concurrent execution, fork/join patterns |
125+
126+
### Deployment API (Planned)
127+
Beyond YAML generation, the framework will provide APIs to:
128+
- Deploy workflows programmatically to Google Cloud
129+
- Create and manage EventArc triggers
130+
- Configure IAM permissions and service accounts
131+
- Orchestrate complete workflow deployment pipelines
120132

121133
## Next steps
122134
- Open CONTRIBUTING.md for local setup, structure, and contribution checklist
123135
- Use descriptive names for workflows/steps and prefer multi-step workflows that show transformations
124-
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
"""Example data pipeline workflow with error handling and retries."""
2+
3+
from __future__ import annotations
4+
5+
from datetime import timedelta
6+
7+
from pydantic import BaseModel
8+
9+
from fastapi_cloudflow import (
10+
Context,
11+
HttpStep,
12+
RetryPolicy,
13+
TryCatchStep,
14+
step,
15+
workflow,
16+
)
17+
18+
19+
class DataSource(BaseModel):
20+
source_id: str
21+
endpoint: str
22+
format: str = "json"
23+
24+
25+
class RawData(BaseModel):
26+
data: list[dict]
27+
source_id: str
28+
record_count: int
29+
30+
31+
class ProcessedData(BaseModel):
32+
processed_records: int
33+
failed_records: int
34+
source_id: str
35+
status: str
36+
37+
38+
class PipelineResult(BaseModel):
39+
total_processed: int
40+
total_failed: int
41+
sources_processed: list[str]
42+
status: str
43+
44+
45+
# Data extraction with retries for unreliable sources
46+
@step(
47+
name="extract-data",
48+
retry=RetryPolicy(
49+
max_retries=3,
50+
initial_delay_s=5.0,
51+
max_delay_s=60.0,
52+
multiplier=2.0,
53+
predicate="http.default_retry_predicate",
54+
),
55+
timeout=timedelta(seconds=120),
56+
)
57+
async def extract_data(ctx: Context, data: DataSource) -> RawData:
58+
"""Extract data from source with automatic retries."""
59+
# Simulate data extraction
60+
import random
61+
62+
# Simulate occasional extraction failures
63+
if random.random() < 0.2:
64+
raise Exception(f"Failed to extract from {data.source_id}")
65+
66+
# Mock extracted data
67+
mock_data = [{"id": i, "value": f"record_{i}", "source": data.source_id} for i in range(10)]
68+
69+
return RawData(data=mock_data, source_id=data.source_id, record_count=len(mock_data))
70+
71+
72+
@step(name="validate-data")
73+
async def validate_data(ctx: Context, data: RawData) -> RawData:
74+
"""Validate extracted data."""
75+
# Filter out invalid records
76+
valid_data = [record for record in data.data if record.get("id") is not None and record.get("value") is not None]
77+
78+
if not valid_data:
79+
raise ValueError(f"No valid records found in {data.source_id}")
80+
81+
return RawData(data=valid_data, source_id=data.source_id, record_count=len(valid_data))
82+
83+
84+
@step(
85+
name="transform-data",
86+
retry=RetryPolicy(
87+
max_retries=2, initial_delay_s=1.0, max_delay_s=5.0, multiplier=2.0, predicate="http.default_retry_predicate"
88+
),
89+
)
90+
async def transform_data(ctx: Context, data: RawData) -> ProcessedData:
91+
"""Transform and enrich data."""
92+
processed_count = 0
93+
failed_count = 0
94+
95+
for record in data.data:
96+
try:
97+
# Simulate transformation
98+
record["transformed"] = True
99+
record["timestamp"] = "2024-01-01T00:00:00Z"
100+
processed_count += 1
101+
except Exception:
102+
failed_count += 1
103+
104+
return ProcessedData(
105+
processed_records=processed_count, failed_records=failed_count, source_id=data.source_id, status="transformed"
106+
)
107+
108+
109+
# External data quality service
110+
data_quality_check = HttpStep(
111+
name="quality-check",
112+
input_model=ProcessedData,
113+
output_model=ProcessedData,
114+
method="POST",
115+
url="https://jsonplaceholder.typicode.com/posts", # Mock endpoint
116+
retry=RetryPolicy.idempotent_http(),
117+
timeout=timedelta(seconds=30),
118+
)
119+
120+
121+
@step(name="load-data")
122+
async def load_data(ctx: Context, data: ProcessedData) -> PipelineResult:
123+
"""Load data to destination."""
124+
# Simulate data loading
125+
return PipelineResult(
126+
total_processed=data.processed_records,
127+
total_failed=data.failed_records,
128+
sources_processed=[data.source_id],
129+
status="loaded",
130+
)
131+
132+
133+
@step(name="handle-extraction-error")
134+
async def handle_extraction_error(ctx: Context, data: DataSource) -> RawData:
135+
"""Handle extraction errors by returning empty dataset."""
136+
return RawData(data=[], source_id=data.source_id, record_count=0)
137+
138+
139+
@step(name="handle-transform-error")
140+
async def handle_transform_error(ctx: Context, data: RawData) -> ProcessedData:
141+
"""Handle transformation errors."""
142+
return ProcessedData(
143+
processed_records=0, failed_records=data.record_count, source_id=data.source_id, status="transform_failed"
144+
)
145+
146+
147+
@step(name="cleanup-on-error")
148+
async def cleanup_on_error(ctx: Context, data: DataSource) -> PipelineResult:
149+
"""Cleanup resources on pipeline failure."""
150+
return PipelineResult(total_processed=0, total_failed=0, sources_processed=[], status="pipeline_failed_cleaned")
151+
152+
153+
def build_data_pipeline():
154+
"""Build a resilient data pipeline with multiple try/catch blocks."""
155+
156+
# Extract phase with error handling
157+
extract_try = workflow("extract-phase") >> extract_data >> validate_data
158+
extract_with_recovery = TryCatchStep(
159+
name="extract-with-recovery",
160+
input_model=DataSource,
161+
output_model=RawData,
162+
try_steps=extract_try.nodes,
163+
except_steps=[handle_extraction_error],
164+
error_var="extract_error",
165+
raise_on_error=False,
166+
)
167+
168+
# Transform phase with error handling
169+
transform_try = workflow("transform-phase") >> transform_data >> data_quality_check
170+
transform_with_recovery = TryCatchStep(
171+
name="transform-with-recovery",
172+
input_model=RawData,
173+
output_model=ProcessedData,
174+
try_steps=transform_try.nodes,
175+
except_steps=[handle_transform_error],
176+
error_var="transform_error",
177+
raise_on_error=False,
178+
)
179+
180+
# Complete pipeline with outer try/catch
181+
pipeline_steps = [extract_with_recovery, transform_with_recovery, load_data]
182+
183+
pipeline_with_cleanup = TryCatchStep(
184+
name="pipeline-with-cleanup",
185+
input_model=DataSource,
186+
output_model=PipelineResult,
187+
try_steps=pipeline_steps,
188+
except_steps=[cleanup_on_error],
189+
error_var="pipeline_error",
190+
raise_on_error=False,
191+
)
192+
193+
return (workflow("data-pipeline-flow") >> pipeline_with_cleanup).build()
194+
195+
196+
# Export workflow
197+
DATA_PIPELINE_FLOW = build_data_pipeline()

0 commit comments

Comments
 (0)