Skip to content
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
9a9b001
first draft
StreetLamb Sep 19, 2025
95487d7
make temporal plugin work with asyncio execution engine and mcp-agent…
StreetLamb Sep 20, 2025
d211248
Add more passthrough modules
StreetLamb Sep 20, 2025
29d35c7
draft scripts
StreetLamb Sep 20, 2025
2ea85fd
Update `configure_worker` to work with temporal execution engine only
StreetLamb Sep 20, 2025
8cd266c
Change prompt of example workflow
StreetLamb Sep 20, 2025
9b03bb9
Refactor workflow and worker
StreetLamb Sep 20, 2025
5051dd3
Update configuration for temporal execution engine
StreetLamb Sep 21, 2025
6a75bde
Add single-file temporal plugin example implementation
StreetLamb Sep 21, 2025
0e5f157
Update temploral plugin example readme
StreetLamb Sep 21, 2025
b3b9f56
Move plugin to client instead of worker
StreetLamb Sep 21, 2025
5de198a
Improve MCPAgentPlugin docstring, simplify plugin initialisation, upd…
StreetLamb Sep 21, 2025
9494112
delete obsolete workflow library
StreetLamb Sep 21, 2025
b9fd945
Move MCPAgentPlugin under executor.temporal directory. Refactor MCPAg…
StreetLamb Sep 21, 2025
b5fca11
Reduce passthrough modules
StreetLamb Sep 21, 2025
b24ec16
add examples for orchestrator and parallel_agent workflow patterns
StreetLamb Sep 21, 2025
ed56e3f
wip evaluator_optimizer and router workflow example
StreetLamb Sep 21, 2025
529dee3
Move upstream session handling from Workflow class to MCPAgentPlugin
StreetLamb Sep 22, 2025
d467cda
remove unused import in evaluator_optimizer
StreetLamb Sep 22, 2025
de7603b
Update MCPAgentPlugin to configure interceptors, workflows, and runne…
StreetLamb Sep 23, 2025
04aab67
Rename example workflow classes for clarity
StreetLamb Sep 23, 2025
e4588cd
Add temporal replay test example
StreetLamb Sep 23, 2025
9d64710
draft support for registering temporal workflows via MCPApp
StreetLamb Sep 26, 2025
02735f0
Rename register_workflows to register_temporal_workflows and update l…
StreetLamb Sep 26, 2025
d596208
Deduplicate workflows registered via Worker and register_temporal_wor…
StreetLamb Sep 26, 2025
46f7a00
Update README and add basic_agent_server.py for MCP server integratio…
StreetLamb Sep 26, 2025
efb282c
Refactor MCPAgentPlugin to register unregistered Temporal workflows w…
StreetLamb Sep 27, 2025
fc3613c
Add execution status filter to workflow listing in replay.py
StreetLamb Sep 27, 2025
ab21d00
Update README to clarify MCP server setup and introduce workflow repl…
StreetLamb Sep 27, 2025
0c453f9
Fix variable name in EvaluatorOptimizerWorkflow
StreetLamb Sep 29, 2025
3dd5722
Fix AnthropicAugmentedLLM to use executor for streaming completion
StreetLamb Sep 30, 2025
d22223f
Fix router example return type
StreetLamb Sep 30, 2025
af39096
Merge branch 'main' of https://github.com/lastmile-ai/mcp-agent into …
StreetLamb Sep 30, 2025
aef8aa3
Fix broken AnthropicAugmentedLLM tests
StreetLamb Sep 30, 2025
0a97ce1
Refactor instructions in ParallelAgentWorkflow and update workflow im…
StreetLamb Sep 30, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions examples/temporal_plugin/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# MCP-Agent
mcp_agent.secrets.yaml
*.secrets.yaml
.mcp-agent/

# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
pip-log.txt
pip-delete-this-directory.txt

# Virtual Environment
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/

# PyCharm
.idea/

# VS Code
.vscode/
*.code-workspace

# Vim
[._]*.s[a-v][a-z]
[._]*.sw[a-p]
[._]s[a-rt-v][a-z]
[._]ss[a-gi-z]
[._]sw[a-p]
*~

# Logs
logs/
*.log
*.jsonl

# OS
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
ehthumbs.db
Thumbs.db

# Testing
.pytest_cache/
.coverage
htmlcov/
.tox/
.hypothesis/

# Jupyter Notebook
.ipynb_checkpoints

# pyenv
.python-version

# mypy
.mypy_cache/
.dmypy.json
dmypy.json

