Skip to content

Commit 231db8b

Browse files
authored
Merge pull request #5 from flamingo-run/feature/pubsub
Enables Pub/Sub connector for workflows
2 parents dfde083 + 8def965 commit 231db8b

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1049
-833
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ Priority order for upcoming features:
117117
| 1 | **Retries** | ✅ Completed | `RetryPolicy` with configurable backoff and predicates |
118118
| 2 | **Try/catch** | ✅ Completed | `TryCatchStep` for exception handling with fallback flows |
119119
| 3 | **Subworkflows** | ✅ Completed | Call other workflows with `>> WORKFLOW`, dependency tracking |
120-
| 4 | **GCP connectors** | 📋 Planned | Direct service calls, native Cloud Workflows connectors |
120+
| 4 | **GCP connectors** | ✅ Completed | Direct service calls using Cloud Workflows connectors (Pub/Sub first) |
121121
| 5 | **Deployment API** | 📋 Planned | Programmatic deployment of workflows & EventArc triggers via GCP APIs |
122122
| 6 | **Loops** | 📋 Planned | For/while constructs, iteration over collections |
123123
| 7 | **Conditionals / switch** | 📋 Planned | Branching logic, switch statements |

conftest.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
from __future__ import annotations
2+
3+
import sys
4+
from pathlib import Path
5+
6+
ROOT_DIR = Path(__file__).resolve().parent
7+
PROJECT_ROOT = ROOT_DIR
8+
EXAMPLES_DIR = PROJECT_ROOT / "examples"
9+
SRC_DIR = PROJECT_ROOT / "src"
10+
11+
for path in [SRC_DIR, EXAMPLES_DIR, PROJECT_ROOT]:
12+
path_str = str(path)
13+
if path_str not in sys.path:
14+
sys.path.insert(0, path_str)

examples/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
"""Example application package for fastapi-cloudflow demos."""

examples/app/__init__.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
from .flows.resilient_payment import GATEWAY_PAYMENT_FLOW, RESILIENT_PAYMENT_FLOW
2+
from .flows.retry_contract import RETRY_DEMO_WORKFLOW
3+
from .flows.subworkflow_contract import SUBWORKFLOW_DEMO
4+
from .flows.try_catch_contract import TRY_CATCH_DEMO
5+
6+
__all__ = [
7+
"GATEWAY_PAYMENT_FLOW",
8+
"RESILIENT_PAYMENT_FLOW",
9+
"RETRY_DEMO_WORKFLOW",
10+
"SUBWORKFLOW_DEMO",
11+
"TRY_CATCH_DEMO",
12+
]

