-
Notifications
You must be signed in to change notification settings - Fork 766
Temporal plugin implementation #518
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
….config.yaml; set passthrough modules
…entPlugin import paths across examples
Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds a Temporal executor plugin and app-side workflow registration/patching, removes automatic upstream SessionProxy exposure, refactors Anthropic streaming to an executor task, updates the temporalio pin, and adds many Temporal example workflows, runners, configs, docs, and tests for the Temporal integration. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant Dev as Developer
participant App as MCPApp
participant Plugin as MCPAgentPlugin
participant TClient as Temporal Client
participant TWorker as Temporal Worker
participant TServer as Temporal Server
Dev->>App: import / start (module-level `app`)
App->>Plugin: instantiate(app)
Dev->>TClient: Client.connect(host, namespace, rpc_metadata)
Plugin-->>TClient: configure client (data converter, interceptors, namespace)
Dev->>TWorker: create Worker(task_queue)
Plugin-->>TWorker: configure worker (register workflows, activities, passthrough)
TWorker->>TServer: poll tasks → execute activities/workflows
note right of TWorker: MCP context prepared before worker.run()
sequenceDiagram
autonumber
participant Runner as run_basic_workflow.py
participant TClient as Temporal Client
participant Wf as BasicWorkflow
participant Agent as Agent
participant LLM as OpenAIAugmentedLLM
Runner->>TClient: execute_workflow(BasicWorkflow.run, prompt, id, task_queue)
TClient->>Wf: start workflow
Wf->>Agent: create("finder", servers)
Wf->>LLM: attach OpenAIAugmentedLLM
Wf->>LLM: generate_str(prompt)
LLM-->>Wf: text result
Wf-->>TClient: complete with result
TClient-->>Runner: return result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🧪 Early access (Sonnet 4.5): enabledWe are currently testing the Sonnet 4.5 model, which is expected to improve code review quality. However, this model may lead to increased noise levels in the review comments. Please disable the early access features if the noise level causes any inconvenience. Note:
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (11)
examples/temporal_plugin/.gitignore (3)
1-5
: Harden secret ignores (catch more variants)Broaden patterns so JSON/YAML/YML or nested secret files don’t slip in.
Apply this diff:
# MCP-Agent mcp_agent.secrets.yaml *.secrets.yaml .mcp-agent/ +*.secrets.json +*.secrets.* # catch .yaml/.yml/.json variants +.secrets/ # if any folder-based secrets are used
6-31
: Add common Python/tooling caches (coverage, ruff, nox, PEP 582)These are frequently created locally and shouldn’t be committed.
Apply this diff:
# Python __pycache__/ *.py[cod] *$py.class *.so .Python build/ develop-eggs/ dist/ downloads/ eggs/ .eggs/ lib/ lib64/ parts/ sdist/ var/ wheels/ share/python-wheels/ *.egg-info/ .installed.cfg *.egg MANIFEST pip-log.txt pip-delete-this-directory.txt +__pypackages__/ +.cache/ # Testing .pytest_cache/ .coverage htmlcov/ .tox/ .hypothesis/ +.nox/ +.coverage.* +coverage.xml # mypy .mypy_cache/ .dmypy.json dmypy.json + +# Ruff +.ruff_cache/ + +# Pytype +.pytype/Also applies to: 70-76, 83-90
32-40
: Unify .env handling and support direnvYou already ignore “.env” (covers file/dir). Add common variants and direnv.
Apply this diff:
# Local environment variables -.env.local -.env.*.local +.env +.env.* +.env.local +.env.*.local +.envrc +.direnv/Also applies to: 91-93
examples/temporal_plugin/basic_workflow.py (1)
10-14
: Optional: reuse agent definitions from config.Consider constructing the agent from
agents.definitions
inmcp_agent.config.yaml
to keep behavior/config in one place.examples/temporal_plugin/README.md (2)
5-17
: Add install step for Temporal extras.Readers need the optional dependency installed; otherwise
temporalio
and the plugin won’t be available.
- Add before “Prerequisites”:
uv pip install -e ".[temporal]" # or pip install -e ".[temporal]"
120-129
: Fix markdownlint: specify language for the file tree code block.Annotate the fence with
text
to satisfy MD040.-``` +```text temporal_plugin/ ├── basic_workflow.py # Workflow definition ...</blockquote></details> <details> <summary>examples/temporal_plugin/mcp_agent.config.yaml (2)</summary><blockquote> `5-7`: **Broken/incorrect reference path.** The comment points to `examples/temporal/README.md` which doesn’t match this PR’s `examples/temporal_plugin`. Update to reduce confusion. ```diff -# For temporal mode, see: https://github.com/lastmile-ai/mcp-agent/blob/main/examples/temporal/README.md +# For temporal mode, see: https://github.com/lastmile-ai/mcp-agent/blob/main/examples/temporal_plugin/README.md
3-3
: Use canonical raw URL for $schemaThe current URL resolves, but use the canonical form (without "refs/heads") for tooling compatibility: https://raw.githubusercontent.com/lastmile-ai/mcp-agent/main/schema/mcp-agent.config.schema.json
File: examples/temporal_plugin/mcp_agent.config.yaml line 3
examples/temporal_plugin/run_basic_workflow.py (1)
25-31
: Optional: add a run timeout to avoid hanging if no worker is available.Helps fail fast when the worker isn’t running.
from temporalio.client import Client +from datetime import timedelta ... result = await client.execute_workflow( BasicWorkflow.run, "Print the first 2 paragraphs of https://modelcontextprotocol.io/introduction", id=workflow_id, task_queue=task_queue, + run_timeout=timedelta(minutes=2), )src/mcp_agent/executor/temporal/plugin.py (2)
176-178
: Consider upgrading warning to exception for missing dependencies.The warning for missing context and app may not be sufficient to prevent runtime issues. Consider whether this should be an exception instead, as the plugin cannot function properly without these dependencies.
- else: - warnings.warn("No context and app - Activities not registered.") + else: + raise ValueError("Plugin requires both context and app to register activities")
242-253
: Consider making data converter replacement more explicit.The data converter replacement logic warns when replacing non-default converters but always returns the Pydantic converter. Consider making this behavior more configurable or providing an option to preserve custom converters in specific scenarios.
The current implementation always replaces non-default converters. You might want to add a configuration option to allow preserving custom converters:
def _get_new_data_converter(self, converter: DataConverter | None, force_pydantic: bool = True) -> DataConverter: """Get or create a Pydantic data converter, warning if replacing a custom one.""" if not force_pydantic and converter and converter.payload_converter_class not in ( DefaultPayloadConverter, PydanticPayloadConverter, ): return converter # Preserve custom converter if explicitly requested if converter and converter.payload_converter_class not in ( DefaultPayloadConverter, PydanticPayloadConverter, ): warnings.warn( "A non-default Temporal data converter was provided but has been replaced " "with the Pydantic data converter for MCP Agent compatibility." ) return pydantic_data_converter
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lock
is excluded by!**/*.lock
📒 Files selected for processing (10)
examples/temporal_plugin/.gitignore
(1 hunks)examples/temporal_plugin/README.md
(1 hunks)examples/temporal_plugin/basic_workflow.py
(1 hunks)examples/temporal_plugin/main.py
(1 hunks)examples/temporal_plugin/mcp_agent.config.yaml
(1 hunks)examples/temporal_plugin/run_basic_workflow.py
(1 hunks)examples/temporal_plugin/run_worker.py
(1 hunks)examples/temporal_plugin/temporal_agent.py
(1 hunks)pyproject.toml
(1 hunks)src/mcp_agent/executor/temporal/plugin.py
(1 hunks)
🧰 Additional context used
🧠 Learnings (4)
📓 Common learnings
Learnt from: rholinshead
PR: lastmile-ai/mcp-agent#414
File: src/mcp_agent/logging/logger.py:18-19
Timestamp: 2025-09-05T14:31:48.139Z
Learning: In the mcp-agent logging module (src/mcp_agent/logging/logger.py), temporalio should be imported lazily with try/except ImportError to avoid making it a hard dependency. Use temporalio.workflow.in_workflow() instead of isinstance checks on internal classes like _WorkflowInstanceImpl.
📚 Learning: 2025-09-05T14:31:48.139Z
Learnt from: rholinshead
PR: lastmile-ai/mcp-agent#414
File: src/mcp_agent/logging/logger.py:18-19
Timestamp: 2025-09-05T14:31:48.139Z
Learning: In the mcp-agent logging module (src/mcp_agent/logging/logger.py), temporalio should be imported lazily with try/except ImportError to avoid making it a hard dependency. Use temporalio.workflow.in_workflow() instead of isinstance checks on internal classes like _WorkflowInstanceImpl.
Applied to files:
examples/temporal_plugin/README.md
examples/temporal_plugin/run_worker.py
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/utils/config.py : Configuration values such as quality_threshold, max_refinement_attempts, consolidation_interval, and evaluator_model_provider must be loaded from mcp_agent.config.yaml.
Applied to files:
examples/temporal_plugin/mcp_agent.config.yaml
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/**/*.py : Use mcp-agent's Agent abstraction for ALL LLM interactions, including quality evaluation, to ensure consistent tool access, logging, and error handling.
Applied to files:
src/mcp_agent/executor/temporal/plugin.py
examples/temporal_plugin/main.py
🧬 Code graph analysis (6)
examples/temporal_plugin/temporal_agent.py (2)
src/mcp_agent/agents/agent.py (2)
Agent
(62-1162)attach_llm
(157-193)src/mcp_agent/executor/temporal/plugin.py (1)
MCPAgentPlugin
(38-253)
examples/temporal_plugin/basic_workflow.py (2)
src/mcp_agent/workflows/llm/augmented_llm_openai.py (1)
OpenAIAugmentedLLM
(83-922)src/mcp_agent/agents/agent.py (1)
attach_llm
(157-193)
examples/temporal_plugin/run_basic_workflow.py (2)
examples/temporal_plugin/basic_workflow.py (2)
BasicWorkflow
(7-19)run
(9-19)src/mcp_agent/executor/temporal/plugin.py (1)
MCPAgentPlugin
(38-253)
examples/temporal_plugin/run_worker.py (6)
examples/temporal_plugin/temporal_agent.py (3)
BasicWorkflow
(13-25)main
(28-52)run
(15-25)examples/temporal_plugin/basic_workflow.py (2)
BasicWorkflow
(7-19)run
(9-19)src/mcp_agent/executor/temporal/plugin.py (1)
MCPAgentPlugin
(38-253)examples/temporal_plugin/run_basic_workflow.py (1)
main
(9-31)src/mcp_agent/server/app_server.py (1)
app
(135-137)src/mcp_agent/app.py (1)
config
(168-169)
src/mcp_agent/executor/temporal/plugin.py (6)
src/mcp_agent/executor/temporal/interceptor.py (1)
ContextPropagationInterceptor
(58-93)src/mcp_agent/app.py (2)
logger
(204-221)config
(168-169)src/mcp_agent/logging/logger.py (1)
get_logger
(513-541)src/mcp_agent/agents/agent.py (8)
AgentTasks
(1286-1598)call_tool_task
(1489-1500)get_capabilities_task
(1526-1553)get_prompt_task
(1513-1524)initialize_aggregator_task
(1385-1436)list_prompts_task
(1502-1511)list_tools_task
(1478-1487)shutdown_aggregator_task
(1438-1476)src/mcp_agent/executor/temporal/system_activities.py (5)
SystemActivities
(16-103)forward_log
(20-38)request_user_input
(41-62)relay_notify
(65-89)relay_request
(92-103)src/mcp_agent/executor/task_registry.py (2)
list_activities
(33-34)get_activity
(25-28)
examples/temporal_plugin/main.py (1)
src/mcp_agent/server/app_server.py (1)
app
(135-137)
🪛 markdownlint-cli2 (0.18.1)
examples/temporal_plugin/README.md
120-120: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
🔇 Additional comments (8)
examples/temporal_plugin/.gitignore (1)
1-93
: Solid baseline .gitignore for the exampleCovers secrets, Python artifacts, venvs, editors, logs, and test caches. Nice.
examples/temporal_plugin/main.py (1)
1-4
: LGTM: simple, reusable app bootstrap.This module-level app makes the examples consistent and avoids duplicate setup.
examples/temporal_plugin/temporal_agent.py (1)
39-43
: Pass the plugin into Worker or confirm it implements the Worker plugin interface.Worker inherits Client plugins only if the plugin implements the worker Plugin interface; if it is client-only, construct the Worker with plugins=[plugin].
File: examples/temporal_plugin/temporal_agent.py Lines: 39-43
- async with Worker( + async with Worker( client, task_queue=running_app.config.temporal.task_queue, - workflows=[BasicWorkflow], + workflows=[BasicWorkflow], + plugins=[plugin], ):pyproject.toml (1)
39-39
: Pin Temporal dependency with an upper bound to avoid 2.x breakage.Confirm the minimum required version for your features; the opentelemetry extra is still correct (latest temporalio 1.18.0).
-temporal = [ - "temporalio[opentelemetry]>=1.17.0", -] +temporal = [ + "temporalio[opentelemetry]>=1.17,<2", +]src/mcp_agent/executor/temporal/plugin.py (4)
76-80
: LGTM! Plugin initialization follows standard pattern.The plugin initialization correctly stores the next plugin references according to the Temporal plugin chain pattern where plugins should call the next plugin and maintain the plugin chain.
82-105
: LGTM! Client configuration is well-structured.The client configuration correctly handles data converter replacement, adds interceptors conditionally based on tracing settings, and applies namespace configuration from the temporal config. The implementation follows the standard pattern for client plugins to intercept and modify client configuration.
107-127
: LGTM! Service client configuration applies all necessary settings.The service client configuration correctly applies all Temporal connection settings from the config, including host, namespace, API key, TLS, and RPC metadata merging. The implementation properly handles optional settings and merges metadata appropriately.
134-175
: Activity registration with app.workflow_task() is correct.Pattern matches the app.workflow_task implementation, GlobalWorkflowTaskRegistry registration, and TemporalExecutor usage — no changes required.
simple_agent = Agent( | ||
name="finder", | ||
instruction="You are a helpful agent", | ||
server_names=["fetch"], | ||
) | ||
|
||
async with simple_agent: | ||
llm = await simple_agent.attach_llm(OpenAIAugmentedLLM) | ||
result = await llm.generate_str(prompt) | ||
return result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Workflow determinism/I-O safety: ensure all LLM/network calls run as activities (not in workflow code).
Agent(...); async with simple_agent; llm.generate_str(...)
will perform I/O unless your executor reroutes to activities under execution_engine: temporal
. If not fully offloaded, this violates Temporal workflow rules and will fail nondeterminism/sandbox checks.
Please confirm your executor uses workflow.execute_activity
/local activities for model calls and MCP tool use when execution_engine=temporal
. If not, we should refactor the workflow to call explicit activities.
🌐 Web query:
Are network calls allowed inside Temporal Python workflows? What is the recommended pattern to perform external I/O (e.g., HTTP/LLM) from workflows?
💡 Result:
Short answer: No — Temporal Python Workflows must not perform network I/O directly. Use Activities (or Local Activities / child workflows / async Activities) to do external I/O such as HTTP calls or LLM requests. Workflows remain deterministic and only orchestrate; Activities execute non‑deterministic, blocking, or network work. [1][2][3]
Recommended pattern (concise):
- Keep network/HTTP/LLM calls in Activities registered with a Worker. Call them from the Workflow via workflow.execute_activity (or execute_local_activity for very low‑latency local work). Activities can be sync or async and run with their own executors/retries/heartbeats. [1][2][3]
Sources:
- Temporal Python docs — Workflow constraints (no network I/O). [1]
- Temporal Python SDK (Activities guidance). [2]
- Temporal blog: Local Activities / patterns to reduce latency. [3]
Critical: Move LLM/HTTP/network I/O out of the workflow — invoke via Activities
Temporal Python workflows must not perform network I/O. examples/temporal_plugin/basic_workflow.py (lines 10–19) calls simple_agent.attach_llm(...) and llm.generate_str(...) inside the workflow; this performs external I/O and will break Temporal determinism/sandbox checks. Refactor those LLM/HTTP calls into Activities (or execute_local_activity) and invoke them from the workflow using workflow.execute_activity / workflow.execute_local_activity. If an executor already transparently offloads these calls to Temporal Activities, point to that implementation; otherwise refactor.
🤖 Prompt for AI Agents
In examples/temporal_plugin/basic_workflow.py around lines 10 to 19, the
workflow is directly calling simple_agent.attach_llm(...) and
llm.generate_str(...), which perform network/HTTP I/O and must not run inside a
Temporal workflow; refactor by extracting the LLM attach and generation logic
into a separate Activity function (or local activity) that performs the
OpenAI/HTTP calls, register that Activity, and from the workflow replace the
direct calls with workflow.execute_activity(...) or
workflow.execute_local_activity(...) to invoke the Activity (or, if your
codebase already provides an executor that transparently runs these operations
as Activities, update the workflow to call that executor API and add a reference
to the executor implementation instead of in-workflow I/O).
worker = Worker( | ||
client, | ||
task_queue=running_app.config.temporal.task_queue, | ||
workflows=[BasicWorkflow], | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Worker isn’t receiving the plugin.
Same as above: pass the plugin to ensure activities, interceptors, and sandbox settings are applied.
Apply:
- worker = Worker(
+ worker = Worker(
client,
task_queue=running_app.config.temporal.task_queue,
- workflows=[BasicWorkflow],
+ workflows=[BasicWorkflow],
+ plugins=[plugin],
)
Run to find any remaining Workers without plugins=
:
🏁 Script executed:
#!/bin/bash
# Find Worker(...) calls missing plugins=
rg -nP -C2 '\bWorker\s*\(\s*client\b(?![^)]*plugins\s*=)' --type=py
Length of output: 83
🏁 Script executed:
#!/bin/bash
set -e
python3 - <<'PY'
import ast, os, sys
def iter_py_files():
for root, dirs, files in os.walk('.'):
dirs[:] = [d for d in dirs if d not in {'.git','__pycache__','venv','env','node_modules','build','dist','.venv'}]
for fname in files:
if fname.endswith('.py'):
yield os.path.join(root, fname)
missing = []
for path in iter_py_files():
try:
src = open(path, 'r', encoding='utf-8').read()
except Exception as e:
print(f"ERROR reading {path}: {e}", file=sys.stderr)
continue
try:
tree = ast.parse(src, filename=path)
except Exception as e:
print(f"SKIP parse {path}: {e}", file=sys.stderr)
continue
class Finder(ast.NodeVisitor):
def visit_Call(self, node):
func = node.func
name = None
if isinstance(func, ast.Name):
name = func.id
elif isinstance(func, ast.Attribute):
name = func.attr
if name == 'Worker':
has_plugins = any(isinstance(k, ast.keyword) and k.arg == 'plugins' for k in node.keywords)
if not has_plugins:
lines = src.splitlines()
start = max(0, node.lineno - 4)
end = min(len(lines), node.lineno + 3)
snippet = "\n".join(lines[start:end])
missing.append((path, node.lineno, snippet))
self.generic_visit(node)
Finder().visit(tree)
if missing:
for p, ln, sn in missing:
print(f"FOUND: {p}:{ln}")
print("----")
print(sn)
print("----")
sys.exit(1)
else:
print("OK: no Worker(...) calls missing 'plugins='")
PY
Length of output: 863
Add plugin to Worker instantiations
Worker calls are missing plugins=[plugin]; add it so activities, interceptors, and sandbox settings are applied.
- examples/temporal_plugin/run_worker.py:19 — apply the patch below.
- worker = Worker(
+ worker = Worker(
client,
task_queue=running_app.config.temporal.task_queue,
- workflows=[BasicWorkflow],
+ workflows=[BasicWorkflow],
+ plugins=[plugin],
)
- examples/temporal_plugin/temporal_agent.py:39 — async with Worker(...) is missing plugins=[plugin]; add plugins=[plugin] to the call.
- src/mcp_agent/executor/temporal/init.py:540 — Worker(...) is missing plugins=[plugin]; add plugins=[plugin] to the call.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
worker = Worker( | |
client, | |
task_queue=running_app.config.temporal.task_queue, | |
workflows=[BasicWorkflow], | |
) | |
worker = Worker( | |
client, | |
task_queue=running_app.config.temporal.task_queue, | |
workflows=[BasicWorkflow], | |
plugins=[plugin], | |
) |
# Configure workflow sandbox to allow MCP Agent modules | ||
from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner | ||
from dataclasses import replace | ||
|
||
runner = config.get("workflow_runner") | ||
if isinstance(runner, SandboxedWorkflowRunner): | ||
# Disable most restrictions for MCP Agent workflows | ||
# This is necessary because MCP Agent code uses many libraries that aren't workflow-safe by default | ||
config["workflow_runner"] = replace( | ||
runner, | ||
restrictions=runner.restrictions.with_passthrough_modules( | ||
"mcp_agent", | ||
"mcp", | ||
"rich", | ||
"logging", | ||
"opentelemetry", | ||
"httpx", | ||
"aiohttp", | ||
"attrs", | ||
"numpy", | ||
"pydantic", | ||
), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Verify sandbox restrictions are appropriate for production use.
The workflow sandbox restrictions are disabled for multiple modules including mcp_agent
, mcp
, httpx
, aiohttp
, etc. While this enables functionality, it may compromise the deterministic execution guarantees that Temporal workflows require.
Let me check if this broad passthrough configuration is aligned with Temporal best practices:
🌐 Web query:
Temporal Python workflow sandbox restrictions passthrough modules best practices
💡 Result:
Here are concise best practices for using Temporal Python workflow sandbox passthrough modules and restrictions:
-
Only passthrough truly deterministic, side‑effect‑free modules (standard library is passed by default). Prefer passing through heavy serialization/typing libs (e.g., pydantic/dataclasses) used by workflows/activities to avoid repeated reloads. [1][2][3]
-
Prefer whitelisting specific modules via SandboxRestrictions.with_passthrough_modules(...) or the imports_passed_through() context manager in the workflow file rather than enabling passthrough_all_modules. passthrough_all_modules disables per-run reloads and is unsafe unless you are certain no code is non‑deterministic. [1][2]
-
Use invalid_module_members (restricted member-level checks) instead of invalid_modules when possible so unused modules can still be imported but dangerous members are blocked. Keep the default invalid_module_members set unless you have a reason to relax it. [0][1]
-
Keep workflow files minimal (few non‑stdlib imports). Define activities, models, and heavy imports in separate modules you mark as passthrough to reduce per-run CPU and memory overhead. Tune worker cache/concurrency if memory grows. [2][3]
-
Avoid passthrough for modules that mutate global state, perform I/O on import, use process-global singletons, or rely on non‑deterministic behavior (random, time, system calls) unless those calls are explicitly restricted. If you need non‑sandboxed behavior, prefer scoped options: sandbox_unrestricted() for a block, @workflow.defn(sandboxed=False) for a workflow, or UnsandboxedWorkflowRunner for a worker—use sparingly. [1][2]
-
If you must passthrough a module but want safer behavior, disable lazy sys.modules passthrough (disable_lazy_sys_module_passthrough) so passthrough modules must be explicitly imported (reduces accidental passthrough). [0]
Sources:
- Temporal Python SDK docs — sandbox overview, passthrough modules, imports_passed_through, skipping sandbox. [1]
- temporalio.worker.workflow_sandbox.SandboxRestrictions API (fields: passthrough_modules, passthrough_all_modules, invalid_module_members, disable_lazy_sys_module_passthrough). [0]
- Temporal SDK (README/guide) and PyPI packaging notes on passthrough/performance recommendations. [2][3]
References:
[0] temporalio.worker.workflow_sandbox.SandboxRestrictions docs.
[1] Temporal docs — Python SDK sandbox (docs.temporal.io).
[2] Temporal Python SDK repo / README (github.com/temporalio/sdk-python).
[3] temporalio PyPI package description.
Restrict sandbox passthrough — tighten the whitelist in plugin.py
Disabling sandbox protections for many modules (mcp_agent, mcp, httpx, aiohttp, numpy, pydantic, etc.) undermines Temporal workflow determinism and is unsafe for production.
Location: src/mcp_agent/executor/temporal/plugin.py lines 196–218.
- Remove network/I/O libraries (httpx, aiohttp) and any modules that perform I/O or mutate global state on import from the passthrough list.
- Limit passthrough to truly deterministic, side‑effect‑free modules (e.g., small serialization/typing libs when proven safe); narrow the list to the minimum required.
- Prefer imports_passed_through(...) or SandboxRestrictions.with_passthrough_modules with an explicit minimal set; prefer invalid_module_members to block dangerous members rather than whole modules.
- Enable disable_lazy_sys_module_passthrough and move heavy/non-workflow code into separate modules marked explicitly passthrough.
- If non‑sandboxed behavior is necessary for specific workflows, use @workflow.defn(sandboxed=False), sandbox_unrestricted() blocks, or an UnsandboxedWorkflowRunner scoped to those workflows instead of worker-wide passthrough.
- Action: update the replace(...) call to a minimal whitelist, remove httpx/aiohttp, and add deterministic replay tests to validate correctness.
🤖 Prompt for AI Agents
In src/mcp_agent/executor/temporal/plugin.py around lines 196–218 the workflow
sandbox passthrough list is overly permissive (includes network/I/O and other
non‑deterministic modules); tighten this whitelist by removing httpx, aiohttp
and any other modules that perform I/O or mutate global state, replace the broad
list with a minimal set of proven deterministic, side‑effect‑free modules only
(or use imports_passed_through()/invalid_module_members to expose only safe
members), and instead mark heavy or non‑replayable code as explicitly
passthrough or run those workflows with
sandboxed=False/UnsandboxedWorkflowRunner; update the replace(...) call to use
the minimal whitelist and add deterministic replay tests to validate behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (3)
examples/temporal_plugin/graded_report.md (1)
3-3
: Address Markdown heading style issues identified by linter.The static analysis tool identifies trailing punctuation in headings, which affects document formatting standards.
Apply this diff to remove trailing colons from headings:
-### Proofreading Feedback: +### Proofreading Feedback -### Factual Consistency & Logical Coherence: +### Factual Consistency & Logical Coherence -### APA Style Adherence: +### APA Style Adherence -### Suggestions: +### SuggestionsAlso applies to: 18-18, 27-27, 32-32
examples/temporal_plugin/parallel_agent.py (1)
13-33
: Consider extracting the story content to reduce workflow memory overhead.The Temporal SDK documentation notes that non-standard-library imports and large data structures in workflow files add CPU and memory overhead for each workflow run. The large string constant directly embedded in the workflow file will be cached for each workflow execution.
Consider moving the story to an external file and reading it as needed:
-SHORT_STORY = """ -The Battle of Glimmerwood -... -""" +# Move to a separate file, e.g., story_data.md +# and read it in the workflow or pass as parameterAlternatively, pass the story as a parameter to the workflow rather than defining it as a module constant.
examples/temporal_plugin/orchestrator.py (1)
105-109
: Hardcoded task instructions may limit reusability.The task instructions are hardcoded with specific file references (
short_story.md
,graded_report.md
) and URL, which reduces the workflow's reusability.Consider parameterizing the task details:
- task = """Load the student's short story from short_story.md, - and generate a report with feedback across proofreading, - factuality/logical consistency and style adherence. Use the style rules from - https://owl.purdue.edu/owl/research_and_citation/apa_style/apa_formatting_and_style_guide/general_format.html. - Write the graded report to graded_report.md as soon as you complete your task. Don't take too many steps.""" + # Consider making this configurable or parameterized + task = f"""Load the student's short story from {input_file}, + and generate a report with feedback across proofreading, + factuality/logical consistency and style adherence. Use the style rules from {style_guide_url}. + Write the graded report to {output_file} as soon as you complete your task."""
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
examples/temporal_plugin/graded_report.md
(1 hunks)examples/temporal_plugin/orchestrator.py
(1 hunks)examples/temporal_plugin/parallel_agent.py
(1 hunks)examples/temporal_plugin/short_story.md
(1 hunks)
✅ Files skipped from review due to trivial changes (1)
- examples/temporal_plugin/short_story.md
🧰 Additional context used
🧠 Learnings (1)
📓 Common learnings
Learnt from: rholinshead
PR: lastmile-ai/mcp-agent#414
File: src/mcp_agent/logging/logger.py:18-19
Timestamp: 2025-09-05T14:31:48.139Z
Learning: In the mcp-agent logging module (src/mcp_agent/logging/logger.py), temporalio should be imported lazily with try/except ImportError to avoid making it a hard dependency. Use temporalio.workflow.in_workflow() instead of isinstance checks on internal classes like _WorkflowInstanceImpl.
🧬 Code graph analysis (2)
examples/temporal_plugin/parallel_agent.py (5)
src/mcp_agent/core/context.py (1)
get_current_context
(270-300)src/mcp_agent/workflows/llm/augmented_llm_openai.py (1)
OpenAIAugmentedLLM
(83-922)src/mcp_agent/agents/agent.py (1)
Agent
(62-1162)src/mcp_agent/executor/temporal/plugin.py (1)
MCPAgentPlugin
(38-253)src/mcp_agent/workflows/parallel/parallel_llm.py (1)
ParallelLLM
(25-282)
examples/temporal_plugin/orchestrator.py (6)
src/mcp_agent/core/context.py (2)
get_current_context
(270-300)mcp
(102-103)src/mcp_agent/workflows/llm/augmented_llm.py (1)
RequestParams
(126-196)src/mcp_agent/workflows/llm/augmented_llm_openai.py (1)
OpenAIAugmentedLLM
(83-922)src/mcp_agent/agents/agent.py (1)
Agent
(62-1162)src/mcp_agent/executor/temporal/plugin.py (1)
MCPAgentPlugin
(38-253)src/mcp_agent/workflows/orchestrator/orchestrator.py (1)
Orchestrator
(116-716)
🪛 markdownlint-cli2 (0.18.1)
examples/temporal_plugin/graded_report.md
3-3: Trailing punctuation in heading
Punctuation: ':'
(MD026, no-trailing-punctuation)
18-18: Trailing punctuation in heading
Punctuation: ':'
(MD026, no-trailing-punctuation)
27-27: Trailing punctuation in heading
Punctuation: ':'
(MD026, no-trailing-punctuation)
32-32: Trailing punctuation in heading
Punctuation: ':'
(MD026, no-trailing-punctuation)
🔇 Additional comments (3)
examples/temporal_plugin/graded_report.md (1)
1-36
: Example document demonstrates temporal plugin workflow output.This appears to be the generated output from the temporal plugin workflow that evaluates the short story, providing structured feedback across grammar, factual consistency, and style adherence as requested. The content aligns well with the orchestrated agent approach shown in the accompanying Python files.
examples/temporal_plugin/parallel_agent.py (1)
39-85
: LGTM! Clean parallel agent orchestration pattern.The workflow correctly uses the ParallelLLM pattern to orchestrate multiple agents (proofreader, fact_checker, style_enforcer) with the grader as the fan-in agent. The implementation follows best practices for Temporal workflows and properly integrates with the MCP Agent framework.
examples/temporal_plugin/orchestrator.py (1)
18-84
: Well-structured orchestrator setup with comprehensive agent roles.The workflow demonstrates a good use of the Orchestrator pattern with clearly defined agent roles:
finder_agent
: Filesystem and URL access with appropriate server nameswriter_agent
: Filesystem writing capabilityproofreader
,fact_checker
,style_enforcer
: Analysis agents with fetch server accessThe RequestParams configuration with model="gpt-4o" and max_iterations=100 provides reasonable limits for the orchestration workflow.
running_app.context.config.mcp.servers["filesystem"].args.extend( | ||
[os.getcwd()] | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Runtime filesystem configuration modification raises concerns.
Modifying the filesystem server configuration at runtime by extending args with os.getcwd()
could lead to unexpected behavior if the current working directory changes or if multiple workflows run concurrently.
Consider these alternatives:
- Configure filesystem path in config file: Add the working directory to the MCP configuration file
- Pass path as workflow parameter: Include the working directory as a parameter to the workflow
- Use absolute paths: Explicitly set the filesystem server to use specific absolute paths
- running_app.context.config.mcp.servers["filesystem"].args.extend(
- [os.getcwd()]
- )
+ # Move this to configuration or pass as parameter
Committable suggestion skipped: line range outside the PR's diff.
🤖 Prompt for AI Agents
In examples/temporal_plugin/orchestrator.py around lines 101-103, avoid mutating
running_app.context.config.mcp.servers["filesystem"].args at runtime; remove the
extend(os.getcwd()) call and instead supply the filesystem path explicitly
before app startup or via workflow parameters: either (a) add the desired
absolute path to the MCP configuration file so the server is configured before
the app starts, or (b) change the workflow invocation to accept a
filesystem_path argument and pass os.getcwd() (or another absolute path) into
the workflow, ensuring all code uses that provided path rather than mutating
shared runtime config.
async def main(): | ||
async with app.run() as running_app: | ||
plugin = MCPAgentPlugin(running_app) | ||
|
||
client = await Client.connect( | ||
running_app.config.temporal.host, | ||
plugins=[plugin], | ||
) | ||
|
||
async with Worker( | ||
client, | ||
task_queue=running_app.config.temporal.task_queue, | ||
workflows=[BasicWorkflow], | ||
): | ||
output = await client.execute_workflow( | ||
BasicWorkflow.run, | ||
args=[SHORT_STORY], | ||
id=f"basic-workflow-{uuid4()}", | ||
task_queue=running_app.config.temporal.task_queue, | ||
) | ||
print(output) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Verify Temporal configuration and connection parameters.
The workflow execution uses configuration from running_app.config.temporal
but there's no validation that these settings are properly configured.
Verify that the temporal configuration exists and is valid:
🏁 Script executed:
#!/bin/bash
# Description: Check for temporal configuration files and verify structure
# Look for temporal configuration files
fd -t f "config" --exec grep -l "temporal" {} \;
# Check if temporal configuration is defined in the codebase
rg -nP -A5 -B5 "class.*TemporalSettings|temporal.*=" --type=py
Length of output: 31433
Validate Temporal config before accessing running_app.config.temporal
TemporalSettings is defined (src/mcp_agent/config.py) and the example config exists (examples/temporal_plugin/mcp_agent.config.yaml), but the top-level config declares temporal: TemporalSettings | None = None. The example (examples/temporal_plugin/parallel_agent.py — lines 88–108) accesses running_app.config.temporal.host/task_queue without a None-check — add an explicit validation or fail-fast error before using these fields.
🤖 Prompt for AI Agents
In examples/temporal_plugin/parallel_agent.py around lines 88–108, the code uses
running_app.config.temporal.host and .task_queue without checking that
running_app.config.temporal is not None; add an explicit validation before using
these fields (e.g., if running_app.config.temporal is None: raise a clear
RuntimeError/ValueError with instructions to configure Temporal) or use an
assertion/guard to fail fast, then proceed to call Client.connect/Worker using
the validated temporal settings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 7
🧹 Nitpick comments (5)
examples/temporal_plugin/router.py (3)
64-71
: Honor app config in LLM by wiring contextPass context so default model/params come from config; current instance ignores it.
- llm = OpenAIAugmentedLLM(name="openai_router", instruction="You are a router") + llm = OpenAIAugmentedLLM( + name="openai_router", + instruction="You are a router", + context=context, + )
73-82
: Guard against empty routing resultsIndexing results[0] will raise if routing returns nothing.
- results = await router.route_to_agent( + results = await router.route_to_agent( request="Print the contents of mcp_agent.config.yaml verbatim", top_k=1 ) - - logger.info("Router Results:", data=results) - - # We can use the agent returned by the router - agent = results[0].result + logger.info("Router Results:", data=results) + if not results: + raise RuntimeError("Router returned no agent for the request") + # We can use the agent returned by the router + agent = results[0].result
86-93
: Redundant unsafe blockWorkflow is already declared sandboxed=False; this unrestricted block is unnecessary. If you keep workflow sandboxed, keep the block; otherwise remove it for clarity.
- with workflow.unsafe.sandbox_unrestricted(): - config_path = str(os.path.join(os.getcwd(), "mcp_agent.config.yaml")) + config_path = str(os.path.join(os.getcwd(), "mcp_agent.config.yaml"))examples/temporal_plugin/evaluator_optimizer.py (1)
2-2
: Fix pipeline: remove unused importRuff F401 flagged ‘os’ as unused.
-import os
src/mcp_agent/executor/temporal/plugin.py (1)
176-178
: Improve warning with stacklevel for easier debugging- warnings.warn("No context and app - Activities not registered.") + warnings.warn( + "No context and app - Activities not registered.", stacklevel=2 + )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
examples/temporal_plugin/evaluator_optimizer.py
(1 hunks)examples/temporal_plugin/router.py
(1 hunks)src/mcp_agent/executor/temporal/plugin.py
(1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-09-05T14:31:48.139Z
Learnt from: rholinshead
PR: lastmile-ai/mcp-agent#414
File: src/mcp_agent/logging/logger.py:18-19
Timestamp: 2025-09-05T14:31:48.139Z
Learning: In the mcp-agent logging module (src/mcp_agent/logging/logger.py), temporalio should be imported lazily with try/except ImportError to avoid making it a hard dependency. Use temporalio.workflow.in_workflow() instead of isinstance checks on internal classes like _WorkflowInstanceImpl.
Applied to files:
src/mcp_agent/executor/temporal/plugin.py
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/**/*.py : Use mcp-agent's Agent abstraction for ALL LLM interactions, including quality evaluation, to ensure consistent tool access, logging, and error handling.
Applied to files:
src/mcp_agent/executor/temporal/plugin.py
🧬 Code graph analysis (3)
examples/temporal_plugin/router.py (5)
src/mcp_agent/core/context.py (2)
get_current_context
(270-300)mcp
(102-103)src/mcp_agent/agents/agent.py (1)
Agent
(62-1162)src/mcp_agent/executor/temporal/plugin.py (1)
MCPAgentPlugin
(38-255)src/mcp_agent/workflows/router/router_llm.py (1)
LLMRouter
(96-570)src/mcp_agent/workflows/router/router_llm_anthropic.py (1)
AnthropicLLMRouter
(12-69)
examples/temporal_plugin/evaluator_optimizer.py (5)
src/mcp_agent/core/context.py (1)
get_current_context
(270-300)src/mcp_agent/workflows/llm/augmented_llm.py (1)
RequestParams
(126-196)src/mcp_agent/workflows/llm/augmented_llm_openai.py (1)
OpenAIAugmentedLLM
(83-922)src/mcp_agent/executor/temporal/plugin.py (1)
MCPAgentPlugin
(38-255)examples/temporal_plugin/router.py (3)
BasicWorkflow
(33-128)run
(35-128)main
(131-154)
src/mcp_agent/executor/temporal/plugin.py (6)
src/mcp_agent/executor/temporal/interceptor.py (1)
ContextPropagationInterceptor
(58-93)src/mcp_agent/app.py (2)
logger
(204-221)config
(168-169)src/mcp_agent/logging/logger.py (1)
get_logger
(513-541)src/mcp_agent/agents/agent.py (8)
AgentTasks
(1286-1598)call_tool_task
(1489-1500)get_capabilities_task
(1526-1553)get_prompt_task
(1513-1524)initialize_aggregator_task
(1385-1436)list_prompts_task
(1502-1511)list_tools_task
(1478-1487)shutdown_aggregator_task
(1438-1476)src/mcp_agent/executor/temporal/system_activities.py (5)
SystemActivities
(16-103)forward_log
(20-38)request_user_input
(41-62)relay_notify
(65-89)relay_request
(92-103)src/mcp_agent/executor/task_registry.py (2)
list_activities
(33-34)get_activity
(25-28)
🪛 GitHub Actions: Pull Request Checks
examples/temporal_plugin/evaluator_optimizer.py
[error] 2-2: F401: 'os' imported but unused. Remove unused import. Found 1 error. Command 'ruff check' exited with non-zero status.
🔇 Additional comments (4)
examples/temporal_plugin/router.py (2)
103-111
: Workflow side-effects (print) are non-deterministicCalling print from a workflow function is a side-effect and may replay unpredictably. Prefer logging via a workflow activity or just log the intent.
32-35
: Unsandboxed workflow — acceptable for examples, risky for prod@sandboxed=False disables deterministic guards. If this example is used beyond demos, prefer sandboxed workflows and move I/O into activities.
examples/temporal_plugin/evaluator_optimizer.py (1)
63-65
: Tight constraint may cause extra iterationsmin_rating=EXCELLENT could loop until max_iterations; confirm desired behavior given default max_iterations.
src/mcp_agent/executor/temporal/plugin.py (1)
200-220
: Sandbox passthrough list is overly permissive (network/I/O modules)Including httpx/httpcore/aiohttp/sniffio undermines workflow determinism. Narrow the allowlist.
- restrictions=runner.restrictions.with_passthrough_modules( + restrictions=runner.restrictions.with_passthrough_modules( "mcp_agent", "mcp", "rich", "logging", "opentelemetry", - "httpx", - "httpcore", - "sniffio", - "aiohttp", "attrs", "numpy", "pydantic", ),
logger.info("Current config:", data=context.config.model_dump()) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid dumping full config to logs (secrets risk)
context.config.model_dump() may include API keys. Don’t log entire config.
- logger.info("Current config:", data=context.config.model_dump())
+ logger.info("Config initialized")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
logger.info("Current config:", data=context.config.model_dump()) | |
logger.info("Config initialized") |
🤖 Prompt for AI Agents
In examples/temporal_plugin/evaluator_optimizer.py around lines 28-29, logging
the full config via context.config.model_dump() risks exposing secrets (API
keys); remove the full dump and instead log only non-sensitive metadata or a
redacted/config-summary view. Replace the call with either explicit safe fields
(e.g., build a dict of allowed keys and log that), or use model_dump with an
explicit exclude list of secret fields (or an include list of safe fields), and
ensure any secret-like keys (api_key, token, password, secret, etc.) are omitted
or masked before logging.
async with Worker( | ||
client, | ||
task_queue=running_app.config.temporal.task_queue, | ||
workflows=[BasicWorkflow], | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worker plugin not attached — activities won’t be registered
Same issue as in router example; attach the plugin to Worker.
- async with Worker(
- client,
- task_queue=running_app.config.temporal.task_queue,
- workflows=[BasicWorkflow],
- ):
+ async with Worker(
+ client,
+ task_queue=running_app.config.temporal.task_queue,
+ workflows=[BasicWorkflow],
+ plugins=[plugin],
+ ):
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async with Worker( | |
client, | |
task_queue=running_app.config.temporal.task_queue, | |
workflows=[BasicWorkflow], | |
): | |
async with Worker( | |
client, | |
task_queue=running_app.config.temporal.task_queue, | |
workflows=[BasicWorkflow], | |
plugins=[plugin], | |
): |
🤖 Prompt for AI Agents
In examples/temporal_plugin/evaluator_optimizer.py around lines 84 to 88, the
Worker is created without attaching the plugin so activities are not registered;
update the Worker invocation to include the plugin (e.g., pass
plugins=[your_plugin_instance] or plugins=[TemporalPlugin(...)] as appropriate
for this project) so the plugin is attached and activities get registered when
the Worker starts.
result = await agent.call_tool( | ||
name="read_file", | ||
arguments={"path": config_path}, | ||
) | ||
logger.info("read_file result:", data=result.model_dump()) | ||
|
||
# We can also use a router already configured with a particular LLM |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Return type mismatch: run() -> str but returning a Pydantic object
Return a string (e.g., JSON) and avoid reusing the generic “result” name.
- result = await agent.call_tool(
+ read_result = await agent.call_tool(
name="read_file",
arguments={"path": config_path},
)
- logger.info("read_file result:", data=result.model_dump())
+ logger.info("read_file result:", data=read_result.model_dump())
...
- return result
+ return read_result.model_dump_json(indent=2)
Also applies to: 128-128
🤖 Prompt for AI Agents
In examples/temporal_plugin/router.py around lines 89-95 (and also at 128), the
run() signature promises to return a str but the code logs/returns a Pydantic
model object named result; change handling to produce and return a string: after
agent.call_tool, rename the variable to something specific (e.g.,
read_file_response) and serialize the Pydantic object to JSON (use
.model_dump_json() or .json()) for logging and to satisfy the str return type,
then return that JSON string instead of the raw model.
async with Worker( | ||
client, | ||
task_queue=running_app.config.temporal.task_queue, | ||
workflows=[BasicWorkflow], | ||
): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worker plugin not attached — activities won’t be registered
Without passing the plugin to Worker, MCP Agent activities (list_tools, call_tool, etc.) won’t be available; workflow calls will fail at runtime.
Apply:
- async with Worker(
- client,
- task_queue=running_app.config.temporal.task_queue,
- workflows=[BasicWorkflow],
- ):
+ async with Worker(
+ client,
+ task_queue=running_app.config.temporal.task_queue,
+ workflows=[BasicWorkflow],
+ plugins=[plugin],
+ ):
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async with Worker( | |
client, | |
task_queue=running_app.config.temporal.task_queue, | |
workflows=[BasicWorkflow], | |
): | |
async with Worker( | |
client, | |
task_queue=running_app.config.temporal.task_queue, | |
workflows=[BasicWorkflow], | |
plugins=[plugin], | |
): |
🤖 Prompt for AI Agents
In examples/temporal_plugin/router.py around lines 140-144, the Worker is
created without the MCP plugin so MCP Agent activities (list_tools, call_tool,
etc.) are never registered; import or construct the MCP plugin instance (e.g.,
mcp_plugin = MCPPlugin(...) or obtain it from running_app.plugins) and pass it
into the Worker via the plugins parameter (plugins=[mcp_plugin]) when calling
Worker so the activities are registered and available to workflows.
from temporalio.contrib.opentelemetry import TracingInterceptor | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make TracingInterceptor optional and add to worker when enabled
Importing TracingInterceptor unconditionally makes opentelemetry a hard dep and it’s only added on client, not worker.
-from temporalio.contrib.opentelemetry import TracingInterceptor
+try:
+ from temporalio.contrib.opentelemetry import TracingInterceptor # type: ignore
+except Exception: # pragma: no cover
+ TracingInterceptor = None # type: ignore
@@
- if self.context and getattr(self.context, "tracing_enabled", False):
- interceptors.append(TracingInterceptor())
+ if (
+ self.context
+ and getattr(self.context, "tracing_enabled", False)
+ and TracingInterceptor
+ ):
+ interceptors.append(TracingInterceptor())
@@
- config["interceptors"] = list(config.get("interceptors") or []) + [
- ContextPropagationInterceptor()
- ]
+ worker_interceptors = list(config.get("interceptors") or [])
+ if (
+ self.context
+ and getattr(self.context, "tracing_enabled", False)
+ and TracingInterceptor
+ ):
+ worker_interceptors.append(TracingInterceptor())
+ worker_interceptors.append(ContextPropagationInterceptor())
+ config["interceptors"] = worker_interceptors
Also applies to: 92-95, 181-185
🤖 Prompt for AI Agents
In src/mcp_agent/executor/temporal/plugin.py around lines 27-28 (and similarly
at 92-95 and 181-185): the TracingInterceptor is imported unconditionally which
forces opentelemetry as a hard dependency; change this to an optional import and
only attach the interceptor when tracing is enabled and the package is present.
Implement a try/except ImportError (or lazy import) that sets TracingInterceptor
to None if unavailable, and when building client/worker options check the
tracing-enabled flag and TracingInterceptor is not None before adding the
interceptor to the interceptors list; ensure code continues to work if
opentelemetry is absent and do not add it to the worker unless explicitly
enabled.
if converter and converter.payload_converter_class not in ( | ||
DefaultPayloadConverter, | ||
PydanticPayloadConverter, | ||
): | ||
warnings.warn( | ||
"A non-default Temporal data converter was provided but has been replaced " | ||
"with the Pydantic data converter for MCP Agent compatibility." | ||
) | ||
|
||
return pydantic_data_converter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
DataConverter introspection bug — attribute doesn’t exist
DataConverter doesn’t expose payload_converter_class; this will AttributeError when a custom converter is provided.
- if converter and converter.payload_converter_class not in (
- DefaultPayloadConverter,
- PydanticPayloadConverter,
- ):
+ pc = getattr(converter, "payload_converter", None)
+ if converter and pc and not isinstance(
+ pc, (DefaultPayloadConverter, PydanticPayloadConverter)
+ ):
warnings.warn(
"A non-default Temporal data converter was provided but has been replaced "
"with the Pydantic data converter for MCP Agent compatibility."
)
-
- return pydantic_data_converter
+ return pydantic_data_converter
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if converter and converter.payload_converter_class not in ( | |
DefaultPayloadConverter, | |
PydanticPayloadConverter, | |
): | |
warnings.warn( | |
"A non-default Temporal data converter was provided but has been replaced " | |
"with the Pydantic data converter for MCP Agent compatibility." | |
) | |
return pydantic_data_converter | |
pc = getattr(converter, "payload_converter", None) | |
if converter and pc and not isinstance( | |
pc, (DefaultPayloadConverter, PydanticPayloadConverter) | |
): | |
warnings.warn( | |
"A non-default Temporal data converter was provided but has been replaced " | |
"with the Pydantic data converter for MCP Agent compatibility." | |
) | |
return pydantic_data_converter |
🤖 Prompt for AI Agents
In src/mcp_agent/executor/temporal/plugin.py around lines 246 to 255, the code
accesses converter.payload_converter_class which doesn’t exist and will raise
AttributeError; change the check to detect non-default converters by class/type
instead (e.g., use if converter and not isinstance(converter,
(DefaultPayloadConverter, PydanticPayloadConverter)): or compare
converter.__class__ against those classes) and keep the warnings.warn and return
pydantic_data_converter as before.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (1)
examples/temporal_plugin/evaluator_optimizer.py (1)
105-111
: Consider adding timeouts to execute_workflow (example hardens demos).Prevents hangs and improves demos.
output = await client.execute_workflow( BasicWorkflow.run, task, id=f"basic-workflow-{uuid4()}", task_queue=running_app.config.temporal.task_queue, + run_timeout=60, # seconds )
If you adopt this, add a suitable import or keep the literal seconds. Verify SDK param name in your version.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
examples/temporal_plugin/evaluator_optimizer.py
(1 hunks)src/mcp_agent/executor/temporal/plugin.py
(1 hunks)src/mcp_agent/executor/workflow.py
(0 hunks)
💤 Files with no reviewable changes (1)
- src/mcp_agent/executor/workflow.py
🧰 Additional context used
🧠 Learnings (3)
📓 Common learnings
Learnt from: rholinshead
PR: lastmile-ai/mcp-agent#414
File: src/mcp_agent/logging/logger.py:18-19
Timestamp: 2025-09-05T14:31:48.139Z
Learning: In the mcp-agent logging module (src/mcp_agent/logging/logger.py), temporalio should be imported lazily with try/except ImportError to avoid making it a hard dependency. Use temporalio.workflow.in_workflow() instead of isinstance checks on internal classes like _WorkflowInstanceImpl.
📚 Learning: 2025-09-05T14:31:48.139Z
Learnt from: rholinshead
PR: lastmile-ai/mcp-agent#414
File: src/mcp_agent/logging/logger.py:18-19
Timestamp: 2025-09-05T14:31:48.139Z
Learning: In the mcp-agent logging module (src/mcp_agent/logging/logger.py), temporalio should be imported lazily with try/except ImportError to avoid making it a hard dependency. Use temporalio.workflow.in_workflow() instead of isinstance checks on internal classes like _WorkflowInstanceImpl.
Applied to files:
src/mcp_agent/executor/temporal/plugin.py
📚 Learning: 2025-07-22T18:59:49.368Z
Learnt from: CR
PR: lastmile-ai/mcp-agent#0
File: examples/usecases/reliable_conversation/CLAUDE.md:0-0
Timestamp: 2025-07-22T18:59:49.368Z
Learning: Applies to examples/usecases/reliable_conversation/examples/reliable_conversation/src/**/*.py : Use mcp-agent's Agent abstraction for ALL LLM interactions, including quality evaluation, to ensure consistent tool access, logging, and error handling.
Applied to files:
src/mcp_agent/executor/temporal/plugin.py
🧬 Code graph analysis (2)
src/mcp_agent/executor/temporal/plugin.py (8)
src/mcp_agent/executor/workflow.py (1)
executor
(118-123)src/mcp_agent/executor/temporal/interceptor.py (1)
ContextPropagationInterceptor
(58-93)src/mcp_agent/executor/temporal/session_proxy.py (1)
SessionProxy
(23-307)src/mcp_agent/app.py (2)
logger
(204-221)config
(168-169)src/mcp_agent/logging/logger.py (1)
get_logger
(513-541)src/mcp_agent/agents/agent.py (8)
AgentTasks
(1286-1598)call_tool_task
(1489-1500)get_capabilities_task
(1526-1553)get_prompt_task
(1513-1524)initialize_aggregator_task
(1385-1436)list_prompts_task
(1502-1511)list_tools_task
(1478-1487)shutdown_aggregator_task
(1438-1476)src/mcp_agent/executor/temporal/system_activities.py (5)
SystemActivities
(16-103)forward_log
(20-38)request_user_input
(41-62)relay_notify
(65-89)relay_request
(92-103)src/mcp_agent/executor/task_registry.py (2)
list_activities
(33-34)get_activity
(25-28)
examples/temporal_plugin/evaluator_optimizer.py (5)
src/mcp_agent/core/context.py (1)
get_current_context
(270-300)src/mcp_agent/workflows/evaluator_optimizer/evaluator_optimizer.py (2)
EvaluatorOptimizerLLM
(49-478)QualityRating
(25-31)src/mcp_agent/workflows/llm/augmented_llm.py (1)
RequestParams
(126-196)src/mcp_agent/workflows/llm/augmented_llm_openai.py (1)
OpenAIAugmentedLLM
(83-922)src/mcp_agent/executor/temporal/plugin.py (1)
MCPAgentPlugin
(39-270)
🔇 Additional comments (8)
examples/temporal_plugin/evaluator_optimizer.py (3)
27-28
: Don’t log full config (secrets risk).Remove the dump; keep a generic init message or a redacted summary.
- logger.info("Current config:", data=context.config.model_dump()) + logger.info("Config initialized")
66-69
: Bug: passing builtin input instead of the workflow arg.This sends the built-in function input, breaking generation. Use the prompt variable.
- result = await evaluator_optimizer.generate_str( - message=input, - request_params=RequestParams(model="gpt-4o"), - ) + result = await evaluator_optimizer.generate_str( + message=prompt, + request_params=RequestParams(model="gpt-4o"), + )
83-87
: Worker plugin not attached — activities won’t register.Attach the plugin to Worker so Agent/System activities are available.
async with Worker( client, task_queue=running_app.config.temporal.task_queue, workflows=[BasicWorkflow], + plugins=[plugin], ):
src/mcp_agent/executor/temporal/plugin.py (5)
27-28
: Make TracingInterceptor optional (don’t hard‑depend on opentelemetry).Optional import; proceed only when tracing is enabled and package exists.
-from temporalio.contrib.opentelemetry import TracingInterceptor +try: + from temporalio.contrib.opentelemetry import TracingInterceptor # type: ignore +except Exception: # pragma: no cover + TracingInterceptor = None # type: ignore
107-114
: Gate tracing on availability; keep context propagation.Avoid constructing TracingInterceptor when not installed.
- # Add tracing if enabled - if self.context and getattr(self.context, "tracing_enabled", False): - interceptors.append(TracingInterceptor()) + # Add tracing if enabled and available + if ( + self.context + and getattr(self.context, "tracing_enabled", False) + and TracingInterceptor + ): + interceptors.append(TracingInterceptor())
196-199
: Add tracing to worker interceptors when enabled (client-only isn’t enough).Attach TracingInterceptor (if present) on worker too; always add context propagation.
- # Add interceptors - config["interceptors"] = list(config.get("interceptors") or []) + [ - ContextPropagationInterceptor() - ] + # Add interceptors + worker_interceptors = list(config.get("interceptors") or []) + if ( + self.context + and getattr(self.context, "tracing_enabled", False) + and TracingInterceptor + ): + worker_interceptors.append(TracingInterceptor()) + worker_interceptors.append(ContextPropagationInterceptor()) + config["interceptors"] = worker_interceptors
211-235
: Sandbox passthrough is overly permissive; tighten for determinism.Remove I/O/network libs and keep a minimal whitelist.
- config["workflow_runner"] = replace( - runner, - restrictions=runner.restrictions.with_passthrough_modules( - "mcp_agent", - "mcp", - "rich", - "logging", - "opentelemetry", - "httpx", - "httpcore", - "sniffio", - "aiohttp", - "attrs", - "numpy", - "pydantic", - ), - ) + from dataclasses import replace as dc_replace + safe = runner.restrictions.with_passthrough_modules( + "mcp_agent", + "mcp", + "pydantic", + ) + config["workflow_runner"] = dc_replace( + runner, + restrictions=dc_replace( + safe, + disable_lazy_sys_module_passthrough=True, + ), + )If you truly need any removed modules, add narrowly and prove determinism with replay tests.
259-270
: DataConverter check uses non‑existent attribute; will AttributeError.Use payload_converter instance for type check before warning/replacing.
- if converter and converter.payload_converter_class not in ( - DefaultPayloadConverter, - PydanticPayloadConverter, - ): - warnings.warn( + pc = getattr(converter, "payload_converter", None) + if converter and pc and not isinstance( + pc, (DefaultPayloadConverter, PydanticPayloadConverter) + ): + warnings.warn( "A non-default Temporal data converter was provided but has been replaced " "with the Pydantic data converter for MCP Agent compatibility." ) return pydantic_data_converter
|
||
def configure_replayer(self, config: ReplayerConfig) -> ReplayerConfig: | ||
"""Configure the replayer with MCP Agent settings.""" | ||
config["data_converter"] = self._get_new_data_converter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe you need to add ContextPropagationInterceptor
here as well, and potentially workflow_runner
.
It might be worth setting up a test that runs Replayer
so that users who are doing replay tests can do them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the feedback! Updated configure_replayer
to configure workflow_runner, workflows and interceptors as well.
…rs in configure_replayer; register activities at init
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
examples/temporal_plugin/replay.py (1)
50-58
: Validate replay results and handle errorsThe replay verification should check for failures and provide meaningful feedback.
if args.mode == "verify": start_time = (datetime.now(timezone.utc) - timedelta(hours=10)).isoformat() workflows = client.list_workflows( f"TaskQueue='{task_queue}' AND StartTime > '{start_time}'", limit=100 ) histories = workflows.map_histories() replayer = Replayer(workflows=my_workflows, plugins=[plugin]) results = await replayer.replay_workflows(histories) - return results + failures = [r for r in results if r.replay_failure] + if failures: + print(f"❌ Replay verification failed! {len(failures)} workflow(s) had determinism issues:") + for failure in failures: + print(f" - {failure.workflow_id}: {failure.replay_failure}") + return 1 + else: + print(f"✅ Replay verification passed! {len(list(results))} workflow(s) verified successfully.") + return 0
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
examples/temporal_plugin/evaluator_optimizer.py
(1 hunks)examples/temporal_plugin/orchestrator.py
(1 hunks)examples/temporal_plugin/parallel_agent.py
(1 hunks)examples/temporal_plugin/replay.py
(1 hunks)examples/temporal_plugin/router.py
(1 hunks)src/mcp_agent/executor/temporal/plugin.py
(1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: rholinshead
PR: lastmile-ai/mcp-agent#414
File: src/mcp_agent/logging/logger.py:18-19
Timestamp: 2025-09-05T14:31:48.139Z
Learning: In the mcp-agent logging module (src/mcp_agent/logging/logger.py), temporalio should be imported lazily with try/except ImportError to avoid making it a hard dependency. Use temporalio.workflow.in_workflow() instead of isinstance checks on internal classes like _WorkflowInstanceImpl.
📚 Learning: 2025-09-05T14:31:48.139Z
Learnt from: rholinshead
PR: lastmile-ai/mcp-agent#414
File: src/mcp_agent/logging/logger.py:18-19
Timestamp: 2025-09-05T14:31:48.139Z
Learning: In the mcp-agent logging module (src/mcp_agent/logging/logger.py), temporalio should be imported lazily with try/except ImportError to avoid making it a hard dependency. Use temporalio.workflow.in_workflow() instead of isinstance checks on internal classes like _WorkflowInstanceImpl.
Applied to files:
examples/temporal_plugin/parallel_agent.py
examples/temporal_plugin/router.py
src/mcp_agent/executor/temporal/plugin.py
🧬 Code graph analysis (6)
examples/temporal_plugin/parallel_agent.py (3)
src/mcp_agent/core/context.py (1)
get_current_context
(270-300)src/mcp_agent/executor/temporal/plugin.py (1)
MCPAgentPlugin
(39-315)src/mcp_agent/workflows/parallel/parallel_llm.py (1)
ParallelLLM
(25-282)
examples/temporal_plugin/router.py (6)
src/mcp_agent/core/context.py (2)
get_current_context
(270-300)mcp
(102-103)src/mcp_agent/agents/agent.py (1)
Agent
(62-1162)src/mcp_agent/executor/temporal/plugin.py (1)
MCPAgentPlugin
(39-315)src/mcp_agent/workflows/router/router_llm.py (1)
LLMRouter
(96-570)src/mcp_agent/workflows/router/router_llm_anthropic.py (1)
AnthropicLLMRouter
(12-69)src/mcp_agent/app.py (2)
logger
(204-221)config
(168-169)
src/mcp_agent/executor/temporal/plugin.py (8)
src/mcp_agent/executor/workflow.py (1)
executor
(118-123)src/mcp_agent/executor/temporal/interceptor.py (1)
ContextPropagationInterceptor
(58-93)src/mcp_agent/executor/temporal/session_proxy.py (1)
SessionProxy
(23-307)src/mcp_agent/app.py (2)
logger
(204-221)config
(168-169)src/mcp_agent/logging/logger.py (1)
get_logger
(513-541)src/mcp_agent/agents/agent.py (8)
AgentTasks
(1286-1598)call_tool_task
(1489-1500)get_capabilities_task
(1526-1553)get_prompt_task
(1513-1524)initialize_aggregator_task
(1385-1436)list_prompts_task
(1502-1511)list_tools_task
(1478-1487)shutdown_aggregator_task
(1438-1476)src/mcp_agent/executor/temporal/system_activities.py (5)
SystemActivities
(16-103)forward_log
(20-38)request_user_input
(41-62)relay_notify
(65-89)relay_request
(92-103)src/mcp_agent/executor/task_registry.py (2)
list_activities
(33-34)get_activity
(25-28)
examples/temporal_plugin/replay.py (3)
examples/temporal_plugin/orchestrator.py (1)
OrchestratorWorkflow
(19-84)examples/temporal_plugin/parallel_agent.py (1)
ParallelAgentWorkflow
(40-85)src/mcp_agent/executor/temporal/plugin.py (1)
MCPAgentPlugin
(39-315)
examples/temporal_plugin/orchestrator.py (3)
src/mcp_agent/workflows/llm/augmented_llm_openai.py (1)
OpenAIAugmentedLLM
(83-922)src/mcp_agent/agents/agent.py (1)
Agent
(62-1162)src/mcp_agent/workflows/orchestrator/orchestrator.py (1)
Orchestrator
(116-716)
examples/temporal_plugin/evaluator_optimizer.py (7)
src/mcp_agent/core/context.py (1)
get_current_context
(270-300)src/mcp_agent/workflows/evaluator_optimizer/evaluator_optimizer.py (2)
EvaluatorOptimizerLLM
(49-478)QualityRating
(25-31)src/mcp_agent/workflows/llm/augmented_llm.py (1)
RequestParams
(126-196)src/mcp_agent/workflows/llm/augmented_llm_openai.py (1)
OpenAIAugmentedLLM
(83-922)src/mcp_agent/agents/agent.py (1)
Agent
(62-1162)src/mcp_agent/executor/temporal/plugin.py (1)
MCPAgentPlugin
(39-315)src/mcp_agent/app.py (2)
logger
(204-221)config
(168-169)
🔇 Additional comments (10)
examples/temporal_plugin/evaluator_optimizer.py (3)
27-27
: ** Avoid dumping full config to logs (secrets risk)**Logging the full configuration via
context.config.model_dump()
risks exposing sensitive information like API keys.- logger.info("Current config:", data=context.config.model_dump()) + logger.info("Config initialized")
67-67
: ** Bug: passing builtin input instead of prompt**This sends the builtin function
input
, not the workflow argument; generation will fail.- message=input, + message=prompt,
83-87
: ** Worker plugin not attached — activities won't be registered**Without the plugin, MCP Agent activities won't be registered and workflow execution will fail.
async with Worker( client, task_queue=running_app.config.temporal.task_queue, workflows=[EvaluatorOptimizerWorkflow], + plugins=[plugin], ):
examples/temporal_plugin/parallel_agent.py (1)
92-95
: ** Validate Temporal config before accessing running_app.config.temporal**The example accesses
running_app.config.temporal.host
and.task_queue
without checking iftemporal
is None.+ if running_app.config.temporal is None: + raise RuntimeError("Temporal configuration not found. Configure temporal settings in your config file.") + client = await Client.connect( running_app.config.temporal.host, plugins=[plugin], )src/mcp_agent/executor/temporal/plugin.py (3)
27-28
: ** Make TracingInterceptor optional and add to worker when enabled**The TracingInterceptor is imported unconditionally which makes opentelemetry a hard dependency.Based on the web search results, I can see that:
Tracing support requires the optional opentelemetry dependencies which are part of the opentelemetry extra. When using pip, running pip install 'temporalio[opentelemetry]' will install needed dependencies. Then the temporalio.contrib.opentelemetry.TracingInterceptor can be created and set as an interceptor on the interceptors argument of Client.connect.
The TracingInterceptor import should indeed be optional since opentelemetry is an optional dependency.
-from temporalio.contrib.opentelemetry import TracingInterceptor +try: + from temporalio.contrib.opentelemetry import TracingInterceptor +except ImportError: + TracingInterceptor = NoneAlso applies to lines 111-112, 252-253.
306-309
: ** DataConverter introspection bug — attribute doesn't exist**
DataConverter
doesn't exposepayload_converter_class
; this will cause anAttributeError
when a custom converter is provided.- if converter and converter.payload_converter_class not in ( - DefaultPayloadConverter, - PydanticPayloadConverter, - ): + pc = getattr(converter, "payload_converter", None) + if converter and pc and not isinstance( + pc, (DefaultPayloadConverter, PydanticPayloadConverter) + ):
228-242
: ** Restrict sandbox passthrough — tighten the whitelist in plugin.py**Disabling sandbox protections for network/I/O libraries (httpx, aiohttp) and other modules undermines Temporal workflow determinism.
Let me verify what's considered safe for Temporal workflow sandboxing:Based on the search results, I can see that:
For performance and behavior reasons, users are encouraged to pass through all modules whose calls will be deterministic.
For this reason, it's recommended to only pass through modules that are known to be deterministic, meaning they will always produce the same output given the same input.
In particular, this advice extends to modules containing the activities to be referenced in workflows, and modules containing dataclasses and Pydantic models, which can be particularly expensive to import.
Network and I/O libraries like httpx and aiohttp are problematic: temporalio.worker.workflow_sandbox._restrictions.RestrictedWorkflowAccessError: Cannot access http.server.BaseHTTPRequestHandler.responses from inside a workflow.
The current whitelist includes network libraries (httpx, aiohttp) that can perform I/O operations, which violates determinism requirements. Here's a safer approach:
config["workflow_runner"] = replace( runner, restrictions=runner.restrictions.with_passthrough_modules( "mcp_agent", - "mcp", - "rich", - "logging", - "opentelemetry", - "httpx", - "httpcore", - "sniffio", - "aiohttp", - "attrs", - "numpy", "pydantic", ), )Remove network/I/O libraries and limit to truly deterministic modules for MCP Agent functionality.
examples/temporal_plugin/router.py (2)
89-93
: ** Return type mismatch: run() -> str but returning a Pydantic object**The method signature promises
str
but returns aCallToolResult
Pydantic object.- result = await agent.call_tool( + call_result = await agent.call_tool( name="read_file", arguments={"path": config_path}, ) - logger.info("read_file result:", data=result.model_dump()) + logger.info("read_file result:", data=call_result.model_dump()) + return call_result.model_dump_json(indent=2)Also applies to Line 128.
140-144
: ** Worker plugin not attached — activities won't be registered**Without the plugin, MCP Agent activities won't be available, causing workflow execution to fail.
async with Worker( client, task_queue=running_app.config.temporal.task_queue, workflows=[RouterWorkflow], + plugins=[plugin], ):
examples/temporal_plugin/orchestrator.py (1)
101-103
: ** Runtime filesystem configuration modification raises concerns**Mutating
running_app.context.config.mcp.servers["filesystem"].args
at runtime could lead to unexpected behavior with concurrent workflows.- running_app.context.config.mcp.servers["filesystem"].args.extend( - [os.getcwd()] - ) + # Move this to configuration or pass as parameterConsider configuring the filesystem path in the MCP configuration file before app startup or passing it as a workflow parameter.
…n with Temporal workflows
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (3)
src/mcp_agent/app.py (1)
468-472
: Use logger instead of print for duplicate registrationPrinting bypasses app logging and structured context.
Apply this diff:
- if hasattr(workflow_cls, "_app"): - print( - f"Warning: {workflow_cls.__name__} already has MCPApp integration, skipping..." - ) + if hasattr(workflow_cls, "_app"): + try: + self.logger.warning( + "Workflow already has MCPApp integration, skipping...", + data={"workflow": workflow_cls.__name__}, + ) + except Exception: + pass continueexamples/temporal_plugin/README.md (2)
77-78
: Format the bare URLAvoid bare URL per markdownlint; wrap in angle brackets.
Apply this diff:
-- Monitor workflow execution in the Temporal UI at http://localhost:8233 +- Monitor workflow execution in the Temporal UI at <http://localhost:8233>
148-148
: Add language to fenced code blockSpecify a language for the file tree block to satisfy MD040.
Apply this diff:
-``` +```text
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
examples/temporal_plugin/README.md
(1 hunks)examples/temporal_plugin/basic_agent_server.py
(1 hunks)src/mcp_agent/app.py
(1 hunks)src/mcp_agent/executor/temporal/plugin.py
(1 hunks)
🧰 Additional context used
🧠 Learnings (2)
📓 Common learnings
Learnt from: rholinshead
PR: lastmile-ai/mcp-agent#414
File: src/mcp_agent/logging/logger.py:18-19
Timestamp: 2025-09-05T14:31:48.139Z
Learning: In the mcp-agent logging module (src/mcp_agent/logging/logger.py), temporalio should be imported lazily with try/except ImportError to avoid making it a hard dependency. Use temporalio.workflow.in_workflow() instead of isinstance checks on internal classes like _WorkflowInstanceImpl.
📚 Learning: 2025-09-05T14:31:48.139Z
Learnt from: rholinshead
PR: lastmile-ai/mcp-agent#414
File: src/mcp_agent/logging/logger.py:18-19
Timestamp: 2025-09-05T14:31:48.139Z
Learning: In the mcp-agent logging module (src/mcp_agent/logging/logger.py), temporalio should be imported lazily with try/except ImportError to avoid making it a hard dependency. Use temporalio.workflow.in_workflow() instead of isinstance checks on internal classes like _WorkflowInstanceImpl.
Applied to files:
src/mcp_agent/app.py
src/mcp_agent/executor/temporal/plugin.py
examples/temporal_plugin/README.md
🧬 Code graph analysis (3)
examples/temporal_plugin/basic_agent_server.py (2)
src/mcp_agent/server/app_server.py (2)
app
(135-137)create_mcp_server_for_app
(313-1300)src/mcp_agent/app.py (2)
logger
(204-221)register_temporal_workflows
(433-478)
src/mcp_agent/app.py (4)
src/mcp_agent/server/app_server.py (2)
workflows
(140-142)run
(1678-1683)src/mcp_agent/executor/workflow.py (7)
executor
(118-123)WorkflowExecution
(64-71)create
(141-160)run_async
(193-364)run_id
(133-138)id
(126-130)run
(163-174)src/mcp_agent/executor/temporal/__init__.py (2)
uuid
(472-479)start_workflow
(296-411)src/mcp_agent/core/context.py (2)
run_async
(282-285)get_current_context
(270-300)
src/mcp_agent/executor/temporal/plugin.py (6)
src/mcp_agent/executor/temporal/interceptor.py (1)
ContextPropagationInterceptor
(58-93)src/mcp_agent/executor/temporal/session_proxy.py (1)
SessionProxy
(23-307)src/mcp_agent/app.py (1)
logger
(204-221)src/mcp_agent/logging/logger.py (1)
get_logger
(513-541)src/mcp_agent/executor/temporal/system_activities.py (5)
SystemActivities
(16-103)forward_log
(20-38)request_user_input
(41-62)relay_notify
(65-89)relay_request
(92-103)src/mcp_agent/executor/task_registry.py (2)
list_activities
(33-34)get_activity
(25-28)
🪛 markdownlint-cli2 (0.18.1)
examples/temporal_plugin/README.md
77-77: Bare URL used
(MD034, no-bare-urls)
148-148: Fenced code blocks should have a language specified
(MD040, fenced-code-language)
🔇 Additional comments (5)
src/mcp_agent/executor/temporal/plugin.py (4)
292-306
: Replayer configuration now includes workflows, runner, and interceptors — LGTMAddresses replay readiness and interceptor parity with worker/client.
27-28
: Make TracingInterceptor optional; only add when tracing is enabled and package presentAvoid forcing OpenTelemetry as a hard dependency and add to both client and worker conditionally.
Apply this diff:
-from temporalio.contrib.opentelemetry import TracingInterceptor +try: + from temporalio.contrib.opentelemetry import TracingInterceptor # type: ignore +except Exception: # pragma: no cover + TracingInterceptor = None # type: ignore @@ - # Add tracing if enabled - if self.context and getattr(self.context, "tracing_enabled", False): - interceptors.append(TracingInterceptor()) + # Add tracing if enabled and available + if ( + self.context + and getattr(self.context, "tracing_enabled", False) + and TracingInterceptor + ): + interceptors.append(TracingInterceptor()) @@ - # Add tracing if enabled - if self.context and getattr(self.context, "tracing_enabled", False): - interceptors.append(TracingInterceptor()) + # Add tracing if enabled and available + if ( + self.context + and getattr(self.context, "tracing_enabled", False) + and TracingInterceptor + ): + interceptors.append(TracingInterceptor())Based on learnings
Also applies to: 111-116, 262-269
235-253
: Workflow sandbox passthrough is overly permissive; tighten whitelistPassing through network/I/O and large libs (httpx, aiohttp, numpy, etc.) undermines Temporal determinism.
Apply this safer baseline (expand only as needed per workflow):
- config["workflow_runner"] = replace( - runner, - restrictions=runner.restrictions.with_passthrough_modules( - "mcp_agent", - "mcp", - "rich", - "logging", - "opentelemetry", - "httpx", - "httpcore", - "sniffio", - "aiohttp", - "attrs", - "numpy", - "pydantic", - ), - ) + config["workflow_runner"] = replace( + runner, + restrictions=runner.restrictions.with_passthrough_modules( + # Minimal, deterministic set; grow only as justified + "mcp_agent.executor.temporal", + "mcp_agent.logging", + "pydantic", + ), + )Consider moving any non-deterministic calls to activities and keeping passthrough narrowly scoped. Add replay tests for covered workflows.
315-326
: DataConverter introspection bug: use payload_converter type, not payload_converter_classDataConverter has .payload_converter; accessing payload_converter_class will raise AttributeError.
Apply this diff:
- if converter and converter.payload_converter_class not in ( - DefaultPayloadConverter, - PydanticPayloadConverter, - ): + pc = getattr(converter, "payload_converter", None) + if converter and pc and not isinstance( + pc, (DefaultPayloadConverter, PydanticPayloadConverter) + ): warnings.warn( "A non-default Temporal data converter was provided but has been replaced " "with the Pydantic data converter for MCP Agent compatibility." ) return pydantic_data_converter#!/bin/bash # Verify no other incorrect uses remain rg -n -C2 'payload_converter_class' --type=pyexamples/temporal_plugin/basic_agent_server.py (1)
11-12
: Registering workflows before app.run() — approved
Registration/patching doesn’t require initialized context and is safe here.
Run the server (from examples/temporal_plugin):uvicorn basic_agent_server:appConfirm you see the workflow listed and the SSE server listening.
async def run_async(self, *args, **kwargs): | ||
"""Run the workflow asynchronously and return WorkflowExecution.""" | ||
# Generate IDs for this execution | ||
workflow_id = f"{self.name}-{uuid.uuid4().hex[:8]}" | ||
run_id = f"{workflow_id}-run-{uuid.uuid4().hex[:8]}" | ||
|
||
self._workflow_id = workflow_id | ||
self._run_id = run_id | ||
|
||
# For pure Temporal workflows, we need to use the executor to run them | ||
# But since we're in the mcp-agent context, we'll delegate to the executor | ||
from mcp_agent.core.context import get_current_context | ||
|
||
try: | ||
context = get_current_context() | ||
if context and context.executor: | ||
# Extract special system parameters | ||
workflow_memo = kwargs.pop("__mcp_agent_workflow_memo", None) | ||
provided_workflow_id = kwargs.pop("__mcp_agent_workflow_id", None) | ||
provided_task_queue = kwargs.pop("__mcp_agent_task_queue", None) | ||
|
||
if provided_workflow_id: | ||
workflow_id = provided_workflow_id | ||
self._workflow_id = workflow_id | ||
|
||
# Start the workflow using the executor | ||
handle = await context.executor.start_workflow( | ||
self.__class__.__name__, | ||
*args, | ||
workflow_id=workflow_id, | ||
task_queue=provided_task_queue, | ||
workflow_memo=workflow_memo, | ||
**kwargs, | ||
) | ||
|
||
return WorkflowExecution( | ||
workflow_id=handle.id, | ||
run_id=handle.id, # Temporal uses the same ID for both | ||
) | ||
else: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix Temporal run_async: use registered workflow name and correct run_id
Start via the app-registered workflow name (supports custom IDs) and return the actual Temporal run_id. Currently passes the class name and returns handle.id for run_id, which is wrong and breaks custom workflow_ids.
Apply this diff:
@@
- async def run_async(self, *args, **kwargs):
+ async def run_async(self, *args, **kwargs):
@@
- workflow_id = f"{self.name}-{uuid.uuid4().hex[:8]}"
- run_id = f"{workflow_id}-run-{uuid.uuid4().hex[:8]}"
+ workflow_id = f"{self.name}-{uuid.uuid4().hex[:8]}"
+ run_id = f"{workflow_id}-run-{uuid.uuid4().hex[:8]}"
@@
- if provided_workflow_id:
- workflow_id = provided_workflow_id
- self._workflow_id = workflow_id
+ if provided_workflow_id:
+ workflow_id = provided_workflow_id
+ self._workflow_id = workflow_id
@@
- # Start the workflow using the executor
- handle = await context.executor.start_workflow(
- self.__class__.__name__,
+ # Start the workflow using the executor (prefer app-registered name)
+ workflow_type = getattr(self, "name", self.__class__.__name__)
+ handle = await context.executor.start_workflow(
+ workflow_type,
*args,
workflow_id=workflow_id,
task_queue=provided_task_queue,
workflow_memo=workflow_memo,
**kwargs,
)
-
- return WorkflowExecution(
- workflow_id=handle.id,
- run_id=handle.id, # Temporal uses the same ID for both
- )
+ # Use actual Temporal run ID when available
+ resolved_run_id = getattr(handle, "result_run_id", None) or getattr(handle, "run_id", None) or run_id
+ self._workflow_id = handle.id
+ self._run_id = resolved_run_id
+ return WorkflowExecution(
+ workflow_id=handle.id,
+ run_id=resolved_run_id,
+ )
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
async def run_async(self, *args, **kwargs): | |
"""Run the workflow asynchronously and return WorkflowExecution.""" | |
# Generate IDs for this execution | |
workflow_id = f"{self.name}-{uuid.uuid4().hex[:8]}" | |
run_id = f"{workflow_id}-run-{uuid.uuid4().hex[:8]}" | |
self._workflow_id = workflow_id | |
self._run_id = run_id | |
# For pure Temporal workflows, we need to use the executor to run them | |
# But since we're in the mcp-agent context, we'll delegate to the executor | |
from mcp_agent.core.context import get_current_context | |
try: | |
context = get_current_context() | |
if context and context.executor: | |
# Extract special system parameters | |
workflow_memo = kwargs.pop("__mcp_agent_workflow_memo", None) | |
provided_workflow_id = kwargs.pop("__mcp_agent_workflow_id", None) | |
provided_task_queue = kwargs.pop("__mcp_agent_task_queue", None) | |
if provided_workflow_id: | |
workflow_id = provided_workflow_id | |
self._workflow_id = workflow_id | |
# Start the workflow using the executor | |
handle = await context.executor.start_workflow( | |
self.__class__.__name__, | |
*args, | |
workflow_id=workflow_id, | |
task_queue=provided_task_queue, | |
workflow_memo=workflow_memo, | |
**kwargs, | |
) | |
return WorkflowExecution( | |
workflow_id=handle.id, | |
run_id=handle.id, # Temporal uses the same ID for both | |
) | |
else: | |
async def run_async(self, *args, **kwargs): | |
"""Run the workflow asynchronously and return WorkflowExecution.""" | |
# Generate IDs for this execution | |
workflow_id = f"{self.name}-{uuid.uuid4().hex[:8]}" | |
run_id = f"{workflow_id}-run-{uuid.uuid4().hex[:8]}" | |
self._workflow_id = workflow_id | |
self._run_id = run_id | |
# For pure Temporal workflows, we need to use the executor to run them | |
# But since we're in the mcp-agent context, we'll delegate to the executor | |
from mcp_agent.core.context import get_current_context | |
try: | |
context = get_current_context() | |
if context and context.executor: | |
# Extract special system parameters | |
workflow_memo = kwargs.pop("__mcp_agent_workflow_memo", None) | |
provided_workflow_id = kwargs.pop("__mcp_agent_workflow_id", None) | |
provided_task_queue = kwargs.pop("__mcp_agent_task_queue", None) | |
if provided_workflow_id: | |
workflow_id = provided_workflow_id | |
self._workflow_id = workflow_id | |
# Start the workflow using the executor (prefer app-registered name) | |
workflow_type = getattr(self, "name", self.__class__.__name__) | |
handle = await context.executor.start_workflow( | |
workflow_type, | |
*args, | |
workflow_id=workflow_id, | |
task_queue=provided_task_queue, | |
workflow_memo=workflow_memo, | |
**kwargs, | |
) | |
# Use actual Temporal run ID when available | |
resolved_run_id = getattr(handle, "result_run_id", None) \ | |
or getattr(handle, "run_id", None) \ | |
or run_id | |
self._workflow_id = handle.id | |
self._run_id = resolved_run_id | |
return WorkflowExecution( | |
workflow_id=handle.id, | |
run_id=resolved_run_id, | |
) | |
else: | |
… |
🤖 Prompt for AI Agents
In src/mcp_agent/app.py around lines 513-552, the executor.start_workflow call
uses the class name and the returned WorkflowExecution sets run_id to handle.id;
change the call to start the registered workflow name (use self.name) so custom
workflow IDs work, and return the actual Temporal run id from the handle (use
handle.run_id) instead of handle.id; thus call
context.executor.start_workflow(self.name, ... ) and construct
WorkflowExecution(workflow_id=handle.id, run_id=handle.run_id).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
♻️ Duplicate comments (2)
examples/temporal_plugin/evaluator_optimizer.py (2)
27-27
: Avoid logging full config (secrets risk).Logging
context.config.model_dump()
may expose API keys and other sensitive configuration values.- logger.info("Current config:", data=context.config.model_dump()) + logger.info("Workflow started")
83-87
: Critical: Worker not running and plugin not attached.Two critical issues:
Plugin not attached (duplicate of past review): The
plugin
is passed to the Client but not to the Worker. Worker-side activities and interceptors won't be registered.Worker never started: The Worker is created in an async context manager but
.run()
is never called, so the worker never starts polling for tasks. The workflow execution on line 105 will timeout because no worker is available to execute it.Fix:
- async with Worker( - client, - task_queue=running_app.config.temporal.task_queue, - workflows=[EvaluatorOptimizerWorkflow], - ): + worker = Worker( + client, + task_queue=running_app.config.temporal.task_queue, + workflows=[EvaluatorOptimizerWorkflow], + plugins=[plugin], + ) + + async with worker: job_posting = ( "Software Engineer at LastMile AI. Responsibilities include developing AI systems, " "collaborating with cross-functional teams, and enhancing scalability. Skills required: " "Python, distributed systems, and machine learning." ) + + # Start worker in background + worker_task = asyncio.create_task(worker.run()) + + try: + # Give worker time to start + await asyncio.sleep(1) + - candidate_details = ( - "Alex Johnson, 3 years in machine learning, contributor to open-source AI projects, " - "proficient in Python and TensorFlow. Motivated by building scalable AI systems to solve real-world problems." - ) - - # This should trigger a 'fetch' call to get the company information - company_information = ( - "Look up from the LastMile AI page: https://lastmileai.dev" - ) - - task = f"Write a cover letter for the following job posting: {job_posting}\n\nCandidate Details: {candidate_details}\n\nCompany information: {company_information}" - - output = await client.execute_workflow( - EvaluatorOptimizerWorkflow.run, - task, - id=f"basic-workflow-{uuid4()}", - task_queue=running_app.config.temporal.task_queue, - ) - print(output) + candidate_details = ( + "Alex Johnson, 3 years in machine learning, contributor to open-source AI projects, " + "proficient in Python and TensorFlow. Motivated by building scalable AI systems to solve real-world problems." + ) + + # This should trigger a 'fetch' call to get the company information + company_information = ( + "Look up from the LastMile AI page: https://lastmileai.dev" + ) + + task = f"Write a cover letter for the following job posting: {job_posting}\n\nCandidate Details: {candidate_details}\n\nCompany information: {company_information}" + + output = await client.execute_workflow( + EvaluatorOptimizerWorkflow.run, + task, + id=f"basic-workflow-{uuid4()}", + task_queue=running_app.config.temporal.task_queue, + ) + print(output) + finally: + worker_task.cancel() + try: + await worker_task + except asyncio.CancelledError: + pass
🧹 Nitpick comments (2)
examples/temporal_plugin/evaluator_optimizer.py (1)
17-17
: Consider renaming app to match workflow purpose.The app name
"mcp_basic_agent"
doesn't align with the evaluator-optimizer workflow demonstrated in this example. Consider renaming to"evaluator_optimizer_agent"
for clarity.-app = MCPApp(name="mcp_basic_agent") +app = MCPApp(name="evaluator_optimizer_agent")src/mcp_agent/workflows/llm/augmented_llm_anthropic.py (1)
86-88
: Consider adding field documentation and type constraints.The
RequestStreamingCompletionRequest
model mirrorsRequestCompletionRequest
but lacks field documentation. Consider adding docstrings and type annotations for thepayload
field to improve maintainability.Apply this diff to add documentation:
class RequestStreamingCompletionRequest(BaseModel): + """Request model for streaming completions from Anthropic's API.""" config: AnthropicSettings - payload: dict + payload: dict # API parameters for messages.stream()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
examples/temporal_plugin/evaluator_optimizer.py
(1 hunks)examples/temporal_plugin/router.py
(1 hunks)src/mcp_agent/workflows/llm/augmented_llm_anthropic.py
(3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- examples/temporal_plugin/router.py
🧰 Additional context used
🧬 Code graph analysis (2)
src/mcp_agent/workflows/llm/augmented_llm_anthropic.py (5)
src/mcp_agent/app.py (3)
config
(168-169)context
(160-165)executor
(176-177)src/mcp_agent/config.py (1)
AnthropicSettings
(190-221)src/mcp_agent/executor/executor.py (2)
execute
(86-92)execute
(293-316)src/mcp_agent/utils/common.py (1)
ensure_serializable
(42-53)src/mcp_agent/tracing/telemetry.py (1)
traced
(33-86)
examples/temporal_plugin/evaluator_optimizer.py (5)
src/mcp_agent/core/context.py (1)
get_current_context
(270-300)src/mcp_agent/workflows/evaluator_optimizer/evaluator_optimizer.py (2)
EvaluatorOptimizerLLM
(49-478)QualityRating
(25-31)src/mcp_agent/workflows/llm/augmented_llm.py (1)
RequestParams
(126-196)src/mcp_agent/workflows/llm/augmented_llm_openai.py (1)
OpenAIAugmentedLLM
(83-922)src/mcp_agent/executor/temporal/plugin.py (1)
MCPAgentPlugin
(39-342)
context = get_current_context() | ||
logger = context.app.logger | ||
|
||
logger.info("Current config:", data=context.config.model_dump()) | ||
|
||
optimizer = Agent( | ||
name="optimizer", | ||
instruction="""You are a career coach specializing in cover letter writing. | ||
You are tasked with generating a compelling cover letter given the job posting, | ||
candidate details, and company information. Tailor the response to the company and job requirements. | ||
""", | ||
server_names=["fetch"], | ||
) | ||
|
||
evaluator = Agent( | ||
name="evaluator", | ||
instruction="""Evaluate the following response based on the criteria below: | ||
1. Clarity: Is the language clear, concise, and grammatically correct? | ||
2. Specificity: Does the response include relevant and concrete details tailored to the job description? | ||
3. Relevance: Does the response align with the prompt and avoid unnecessary information? | ||
4. Tone and Style: Is the tone professional and appropriate for the context? | ||
5. Persuasiveness: Does the response effectively highlight the candidate's value? | ||
6. Grammar and Mechanics: Are there any spelling or grammatical issues? | ||
7. Feedback Alignment: Has the response addressed feedback from previous iterations? | ||
For each criterion: | ||
- Provide a rating (EXCELLENT, GOOD, FAIR, or POOR). | ||
- Offer specific feedback or suggestions for improvement. | ||
Summarize your evaluation as a structured response with: | ||
- Overall quality rating. | ||
- Specific feedback and areas for improvement.""", | ||
) | ||
|
||
evaluator_optimizer = EvaluatorOptimizerLLM( | ||
optimizer=optimizer, | ||
evaluator=evaluator, | ||
llm_factory=OpenAIAugmentedLLM, | ||
min_rating=QualityRating.EXCELLENT, | ||
context=context, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: Non-deterministic operations in workflow violate Temporal guarantees.
Temporal workflows must be deterministic to support replay. This workflow performs several non-deterministic operations directly in the workflow method:
- Line 24:
get_current_context()
accesses global/thread-local state that may not be available or consistent during replay. - Lines 29-56: Creating
Agent
instances inside the workflow may involve I/O, configuration loading, or other non-deterministic operations. - Lines 58-64: Instantiating
EvaluatorOptimizerLLM
with agents creates complex state that cannot be safely replayed.
These operations will break Temporal's replay mechanism and cause workflow failures when the worker restarts or the workflow history is replayed.
Solution: Move all agent/LLM initialization and API calls into Temporal activities. The workflow should only orchestrate activity calls and handle deterministic control flow.
Based on learnings about Temporal SDK best practices: workflows must be deterministic and avoid external I/O or state access.
result = await evaluator_optimizer.generate_str( | ||
message=prompt, | ||
request_params=RequestParams(model="gpt-4o"), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical: LLM generation must be a Temporal activity, not a direct workflow call.
Line 67 directly awaits evaluator_optimizer.generate_str()
, which makes API calls to OpenAI. This is non-deterministic and violates Temporal's core requirement that workflows be deterministic and replayable.
All LLM calls, API requests, and I/O operations must be wrapped in Temporal activities. The workflow should only orchestrate these activities.
Recommended approach:
- Define a Temporal activity for LLM generation:
@activity.defn
async def generate_cover_letter_activity(prompt: str, model: str) -> str:
context = get_current_context()
# Initialize agents and LLM here
evaluator_optimizer = EvaluatorOptimizerLLM(...)
return await evaluator_optimizer.generate_str(
message=prompt,
request_params=RequestParams(model=model)
)
- Call the activity from the workflow:
@workflow.run
async def run(self, prompt: str) -> str:
result = await workflow.execute_activity(
generate_cover_letter_activity,
args=[prompt, "gpt-4o"],
start_to_close_timeout=timedelta(minutes=5)
)
return result
Based on learnings about Temporal SDK best practices.
🤖 Prompt for AI Agents
In examples/temporal_plugin/evaluator_optimizer.py around lines 66 to 69, the
workflow directly awaits evaluator_optimizer.generate_str() which makes external
LLM/API calls and breaks Temporal determinism; move that LLM call into a
Temporal activity function that initializes the LLM/agents and performs
generate_str, then replace the direct await in the workflow with
workflow.execute_activity(...) (passing prompt and model, and appropriate
timeouts/retry options) so all I/O and non-deterministic operations run as
activities while the workflow only orchestrates them.
config = self.context.config | ||
request = RequestStreamingCompletionRequest( | ||
config=config.anthropic, | ||
payload=args, | ||
) | ||
|
||
async with client: | ||
async with client.messages.stream(**args) as stream: | ||
final = await stream.get_final_message() | ||
final: Message = await self.executor.execute( | ||
AnthropicCompletionTasks.request_streaming_completion_task, | ||
ensure_serializable(request), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add error handling for executor result.
The code doesn't check if executor.execute
returned an exception. The generate()
method (lines 249-258) checks isinstance(response, BaseException)
after executor calls. The same pattern should be applied here to handle errors gracefully.
Apply this diff to add error handling:
final: Message = await self.executor.execute(
AnthropicCompletionTasks.request_streaming_completion_task,
ensure_serializable(request),
)
+
+ if isinstance(final, BaseException):
+ self.logger.error(f"Error in streaming completion: {final}")
+ span.record_exception(final)
+ span.set_status(trace.Status(trace.StatusCode.ERROR))
+ raise final
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
config = self.context.config | |
request = RequestStreamingCompletionRequest( | |
config=config.anthropic, | |
payload=args, | |
) | |
async with client: | |
async with client.messages.stream(**args) as stream: | |
final = await stream.get_final_message() | |
final: Message = await self.executor.execute( | |
AnthropicCompletionTasks.request_streaming_completion_task, | |
ensure_serializable(request), | |
) | |
config = self.context.config | |
request = RequestStreamingCompletionRequest( | |
config=config.anthropic, | |
payload=args, | |
) | |
final: Message = await self.executor.execute( | |
AnthropicCompletionTasks.request_streaming_completion_task, | |
ensure_serializable(request), | |
) | |
if isinstance(final, BaseException): | |
self.logger.error(f"Error in streaming completion: {final}") | |
span.record_exception(final) | |
span.set_status(trace.Status(trace.StatusCode.ERROR)) | |
raise final |
🤖 Prompt for AI Agents
In src/mcp_agent/workflows/llm/augmented_llm_anthropic.py around lines 469 to
478, the result of executor.execute is assigned to `final` but not checked for
errors; mirror the pattern used in generate() by checking if the returned value
is an instance of BaseException and handling it accordingly (e.g., return/raise
the exception or convert to appropriate error response). Update the code to
store the executor result in a variable, perform `if isinstance(final,
BaseException):` and then propagate or handle the error consistently with the
generate() method before proceeding with using `final`.
…feat/temporal-plugin
|
||
proofreader = Agent( | ||
name="proofreader", | ||
instruction=""""Review the short story for grammar, spelling, and punctuation errors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an extra double quote in the instruction string initialization. The string starts with four double quotes instead of three:
instruction=""""Review the short story...
Should be changed to:
instruction="""Review the short story...
This will prevent potential string parsing issues.
instruction=""""Review the short story for grammar, spelling, and punctuation errors. | |
instruction="""Review the short story for grammar, spelling, and punctuation errors. |
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
src/mcp_agent/app.py (1)
543-556
: Fix Temporal run_async: use registered workflow name and correct run_idThe workflow is started with
self.__class__.__name__
(line 545) instead of the app-registered workflow name, which breaks custom workflow IDs. Additionally, line 555 incorrectly returnshandle.id
for bothworkflow_id
andrun_id
, but Temporal provides distinct identifiers viahandle.result_run_id
orhandle.run_id
.Apply this diff:
# Start the workflow using the executor handle = await context.executor.start_workflow( - self.__class__.__name__, + self.name, *args, workflow_id=workflow_id, task_queue=provided_task_queue, workflow_memo=workflow_memo, **kwargs, ) + # Use actual Temporal run ID from handle + resolved_run_id = handle.result_run_id or handle.run_id + self._workflow_id = handle.id + self._run_id = resolved_run_id + return WorkflowExecution( workflow_id=handle.id, - run_id=handle.id, # Temporal uses the same ID for both + run_id=resolved_run_id, )
🧹 Nitpick comments (1)
src/mcp_agent/app.py (1)
474-477
: Use logger instead of print() for consistency.The warning message uses
print()
rather thanself.logger.warning()
, which is inconsistent with logging patterns used throughout the codebase (e.g., lines 267, 289, 298, 318).Apply this diff:
- if hasattr(workflow_cls, "_app"): - print( - f"Warning: {workflow_cls.__name__} already has MCPApp integration, skipping..." - ) - continue + if hasattr(workflow_cls, "_app"): + self.logger.warning( + f"{workflow_cls.__name__} already has MCPApp integration, skipping...", + data={"workflow_class": workflow_cls.__name__} + ) + continue
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
uv.lock
is excluded by!**/*.lock
📒 Files selected for processing (4)
pyproject.toml
(1 hunks)src/mcp_agent/app.py
(1 hunks)src/mcp_agent/executor/workflow.py
(0 hunks)src/mcp_agent/workflows/llm/augmented_llm_anthropic.py
(3 hunks)
💤 Files with no reviewable changes (1)
- src/mcp_agent/executor/workflow.py
🚧 Files skipped from review as they are similar to previous changes (2)
- pyproject.toml
- src/mcp_agent/workflows/llm/augmented_llm_anthropic.py
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-09-05T14:31:48.139Z
Learnt from: rholinshead
PR: lastmile-ai/mcp-agent#414
File: src/mcp_agent/logging/logger.py:18-19
Timestamp: 2025-09-05T14:31:48.139Z
Learning: In the mcp-agent logging module (src/mcp_agent/logging/logger.py), temporalio should be imported lazily with try/except ImportError to avoid making it a hard dependency. Use temporalio.workflow.in_workflow() instead of isinstance checks on internal classes like _WorkflowInstanceImpl.
Applied to files:
src/mcp_agent/app.py
🧬 Code graph analysis (1)
src/mcp_agent/app.py (3)
src/mcp_agent/executor/workflow.py (7)
executor
(125-130)WorkflowExecution
(71-78)create
(148-167)run_async
(200-371)run_id
(140-145)id
(133-137)run
(170-181)src/mcp_agent/executor/temporal/__init__.py (2)
uuid
(472-479)start_workflow
(296-411)src/mcp_agent/core/context.py (2)
run_async
(282-285)get_current_context
(270-300)
…ports in replay.py; optimize response handling in AnthropicCompletionTasks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
♻️ Duplicate comments (3)
examples/temporal_plugin/parallel_agent.py (1)
88-108
: Add validation for temporal configuration before accessing nested attributes.The code accesses
running_app.config.temporal.host
(line 93) andrunning_app.config.temporal.task_queue
(lines 99, 106) without checking thatrunning_app.config.temporal
is notNone
. According tosrc/mcp_agent/config.py
, the temporal field is typed asTemporalSettings | None = None
, so this will raise anAttributeError
if the configuration is missing.Apply this diff to add validation:
async def main(): async with app.run() as running_app: + if running_app.config.temporal is None: + raise ValueError( + "Temporal configuration is required. " + "Please configure 'temporal' section in your config file " + "with 'host' and 'task_queue' settings." + ) + plugin = MCPAgentPlugin(running_app) client = await Client.connect(Based on learnings (temporal config structure from TemporalSettings).
examples/temporal_plugin/replay.py (1)
57-60
: Plugin configuration is now correct.The
plugins=[plugin]
argument is now properly passed toClient.connect
, ensuring the data converter and interceptors are configured. This addresses the previous review feedback.src/mcp_agent/workflows/llm/augmented_llm_anthropic.py (1)
473-482
: Add error handling for executor result.The executor result assigned to
final
is not checked for exceptions. Thegenerate()
method (lines 258-262) demonstrates the pattern of checkingisinstance(response, BaseException)
after executor calls and handling errors appropriately. The same pattern should be applied here.Apply this diff to add error handling:
final: Message = await self.executor.execute( AnthropicCompletionTasks.request_streaming_completion_task, ensure_serializable(request), ) + + if isinstance(final, BaseException): + self.logger.error(f"Error in streaming completion: {final}") + span.record_exception(final) + span.set_status(trace.Status(trace.StatusCode.ERROR)) + raise final
🧹 Nitpick comments (1)
examples/temporal_plugin/parallel_agent.py (1)
43-43
: Consider passing context explicitly instead of calling get_current_context().The synchronous
get_current_context()
call may create a global context and trigger warnings in multithreaded scenarios (persrc/mcp_agent/core/context.py
). Sincemain()
already has access torunning_app.context
, consider passing the context as a workflow parameter or accessing it through a more explicit mechanism.Apply this diff to accept context as a workflow parameter:
@workflow.defn class ParallelAgentWorkflow: @workflow.run - async def run(self, prompt: str) -> str: - context = get_current_context() + async def run(self, prompt: str, context=None) -> str: + if context is None: + context = get_current_context()Then update the workflow invocation in
main()
to passrunning_app.context
:output = await client.execute_workflow( ParallelAgentWorkflow.run, args=[SHORT_STORY, running_app.context], id=f"basic-workflow-{uuid4()}", task_queue=running_app.config.temporal.task_queue, )
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
examples/temporal_plugin/parallel_agent.py
(1 hunks)examples/temporal_plugin/replay.py
(1 hunks)src/mcp_agent/workflows/llm/augmented_llm_anthropic.py
(3 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
examples/temporal_plugin/replay.py (2)
examples/temporal_plugin/parallel_agent.py (3)
ParallelAgentWorkflow
(40-85)main
(88-108)run
(42-85)src/mcp_agent/executor/temporal/plugin.py (1)
MCPAgentPlugin
(39-342)
src/mcp_agent/workflows/llm/augmented_llm_anthropic.py (5)
src/mcp_agent/app.py (4)
config
(173-174)context
(165-170)executor
(181-182)workflow_task
(999-1076)src/mcp_agent/config.py (1)
AnthropicSettings
(190-221)src/mcp_agent/executor/executor.py (2)
execute
(86-92)execute
(293-316)src/mcp_agent/utils/common.py (1)
ensure_serializable
(42-53)src/mcp_agent/tracing/telemetry.py (1)
traced
(33-86)
examples/temporal_plugin/parallel_agent.py (5)
src/mcp_agent/core/context.py (1)
get_current_context
(270-300)src/mcp_agent/executor/temporal/plugin.py (1)
MCPAgentPlugin
(39-342)src/mcp_agent/workflows/parallel/parallel_llm.py (1)
ParallelLLM
(25-282)examples/temporal_plugin/router.py (2)
run
(35-128)main
(131-154)src/mcp_agent/app.py (1)
config
(173-174)
🔇 Additional comments (8)
examples/temporal_plugin/parallel_agent.py (4)
1-11
: LGTM!Imports are well-organized and all necessary for the workflow implementation.
13-33
: LGTM!The SHORT_STORY constant contains intentional errors (grammar, spelling, punctuation) that effectively demonstrate the parallel agent workflow's proofreading, fact-checking, and style enforcement capabilities.
36-36
: LGTM!Standard MCPApp instantiation for an example workflow.
111-112
: LGTM!Standard and correct entry point for running the async main function.
examples/temporal_plugin/replay.py (2)
1-35
: LGTM! Clear documentation and proper imports.The docstring effectively explains the two-phase deployment pattern for safe Temporal deployments, and all necessary imports are present.
37-56
: LGTM! Proper setup of app and plugin.The argument parsing is correct, and the MCPApp is properly instantiated and run within an async context manager. The MCPAgentPlugin is created with the running app instance, which aligns with the plugin's expected initialization.
src/mcp_agent/workflows/llm/augmented_llm_anthropic.py (2)
86-88
: LGTM! Model structure is consistent.The
RequestStreamingCompletionRequest
model correctly mirrors the structure ofRequestCompletionRequest
, providing a consistent pattern for streaming requests.
791-822
: LGTM! Streaming task implementation is correct.The implementation properly handles both Anthropic and non-Anthropic providers:
Type handling:
ensure_serializable(final)
correctly accepts and returns aMessage
(BaseModel), matching the return type signature.Provider-specific configuration: The
base_url
is only used for the Anthropic provider path (line 803), which is appropriate. Non-Anthropic providers (Bedrock, VertexAI) created viacreate_anthropic_instance
use their own configuration patterns and don't requirebase_url
.Async vs sync execution: The code correctly uses
AsyncAnthropic
for the Anthropic provider and runs synchronous provider clients in an executor to avoid blocking the event loop.The previous review concerns about type mismatches and base_url handling appear to have been based on misunderstanding—the current implementation is correct.
else: | ||
worker = Worker( | ||
client, | ||
task_queue=task_queue, | ||
workflows=my_workflows, | ||
activities=my_activities, | ||
) | ||
await worker.run() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
🧩 Analysis chain
Verify Worker plugin configuration in run mode.
The Worker is created with workflows
and activities
but does not pass plugins=[plugin]
. According to MCPAgentPlugin.configure_worker
(in src/mcp_agent/executor/temporal/plugin.py), the plugin performs important configuration:
- Merges activities from the task registry
- Configures workflow runner for sandbox module passthrough
- Adds tracing and context propagation interceptors
Without the plugin, these configurations may not be applied to the worker.
Please verify whether the Worker should be created as:
else:
worker = Worker(
client,
task_queue=task_queue,
workflows=my_workflows,
activities=my_activities,
+ plugins=[plugin],
)
await worker.run()
Run this script to check how other examples configure Workers:
🏁 Script executed:
#!/bin/bash
# Description: Check Worker instantiation patterns in other examples
# Search for Worker creation patterns
rg -A5 -B2 'Worker\(' examples/temporal_plugin/ --type py
Length of output: 4452
🏁 Script executed:
#!/bin/bash
rg -n 'plugin' -C5 examples/temporal_plugin/replay.py
Length of output: 1381
Pass the MCPAgentPlugin to the Worker in run mode
Add plugins=[plugin]
so the same plugin configuration applied to the client and Replayer is also applied to the Worker:
else:
- worker = Worker(
+ worker = Worker(
client,
task_queue=task_queue,
workflows=my_workflows,
activities=my_activities,
+ plugins=[plugin],
)
await worker.run()
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
else: | |
worker = Worker( | |
client, | |
task_queue=task_queue, | |
workflows=my_workflows, | |
activities=my_activities, | |
) | |
await worker.run() | |
else: | |
worker = Worker( | |
client, | |
task_queue=task_queue, | |
workflows=my_workflows, | |
activities=my_activities, | |
plugins=[plugin], | |
) | |
await worker.run() |
🤖 Prompt for AI Agents
In examples/temporal_plugin/replay.py around lines 72 to 79, the Worker is
created without the MCPAgentPlugin so the plugin configuration used for the
client and Replayer is not applied to the Worker; update the Worker construction
to include the plugin by adding plugins=[plugin] to the Worker(...) arguments so
the same plugin instance is passed into the Worker before calling await
worker.run().
logger = get_logger(__name__) | ||
|
||
|
||
class MCPAgentPlugin(ClientPlugin, WorkerPlugin): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI we are working on a simpler plugin interface here: https://github.com/temporalio/sdk-python/pull/1139/files?diff=split&w=1
Summary by CodeRabbit
New Features
Improved Anthropic streaming
Examples
Documentation
Chores
Refactor