# Pyre type checker
.pyre/

# Local environment variables
.env.local
.env.*.local
155 changes: 155 additions & 0 deletions examples/temporal_plugin/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
# MCP-Agent with Temporal Plugin

This example demonstrates multiple ways to use the Temporal plugin with MCP-Agent for workflow orchestration.

## Prerequisites

1. **Temporal Server**: Ensure you have a Temporal server running locally:
```bash
temporal server start-dev
```
This starts a development server at `localhost:7233`

2. **API Keys**: Add your API keys to `mcp_agent.secrets.yaml`:
```yaml
OPENAI_API_KEY: "your-key-here"
ANTHROPIC_API_KEY: "your-key-here" # optional
```

3. **Configuration**: Set the execution engine to `temporal` in `mcp_agent.config.yaml`:
```yaml
execution_engine: temporal

temporal:
host: "localhost:7233"
namespace: "default"
task_queue: "mcp-agent"
```

## Usage Methods

### Method 1: Separate Worker and Workflow Files

This approach separates the worker and workflow execution into different processes, useful for distributed systems.

**Step 1: Define your workflow** (`basic_workflow.py`):
```python
from temporalio import workflow
from mcp_agent.agents.agent import Agent
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM

@workflow.defn
class BasicWorkflow:
@workflow.run
async def run(self, prompt: str) -> str:
simple_agent = Agent(
name="finder",
instruction="You are a helpful agent",
server_names=["fetch"],
)

async with simple_agent:
llm = await simple_agent.attach_llm(OpenAIAugmentedLLM)
result = await llm.generate_str(prompt)
return result
```

**Step 2: Run the worker** (`run_worker.py`):
```bash
uv run run_worker.py
```

**Step 3: Execute the workflow** (in another terminal):
```bash
uv run run_basic_workflow.py
```

### Method 2: Single File Execution (temporal_agent.py)

This approach combines worker and workflow execution in a single file, ideal for simpler deployments or testing.

```bash
uv run temporal_agent.py
```

This file:
- Defines the workflow
- Starts the worker
- Executes the workflow
- All within the same process using `async with Worker(...)`

**Key difference**: The single-file approach runs both the worker and client in the same process:
```python
async with Worker(
client,
task_queue=running_app.config.temporal.task_queue,
workflows=[BasicWorkflow],
):
# Execute workflow while worker is running
output = await client.execute_workflow(...)
```

## Important Configuration Notes

### Execution Engine Setting

The `execution_engine` in `mcp_agent.config.yaml` **MUST** be set to `temporal` for the Temporal plugin to work:

```yaml
execution_engine: temporal # Required for Temporal plugin
```

Without this setting, MCP-Agent will use the default `asyncio` engine and Temporal features won't be available.

### Temporal Configuration

Configure Temporal settings in `mcp_agent.config.yaml`:

```yaml
temporal:
host: "localhost:7233" # Temporal server address
namespace: "default" # Temporal namespace
task_queue: "mcp-agent" # Task queue name
max_concurrent_activities: 10 # Concurrency limit
rpc_metadata:
X-Client-Name: "mcp-agent" # Client identification
```

## File Structure

```
temporal_plugin/
├── basic_workflow.py # Workflow definition
├── run_worker.py # Worker process (Method 1)
├── run_basic_workflow.py # Workflow client (Method 1)
├── temporal_agent.py # Single-file approach (Method 2)
├── main.py # MCP-Agent app setup
├── mcp_agent.config.yaml # Configuration (MUST set execution_engine: temporal)
└── mcp_agent.secrets.yaml # API keys
```

## When to Use Each Method

- **Separate Files (Method 1)**: Use when you need:
- Distributed workers across multiple machines
- Independent scaling of workers and clients
- Clear separation of concerns
- Production deployments

- **Single File (Method 2)**: Use when you need:
- Quick prototyping and testing
- Simple deployments
- All-in-one execution for demos
- Development and debugging

## Troubleshooting

1. **Temporal not working**: Ensure `execution_engine: temporal` in config
2. **Connection refused**: Start Temporal server with `temporal server start-dev`
3. **Task queue mismatch**: Verify task queue names match between worker and client

## Further Resources