examples/app/flows/__init__.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
"""Example flow definitions for FastAPI Cloudflow demos."""
2+
3+
__all__ = [
4+
"data_pipeline",
5+
"echo_name",
6+
"jokes",
7+
"order",
8+
"order_with_subworkflow",
9+
"payments",
10+
"post_story",
11+
"pubsub_example",
12+
"resilient_payment",
13+
"retry_contract",
14+
"subworkflow_contract",
15+
"try_catch_contract",
16+
"user",
17+
]
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
"""Example workflow demonstrating Pub/Sub connector support."""
2+
3+
from __future__ import annotations
4+
5+
from pydantic import BaseModel
6+
7+
from fastapi_cloudflow import Context, step, workflow
8+
from fastapi_cloudflow.core.connectors import pubsub_publish_step
9+
10+
11+
class PublishRequest(BaseModel):
12+
topic: str
13+
payload: str
14+
15+
16+
class PublishResult(BaseModel):
17+
message_ids: list[str]
18+
19+
20+
@step(name="prepare-message")
21+
async def prepare_message(ctx: Context, data: PublishRequest) -> PublishRequest:
22+
return data
23+
24+
25+
publish_step = pubsub_publish_step(
26+
name="publish-topic",
27+
topic="projects/demo/topics/example",
28+
input_model=PublishRequest,
29+
data_field="payload",
30+
)
31+
32+
33+
PUBSUB_EXAMPLE_FLOW = (workflow("pubsub-example-flow") >> prepare_message >> publish_step).build()
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
"""Retry-focused workflow example used for documentation and contract tests."""
2+
3+
from __future__ import annotations
4+
5+
from datetime import timedelta
6+
7+
from pydantic import BaseModel
8+
9+
from fastapi_cloudflow import Context, HttpStep, RetryPolicy, step, workflow
10+
11+
12+
class RetryInput(BaseModel):
13+
value: int
14+
15+
16+
class RetryResult(BaseModel):
17+
result: int
18+
19+
20+
@step(
21+
name="retry-entry",
22+
retry=RetryPolicy(
23+
max_retries=3,
24+
initial_delay_s=2.0,
25+
max_delay_s=10.0,
26+
multiplier=2.0,
27+
predicate="http.default_retry_predicate",
28+
),
29+
timeout=timedelta(seconds=30),
30+
)
31+
async def retry_entry(ctx: Context, data: RetryInput) -> RetryInput:
32+
return RetryInput(value=data.value + 1)
33+
34+
35+
PAYMENT_GATEWAY = HttpStep(
36+
name="retry-gateway",
37+
input_model=RetryInput,
38+
output_model=RetryResult,
39+
method="POST",
40+
url="https://api.example.com/payments",
41+
retry=RetryPolicy.idempotent_http(),
42+
timeout=timedelta(seconds=60),
43+
)
44+
45+
46+
@step(name="retry-finalize")
47+
async def retry_finalize(ctx: Context, data: RetryResult) -> RetryResult:
48+
return RetryResult(result=data.result)
49+
50+
51+
def build_retry_demo_workflow():
52+
"""Create a workflow showcasing HTTP and step-level retry policies."""
53+
54+
return (workflow("retry-demo") >> retry_entry >> PAYMENT_GATEWAY >> retry_finalize).build()
55+
56+
57+
RETRY_DEMO_WORKFLOW = build_retry_demo_workflow()
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
from __future__ import annotations
2+
3+
from pydantic import BaseModel
4+
5+
from fastapi_cloudflow import Context, SubworkflowStep, step, workflow
6+
7+
8+
class SubworkflowInput(BaseModel):
9+
value: int
10+
11+
12+
class SubworkflowResult(BaseModel):
13+
result: int
14+
15+
16+
@step(name="subwf-prepare")
17+
async def subwf_prepare(ctx: Context, data: SubworkflowInput) -> SubworkflowInput:
18+
return SubworkflowInput(value=data.value + 2)
19+
20+
21+
SUB_WORKFLOW = SubworkflowStep(
22+
workflow_id="subwf-child",
23+
input_model=SubworkflowInput,
24+
output_model=SubworkflowResult,
25+
input_mapping={"custom_field": "${payload.value}"},
26+
output_mapping={"result": "custom_result"},
27+
)
28+
29+
30+
@step(name="subwf-finalize")
31+
async def subwf_finalize(ctx: Context, data: SubworkflowResult) -> SubworkflowResult:
32+
return SubworkflowResult(result=data.result + 5)
33+
34+
35+
SUBWORKFLOW_DEMO = (workflow("subworkflow-demo") >> subwf_prepare >> SUB_WORKFLOW >> subwf_finalize).build()
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from __future__ import annotations
2+
3+
from pydantic import BaseModel
4+
5+
from fastapi_cloudflow import Context, step, try_catch, workflow
6+
7+
8+
class TryCatchInput(BaseModel):
9+
value: int
10+
11+
12+
class TryCatchResult(BaseModel):
13+
result: int
14+
15+
16+
@step(name="tc-risky")
17+
async def tc_risky(ctx: Context, data: TryCatchInput) -> TryCatchResult:
18+
return TryCatchResult(result=data.value * 2)
19+
20+
21+
@step(name="tc-handler")
22+
async def tc_handler(ctx: Context, data: TryCatchInput) -> TryCatchResult:
23+
return TryCatchResult(result=0)
24+
25+
26+
TRY_BLOCK = workflow("tc-try") >> tc_risky
27+
EXCEPT_BLOCK = workflow("tc-except") >> tc_handler
28+
TRY_CATCH_STEP = try_catch("tc-safe").try_block(TRY_BLOCK).except_block(EXCEPT_BLOCK).raise_error(True).build()
29+
30+
31+
TRY_CATCH_DEMO = (workflow("try-catch-demo") >> TRY_CATCH_STEP).build()

examples/app/main.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,23 @@
11
from fastapi import FastAPI
2-
from flows import data_pipeline, echo_name, jokes, order, payments, post_story, resilient_payment, user # noqa: F401
32

3+
from examples.app.flows import ( # noqa: F401
4+
data_pipeline,
5+
echo_name,
6+
jokes,
7+
order,
8+
payments,
9+
post_story,
10+
resilient_payment,
11+
user,
12+
)
413
from fastapi_cloudflow import attach_to_fastapi
514

615

716
def create_app() -> FastAPI:
817
app = FastAPI()
918
# Import types used for stub endpoints
10-
from flows.payments import PSPReq, PSPRes
11-
from flows.user import IdentityReq, IdentityRes
19+
from examples.app.flows.payments import PSPReq, PSPRes
20+
from examples.app.flows.user import IdentityReq, IdentityRes
1221

1322
@app.get("/health")
1423
def health() -> dict[str, str]: # noqa: D401

0 commit comments

Comments
 (0)