- [Temporal Documentation](https://docs.temporal.io/)
- [MCP-Agent Documentation](https://docs.mcp-agent.com/)
- [MCP-Agent GitHub](https://github.com/lastmile-ai/mcp-agent)
19 changes: 19 additions & 0 deletions examples/temporal_plugin/basic_workflow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from temporalio import workflow
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM
from mcp_agent.agents.agent import Agent


@workflow.defn
class BasicWorkflow:
@workflow.run
async def run(self, prompt: str) -> str:
simple_agent = Agent(
name="finder",
instruction="You are a helpful agent",
server_names=["fetch"],
)

async with simple_agent:
llm = await simple_agent.attach_llm(OpenAIAugmentedLLM)
result = await llm.generate_str(prompt)
return result
Comment on lines +10 to +19
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

🧩 Analysis chain

Workflow determinism/I-O safety: ensure all LLM/network calls run as activities (not in workflow code).

Agent(...); async with simple_agent; llm.generate_str(...) will perform I/O unless your executor reroutes to activities under execution_engine: temporal. If not fully offloaded, this violates Temporal workflow rules and will fail nondeterminism/sandbox checks.

Please confirm your executor uses workflow.execute_activity/local activities for model calls and MCP tool use when execution_engine=temporal. If not, we should refactor the workflow to call explicit activities.


🌐 Web query:

Are network calls allowed inside Temporal Python workflows? What is the recommended pattern to perform external I/O (e.g., HTTP/LLM) from workflows?

💡 Result:

Short answer: No — Temporal Python Workflows must not perform network I/O directly. Use Activities (or Local Activities / child workflows / async Activities) to do external I/O such as HTTP calls or LLM requests. Workflows remain deterministic and only orchestrate; Activities execute non‑deterministic, blocking, or network work. [1][2][3]

Recommended pattern (concise):

  • Keep network/HTTP/LLM calls in Activities registered with a Worker. Call them from the Workflow via workflow.execute_activity (or execute_local_activity for very low‑latency local work). Activities can be sync or async and run with their own executors/retries/heartbeats. [1][2][3]

Sources:

  • Temporal Python docs — Workflow constraints (no network I/O). [1]
  • Temporal Python SDK (Activities guidance). [2]
  • Temporal blog: Local Activities / patterns to reduce latency. [3]

Critical: Move LLM/HTTP/network I/O out of the workflow — invoke via Activities

Temporal Python workflows must not perform network I/O. examples/temporal_plugin/basic_workflow.py (lines 10–19) calls simple_agent.attach_llm(...) and llm.generate_str(...) inside the workflow; this performs external I/O and will break Temporal determinism/sandbox checks. Refactor those LLM/HTTP calls into Activities (or execute_local_activity) and invoke them from the workflow using workflow.execute_activity / workflow.execute_local_activity. If an executor already transparently offloads these calls to Temporal Activities, point to that implementation; otherwise refactor.

🤖 Prompt for AI Agents
In examples/temporal_plugin/basic_workflow.py around lines 10 to 19, the
workflow is directly calling simple_agent.attach_llm(...) and
llm.generate_str(...), which perform network/HTTP I/O and must not run inside a
Temporal workflow; refactor by extracting the LLM attach and generation logic
into a separate Activity function (or local activity) that performs the
OpenAI/HTTP calls, register that Activity, and from the workflow replace the
direct calls with workflow.execute_activity(...) or
workflow.execute_local_activity(...) to invoke the Activity (or, if your
codebase already provides an executor that transparently runs these operations
as Activities, update the workflow to call that executor API and add a reference
to the executor implementation instead of in-workflow I/O).

115 changes: 115 additions & 0 deletions examples/temporal_plugin/evaluator_optimizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import asyncio
from uuid import uuid4
from temporalio import workflow
from mcp_agent.core.context import get_current_context
from mcp_agent.workflows.evaluator_optimizer.evaluator_optimizer import (
EvaluatorOptimizerLLM,
QualityRating,
)
from mcp_agent.workflows.llm.augmented_llm import RequestParams
from mcp_agent.workflows.llm.augmented_llm_openai import OpenAIAugmentedLLM
from mcp_agent.agents.agent import Agent
from temporalio.client import Client
from mcp_agent.executor.temporal.plugin import MCPAgentPlugin
from mcp_agent.app import MCPApp
from temporalio.worker import Worker

app = MCPApp(name="mcp_basic_agent")


@workflow.defn
class EvaluatorOptimizerWorkflow:
@workflow.run
async def run(self, prompt: str) -> str:
context = get_current_context()
logger = context.app.logger

logger.info("Current config:", data=context.config.model_dump())

Comment on lines +27 to +28
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Avoid dumping full config to logs (secrets risk)

context.config.model_dump() may include API keys. Don’t log entire config.

-        logger.info("Current config:", data=context.config.model_dump())
+        logger.info("Config initialized")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
logger.info("Current config:", data=context.config.model_dump())
logger.info("Config initialized")
🤖 Prompt for AI Agents
In examples/temporal_plugin/evaluator_optimizer.py around lines 28-29, logging
the full config via context.config.model_dump() risks exposing secrets (API
keys); remove the full dump and instead log only non-sensitive metadata or a
redacted/config-summary view. Replace the call with either explicit safe fields
(e.g., build a dict of allowed keys and log that), or use model_dump with an
explicit exclude list of secret fields (or an include list of safe fields), and
ensure any secret-like keys (api_key, token, password, secret, etc.) are omitted
or masked before logging.

optimizer = Agent(
name="optimizer",
instruction="""You are a career coach specializing in cover letter writing.
You are tasked with generating a compelling cover letter given the job posting,
candidate details, and company information. Tailor the response to the company and job requirements.
""",
server_names=["fetch"],
)

evaluator = Agent(
name="evaluator",
instruction="""Evaluate the following response based on the criteria below:
1. Clarity: Is the language clear, concise, and grammatically correct?
2. Specificity: Does the response include relevant and concrete details tailored to the job description?
3. Relevance: Does the response align with the prompt and avoid unnecessary information?
4. Tone and Style: Is the tone professional and appropriate for the context?
5. Persuasiveness: Does the response effectively highlight the candidate's value?
6. Grammar and Mechanics: Are there any spelling or grammatical issues?
7. Feedback Alignment: Has the response addressed feedback from previous iterations?

For each criterion:
- Provide a rating (EXCELLENT, GOOD, FAIR, or POOR).
- Offer specific feedback or suggestions for improvement.

Summarize your evaluation as a structured response with:
- Overall quality rating.
- Specific feedback and areas for improvement.""",
)

evaluator_optimizer = EvaluatorOptimizerLLM(
optimizer=optimizer,
evaluator=evaluator,
llm_factory=OpenAIAugmentedLLM,
min_rating=QualityRating.EXCELLENT,
context=context,
)
Comment on lines +24 to +64
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical: Non-deterministic operations in workflow violate Temporal guarantees.

Temporal workflows must be deterministic to support replay. This workflow performs several non-deterministic operations directly in the workflow method:

  1. Line 24: get_current_context() accesses global/thread-local state that may not be available or consistent during replay.
  2. Lines 29-56: Creating Agent instances inside the workflow may involve I/O, configuration loading, or other non-deterministic operations.
  3. Lines 58-64: Instantiating EvaluatorOptimizerLLM with agents creates complex state that cannot be safely replayed.

These operations will break Temporal's replay mechanism and cause workflow failures when the worker restarts or the workflow history is replayed.

Solution: Move all agent/LLM initialization and API calls into Temporal activities. The workflow should only orchestrate activity calls and handle deterministic control flow.

Based on learnings about Temporal SDK best practices: workflows must be deterministic and avoid external I/O or state access.


result = await evaluator_optimizer.generate_str(
message=input,
request_params=RequestParams(model="gpt-4o"),
)

return result


async def main():
async with app.run() as running_app:
plugin = MCPAgentPlugin(running_app)

client = await Client.connect(
running_app.config.temporal.host,
plugins=[plugin],
)

async with Worker(
client,
task_queue=running_app.config.temporal.task_queue,
workflows=[EvaluatorOptimizerWorkflow],
):
job_posting = (
"Software Engineer at LastMile AI. Responsibilities include developing AI systems, "
"collaborating with cross-functional teams, and enhancing scalability. Skills required: "
"Python, distributed systems, and machine learning."
)
candidate_details = (
"Alex Johnson, 3 years in machine learning, contributor to open-source AI projects, "
"proficient in Python and TensorFlow. Motivated by building scalable AI systems to solve real-world problems."
)

# This should trigger a 'fetch' call to get the company information
company_information = (
"Look up from the LastMile AI page: https://lastmileai.dev"
)

task = f"Write a cover letter for the following job posting: {job_posting}\n\nCandidate Details: {candidate_details}\n\nCompany information: {company_information}"

output = await client.execute_workflow(
EvaluatorOptimizerWorkflow.run,
task,
id=f"basic-workflow-{uuid4()}",
task_queue=running_app.config.temporal.task_queue,
)
print(output)


if __name__ == "__main__":
asyncio.run(main())
Loading
Loading