diff --git a/MCPForUnity/Editor/Helpers/PortManager.cs b/MCPForUnity/Editor/Helpers/PortManager.cs
index 09d85798..e7c48919 100644
--- a/MCPForUnity/Editor/Helpers/PortManager.cs
+++ b/MCPForUnity/Editor/Helpers/PortManager.cs
@@ -60,14 +60,17 @@ public static int GetPortWithFallback()
if (IsDebugEnabled()) Debug.Log($"MCP-FOR-UNITY: Stored port {storedConfig.unity_port} became available after short wait");
return storedConfig.unity_port;
}
- // Prefer sticking to the same port; let the caller handle bind retries/fallbacks
- return storedConfig.unity_port;
+ // Port is still busy after waiting - find a new available port instead
+ if (IsDebugEnabled()) Debug.Log($"MCP-FOR-UNITY: Stored port {storedConfig.unity_port} is occupied by another instance, finding alternative...");
+ int newPort = FindAvailablePort();
+ SavePort(newPort);
+ return newPort;
}
// If no valid stored port, find a new one and save it
- int newPort = FindAvailablePort();
- SavePort(newPort);
- return newPort;
+ int foundPort = FindAvailablePort();
+ SavePort(foundPort);
+ return foundPort;
}
///
diff --git a/MCPForUnity/Editor/MCPForUnityBridge.cs b/MCPForUnity/Editor/MCPForUnityBridge.cs
index 5fb9f694..85748df2 100644
--- a/MCPForUnity/Editor/MCPForUnityBridge.cs
+++ b/MCPForUnity/Editor/MCPForUnityBridge.cs
@@ -362,7 +362,22 @@ public static void Start()
}
catch (SocketException se) when (se.SocketErrorCode == SocketError.AddressAlreadyInUse && attempt >= maxImmediateRetries)
{
+ // Port is occupied by another instance, get a new available port
+ int oldPort = currentUnityPort;
currentUnityPort = PortManager.GetPortWithFallback();
+
+ // Safety check: ensure we got a different port
+ if (currentUnityPort == oldPort)
+ {
+ McpLog.Error($"Port {oldPort} is occupied and no alternative port available");
+ throw;
+ }
+
+ if (IsDebugEnabled())
+ {
+ McpLog.Info($"Port {oldPort} occupied, switching to port {currentUnityPort}");
+ }
+
listener = new TcpListener(IPAddress.Loopback, currentUnityPort);
listener.Server.SetSocketOption(
SocketOptionLevel.Socket,
@@ -474,6 +489,22 @@ public static void Stop()
try { AssemblyReloadEvents.afterAssemblyReload -= OnAfterAssemblyReload; } catch { }
try { EditorApplication.quitting -= Stop; } catch { }
+ // Clean up status file when Unity stops
+ try
+ {
+ string statusDir = Path.Combine(Environment.GetFolderPath(Environment.SpecialFolder.UserProfile), ".unity-mcp");
+ string statusFile = Path.Combine(statusDir, $"unity-mcp-status-{ComputeProjectHash(Application.dataPath)}.json");
+ if (File.Exists(statusFile))
+ {
+ File.Delete(statusFile);
+ if (IsDebugEnabled()) McpLog.Info($"Deleted status file: {statusFile}");
+ }
+ }
+ catch (Exception ex)
+ {
+ if (IsDebugEnabled()) McpLog.Warn($"Failed to delete status file: {ex.Message}");
+ }
+
if (IsDebugEnabled()) McpLog.Info("MCPForUnityBridge stopped.");
}
@@ -1184,6 +1215,29 @@ private static void WriteHeartbeat(bool reloading, string reason = null)
}
Directory.CreateDirectory(dir);
string filePath = Path.Combine(dir, $"unity-mcp-status-{ComputeProjectHash(Application.dataPath)}.json");
+
+ // Extract project name from path
+ string projectName = "Unknown";
+ try
+ {
+ string projectPath = Application.dataPath;
+ if (!string.IsNullOrEmpty(projectPath))
+ {
+ // Remove trailing /Assets or \Assets
+ projectPath = projectPath.TrimEnd('/', '\\');
+ if (projectPath.EndsWith("Assets", StringComparison.OrdinalIgnoreCase))
+ {
+ projectPath = projectPath.Substring(0, projectPath.Length - 6).TrimEnd('/', '\\');
+ }
+ projectName = Path.GetFileName(projectPath);
+ if (string.IsNullOrEmpty(projectName))
+ {
+ projectName = "Unknown";
+ }
+ }
+ }
+ catch { }
+
var payload = new
{
unity_port = currentUnityPort,
@@ -1191,6 +1245,8 @@ private static void WriteHeartbeat(bool reloading, string reason = null)
reason = reason ?? (reloading ? "reloading" : "ready"),
seq = heartbeatSeq,
project_path = Application.dataPath,
+ project_name = projectName,
+ unity_version = Application.unityVersion,
last_heartbeat = DateTime.UtcNow.ToString("O")
};
File.WriteAllText(filePath, JsonConvert.SerializeObject(payload), new System.Text.UTF8Encoding(false));
diff --git a/MCPForUnity/UnityMcpServer~/src/connection_pool.py b/MCPForUnity/UnityMcpServer~/src/connection_pool.py
new file mode 100644
index 00000000..c7cd2028
--- /dev/null
+++ b/MCPForUnity/UnityMcpServer~/src/connection_pool.py
@@ -0,0 +1,237 @@
+"""
+Connection pool for managing multiple Unity Editor instances.
+"""
+import logging
+import os
+import threading
+import time
+
+from models import UnityInstanceInfo
+from port_discovery import PortDiscovery
+
+logger = logging.getLogger(__name__)
+
+
+class UnityConnectionPool:
+ """Manages connections to multiple Unity Editor instances"""
+
+ def __init__(self):
+ # Import here to avoid circular dependency
+ from unity_connection import UnityConnection
+ self._UnityConnection = UnityConnection
+
+ self._connections: dict[str, "UnityConnection"] = {}
+ self._known_instances: dict[str, UnityInstanceInfo] = {}
+ self._last_full_scan: float = 0
+ self._scan_interval: float = 5.0 # Cache for 5 seconds
+ self._pool_lock = threading.Lock()
+ self._default_instance_id: str | None = None
+
+ # Check for default instance from environment
+ env_default = os.environ.get("UNITY_MCP_DEFAULT_INSTANCE", "").strip()
+ if env_default:
+ self._default_instance_id = env_default
+ logger.info(f"Default Unity instance set from environment: {env_default}")
+
+ def discover_all_instances(self, force_refresh: bool = False) -> list[UnityInstanceInfo]:
+ """
+ Discover all running Unity Editor instances.
+
+ Args:
+ force_refresh: If True, bypass cache and scan immediately
+
+ Returns:
+ List of UnityInstanceInfo objects
+ """
+ now = time.time()
+
+ # Return cached results if valid
+ if not force_refresh and (now - self._last_full_scan) < self._scan_interval:
+ logger.debug(f"Returning cached Unity instances (age: {now - self._last_full_scan:.1f}s)")
+ return list(self._known_instances.values())
+
+ # Scan for instances
+ logger.debug("Scanning for Unity instances...")
+ instances = PortDiscovery.discover_all_unity_instances()
+
+ # Update cache
+ with self._pool_lock:
+ self._known_instances = {inst.id: inst for inst in instances}
+ self._last_full_scan = now
+
+ logger.info(f"Found {len(instances)} Unity instances: {[inst.id for inst in instances]}")
+ return instances
+
+ def _resolve_instance_id(self, instance_identifier: str | None, instances: list[UnityInstanceInfo]) -> UnityInstanceInfo:
+ """
+ Resolve an instance identifier to a specific Unity instance.
+
+ Args:
+ instance_identifier: User-provided identifier (name, hash, name@hash, path, port, or None)
+ instances: List of available instances
+
+ Returns:
+ Resolved UnityInstanceInfo
+
+ Raises:
+ ConnectionError: If instance cannot be resolved
+ """
+ if not instances:
+ raise ConnectionError(
+ "No Unity Editor instances found. Please ensure Unity is running with MCP for Unity bridge."
+ )
+
+ # Use default instance if no identifier provided
+ if instance_identifier is None:
+ if self._default_instance_id:
+ instance_identifier = self._default_instance_id
+ logger.debug(f"Using default instance: {instance_identifier}")
+ else:
+ # Use the most recently active instance
+ # Instances with no heartbeat (None) should be sorted last (use 0.0 as sentinel)
+ sorted_instances = sorted(
+ instances,
+ key=lambda inst: inst.last_heartbeat.timestamp() if inst.last_heartbeat else 0.0,
+ reverse=True,
+ )
+ logger.info(f"No instance specified, using most recent: {sorted_instances[0].id}")
+ return sorted_instances[0]
+
+ identifier = instance_identifier.strip()
+
+ # Try exact ID match first
+ for inst in instances:
+ if inst.id == identifier:
+ return inst
+
+ # Try project name match
+ name_matches = [inst for inst in instances if inst.name == identifier]
+ if len(name_matches) == 1:
+ return name_matches[0]
+ elif len(name_matches) > 1:
+ # Multiple projects with same name - return helpful error
+ suggestions = [
+ {
+ "id": inst.id,
+ "path": inst.path,
+ "port": inst.port,
+ "suggest": f"Use unity_instance='{inst.id}'"
+ }
+ for inst in name_matches
+ ]
+ raise ConnectionError(
+ f"Project name '{identifier}' matches {len(name_matches)} instances. "
+ f"Please use the full format (e.g., '{name_matches[0].id}'). "
+ f"Available instances: {suggestions}"
+ )
+
+ # Try hash match
+ hash_matches = [inst for inst in instances if inst.hash == identifier or inst.hash.startswith(identifier)]
+ if len(hash_matches) == 1:
+ return hash_matches[0]
+ elif len(hash_matches) > 1:
+ raise ConnectionError(
+ f"Hash '{identifier}' matches multiple instances: {[inst.id for inst in hash_matches]}"
+ )
+
+ # Try composite format: Name@Hash or Name@Port
+ if "@" in identifier:
+ name_part, hint_part = identifier.split("@", 1)
+ composite_matches = [
+ inst for inst in instances
+ if inst.name == name_part and (
+ inst.hash.startswith(hint_part) or str(inst.port) == hint_part
+ )
+ ]
+ if len(composite_matches) == 1:
+ return composite_matches[0]
+
+ # Try port match (as string)
+ try:
+ port_num = int(identifier)
+ port_matches = [inst for inst in instances if inst.port == port_num]
+ if len(port_matches) == 1:
+ return port_matches[0]
+ except ValueError:
+ pass
+
+ # Try path match
+ path_matches = [inst for inst in instances if inst.path == identifier]
+ if len(path_matches) == 1:
+ return path_matches[0]
+
+ # Nothing matched
+ available_ids = [inst.id for inst in instances]
+ raise ConnectionError(
+ f"Unity instance '{identifier}' not found. "
+ f"Available instances: {available_ids}. "
+ f"Use the unity_instances resource to see all instances."
+ )
+
+ def get_connection(self, instance_identifier: str | None = None):
+ """
+ Get or create a connection to a Unity instance.
+
+ Args:
+ instance_identifier: Optional identifier (name, hash, name@hash, etc.)
+ If None, uses default or most recent instance
+
+ Returns:
+ UnityConnection to the specified instance
+
+ Raises:
+ ConnectionError: If instance cannot be found or connected
+ """
+ # Refresh instance list if cache expired
+ instances = self.discover_all_instances()
+
+ # Resolve identifier to specific instance
+ target = self._resolve_instance_id(instance_identifier, instances)
+
+ # Return existing connection or create new one
+ with self._pool_lock:
+ if target.id not in self._connections:
+ logger.info(f"Creating new connection to Unity instance: {target.id} (port {target.port})")
+ conn = self._UnityConnection(port=target.port, instance_id=target.id)
+ if not conn.connect():
+ raise ConnectionError(
+ f"Failed to connect to Unity instance '{target.id}' on port {target.port}. "
+ f"Ensure the Unity Editor is running."
+ )
+ self._connections[target.id] = conn
+ else:
+ # Update existing connection with instance_id and port if changed
+ conn = self._connections[target.id]
+ conn.instance_id = target.id
+ if conn.port != target.port:
+ logger.info(f"Updating cached port for {target.id}: {conn.port} -> {target.port}")
+ conn.port = target.port
+ logger.debug(f"Reusing existing connection to: {target.id}")
+
+ return self._connections[target.id]
+
+ def disconnect_all(self):
+ """Disconnect all active connections"""
+ with self._pool_lock:
+ for instance_id, conn in self._connections.items():
+ try:
+ logger.info(f"Disconnecting from Unity instance: {instance_id}")
+ conn.disconnect()
+ except Exception:
+ logger.exception(f"Error disconnecting from {instance_id}")
+ self._connections.clear()
+
+
+# Global Unity connection pool
+_unity_connection_pool: UnityConnectionPool | None = None
+_pool_init_lock = threading.Lock()
+
+
+def get_unity_connection_pool() -> UnityConnectionPool:
+ """Get or create the global Unity connection pool."""
+ global _unity_connection_pool
+ if _unity_connection_pool is None:
+ with _pool_init_lock:
+ if _unity_connection_pool is None:
+ _unity_connection_pool = UnityConnectionPool()
+ return _unity_connection_pool
diff --git a/MCPForUnity/UnityMcpServer~/src/models.py b/MCPForUnity/UnityMcpServer~/src/models.py
index cf1d33da..7c56327c 100644
--- a/MCPForUnity/UnityMcpServer~/src/models.py
+++ b/MCPForUnity/UnityMcpServer~/src/models.py
@@ -1,4 +1,5 @@
from typing import Any
+from datetime import datetime
from pydantic import BaseModel
@@ -7,3 +8,28 @@ class MCPResponse(BaseModel):
message: str | None = None
error: str | None = None
data: Any | None = None
+
+
+class UnityInstanceInfo(BaseModel):
+ """Information about a Unity Editor instance"""
+ id: str # "ProjectName@hash" or fallback to hash
+ name: str # Project name extracted from path
+ path: str # Full project path (Assets folder)
+ hash: str # 8-char hash of project path
+ port: int # TCP port
+ status: str # "running", "reloading", "offline"
+ last_heartbeat: datetime | None = None
+ unity_version: str | None = None
+
+ def to_dict(self) -> dict[str, Any]:
+ """Convert to dictionary for JSON serialization"""
+ return {
+ "id": self.id,
+ "name": self.name,
+ "path": self.path,
+ "hash": self.hash,
+ "port": self.port,
+ "status": self.status,
+ "last_heartbeat": self.last_heartbeat.isoformat() if self.last_heartbeat else None,
+ "unity_version": self.unity_version
+ }
diff --git a/MCPForUnity/UnityMcpServer~/src/port_discovery.py b/MCPForUnity/UnityMcpServer~/src/port_discovery.py
index c759e745..e0f0ed27 100644
--- a/MCPForUnity/UnityMcpServer~/src/port_discovery.py
+++ b/MCPForUnity/UnityMcpServer~/src/port_discovery.py
@@ -14,9 +14,14 @@
import glob
import json
import logging
+import os
+import struct
+from datetime import datetime
from pathlib import Path
import socket
-from typing import Optional, List
+from typing import Optional, List, Dict
+
+from models import UnityInstanceInfo
logger = logging.getLogger("mcp-for-unity-server")
@@ -56,22 +61,55 @@ def list_candidate_files() -> List[Path]:
@staticmethod
def _try_probe_unity_mcp(port: int) -> bool:
"""Quickly check if a MCP for Unity listener is on this port.
- Tries a short TCP connect, sends 'ping', expects Unity bridge welcome message.
+ Uses Unity's framed protocol: receives handshake, sends framed ping, expects framed pong.
"""
try:
with socket.create_connection(("127.0.0.1", port), PortDiscovery.CONNECT_TIMEOUT) as s:
s.settimeout(PortDiscovery.CONNECT_TIMEOUT)
try:
- s.sendall(b"ping")
- data = s.recv(512)
- # Check for Unity bridge welcome message format
- if data and (b"WELCOME UNITY-MCP" in data or b'"message":"pong"' in data):
- return True
- except Exception:
+ # 1. Receive handshake from Unity
+ handshake = s.recv(512)
+ if not handshake or b"FRAMING=1" not in handshake:
+ # Try legacy mode as fallback
+ s.sendall(b"ping")
+ data = s.recv(512)
+ return data and b'"message":"pong"' in data
+
+ # 2. Send framed ping command
+ # Frame format: 8-byte length header (big-endian uint64) + payload
+ payload = b"ping"
+ header = struct.pack('>Q', len(payload))
+ s.sendall(header + payload)
+
+ # 3. Receive framed response
+ # Helper to receive exact number of bytes
+ def _recv_exact(expected: int) -> bytes | None:
+ chunks = bytearray()
+ while len(chunks) < expected:
+ chunk = s.recv(expected - len(chunks))
+ if not chunk:
+ return None
+ chunks.extend(chunk)
+ return bytes(chunks)
+
+ response_header = _recv_exact(8)
+ if response_header is None:
+ return False
+
+ response_length = struct.unpack('>Q', response_header)[0]
+ if response_length > 10000: # Sanity check
+ return False
+
+ response = _recv_exact(response_length)
+ if response is None:
+ return False
+ return b'"message":"pong"' in response
+ except Exception as e:
+ logger.debug(f"Port probe failed for {port}: {e}")
return False
- except Exception:
+ except Exception as e:
+ logger.debug(f"Connection failed for port {port}: {e}")
return False
- return False
@staticmethod
def _read_latest_status() -> Optional[dict]:
@@ -158,3 +196,112 @@ def get_port_config() -> Optional[dict]:
logger.warning(
f"Could not read port configuration {path}: {e}")
return None
+
+ @staticmethod
+ def _extract_project_name(project_path: str) -> str:
+ """Extract project name from Assets path.
+
+ Examples:
+ /Users/sakura/Projects/MyGame/Assets -> MyGame
+ C:\\Projects\\TestProject\\Assets -> TestProject
+ """
+ if not project_path:
+ return "Unknown"
+
+ try:
+ # Remove trailing /Assets or \Assets
+ path = project_path.rstrip('/\\')
+ if path.endswith('Assets'):
+ path = path[:-6].rstrip('/\\')
+
+ # Get the last directory name
+ name = os.path.basename(path)
+ return name if name else "Unknown"
+ except Exception:
+ return "Unknown"
+
+ @staticmethod
+ def discover_all_unity_instances() -> List[UnityInstanceInfo]:
+ """
+ Discover all running Unity Editor instances by scanning status files.
+
+ Returns:
+ List of UnityInstanceInfo objects for all discovered instances
+ """
+ instances_by_port: Dict[int, tuple[UnityInstanceInfo, float]] = {}
+ base = PortDiscovery.get_registry_dir()
+
+ # Scan all status files
+ status_pattern = str(base / "unity-mcp-status-*.json")
+ status_files = glob.glob(status_pattern)
+
+ for status_file_path in status_files:
+ try:
+ status_path = Path(status_file_path)
+ file_mtime = status_path.stat().st_mtime
+
+ with status_path.open('r') as f:
+ data = json.load(f)
+
+ # Extract hash from filename: unity-mcp-status-{hash}.json
+ filename = os.path.basename(status_file_path)
+ hash_value = filename.replace('unity-mcp-status-', '').replace('.json', '')
+
+ # Extract information
+ project_path = data.get('project_path', '')
+ project_name = PortDiscovery._extract_project_name(project_path)
+ port = data.get('unity_port')
+ is_reloading = data.get('reloading', False)
+
+ # Parse last_heartbeat
+ last_heartbeat = None
+ heartbeat_str = data.get('last_heartbeat')
+ if heartbeat_str:
+ try:
+ last_heartbeat = datetime.fromisoformat(heartbeat_str.replace('Z', '+00:00'))
+ except Exception:
+ pass
+
+ # Verify port is actually responding
+ is_alive = PortDiscovery._try_probe_unity_mcp(port) if isinstance(port, int) else False
+
+ if not is_alive:
+ logger.debug(f"Instance {project_name}@{hash_value} has heartbeat but port {port} not responding")
+ continue
+
+ freshness = last_heartbeat.timestamp() if last_heartbeat else file_mtime
+
+ existing = instances_by_port.get(port)
+ if existing:
+ _, existing_freshness = existing
+ if existing_freshness >= freshness:
+ logger.debug(
+ "Skipping stale status entry %s in favor of more recent data for port %s",
+ status_path.name,
+ port,
+ )
+ continue
+
+ # Create instance info
+ instance = UnityInstanceInfo(
+ id=f"{project_name}@{hash_value}",
+ name=project_name,
+ path=project_path,
+ hash=hash_value,
+ port=port,
+ status="reloading" if is_reloading else "running",
+ last_heartbeat=last_heartbeat,
+ unity_version=data.get('unity_version') # May not be available in current version
+ )
+
+ instances_by_port[port] = (instance, freshness)
+ logger.debug(f"Discovered Unity instance: {instance.id} on port {instance.port}")
+
+ except Exception as e:
+ logger.debug(f"Failed to parse status file {status_file_path}: {e}")
+ continue
+
+ deduped_instances = [entry[0] for entry in sorted(instances_by_port.values(), key=lambda item: item[1], reverse=True)]
+
+ logger.info(f"Discovered {len(deduped_instances)} Unity instances (after de-duplication by port)")
+ return deduped_instances
diff --git a/MCPForUnity/UnityMcpServer~/src/pyproject.toml b/MCPForUnity/UnityMcpServer~/src/pyproject.toml
index 6dd28065..709c6e32 100644
--- a/MCPForUnity/UnityMcpServer~/src/pyproject.toml
+++ b/MCPForUnity/UnityMcpServer~/src/pyproject.toml
@@ -6,7 +6,7 @@ readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"httpx>=0.27.2",
- "fastmcp>=2.12.5",
+ "fastmcp>=2.13.0",
"mcp>=1.16.0",
"pydantic>=2.12.0",
"tomli>=2.3.0",
diff --git a/MCPForUnity/UnityMcpServer~/src/resources/__init__.py b/MCPForUnity/UnityMcpServer~/src/resources/__init__.py
index a3577891..eeefea74 100644
--- a/MCPForUnity/UnityMcpServer~/src/resources/__init__.py
+++ b/MCPForUnity/UnityMcpServer~/src/resources/__init__.py
@@ -2,6 +2,7 @@
MCP Resources package - Auto-discovers and registers all resources in this directory.
"""
import logging
+import inspect
from pathlib import Path
from fastmcp import FastMCP
@@ -16,6 +17,31 @@
__all__ = ['register_all_resources']
+def _create_fixed_wrapper(original_func, has_other_params):
+ """
+ Factory function to create a wrapper that calls original_func with unity_instance=None.
+ This avoids closure issues in loops and preserves sync/async nature of the original function.
+ """
+ is_async = inspect.iscoroutinefunction(original_func)
+
+ if has_other_params:
+ if is_async:
+ async def fixed_wrapper(*args, **kwargs):
+ return await original_func(*args, **kwargs, unity_instance=None)
+ else:
+ def fixed_wrapper(*args, **kwargs):
+ return original_func(*args, **kwargs, unity_instance=None)
+ else:
+ if is_async:
+ async def fixed_wrapper():
+ return await original_func(unity_instance=None)
+ else:
+ def fixed_wrapper():
+ return original_func(unity_instance=None)
+
+ return fixed_wrapper
+
+
def register_all_resources(mcp: FastMCP):
"""
Auto-discover and register all resources in the resources/ directory.
@@ -36,6 +62,7 @@ def register_all_resources(mcp: FastMCP):
logger.warning("No MCP resources registered!")
return
+ registered_count = 0
for resource_info in resources:
func = resource_info['func']
uri = resource_info['uri']
@@ -43,11 +70,68 @@ def register_all_resources(mcp: FastMCP):
description = resource_info['description']
kwargs = resource_info['kwargs']
- # Apply the @mcp.resource decorator and telemetry
- wrapped = telemetry_resource(resource_name)(func)
- wrapped = mcp.resource(uri=uri, name=resource_name,
- description=description, **kwargs)(wrapped)
- resource_info['func'] = wrapped
- logger.debug(f"Registered resource: {resource_name} - {description}")
+ # Check if URI contains query parameters (e.g., {?unity_instance})
+ has_query_params = '{?' in uri
+
+ if has_query_params:
+ # Register two versions for backward compatibility:
+ # 1. Template version with query parameters (for multi-instance)
+ wrapped_template = telemetry_resource(resource_name)(func)
+ wrapped_template = mcp.resource(uri=uri, name=resource_name,
+ description=description, **kwargs)(wrapped_template)
+ logger.debug(f"Registered resource template: {resource_name} - {uri}")
+ registered_count += 1
+
+ # 2. Fixed version without query parameters (for single-instance/default)
+ # Remove query parameters from URI
+ fixed_uri = uri.split('{?')[0]
+ fixed_name = f"{resource_name}_default"
+ fixed_description = f"{description} (default instance)"
+
+ # Create a wrapper function that doesn't accept unity_instance parameter
+ # This wrapper will call the original function with unity_instance=None
+ sig = inspect.signature(func)
+ params = list(sig.parameters.values())
+
+ # Filter out unity_instance parameter
+ fixed_params = [p for p in params if p.name != 'unity_instance']
+
+ # Create wrapper using factory function to avoid closure issues
+ has_other_params = len(fixed_params) > 0
+ fixed_wrapper = _create_fixed_wrapper(func, has_other_params)
+
+ # Update signature to match filtered parameters
+ if has_other_params:
+ fixed_wrapper.__signature__ = sig.replace(parameters=fixed_params)
+ fixed_wrapper.__annotations__ = {
+ k: v for k, v in getattr(func, '__annotations__', {}).items()
+ if k != 'unity_instance'
+ }
+ else:
+ fixed_wrapper.__signature__ = inspect.Signature(parameters=[])
+ fixed_wrapper.__annotations__ = {
+ k: v for k, v in getattr(func, '__annotations__', {}).items()
+ if k == 'return'
+ }
+
+ # Preserve function metadata
+ fixed_wrapper.__name__ = fixed_name
+ fixed_wrapper.__doc__ = func.__doc__
+
+ wrapped_fixed = telemetry_resource(fixed_name)(fixed_wrapper)
+ wrapped_fixed = mcp.resource(uri=fixed_uri, name=fixed_name,
+ description=fixed_description, **kwargs)(wrapped_fixed)
+ logger.debug(f"Registered resource (fixed): {fixed_name} - {fixed_uri}")
+ registered_count += 1
+
+ resource_info['func'] = wrapped_template
+ else:
+ # No query parameters, register as-is
+ wrapped = telemetry_resource(resource_name)(func)
+ wrapped = mcp.resource(uri=uri, name=resource_name,
+ description=description, **kwargs)(wrapped)
+ resource_info['func'] = wrapped
+ logger.debug(f"Registered resource: {resource_name} - {description}")
+ registered_count += 1
- logger.info(f"Registered {len(resources)} MCP resources")
+ logger.info(f"Registered {registered_count} MCP resources ({len(resources)} unique)")
diff --git a/MCPForUnity/UnityMcpServer~/src/resources/menu_items.py b/MCPForUnity/UnityMcpServer~/src/resources/menu_items.py
index d3724659..f0469d30 100644
--- a/MCPForUnity/UnityMcpServer~/src/resources/menu_items.py
+++ b/MCPForUnity/UnityMcpServer~/src/resources/menu_items.py
@@ -8,18 +8,21 @@ class GetMenuItemsResponse(MCPResponse):
@mcp_for_unity_resource(
- uri="mcpforunity://menu-items",
+ uri="mcpforunity://menu-items{?unity_instance}",
name="get_menu_items",
description="Provides a list of all menu items."
)
-async def get_menu_items() -> GetMenuItemsResponse:
- """Provides a list of all menu items."""
- # Later versions of FastMCP support these as query parameters
- # See: https://gofastmcp.com/servers/resources#query-parameters
+async def get_menu_items(unity_instance: str | None = None) -> GetMenuItemsResponse:
+ """Provides a list of all menu items.
+
+ Args:
+ unity_instance: Target Unity instance (project name, hash, or 'Name@hash').
+ If not specified, uses default instance.
+ """
params = {
"refresh": True,
"search": "",
}
- response = await async_send_command_with_retry("get_menu_items", params)
+ response = await async_send_command_with_retry("get_menu_items", params, instance_id=unity_instance)
return GetMenuItemsResponse(**response) if isinstance(response, dict) else response
diff --git a/MCPForUnity/UnityMcpServer~/src/resources/tests.py b/MCPForUnity/UnityMcpServer~/src/resources/tests.py
index 4268a143..17ed7c4f 100644
--- a/MCPForUnity/UnityMcpServer~/src/resources/tests.py
+++ b/MCPForUnity/UnityMcpServer~/src/resources/tests.py
@@ -17,15 +17,29 @@ class GetTestsResponse(MCPResponse):
data: list[TestItem] = []
-@mcp_for_unity_resource(uri="mcpforunity://tests", name="get_tests", description="Provides a list of all tests.")
-async def get_tests() -> GetTestsResponse:
- """Provides a list of all tests."""
- response = await async_send_command_with_retry("get_tests", {})
+@mcp_for_unity_resource(uri="mcpforunity://tests{?unity_instance}", name="get_tests", description="Provides a list of all tests.")
+async def get_tests(unity_instance: str | None = None) -> GetTestsResponse:
+ """Provides a list of all tests.
+
+ Args:
+ unity_instance: Target Unity instance (project name, hash, or 'Name@hash').
+ If not specified, uses default instance.
+ """
+ response = await async_send_command_with_retry("get_tests", {}, instance_id=unity_instance)
return GetTestsResponse(**response) if isinstance(response, dict) else response
-@mcp_for_unity_resource(uri="mcpforunity://tests/{mode}", name="get_tests_for_mode", description="Provides a list of tests for a specific mode.")
-async def get_tests_for_mode(mode: Annotated[Literal["EditMode", "PlayMode"], Field(description="The mode to filter tests by.")]) -> GetTestsResponse:
- """Provides a list of tests for a specific mode."""
- response = await async_send_command_with_retry("get_tests_for_mode", {"mode": mode})
+@mcp_for_unity_resource(uri="mcpforunity://tests/{mode}{?unity_instance}", name="get_tests_for_mode", description="Provides a list of tests for a specific mode.")
+async def get_tests_for_mode(
+ mode: Annotated[Literal["EditMode", "PlayMode"], Field(description="The mode to filter tests by.")],
+ unity_instance: str | None = None
+) -> GetTestsResponse:
+ """Provides a list of tests for a specific mode.
+
+ Args:
+ mode: The test mode to filter by (EditMode or PlayMode).
+ unity_instance: Target Unity instance (project name, hash, or 'Name@hash').
+ If not specified, uses default instance.
+ """
+ response = await async_send_command_with_retry("get_tests_for_mode", {"mode": mode}, instance_id=unity_instance)
return GetTestsResponse(**response) if isinstance(response, dict) else response
diff --git a/MCPForUnity/UnityMcpServer~/src/resources/unity_instances.py b/MCPForUnity/UnityMcpServer~/src/resources/unity_instances.py
new file mode 100644
index 00000000..4bddd5a5
--- /dev/null
+++ b/MCPForUnity/UnityMcpServer~/src/resources/unity_instances.py
@@ -0,0 +1,63 @@
+"""
+Resource for listing all available Unity Editor instances.
+"""
+from typing import Any
+
+from registry import mcp_for_unity_resource
+from unity_connection import get_unity_connection_pool
+
+
+@mcp_for_unity_resource(
+ uri="mcpforunity://unity-instances",
+ name="unity_instances",
+ description="Provides a list of all running Unity Editor instances with their details."
+)
+def get_unity_instances() -> dict[str, Any]:
+ """
+ List all available Unity Editor instances.
+
+ Returns information about each instance including:
+ - id: Unique identifier (ProjectName@hash)
+ - name: Project name
+ - path: Full project path
+ - hash: 8-character hash of project path
+ - port: TCP port number
+ - status: Current status (running, reloading, etc.)
+ - last_heartbeat: Last heartbeat timestamp
+ - unity_version: Unity version (if available)
+
+ Returns:
+ Dictionary containing list of instances and metadata
+ """
+ try:
+ pool = get_unity_connection_pool()
+ instances = pool.discover_all_instances(force_refresh=True)
+
+ # Check for duplicate project names
+ name_counts = {}
+ for inst in instances:
+ name_counts[inst.name] = name_counts.get(inst.name, 0) + 1
+
+ duplicates = [name for name, count in name_counts.items() if count > 1]
+
+ result = {
+ "success": True,
+ "instance_count": len(instances),
+ "instances": [inst.to_dict() for inst in instances],
+ }
+
+ if duplicates:
+ result["warning"] = (
+ f"Multiple instances found with duplicate project names: {duplicates}. "
+ f"Use full format (e.g., 'ProjectName@hash') to specify which instance."
+ )
+
+ return result
+
+ except Exception as e:
+ return {
+ "success": False,
+ "error": f"Failed to list Unity instances: {str(e)}",
+ "instance_count": 0,
+ "instances": []
+ }
diff --git a/MCPForUnity/UnityMcpServer~/src/server.py b/MCPForUnity/UnityMcpServer~/src/server.py
index 11053ac8..c3dd10a2 100644
--- a/MCPForUnity/UnityMcpServer~/src/server.py
+++ b/MCPForUnity/UnityMcpServer~/src/server.py
@@ -3,12 +3,13 @@
import logging
from logging.handlers import RotatingFileHandler
import os
+import argparse
from contextlib import asynccontextmanager
from typing import AsyncIterator, Dict, Any
from config import config
from tools import register_all_tools
from resources import register_all_resources
-from unity_connection import get_unity_connection, UnityConnection
+from connection_pool import get_unity_connection_pool
import time
# Configure logging using settings from config
@@ -61,16 +62,14 @@
except Exception:
pass
-# Global connection state
-_unity_connection: UnityConnection = None
-
-
@asynccontextmanager
async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]:
"""Handle server startup and shutdown."""
- global _unity_connection
logger.info("MCP for Unity Server starting up")
+ # Initialize connection pool variable
+ pool = None
+
# Record server startup telemetry
start_time = time.time()
start_clk = time.perf_counter()
@@ -101,22 +100,35 @@ def _emit_startup():
logger.info(
"Skipping Unity connection on startup (UNITY_MCP_SKIP_STARTUP_CONNECT=1)")
else:
- _unity_connection = get_unity_connection()
- logger.info("Connected to Unity on startup")
-
- # Record successful Unity connection (deferred)
- import threading as _t
- _t.Timer(1.0, lambda: record_telemetry(
- RecordType.UNITY_CONNECTION,
- {
- "status": "connected",
- "connection_time_ms": (time.perf_counter() - start_clk) * 1000,
- }
- )).start()
+ # Initialize connection pool and discover instances
+ pool = get_unity_connection_pool()
+ instances = pool.discover_all_instances()
+
+ if instances:
+ logger.info(f"Discovered {len(instances)} Unity instance(s): {[i.id for i in instances]}")
+
+ # Try to connect to default instance
+ try:
+ pool.get_connection()
+ logger.info("Connected to default Unity instance on startup")
+
+ # Record successful Unity connection (deferred)
+ import threading as _t
+ _t.Timer(1.0, lambda: record_telemetry(
+ RecordType.UNITY_CONNECTION,
+ {
+ "status": "connected",
+ "connection_time_ms": (time.perf_counter() - start_clk) * 1000,
+ "instance_count": len(instances)
+ }
+ )).start()
+ except Exception as e:
+ logger.warning("Could not connect to default Unity instance: %s", e)
+ else:
+ logger.warning("No Unity instances found on startup")
except ConnectionError as e:
logger.warning("Could not connect to Unity on startup: %s", e)
- _unity_connection = None
# Record connection failure (deferred)
import threading as _t
@@ -132,7 +144,6 @@ def _emit_startup():
except Exception as e:
logger.warning(
"Unexpected error connecting to Unity on startup: %s", e)
- _unity_connection = None
import threading as _t
_err_msg = str(e)[:200]
_t.Timer(1.0, lambda: record_telemetry(
@@ -145,13 +156,12 @@ def _emit_startup():
)).start()
try:
- # Yield the connection object so it can be attached to the context
- # The key 'bridge' matches how tools like read_console expect to access it (ctx.bridge)
- yield {"bridge": _unity_connection}
+ # Yield the connection pool so it can be attached to the context
+ # Note: Tools will use get_unity_connection_pool() directly
+ yield {"pool": pool}
finally:
- if _unity_connection:
- _unity_connection.disconnect()
- _unity_connection = None
+ if pool:
+ pool.disconnect_all()
logger.info("MCP for Unity Server shut down")
# Initialize MCP server
@@ -188,6 +198,38 @@ def _emit_startup():
def main():
"""Entry point for uvx and console scripts."""
+ parser = argparse.ArgumentParser(
+ description="MCP for Unity Server",
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ epilog="""
+Environment Variables:
+ UNITY_MCP_DEFAULT_INSTANCE Default Unity instance to target (project name, hash, or 'Name@hash')
+ UNITY_MCP_SKIP_STARTUP_CONNECT Skip initial Unity connection attempt (set to 1/true/yes/on)
+ UNITY_MCP_TELEMETRY_ENABLED Enable telemetry (set to 1/true/yes/on)
+
+Examples:
+ # Use specific Unity project as default
+ python -m src.server --default-instance "MyProject"
+
+ # Or use environment variable
+ UNITY_MCP_DEFAULT_INSTANCE="MyProject" python -m src.server
+ """
+ )
+ parser.add_argument(
+ "--default-instance",
+ type=str,
+ metavar="INSTANCE",
+ help="Default Unity instance to target (project name, hash, or 'Name@hash'). "
+ "Overrides UNITY_MCP_DEFAULT_INSTANCE environment variable."
+ )
+
+ args = parser.parse_args()
+
+ # Set environment variable if --default-instance is provided
+ if args.default_instance:
+ os.environ["UNITY_MCP_DEFAULT_INSTANCE"] = args.default_instance
+ logger.info(f"Using default Unity instance from command-line: {args.default_instance}")
+
mcp.run(transport='stdio')
diff --git a/MCPForUnity/UnityMcpServer~/src/tools/execute_menu_item.py b/MCPForUnity/UnityMcpServer~/src/tools/execute_menu_item.py
index a1489c59..23471230 100644
--- a/MCPForUnity/UnityMcpServer~/src/tools/execute_menu_item.py
+++ b/MCPForUnity/UnityMcpServer~/src/tools/execute_menu_item.py
@@ -17,9 +17,11 @@ async def execute_menu_item(
ctx: Context,
menu_path: Annotated[str,
"Menu path for 'execute' or 'exists' (e.g., 'File/Save Project')"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> MCPResponse:
await ctx.info(f"Processing execute_menu_item: {menu_path}")
params_dict: dict[str, Any] = {"menuPath": menu_path}
params_dict = {k: v for k, v in params_dict.items() if v is not None}
- result = await async_send_command_with_retry("execute_menu_item", params_dict)
+ result = await async_send_command_with_retry("execute_menu_item", params_dict, instance_id=unity_instance)
return MCPResponse(**result) if isinstance(result, dict) else result
diff --git a/MCPForUnity/UnityMcpServer~/src/tools/manage_asset.py b/MCPForUnity/UnityMcpServer~/src/tools/manage_asset.py
index a577e94d..a9a012c8 100644
--- a/MCPForUnity/UnityMcpServer~/src/tools/manage_asset.py
+++ b/MCPForUnity/UnityMcpServer~/src/tools/manage_asset.py
@@ -31,9 +31,11 @@ async def manage_asset(
filter_date_after: Annotated[str,
"Date after which to filter"] | None = None,
page_size: Annotated[int | float | str, "Page size for pagination"] | None = None,
- page_number: Annotated[int | float | str, "Page number for pagination"] | None = None
+ page_number: Annotated[int | float | str, "Page number for pagination"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None
) -> dict[str, Any]:
- ctx.info(f"Processing manage_asset: {action}")
+ ctx.info(f"Processing manage_asset: {action} (unity_instance={unity_instance or 'default'})")
# Coerce 'properties' from JSON string to dict for client compatibility
if isinstance(properties, str):
try:
@@ -86,7 +88,7 @@ def _coerce_int(value, default=None):
# Get the current asyncio event loop
loop = asyncio.get_running_loop()
- # Use centralized async retry helper to avoid blocking the event loop
- result = await async_send_command_with_retry("manage_asset", params_dict, loop=loop)
+ # Use centralized async retry helper with instance routing
+ result = await async_send_command_with_retry("manage_asset", params_dict, instance_id=unity_instance, loop=loop)
# Return the result obtained from Unity
return result if isinstance(result, dict) else {"success": False, "message": str(result)}
diff --git a/MCPForUnity/UnityMcpServer~/src/tools/manage_editor.py b/MCPForUnity/UnityMcpServer~/src/tools/manage_editor.py
index f7911458..637fc678 100644
--- a/MCPForUnity/UnityMcpServer~/src/tools/manage_editor.py
+++ b/MCPForUnity/UnityMcpServer~/src/tools/manage_editor.py
@@ -21,6 +21,8 @@ def manage_editor(
"Tag name when adding and removing tags"] | None = None,
layer_name: Annotated[str,
"Layer name when adding and removing layers"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing manage_editor: {action}")
@@ -62,8 +64,8 @@ def _coerce_bool(value, default=None):
}
params = {k: v for k, v in params.items() if v is not None}
- # Send command using centralized retry helper
- response = send_command_with_retry("manage_editor", params)
+ # Send command using centralized retry helper with instance routing
+ response = send_command_with_retry("manage_editor", params, instance_id=unity_instance)
# Preserve structured failure data; unwrap success into a friendlier shape
if isinstance(response, dict) and response.get("success"):
diff --git a/MCPForUnity/UnityMcpServer~/src/tools/manage_gameobject.py b/MCPForUnity/UnityMcpServer~/src/tools/manage_gameobject.py
index 794013b9..2896e309 100644
--- a/MCPForUnity/UnityMcpServer~/src/tools/manage_gameobject.py
+++ b/MCPForUnity/UnityMcpServer~/src/tools/manage_gameobject.py
@@ -64,6 +64,8 @@ def manage_gameobject(
# Controls whether serialization of private [SerializeField] fields is included
includeNonPublicSerialized: Annotated[bool | str,
"Controls whether serialization of private [SerializeField] fields is included (accepts true/false or 'true'/'false')"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing manage_gameobject: {action}")
@@ -195,8 +197,8 @@ def _to_vec3(parts):
params.pop("prefabFolder", None)
# --------------------------------
- # Use centralized retry helper
- response = send_command_with_retry("manage_gameobject", params)
+ # Use centralized retry helper with instance routing
+ response = send_command_with_retry("manage_gameobject", params, instance_id=unity_instance)
# Check if the response indicates success
# If the response is not successful, raise an exception with the error message
diff --git a/MCPForUnity/UnityMcpServer~/src/tools/manage_prefabs.py b/MCPForUnity/UnityMcpServer~/src/tools/manage_prefabs.py
index 2540e9f2..010ceada 100644
--- a/MCPForUnity/UnityMcpServer~/src/tools/manage_prefabs.py
+++ b/MCPForUnity/UnityMcpServer~/src/tools/manage_prefabs.py
@@ -28,6 +28,8 @@ def manage_prefabs(
"Allow replacing an existing prefab at the same path"] | None = None,
search_inactive: Annotated[bool,
"Include inactive objects when resolving the target name"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing manage_prefabs: {action}")
try:
@@ -45,7 +47,7 @@ def manage_prefabs(
params["allowOverwrite"] = bool(allow_overwrite)
if search_inactive is not None:
params["searchInactive"] = bool(search_inactive)
- response = send_command_with_retry("manage_prefabs", params)
+ response = send_command_with_retry("manage_prefabs", params, instance_id=unity_instance)
if isinstance(response, dict) and response.get("success"):
return {
diff --git a/MCPForUnity/UnityMcpServer~/src/tools/manage_scene.py b/MCPForUnity/UnityMcpServer~/src/tools/manage_scene.py
index 50927ca9..a5748196 100644
--- a/MCPForUnity/UnityMcpServer~/src/tools/manage_scene.py
+++ b/MCPForUnity/UnityMcpServer~/src/tools/manage_scene.py
@@ -15,6 +15,8 @@ def manage_scene(
"Asset path for scene operations (default: 'Assets/')"] | None = None,
build_index: Annotated[int | str,
"Build index for load/build settings actions (accepts int or string, e.g., 0 or '0')"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing manage_scene: {action}")
try:
@@ -44,8 +46,8 @@ def _coerce_int(value, default=None):
if coerced_build_index is not None:
params["buildIndex"] = coerced_build_index
- # Use centralized retry helper
- response = send_command_with_retry("manage_scene", params)
+ # Use centralized retry helper with instance routing
+ response = send_command_with_retry("manage_scene", params, instance_id=unity_instance)
# Preserve structured failure data; unwrap success into a friendlier shape
if isinstance(response, dict) and response.get("success"):
diff --git a/MCPForUnity/UnityMcpServer~/src/tools/manage_script.py b/MCPForUnity/UnityMcpServer~/src/tools/manage_script.py
index 6ed8cbca..831afa9e 100644
--- a/MCPForUnity/UnityMcpServer~/src/tools/manage_script.py
+++ b/MCPForUnity/UnityMcpServer~/src/tools/manage_script.py
@@ -85,6 +85,8 @@ def apply_text_edits(
"Optional strict flag, used to enforce strict mode"] | None = None,
options: Annotated[dict[str, Any],
"Optional options, used to pass additional options to the script editor"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing apply_text_edits: {uri}")
name, directory = _split_uri(uri)
@@ -107,7 +109,7 @@ def _needs_normalization(arr: list[dict[str, Any]]) -> bool:
"action": "read",
"name": name,
"path": directory,
- })
+ }, instance_id=unity_instance)
if not (isinstance(read_resp, dict) and read_resp.get("success")):
return read_resp if isinstance(read_resp, dict) else {"success": False, "message": str(read_resp)}
data = read_resp.get("data", {})
@@ -304,7 +306,7 @@ def _le(a: tuple[int, int], b: tuple[int, int]) -> bool:
"options": opts,
}
params = {k: v for k, v in params.items() if v is not None}
- resp = unity_connection.send_command_with_retry("manage_script", params)
+ resp = unity_connection.send_command_with_retry("manage_script", params, instance_id=unity_instance)
if isinstance(resp, dict):
data = resp.setdefault("data", {})
data.setdefault("normalizedEdits", normalized_edits)
@@ -341,6 +343,7 @@ def _flip_async():
{"menuPath": "MCP/Flip Reload Sentinel"},
max_retries=0,
retry_ms=0,
+ instance_id=unity_instance,
)
except Exception:
pass
@@ -359,6 +362,8 @@ def create_script(
contents: Annotated[str, "Contents of the script to create. Note, this is Base64 encoded over transport."],
script_type: Annotated[str, "Script type (e.g., 'C#')"] | None = None,
namespace: Annotated[str, "Namespace for the script"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing create_script: {path}")
name = os.path.splitext(os.path.basename(path))[0]
@@ -386,14 +391,16 @@ def create_script(
contents.encode("utf-8")).decode("utf-8")
params["contentsEncoded"] = True
params = {k: v for k, v in params.items() if v is not None}
- resp = unity_connection.send_command_with_retry("manage_script", params)
+ resp = unity_connection.send_command_with_retry("manage_script", params, instance_id=unity_instance)
return resp if isinstance(resp, dict) else {"success": False, "message": str(resp)}
@mcp_for_unity_tool(description=("Delete a C# script by URI or Assets-relative path."))
def delete_script(
ctx: Context,
- uri: Annotated[str, "URI of the script to delete under Assets/ directory, unity://path/Assets/... or file://... or Assets/..."]
+ uri: Annotated[str, "URI of the script to delete under Assets/ directory, unity://path/Assets/... or file://... or Assets/..."],
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
"""Delete a C# script by URI."""
ctx.info(f"Processing delete_script: {uri}")
@@ -401,7 +408,7 @@ def delete_script(
if not directory or directory.split("/")[0].lower() != "assets":
return {"success": False, "code": "path_outside_assets", "message": "URI must resolve under 'Assets/'."}
params = {"action": "delete", "name": name, "path": directory}
- resp = unity_connection.send_command_with_retry("manage_script", params)
+ resp = unity_connection.send_command_with_retry("manage_script", params, instance_id=unity_instance)
return resp if isinstance(resp, dict) else {"success": False, "message": str(resp)}
@@ -412,7 +419,9 @@ def validate_script(
level: Annotated[Literal['basic', 'standard'],
"Validation level"] = "basic",
include_diagnostics: Annotated[bool,
- "Include full diagnostics and summary"] = False
+ "Include full diagnostics and summary"] = False,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing validate_script: {uri}")
name, directory = _split_uri(uri)
@@ -426,7 +435,7 @@ def validate_script(
"path": directory,
"level": level,
}
- resp = unity_connection.send_command_with_retry("manage_script", params)
+ resp = unity_connection.send_command_with_retry("manage_script", params, instance_id=unity_instance)
if isinstance(resp, dict) and resp.get("success"):
diags = resp.get("data", {}).get("diagnostics", []) or []
warnings = sum(1 for d in diags if str(
@@ -450,6 +459,8 @@ def manage_script(
script_type: Annotated[str, "Script type (e.g., 'C#')",
"Type hint (e.g., 'MonoBehaviour')"] | None = None,
namespace: Annotated[str, "Namespace for the script"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing manage_script: {action}")
try:
@@ -473,7 +484,7 @@ def manage_script(
params = {k: v for k, v in params.items() if v is not None}
- response = unity_connection.send_command_with_retry("manage_script", params)
+ response = unity_connection.send_command_with_retry("manage_script", params, instance_id=unity_instance)
if isinstance(response, dict):
if response.get("success"):
@@ -535,13 +546,15 @@ def manage_script_capabilities(ctx: Context) -> dict[str, Any]:
@mcp_for_unity_tool(description="Get SHA256 and basic metadata for a Unity C# script without returning file contents")
def get_sha(
ctx: Context,
- uri: Annotated[str, "URI of the script to edit under Assets/ directory, unity://path/Assets/... or file://... or Assets/..."]
+ uri: Annotated[str, "URI of the script to edit under Assets/ directory, unity://path/Assets/... or file://... or Assets/..."],
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing get_sha: {uri}")
try:
name, directory = _split_uri(uri)
params = {"action": "get_sha", "name": name, "path": directory}
- resp = unity_connection.send_command_with_retry("manage_script", params)
+ resp = unity_connection.send_command_with_retry("manage_script", params, instance_id=unity_instance)
if isinstance(resp, dict) and resp.get("success"):
data = resp.get("data", {})
minimal = {"sha256": data.get(
diff --git a/MCPForUnity/UnityMcpServer~/src/tools/manage_shader.py b/MCPForUnity/UnityMcpServer~/src/tools/manage_shader.py
index 19b94550..625f4cb0 100644
--- a/MCPForUnity/UnityMcpServer~/src/tools/manage_shader.py
+++ b/MCPForUnity/UnityMcpServer~/src/tools/manage_shader.py
@@ -16,6 +16,8 @@ def manage_shader(
path: Annotated[str, "Asset path (default: \"Assets/\")"],
contents: Annotated[str,
"Shader code for 'create'/'update'"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing manage_shader: {action}")
try:
@@ -39,8 +41,8 @@ def manage_shader(
# Remove None values so they don't get sent as null
params = {k: v for k, v in params.items() if v is not None}
- # Send command via centralized retry helper
- response = send_command_with_retry("manage_shader", params)
+ # Send command via centralized retry helper with instance routing
+ response = send_command_with_retry("manage_shader", params, instance_id=unity_instance)
# Process response from Unity
if isinstance(response, dict) and response.get("success"):
diff --git a/MCPForUnity/UnityMcpServer~/src/tools/read_console.py b/MCPForUnity/UnityMcpServer~/src/tools/read_console.py
index d922982c..0dbdf465 100644
--- a/MCPForUnity/UnityMcpServer~/src/tools/read_console.py
+++ b/MCPForUnity/UnityMcpServer~/src/tools/read_console.py
@@ -23,7 +23,9 @@ def read_console(
format: Annotated[Literal['plain', 'detailed',
'json'], "Output format"] | None = None,
include_stacktrace: Annotated[bool | str,
- "Include stack traces in output (accepts true/false or 'true'/'false')"] | None = None
+ "Include stack traces in output (accepts true/false or 'true'/'false')"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None
) -> dict[str, Any]:
ctx.info(f"Processing read_console: {action}")
# Set defaults if values are None
@@ -87,8 +89,8 @@ def _coerce_int(value, default=None):
if 'count' not in params_dict:
params_dict['count'] = None
- # Use centralized retry helper
- resp = send_command_with_retry("read_console", params_dict)
+ # Use centralized retry helper with instance routing
+ resp = send_command_with_retry("read_console", params_dict, instance_id=unity_instance)
if isinstance(resp, dict) and resp.get("success") and not include_stacktrace:
# Strip stacktrace fields from returned lines if present
try:
diff --git a/MCPForUnity/UnityMcpServer~/src/tools/resource_tools.py b/MCPForUnity/UnityMcpServer~/src/tools/resource_tools.py
index d84bf7be..08c3035d 100644
--- a/MCPForUnity/UnityMcpServer~/src/tools/resource_tools.py
+++ b/MCPForUnity/UnityMcpServer~/src/tools/resource_tools.py
@@ -42,7 +42,7 @@ def _coerce_int(value: Any, default: int | None = None, minimum: int | None = No
return default
-def _resolve_project_root(override: str | None) -> Path:
+def _resolve_project_root(override: str | None, unity_instance: str | None = None) -> Path:
# 1) Explicit override
if override:
pr = Path(override).expanduser().resolve()
@@ -60,7 +60,7 @@ def _resolve_project_root(override: str | None) -> Path:
# 3) Ask Unity via manage_editor.get_project_root
try:
resp = send_command_with_retry(
- "manage_editor", {"action": "get_project_root"})
+ "manage_editor", {"action": "get_project_root"}, instance_id=unity_instance)
if isinstance(resp, dict) and resp.get("success"):
pr = Path(resp.get("data", {}).get(
"projectRoot", "")).expanduser().resolve()
@@ -141,10 +141,12 @@ async def list_resources(
"Folder under project root, default is Assets"] = "Assets",
limit: Annotated[int, "Page limit"] = 200,
project_root: Annotated[str, "Project path"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing list_resources: {pattern}")
try:
- project = _resolve_project_root(project_root)
+ project = _resolve_project_root(project_root, unity_instance)
base = (project / under).resolve()
try:
base.relative_to(project)
@@ -201,6 +203,8 @@ async def read_resource(
project_root: Annotated[str,
"The project root directory"] | None = None,
request: Annotated[str, "The request ID"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing read_resource: {uri}")
try:
@@ -266,7 +270,7 @@ async def read_resource(
sha = hashlib.sha256(spec_json.encode("utf-8")).hexdigest()
return {"success": True, "data": {"text": spec_json, "metadata": {"sha256": sha}}}
- project = _resolve_project_root(project_root)
+ project = _resolve_project_root(project_root, unity_instance)
p = _resolve_safe_path_from_uri(uri, project)
if not p or not p.exists() or not p.is_file():
return {"success": False, "error": f"Resource not found: {uri}"}
@@ -356,10 +360,12 @@ async def find_in_file(
"The project root directory"] | None = None,
max_results: Annotated[int,
"Cap results to avoid huge payloads"] = 200,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing find_in_file: {uri}")
try:
- project = _resolve_project_root(project_root)
+ project = _resolve_project_root(project_root, unity_instance)
p = _resolve_safe_path_from_uri(uri, project)
if not p or not p.exists() or not p.is_file():
return {"success": False, "error": f"Resource not found: {uri}"}
diff --git a/MCPForUnity/UnityMcpServer~/src/tools/run_tests.py b/MCPForUnity/UnityMcpServer~/src/tools/run_tests.py
index e70fd00c..6e3e3960 100644
--- a/MCPForUnity/UnityMcpServer~/src/tools/run_tests.py
+++ b/MCPForUnity/UnityMcpServer~/src/tools/run_tests.py
@@ -45,6 +45,8 @@ async def run_tests(
description="Unity test mode to run")] = "edit",
timeout_seconds: Annotated[str, Field(
description="Optional timeout in seconds for the Unity test run (string, e.g. '30')")] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> RunTestsResponse:
await ctx.info(f"Processing run_tests: mode={mode}")
@@ -69,6 +71,6 @@ def _coerce_int(value, default=None):
if ts is not None:
params["timeoutSeconds"] = ts
- response = await async_send_command_with_retry("run_tests", params)
+ response = await async_send_command_with_retry("run_tests", params, instance_id=unity_instance)
await ctx.info(f'Response {response}')
return RunTestsResponse(**response) if isinstance(response, dict) else response
diff --git a/MCPForUnity/UnityMcpServer~/src/tools/script_apply_edits.py b/MCPForUnity/UnityMcpServer~/src/tools/script_apply_edits.py
index e339a754..d99770a8 100644
--- a/MCPForUnity/UnityMcpServer~/src/tools/script_apply_edits.py
+++ b/MCPForUnity/UnityMcpServer~/src/tools/script_apply_edits.py
@@ -365,6 +365,8 @@ def script_apply_edits(
"Type of the script to edit"] = "MonoBehaviour",
namespace: Annotated[str,
"Namespace of the script to edit"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing script_apply_edits: {name}")
# Normalize locator first so downstream calls target the correct script file.
@@ -586,7 +588,7 @@ def error_with_hint(message: str, expected: dict[str, Any], suggestion: dict[str
"options": opts2,
}
resp_struct = send_command_with_retry(
- "manage_script", params_struct)
+ "manage_script", params_struct, instance_id=unity_instance)
if isinstance(resp_struct, dict) and resp_struct.get("success"):
pass # Optional sentinel reload removed (deprecated)
return _with_norm(resp_struct if isinstance(resp_struct, dict) else {"success": False, "message": str(resp_struct)}, normalized_for_echo, routing="structured")
@@ -598,7 +600,7 @@ def error_with_hint(message: str, expected: dict[str, Any], suggestion: dict[str
"path": path,
"namespace": namespace,
"scriptType": script_type,
- })
+ }, instance_id=unity_instance)
if not isinstance(read_resp, dict) or not read_resp.get("success"):
return read_resp if isinstance(read_resp, dict) else {"success": False, "message": str(read_resp)}
@@ -722,7 +724,7 @@ def _expand_dollars(rep: str, _m=m) -> str:
"options": {"refresh": (options or {}).get("refresh", "debounced"), "validate": (options or {}).get("validate", "standard"), "applyMode": ("atomic" if len(at_edits) > 1 else (options or {}).get("applyMode", "sequential"))}
}
resp_text = send_command_with_retry(
- "manage_script", params_text)
+ "manage_script", params_text, instance_id=unity_instance)
if not (isinstance(resp_text, dict) and resp_text.get("success")):
return _with_norm(resp_text if isinstance(resp_text, dict) else {"success": False, "message": str(resp_text)}, normalized_for_echo, routing="mixed/text-first")
# Optional sentinel reload removed (deprecated)
@@ -743,7 +745,7 @@ def _expand_dollars(rep: str, _m=m) -> str:
"options": opts2
}
resp_struct = send_command_with_retry(
- "manage_script", params_struct)
+ "manage_script", params_struct, instance_id=unity_instance)
if isinstance(resp_struct, dict) and resp_struct.get("success"):
pass # Optional sentinel reload removed (deprecated)
return _with_norm(resp_struct if isinstance(resp_struct, dict) else {"success": False, "message": str(resp_struct)}, normalized_for_echo, routing="mixed/text-first")
@@ -871,7 +873,7 @@ def _expand_dollars(rep: str, _m=m) -> str:
"applyMode": ("atomic" if len(at_edits) > 1 else (options or {}).get("applyMode", "sequential"))
}
}
- resp = send_command_with_retry("manage_script", params)
+ resp = send_command_with_retry("manage_script", params, instance_id=unity_instance)
if isinstance(resp, dict) and resp.get("success"):
pass # Optional sentinel reload removed (deprecated)
return _with_norm(
@@ -955,7 +957,7 @@ def _expand_dollars(rep: str, _m=m) -> str:
"options": options or {"validate": "standard", "refresh": "debounced"},
}
- write_resp = send_command_with_retry("manage_script", params)
+ write_resp = send_command_with_retry("manage_script", params, instance_id=unity_instance)
if isinstance(write_resp, dict) and write_resp.get("success"):
pass # Optional sentinel reload removed (deprecated)
return _with_norm(
diff --git a/MCPForUnity/UnityMcpServer~/src/unity_connection.py b/MCPForUnity/UnityMcpServer~/src/unity_connection.py
index f0e06b76..916229ee 100644
--- a/MCPForUnity/UnityMcpServer~/src/unity_connection.py
+++ b/MCPForUnity/UnityMcpServer~/src/unity_connection.py
@@ -11,9 +11,10 @@
import struct
import threading
import time
-from typing import Any, Dict
+from typing import Any
from models import MCPResponse
+from connection_pool import get_unity_connection_pool
# Configure logging using settings from config
@@ -37,6 +38,7 @@ class UnityConnection:
port: int = None # Will be set dynamically
sock: socket.socket = None # Socket for Unity communication
use_framing: bool = False # Negotiated per-connection
+ instance_id: str | None = None # Instance identifier for reconnection
def __post_init__(self):
"""Set port from discovery if not explicitly provided"""
@@ -223,40 +225,63 @@ def receive_full_response(self, sock, buffer_size=config.buffer_size) -> bytes:
logger.error(f"Error during receive: {str(e)}")
raise
- def send_command(self, command_type: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
+ def send_command(self, command_type: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
"""Send a command with retry/backoff and port rediscovery. Pings only when requested."""
# Defensive guard: catch empty/placeholder invocations early
if not command_type:
raise ValueError("MCP call missing command_type")
if params is None:
- return MCPResponse(success=False, error="MCP call received with no parameters (client placeholder?)")
+ params = {}
attempts = max(config.max_retries, 5)
base_backoff = max(0.5, config.retry_delay)
- def read_status_file() -> dict | None:
+ def read_status_file(target_hash: str | None = None) -> dict | None:
try:
- status_files = sorted(Path.home().joinpath(
- '.unity-mcp').glob('unity-mcp-status-*.json'), key=lambda p: p.stat().st_mtime, reverse=True)
+ base_path = Path.home().joinpath('.unity-mcp')
+ status_files = sorted(
+ base_path.glob('unity-mcp-status-*.json'),
+ key=lambda p: p.stat().st_mtime,
+ reverse=True,
+ )
if not status_files:
return None
- latest = status_files[0]
- with latest.open('r') as f:
+ if target_hash:
+ for status_path in status_files:
+ if status_path.stem.endswith(target_hash):
+ with status_path.open('r') as f:
+ return json.load(f)
+ # Target hash not found - don't fallback to avoid reading wrong instance's status
+ logger.debug(f"Status file for hash '{target_hash}' not found, available: {[p.stem for p in status_files[:3]]}")
+ return None
+ # Only return most recent when no specific hash requested
+ with status_files[0].open('r') as f:
return json.load(f)
except Exception:
return None
last_short_timeout = None
+ # Extract hash suffix from instance id (e.g., Project@hash)
+ target_hash: str | None = None
+ if self.instance_id:
+ if '@' in self.instance_id:
+ maybe_hash = self.instance_id.split('@', 1)[1].strip()
+ if maybe_hash:
+ target_hash = maybe_hash
+ else:
+ # instance_id is just the hash (fallback format)
+ target_hash = self.instance_id.strip() or None
+
# Preflight: if Unity reports reloading, return a structured hint so clients can retry politely
try:
- status = read_status_file()
+ status = read_status_file(target_hash)
if status and (status.get('reloading') or status.get('reason') == 'reloading'):
- return MCPResponse(
- success=False,
- error="Unity domain reload in progress, please try again shortly",
- data={"state": "reloading", "retry_after_ms": int(
- config.reload_retry_ms)}
- )
+ return {
+ "success": False,
+ "state": "reloading",
+ "retry_after_ms": int(config.reload_retry_ms),
+ "error": "Unity domain reload in progress, please try again shortly",
+ }
except Exception:
pass
@@ -328,9 +353,28 @@ def read_status_file() -> dict | None:
finally:
self.sock = None
- # Re-discover port each time
+ # Re-discover the port for this specific instance
try:
- new_port = PortDiscovery.discover_unity_port()
+ new_port: int | None = None
+ if self.instance_id:
+ # Try to rediscover the specific instance
+ pool = get_unity_connection_pool()
+ refreshed = pool.discover_all_instances(force_refresh=True)
+ match = next((inst for inst in refreshed if inst.id == self.instance_id), None)
+ if match:
+ new_port = match.port
+ logger.debug(f"Rediscovered instance {self.instance_id} on port {new_port}")
+ else:
+ logger.warning(f"Instance {self.instance_id} not found during reconnection")
+
+ # Fallback to generic port discovery if instance-specific discovery failed
+ if new_port is None:
+ if self.instance_id:
+ raise ConnectionError(
+ f"Unity instance '{self.instance_id}' could not be rediscovered"
+ ) from e
+ new_port = PortDiscovery.discover_unity_port()
+
if new_port != self.port:
logger.info(
f"Unity port changed {self.port} -> {new_port}")
@@ -340,7 +384,7 @@ def read_status_file() -> dict | None:
if attempt < attempts:
# Heartbeat-aware, jittered backoff
- status = read_status_file()
+ status = read_status_file(target_hash)
# Base exponential backoff
backoff = base_backoff * (2 ** attempt)
# Decorrelated jitter multiplier
@@ -371,32 +415,22 @@ def read_status_file() -> dict | None:
raise
-# Global Unity connection
-_unity_connection = None
+# Backwards compatibility: keep old single-connection function
+def get_unity_connection(instance_identifier: str | None = None) -> UnityConnection:
+ """Retrieve or establish a Unity connection.
+
+ Args:
+ instance_identifier: Optional identifier for specific Unity instance.
+ If None, uses default or most recent instance.
-def get_unity_connection() -> UnityConnection:
- """Retrieve or establish a persistent Unity connection.
+ Returns:
+ UnityConnection to the specified or default Unity instance
- Note: Do NOT ping on every retrieval to avoid connection storms. Rely on
- send_command() exceptions to detect broken sockets and reconnect there.
+ Note: This function now uses the connection pool internally.
"""
- global _unity_connection
- if _unity_connection is not None:
- return _unity_connection
-
- # Double-checked locking to avoid concurrent socket creation
- with _connection_lock:
- if _unity_connection is not None:
- return _unity_connection
- logger.info("Creating new Unity connection")
- _unity_connection = UnityConnection()
- if not _unity_connection.connect():
- _unity_connection = None
- raise ConnectionError(
- "Could not connect to Unity. Ensure the Unity Editor and MCP Bridge are running.")
- logger.info("Connected to Unity on startup")
- return _unity_connection
+ pool = get_unity_connection_pool()
+ return pool.get_connection(instance_identifier)
# -----------------------------
@@ -413,13 +447,30 @@ def _is_reloading_response(resp: dict) -> bool:
return "reload" in message_text
-def send_command_with_retry(command_type: str, params: Dict[str, Any], *, max_retries: int | None = None, retry_ms: int | None = None) -> Dict[str, Any]:
- """Send a command via the shared connection, waiting politely through Unity reloads.
+def send_command_with_retry(
+ command_type: str,
+ params: dict[str, Any],
+ *,
+ instance_id: str | None = None,
+ max_retries: int | None = None,
+ retry_ms: int | None = None
+) -> dict[str, Any]:
+ """Send a command to a Unity instance, waiting politely through Unity reloads.
+
+ Args:
+ command_type: The command type to send
+ params: Command parameters
+ instance_id: Optional Unity instance identifier (name, hash, name@hash, etc.)
+ max_retries: Maximum number of retries for reload states
+ retry_ms: Delay between retries in milliseconds
+
+ Returns:
+ Response dictionary from Unity
Uses config.reload_retry_ms and config.reload_max_retries by default. Preserves the
structured failure if retries are exhausted.
"""
- conn = get_unity_connection()
+ conn = get_unity_connection(instance_id)
if max_retries is None:
max_retries = getattr(config, "reload_max_retries", 40)
if retry_ms is None:
@@ -436,8 +487,28 @@ def send_command_with_retry(command_type: str, params: Dict[str, Any], *, max_re
return response
-async def async_send_command_with_retry(command_type: str, params: dict[str, Any], *, loop=None, max_retries: int | None = None, retry_ms: int | None = None) -> dict[str, Any] | MCPResponse:
- """Async wrapper that runs the blocking retry helper in a thread pool."""
+async def async_send_command_with_retry(
+ command_type: str,
+ params: dict[str, Any],
+ *,
+ instance_id: str | None = None,
+ loop=None,
+ max_retries: int | None = None,
+ retry_ms: int | None = None
+) -> dict[str, Any] | MCPResponse:
+ """Async wrapper that runs the blocking retry helper in a thread pool.
+
+ Args:
+ command_type: The command type to send
+ params: Command parameters
+ instance_id: Optional Unity instance identifier
+ loop: Optional asyncio event loop
+ max_retries: Maximum number of retries for reload states
+ retry_ms: Delay between retries in milliseconds
+
+ Returns:
+ Response dictionary or MCPResponse on error
+ """
try:
import asyncio # local import to avoid mandatory asyncio dependency for sync callers
if loop is None:
@@ -445,7 +516,7 @@ async def async_send_command_with_retry(command_type: str, params: dict[str, Any
return await loop.run_in_executor(
None,
lambda: send_command_with_retry(
- command_type, params, max_retries=max_retries, retry_ms=retry_ms),
+ command_type, params, instance_id=instance_id, max_retries=max_retries, retry_ms=retry_ms),
)
except Exception as e:
return MCPResponse(success=False, error=str(e))
diff --git a/Server/connection_pool.py b/Server/connection_pool.py
new file mode 100644
index 00000000..c7cd2028
--- /dev/null
+++ b/Server/connection_pool.py
@@ -0,0 +1,237 @@
+"""
+Connection pool for managing multiple Unity Editor instances.
+"""
+import logging
+import os
+import threading
+import time
+
+from models import UnityInstanceInfo
+from port_discovery import PortDiscovery
+
+logger = logging.getLogger(__name__)
+
+
+class UnityConnectionPool:
+ """Manages connections to multiple Unity Editor instances"""
+
+ def __init__(self):
+ # Import here to avoid circular dependency
+ from unity_connection import UnityConnection
+ self._UnityConnection = UnityConnection
+
+ self._connections: dict[str, "UnityConnection"] = {}
+ self._known_instances: dict[str, UnityInstanceInfo] = {}
+ self._last_full_scan: float = 0
+ self._scan_interval: float = 5.0 # Cache for 5 seconds
+ self._pool_lock = threading.Lock()
+ self._default_instance_id: str | None = None
+
+ # Check for default instance from environment
+ env_default = os.environ.get("UNITY_MCP_DEFAULT_INSTANCE", "").strip()
+ if env_default:
+ self._default_instance_id = env_default
+ logger.info(f"Default Unity instance set from environment: {env_default}")
+
+ def discover_all_instances(self, force_refresh: bool = False) -> list[UnityInstanceInfo]:
+ """
+ Discover all running Unity Editor instances.
+
+ Args:
+ force_refresh: If True, bypass cache and scan immediately
+
+ Returns:
+ List of UnityInstanceInfo objects
+ """
+ now = time.time()
+
+ # Return cached results if valid
+ if not force_refresh and (now - self._last_full_scan) < self._scan_interval:
+ logger.debug(f"Returning cached Unity instances (age: {now - self._last_full_scan:.1f}s)")
+ return list(self._known_instances.values())
+
+ # Scan for instances
+ logger.debug("Scanning for Unity instances...")
+ instances = PortDiscovery.discover_all_unity_instances()
+
+ # Update cache
+ with self._pool_lock:
+ self._known_instances = {inst.id: inst for inst in instances}
+ self._last_full_scan = now
+
+ logger.info(f"Found {len(instances)} Unity instances: {[inst.id for inst in instances]}")
+ return instances
+
+ def _resolve_instance_id(self, instance_identifier: str | None, instances: list[UnityInstanceInfo]) -> UnityInstanceInfo:
+ """
+ Resolve an instance identifier to a specific Unity instance.
+
+ Args:
+ instance_identifier: User-provided identifier (name, hash, name@hash, path, port, or None)
+ instances: List of available instances
+
+ Returns:
+ Resolved UnityInstanceInfo
+
+ Raises:
+ ConnectionError: If instance cannot be resolved
+ """
+ if not instances:
+ raise ConnectionError(
+ "No Unity Editor instances found. Please ensure Unity is running with MCP for Unity bridge."
+ )
+
+ # Use default instance if no identifier provided
+ if instance_identifier is None:
+ if self._default_instance_id:
+ instance_identifier = self._default_instance_id
+ logger.debug(f"Using default instance: {instance_identifier}")
+ else:
+ # Use the most recently active instance
+ # Instances with no heartbeat (None) should be sorted last (use 0.0 as sentinel)
+ sorted_instances = sorted(
+ instances,
+ key=lambda inst: inst.last_heartbeat.timestamp() if inst.last_heartbeat else 0.0,
+ reverse=True,
+ )
+ logger.info(f"No instance specified, using most recent: {sorted_instances[0].id}")
+ return sorted_instances[0]
+
+ identifier = instance_identifier.strip()
+
+ # Try exact ID match first
+ for inst in instances:
+ if inst.id == identifier:
+ return inst
+
+ # Try project name match
+ name_matches = [inst for inst in instances if inst.name == identifier]
+ if len(name_matches) == 1:
+ return name_matches[0]
+ elif len(name_matches) > 1:
+ # Multiple projects with same name - return helpful error
+ suggestions = [
+ {
+ "id": inst.id,
+ "path": inst.path,
+ "port": inst.port,
+ "suggest": f"Use unity_instance='{inst.id}'"
+ }
+ for inst in name_matches
+ ]
+ raise ConnectionError(
+ f"Project name '{identifier}' matches {len(name_matches)} instances. "
+ f"Please use the full format (e.g., '{name_matches[0].id}'). "
+ f"Available instances: {suggestions}"
+ )
+
+ # Try hash match
+ hash_matches = [inst for inst in instances if inst.hash == identifier or inst.hash.startswith(identifier)]
+ if len(hash_matches) == 1:
+ return hash_matches[0]
+ elif len(hash_matches) > 1:
+ raise ConnectionError(
+ f"Hash '{identifier}' matches multiple instances: {[inst.id for inst in hash_matches]}"
+ )
+
+ # Try composite format: Name@Hash or Name@Port
+ if "@" in identifier:
+ name_part, hint_part = identifier.split("@", 1)
+ composite_matches = [
+ inst for inst in instances
+ if inst.name == name_part and (
+ inst.hash.startswith(hint_part) or str(inst.port) == hint_part
+ )
+ ]
+ if len(composite_matches) == 1:
+ return composite_matches[0]
+
+ # Try port match (as string)
+ try:
+ port_num = int(identifier)
+ port_matches = [inst for inst in instances if inst.port == port_num]
+ if len(port_matches) == 1:
+ return port_matches[0]
+ except ValueError:
+ pass
+
+ # Try path match
+ path_matches = [inst for inst in instances if inst.path == identifier]
+ if len(path_matches) == 1:
+ return path_matches[0]
+
+ # Nothing matched
+ available_ids = [inst.id for inst in instances]
+ raise ConnectionError(
+ f"Unity instance '{identifier}' not found. "
+ f"Available instances: {available_ids}. "
+ f"Use the unity_instances resource to see all instances."
+ )
+
+ def get_connection(self, instance_identifier: str | None = None):
+ """
+ Get or create a connection to a Unity instance.
+
+ Args:
+ instance_identifier: Optional identifier (name, hash, name@hash, etc.)
+ If None, uses default or most recent instance
+
+ Returns:
+ UnityConnection to the specified instance
+
+ Raises:
+ ConnectionError: If instance cannot be found or connected
+ """
+ # Refresh instance list if cache expired
+ instances = self.discover_all_instances()
+
+ # Resolve identifier to specific instance
+ target = self._resolve_instance_id(instance_identifier, instances)
+
+ # Return existing connection or create new one
+ with self._pool_lock:
+ if target.id not in self._connections:
+ logger.info(f"Creating new connection to Unity instance: {target.id} (port {target.port})")
+ conn = self._UnityConnection(port=target.port, instance_id=target.id)
+ if not conn.connect():
+ raise ConnectionError(
+ f"Failed to connect to Unity instance '{target.id}' on port {target.port}. "
+ f"Ensure the Unity Editor is running."
+ )
+ self._connections[target.id] = conn
+ else:
+ # Update existing connection with instance_id and port if changed
+ conn = self._connections[target.id]
+ conn.instance_id = target.id
+ if conn.port != target.port:
+ logger.info(f"Updating cached port for {target.id}: {conn.port} -> {target.port}")
+ conn.port = target.port
+ logger.debug(f"Reusing existing connection to: {target.id}")
+
+ return self._connections[target.id]
+
+ def disconnect_all(self):
+ """Disconnect all active connections"""
+ with self._pool_lock:
+ for instance_id, conn in self._connections.items():
+ try:
+ logger.info(f"Disconnecting from Unity instance: {instance_id}")
+ conn.disconnect()
+ except Exception:
+ logger.exception(f"Error disconnecting from {instance_id}")
+ self._connections.clear()
+
+
+# Global Unity connection pool
+_unity_connection_pool: UnityConnectionPool | None = None
+_pool_init_lock = threading.Lock()
+
+
+def get_unity_connection_pool() -> UnityConnectionPool:
+ """Get or create the global Unity connection pool."""
+ global _unity_connection_pool
+ if _unity_connection_pool is None:
+ with _pool_init_lock:
+ if _unity_connection_pool is None:
+ _unity_connection_pool = UnityConnectionPool()
+ return _unity_connection_pool
diff --git a/Server/models.py b/Server/models.py
index cf1d33da..7c56327c 100644
--- a/Server/models.py
+++ b/Server/models.py
@@ -1,4 +1,5 @@
from typing import Any
+from datetime import datetime
from pydantic import BaseModel
@@ -7,3 +8,28 @@ class MCPResponse(BaseModel):
message: str | None = None
error: str | None = None
data: Any | None = None
+
+
+class UnityInstanceInfo(BaseModel):
+ """Information about a Unity Editor instance"""
+ id: str # "ProjectName@hash" or fallback to hash
+ name: str # Project name extracted from path
+ path: str # Full project path (Assets folder)
+ hash: str # 8-char hash of project path
+ port: int # TCP port
+ status: str # "running", "reloading", "offline"
+ last_heartbeat: datetime | None = None
+ unity_version: str | None = None
+
+ def to_dict(self) -> dict[str, Any]:
+ """Convert to dictionary for JSON serialization"""
+ return {
+ "id": self.id,
+ "name": self.name,
+ "path": self.path,
+ "hash": self.hash,
+ "port": self.port,
+ "status": self.status,
+ "last_heartbeat": self.last_heartbeat.isoformat() if self.last_heartbeat else None,
+ "unity_version": self.unity_version
+ }
diff --git a/Server/port_discovery.py b/Server/port_discovery.py
index c759e745..e0f0ed27 100644
--- a/Server/port_discovery.py
+++ b/Server/port_discovery.py
@@ -14,9 +14,14 @@
import glob
import json
import logging
+import os
+import struct
+from datetime import datetime
from pathlib import Path
import socket
-from typing import Optional, List
+from typing import Optional, List, Dict
+
+from models import UnityInstanceInfo
logger = logging.getLogger("mcp-for-unity-server")
@@ -56,22 +61,55 @@ def list_candidate_files() -> List[Path]:
@staticmethod
def _try_probe_unity_mcp(port: int) -> bool:
"""Quickly check if a MCP for Unity listener is on this port.
- Tries a short TCP connect, sends 'ping', expects Unity bridge welcome message.
+ Uses Unity's framed protocol: receives handshake, sends framed ping, expects framed pong.
"""
try:
with socket.create_connection(("127.0.0.1", port), PortDiscovery.CONNECT_TIMEOUT) as s:
s.settimeout(PortDiscovery.CONNECT_TIMEOUT)
try:
- s.sendall(b"ping")
- data = s.recv(512)
- # Check for Unity bridge welcome message format
- if data and (b"WELCOME UNITY-MCP" in data or b'"message":"pong"' in data):
- return True
- except Exception:
+ # 1. Receive handshake from Unity
+ handshake = s.recv(512)
+ if not handshake or b"FRAMING=1" not in handshake:
+ # Try legacy mode as fallback
+ s.sendall(b"ping")
+ data = s.recv(512)
+ return data and b'"message":"pong"' in data
+
+ # 2. Send framed ping command
+ # Frame format: 8-byte length header (big-endian uint64) + payload
+ payload = b"ping"
+ header = struct.pack('>Q', len(payload))
+ s.sendall(header + payload)
+
+ # 3. Receive framed response
+ # Helper to receive exact number of bytes
+ def _recv_exact(expected: int) -> bytes | None:
+ chunks = bytearray()
+ while len(chunks) < expected:
+ chunk = s.recv(expected - len(chunks))
+ if not chunk:
+ return None
+ chunks.extend(chunk)
+ return bytes(chunks)
+
+ response_header = _recv_exact(8)
+ if response_header is None:
+ return False
+
+ response_length = struct.unpack('>Q', response_header)[0]
+ if response_length > 10000: # Sanity check
+ return False
+
+ response = _recv_exact(response_length)
+ if response is None:
+ return False
+ return b'"message":"pong"' in response
+ except Exception as e:
+ logger.debug(f"Port probe failed for {port}: {e}")
return False
- except Exception:
+ except Exception as e:
+ logger.debug(f"Connection failed for port {port}: {e}")
return False
- return False
@staticmethod
def _read_latest_status() -> Optional[dict]:
@@ -158,3 +196,112 @@ def get_port_config() -> Optional[dict]:
logger.warning(
f"Could not read port configuration {path}: {e}")
return None
+
+ @staticmethod
+ def _extract_project_name(project_path: str) -> str:
+ """Extract project name from Assets path.
+
+ Examples:
+ /Users/sakura/Projects/MyGame/Assets -> MyGame
+ C:\\Projects\\TestProject\\Assets -> TestProject
+ """
+ if not project_path:
+ return "Unknown"
+
+ try:
+ # Remove trailing /Assets or \Assets
+ path = project_path.rstrip('/\\')
+ if path.endswith('Assets'):
+ path = path[:-6].rstrip('/\\')
+
+ # Get the last directory name
+ name = os.path.basename(path)
+ return name if name else "Unknown"
+ except Exception:
+ return "Unknown"
+
+ @staticmethod
+ def discover_all_unity_instances() -> List[UnityInstanceInfo]:
+ """
+ Discover all running Unity Editor instances by scanning status files.
+
+ Returns:
+ List of UnityInstanceInfo objects for all discovered instances
+ """
+ instances_by_port: Dict[int, tuple[UnityInstanceInfo, float]] = {}
+ base = PortDiscovery.get_registry_dir()
+
+ # Scan all status files
+ status_pattern = str(base / "unity-mcp-status-*.json")
+ status_files = glob.glob(status_pattern)
+
+ for status_file_path in status_files:
+ try:
+ status_path = Path(status_file_path)
+ file_mtime = status_path.stat().st_mtime
+
+ with status_path.open('r') as f:
+ data = json.load(f)
+
+ # Extract hash from filename: unity-mcp-status-{hash}.json
+ filename = os.path.basename(status_file_path)
+ hash_value = filename.replace('unity-mcp-status-', '').replace('.json', '')
+
+ # Extract information
+ project_path = data.get('project_path', '')
+ project_name = PortDiscovery._extract_project_name(project_path)
+ port = data.get('unity_port')
+ is_reloading = data.get('reloading', False)
+
+ # Parse last_heartbeat
+ last_heartbeat = None
+ heartbeat_str = data.get('last_heartbeat')
+ if heartbeat_str:
+ try:
+ last_heartbeat = datetime.fromisoformat(heartbeat_str.replace('Z', '+00:00'))
+ except Exception:
+ pass
+
+ # Verify port is actually responding
+ is_alive = PortDiscovery._try_probe_unity_mcp(port) if isinstance(port, int) else False
+
+ if not is_alive:
+ logger.debug(f"Instance {project_name}@{hash_value} has heartbeat but port {port} not responding")
+ continue
+
+ freshness = last_heartbeat.timestamp() if last_heartbeat else file_mtime
+
+ existing = instances_by_port.get(port)
+ if existing:
+ _, existing_freshness = existing
+ if existing_freshness >= freshness:
+ logger.debug(
+ "Skipping stale status entry %s in favor of more recent data for port %s",
+ status_path.name,
+ port,
+ )
+ continue
+
+ # Create instance info
+ instance = UnityInstanceInfo(
+ id=f"{project_name}@{hash_value}",
+ name=project_name,
+ path=project_path,
+ hash=hash_value,
+ port=port,
+ status="reloading" if is_reloading else "running",
+ last_heartbeat=last_heartbeat,
+ unity_version=data.get('unity_version') # May not be available in current version
+ )
+
+ instances_by_port[port] = (instance, freshness)
+ logger.debug(f"Discovered Unity instance: {instance.id} on port {instance.port}")
+
+ except Exception as e:
+ logger.debug(f"Failed to parse status file {status_file_path}: {e}")
+ continue
+
+ deduped_instances = [entry[0] for entry in sorted(instances_by_port.values(), key=lambda item: item[1], reverse=True)]
+
+ logger.info(f"Discovered {len(deduped_instances)} Unity instances (after de-duplication by port)")
+ return deduped_instances
diff --git a/Server/pyproject.toml b/Server/pyproject.toml
index 6dd28065..709c6e32 100644
--- a/Server/pyproject.toml
+++ b/Server/pyproject.toml
@@ -6,7 +6,7 @@ readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"httpx>=0.27.2",
- "fastmcp>=2.12.5",
+ "fastmcp>=2.13.0",
"mcp>=1.16.0",
"pydantic>=2.12.0",
"tomli>=2.3.0",
diff --git a/Server/resources/__init__.py b/Server/resources/__init__.py
index a3577891..eeefea74 100644
--- a/Server/resources/__init__.py
+++ b/Server/resources/__init__.py
@@ -2,6 +2,7 @@
MCP Resources package - Auto-discovers and registers all resources in this directory.
"""
import logging
+import inspect
from pathlib import Path
from fastmcp import FastMCP
@@ -16,6 +17,31 @@
__all__ = ['register_all_resources']
+def _create_fixed_wrapper(original_func, has_other_params):
+ """
+ Factory function to create a wrapper that calls original_func with unity_instance=None.
+ This avoids closure issues in loops and preserves sync/async nature of the original function.
+ """
+ is_async = inspect.iscoroutinefunction(original_func)
+
+ if has_other_params:
+ if is_async:
+ async def fixed_wrapper(*args, **kwargs):
+ return await original_func(*args, **kwargs, unity_instance=None)
+ else:
+ def fixed_wrapper(*args, **kwargs):
+ return original_func(*args, **kwargs, unity_instance=None)
+ else:
+ if is_async:
+ async def fixed_wrapper():
+ return await original_func(unity_instance=None)
+ else:
+ def fixed_wrapper():
+ return original_func(unity_instance=None)
+
+ return fixed_wrapper
+
+
def register_all_resources(mcp: FastMCP):
"""
Auto-discover and register all resources in the resources/ directory.
@@ -36,6 +62,7 @@ def register_all_resources(mcp: FastMCP):
logger.warning("No MCP resources registered!")
return
+ registered_count = 0
for resource_info in resources:
func = resource_info['func']
uri = resource_info['uri']
@@ -43,11 +70,68 @@ def register_all_resources(mcp: FastMCP):
description = resource_info['description']
kwargs = resource_info['kwargs']
- # Apply the @mcp.resource decorator and telemetry
- wrapped = telemetry_resource(resource_name)(func)
- wrapped = mcp.resource(uri=uri, name=resource_name,
- description=description, **kwargs)(wrapped)
- resource_info['func'] = wrapped
- logger.debug(f"Registered resource: {resource_name} - {description}")
+ # Check if URI contains query parameters (e.g., {?unity_instance})
+ has_query_params = '{?' in uri
+
+ if has_query_params:
+ # Register two versions for backward compatibility:
+ # 1. Template version with query parameters (for multi-instance)
+ wrapped_template = telemetry_resource(resource_name)(func)
+ wrapped_template = mcp.resource(uri=uri, name=resource_name,
+ description=description, **kwargs)(wrapped_template)
+ logger.debug(f"Registered resource template: {resource_name} - {uri}")
+ registered_count += 1
+
+ # 2. Fixed version without query parameters (for single-instance/default)
+ # Remove query parameters from URI
+ fixed_uri = uri.split('{?')[0]
+ fixed_name = f"{resource_name}_default"
+ fixed_description = f"{description} (default instance)"
+
+ # Create a wrapper function that doesn't accept unity_instance parameter
+ # This wrapper will call the original function with unity_instance=None
+ sig = inspect.signature(func)
+ params = list(sig.parameters.values())
+
+ # Filter out unity_instance parameter
+ fixed_params = [p for p in params if p.name != 'unity_instance']
+
+ # Create wrapper using factory function to avoid closure issues
+ has_other_params = len(fixed_params) > 0
+ fixed_wrapper = _create_fixed_wrapper(func, has_other_params)
+
+ # Update signature to match filtered parameters
+ if has_other_params:
+ fixed_wrapper.__signature__ = sig.replace(parameters=fixed_params)
+ fixed_wrapper.__annotations__ = {
+ k: v for k, v in getattr(func, '__annotations__', {}).items()
+ if k != 'unity_instance'
+ }
+ else:
+ fixed_wrapper.__signature__ = inspect.Signature(parameters=[])
+ fixed_wrapper.__annotations__ = {
+ k: v for k, v in getattr(func, '__annotations__', {}).items()
+ if k == 'return'
+ }
+
+ # Preserve function metadata
+ fixed_wrapper.__name__ = fixed_name
+ fixed_wrapper.__doc__ = func.__doc__
+
+ wrapped_fixed = telemetry_resource(fixed_name)(fixed_wrapper)
+ wrapped_fixed = mcp.resource(uri=fixed_uri, name=fixed_name,
+ description=fixed_description, **kwargs)(wrapped_fixed)
+ logger.debug(f"Registered resource (fixed): {fixed_name} - {fixed_uri}")
+ registered_count += 1
+
+ resource_info['func'] = wrapped_template
+ else:
+ # No query parameters, register as-is
+ wrapped = telemetry_resource(resource_name)(func)
+ wrapped = mcp.resource(uri=uri, name=resource_name,
+ description=description, **kwargs)(wrapped)
+ resource_info['func'] = wrapped
+ logger.debug(f"Registered resource: {resource_name} - {description}")
+ registered_count += 1
- logger.info(f"Registered {len(resources)} MCP resources")
+ logger.info(f"Registered {registered_count} MCP resources ({len(resources)} unique)")
diff --git a/Server/resources/menu_items.py b/Server/resources/menu_items.py
index d3724659..f0469d30 100644
--- a/Server/resources/menu_items.py
+++ b/Server/resources/menu_items.py
@@ -8,18 +8,21 @@ class GetMenuItemsResponse(MCPResponse):
@mcp_for_unity_resource(
- uri="mcpforunity://menu-items",
+ uri="mcpforunity://menu-items{?unity_instance}",
name="get_menu_items",
description="Provides a list of all menu items."
)
-async def get_menu_items() -> GetMenuItemsResponse:
- """Provides a list of all menu items."""
- # Later versions of FastMCP support these as query parameters
- # See: https://gofastmcp.com/servers/resources#query-parameters
+async def get_menu_items(unity_instance: str | None = None) -> GetMenuItemsResponse:
+ """Provides a list of all menu items.
+
+ Args:
+ unity_instance: Target Unity instance (project name, hash, or 'Name@hash').
+ If not specified, uses default instance.
+ """
params = {
"refresh": True,
"search": "",
}
- response = await async_send_command_with_retry("get_menu_items", params)
+ response = await async_send_command_with_retry("get_menu_items", params, instance_id=unity_instance)
return GetMenuItemsResponse(**response) if isinstance(response, dict) else response
diff --git a/Server/resources/tests.py b/Server/resources/tests.py
index 4268a143..17ed7c4f 100644
--- a/Server/resources/tests.py
+++ b/Server/resources/tests.py
@@ -17,15 +17,29 @@ class GetTestsResponse(MCPResponse):
data: list[TestItem] = []
-@mcp_for_unity_resource(uri="mcpforunity://tests", name="get_tests", description="Provides a list of all tests.")
-async def get_tests() -> GetTestsResponse:
- """Provides a list of all tests."""
- response = await async_send_command_with_retry("get_tests", {})
+@mcp_for_unity_resource(uri="mcpforunity://tests{?unity_instance}", name="get_tests", description="Provides a list of all tests.")
+async def get_tests(unity_instance: str | None = None) -> GetTestsResponse:
+ """Provides a list of all tests.
+
+ Args:
+ unity_instance: Target Unity instance (project name, hash, or 'Name@hash').
+ If not specified, uses default instance.
+ """
+ response = await async_send_command_with_retry("get_tests", {}, instance_id=unity_instance)
return GetTestsResponse(**response) if isinstance(response, dict) else response
-@mcp_for_unity_resource(uri="mcpforunity://tests/{mode}", name="get_tests_for_mode", description="Provides a list of tests for a specific mode.")
-async def get_tests_for_mode(mode: Annotated[Literal["EditMode", "PlayMode"], Field(description="The mode to filter tests by.")]) -> GetTestsResponse:
- """Provides a list of tests for a specific mode."""
- response = await async_send_command_with_retry("get_tests_for_mode", {"mode": mode})
+@mcp_for_unity_resource(uri="mcpforunity://tests/{mode}{?unity_instance}", name="get_tests_for_mode", description="Provides a list of tests for a specific mode.")
+async def get_tests_for_mode(
+ mode: Annotated[Literal["EditMode", "PlayMode"], Field(description="The mode to filter tests by.")],
+ unity_instance: str | None = None
+) -> GetTestsResponse:
+ """Provides a list of tests for a specific mode.
+
+ Args:
+ mode: The test mode to filter by (EditMode or PlayMode).
+ unity_instance: Target Unity instance (project name, hash, or 'Name@hash').
+ If not specified, uses default instance.
+ """
+ response = await async_send_command_with_retry("get_tests_for_mode", {"mode": mode}, instance_id=unity_instance)
return GetTestsResponse(**response) if isinstance(response, dict) else response
diff --git a/Server/resources/unity_instances.py b/Server/resources/unity_instances.py
new file mode 100644
index 00000000..4bddd5a5
--- /dev/null
+++ b/Server/resources/unity_instances.py
@@ -0,0 +1,63 @@
+"""
+Resource for listing all available Unity Editor instances.
+"""
+from typing import Any
+
+from registry import mcp_for_unity_resource
+from unity_connection import get_unity_connection_pool
+
+
+@mcp_for_unity_resource(
+ uri="mcpforunity://unity-instances",
+ name="unity_instances",
+ description="Provides a list of all running Unity Editor instances with their details."
+)
+def get_unity_instances() -> dict[str, Any]:
+ """
+ List all available Unity Editor instances.
+
+ Returns information about each instance including:
+ - id: Unique identifier (ProjectName@hash)
+ - name: Project name
+ - path: Full project path
+ - hash: 8-character hash of project path
+ - port: TCP port number
+ - status: Current status (running, reloading, etc.)
+ - last_heartbeat: Last heartbeat timestamp
+ - unity_version: Unity version (if available)
+
+ Returns:
+ Dictionary containing list of instances and metadata
+ """
+ try:
+ pool = get_unity_connection_pool()
+ instances = pool.discover_all_instances(force_refresh=True)
+
+ # Check for duplicate project names
+ name_counts = {}
+ for inst in instances:
+ name_counts[inst.name] = name_counts.get(inst.name, 0) + 1
+
+ duplicates = [name for name, count in name_counts.items() if count > 1]
+
+ result = {
+ "success": True,
+ "instance_count": len(instances),
+ "instances": [inst.to_dict() for inst in instances],
+ }
+
+ if duplicates:
+ result["warning"] = (
+ f"Multiple instances found with duplicate project names: {duplicates}. "
+ f"Use full format (e.g., 'ProjectName@hash') to specify which instance."
+ )
+
+ return result
+
+ except Exception as e:
+ return {
+ "success": False,
+ "error": f"Failed to list Unity instances: {str(e)}",
+ "instance_count": 0,
+ "instances": []
+ }
diff --git a/Server/server.py b/Server/server.py
index 11053ac8..c3dd10a2 100644
--- a/Server/server.py
+++ b/Server/server.py
@@ -3,12 +3,13 @@
import logging
from logging.handlers import RotatingFileHandler
import os
+import argparse
from contextlib import asynccontextmanager
from typing import AsyncIterator, Dict, Any
from config import config
from tools import register_all_tools
from resources import register_all_resources
-from unity_connection import get_unity_connection, UnityConnection
+from connection_pool import get_unity_connection_pool
import time
# Configure logging using settings from config
@@ -61,16 +62,14 @@
except Exception:
pass
-# Global connection state
-_unity_connection: UnityConnection = None
-
-
@asynccontextmanager
async def server_lifespan(server: FastMCP) -> AsyncIterator[Dict[str, Any]]:
"""Handle server startup and shutdown."""
- global _unity_connection
logger.info("MCP for Unity Server starting up")
+ # Initialize connection pool variable
+ pool = None
+
# Record server startup telemetry
start_time = time.time()
start_clk = time.perf_counter()
@@ -101,22 +100,35 @@ def _emit_startup():
logger.info(
"Skipping Unity connection on startup (UNITY_MCP_SKIP_STARTUP_CONNECT=1)")
else:
- _unity_connection = get_unity_connection()
- logger.info("Connected to Unity on startup")
-
- # Record successful Unity connection (deferred)
- import threading as _t
- _t.Timer(1.0, lambda: record_telemetry(
- RecordType.UNITY_CONNECTION,
- {
- "status": "connected",
- "connection_time_ms": (time.perf_counter() - start_clk) * 1000,
- }
- )).start()
+ # Initialize connection pool and discover instances
+ pool = get_unity_connection_pool()
+ instances = pool.discover_all_instances()
+
+ if instances:
+ logger.info(f"Discovered {len(instances)} Unity instance(s): {[i.id for i in instances]}")
+
+ # Try to connect to default instance
+ try:
+ pool.get_connection()
+ logger.info("Connected to default Unity instance on startup")
+
+ # Record successful Unity connection (deferred)
+ import threading as _t
+ _t.Timer(1.0, lambda: record_telemetry(
+ RecordType.UNITY_CONNECTION,
+ {
+ "status": "connected",
+ "connection_time_ms": (time.perf_counter() - start_clk) * 1000,
+ "instance_count": len(instances)
+ }
+ )).start()
+ except Exception as e:
+ logger.warning("Could not connect to default Unity instance: %s", e)
+ else:
+ logger.warning("No Unity instances found on startup")
except ConnectionError as e:
logger.warning("Could not connect to Unity on startup: %s", e)
- _unity_connection = None
# Record connection failure (deferred)
import threading as _t
@@ -132,7 +144,6 @@ def _emit_startup():
except Exception as e:
logger.warning(
"Unexpected error connecting to Unity on startup: %s", e)
- _unity_connection = None
import threading as _t
_err_msg = str(e)[:200]
_t.Timer(1.0, lambda: record_telemetry(
@@ -145,13 +156,12 @@ def _emit_startup():
)).start()
try:
- # Yield the connection object so it can be attached to the context
- # The key 'bridge' matches how tools like read_console expect to access it (ctx.bridge)
- yield {"bridge": _unity_connection}
+ # Yield the connection pool so it can be attached to the context
+ # Note: Tools will use get_unity_connection_pool() directly
+ yield {"pool": pool}
finally:
- if _unity_connection:
- _unity_connection.disconnect()
- _unity_connection = None
+ if pool:
+ pool.disconnect_all()
logger.info("MCP for Unity Server shut down")
# Initialize MCP server
@@ -188,6 +198,38 @@ def _emit_startup():
def main():
"""Entry point for uvx and console scripts."""
+ parser = argparse.ArgumentParser(
+ description="MCP for Unity Server",
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ epilog="""
+Environment Variables:
+ UNITY_MCP_DEFAULT_INSTANCE Default Unity instance to target (project name, hash, or 'Name@hash')
+ UNITY_MCP_SKIP_STARTUP_CONNECT Skip initial Unity connection attempt (set to 1/true/yes/on)
+ UNITY_MCP_TELEMETRY_ENABLED Enable telemetry (set to 1/true/yes/on)
+
+Examples:
+ # Use specific Unity project as default
+ python -m src.server --default-instance "MyProject"
+
+ # Or use environment variable
+ UNITY_MCP_DEFAULT_INSTANCE="MyProject" python -m src.server
+ """
+ )
+ parser.add_argument(
+ "--default-instance",
+ type=str,
+ metavar="INSTANCE",
+ help="Default Unity instance to target (project name, hash, or 'Name@hash'). "
+ "Overrides UNITY_MCP_DEFAULT_INSTANCE environment variable."
+ )
+
+ args = parser.parse_args()
+
+ # Set environment variable if --default-instance is provided
+ if args.default_instance:
+ os.environ["UNITY_MCP_DEFAULT_INSTANCE"] = args.default_instance
+ logger.info(f"Using default Unity instance from command-line: {args.default_instance}")
+
mcp.run(transport='stdio')
diff --git a/Server/tools/execute_menu_item.py b/Server/tools/execute_menu_item.py
index a1489c59..23471230 100644
--- a/Server/tools/execute_menu_item.py
+++ b/Server/tools/execute_menu_item.py
@@ -17,9 +17,11 @@ async def execute_menu_item(
ctx: Context,
menu_path: Annotated[str,
"Menu path for 'execute' or 'exists' (e.g., 'File/Save Project')"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> MCPResponse:
await ctx.info(f"Processing execute_menu_item: {menu_path}")
params_dict: dict[str, Any] = {"menuPath": menu_path}
params_dict = {k: v for k, v in params_dict.items() if v is not None}
- result = await async_send_command_with_retry("execute_menu_item", params_dict)
+ result = await async_send_command_with_retry("execute_menu_item", params_dict, instance_id=unity_instance)
return MCPResponse(**result) if isinstance(result, dict) else result
diff --git a/Server/tools/manage_asset.py b/Server/tools/manage_asset.py
index a577e94d..a9a012c8 100644
--- a/Server/tools/manage_asset.py
+++ b/Server/tools/manage_asset.py
@@ -31,9 +31,11 @@ async def manage_asset(
filter_date_after: Annotated[str,
"Date after which to filter"] | None = None,
page_size: Annotated[int | float | str, "Page size for pagination"] | None = None,
- page_number: Annotated[int | float | str, "Page number for pagination"] | None = None
+ page_number: Annotated[int | float | str, "Page number for pagination"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None
) -> dict[str, Any]:
- ctx.info(f"Processing manage_asset: {action}")
+ ctx.info(f"Processing manage_asset: {action} (unity_instance={unity_instance or 'default'})")
# Coerce 'properties' from JSON string to dict for client compatibility
if isinstance(properties, str):
try:
@@ -86,7 +88,7 @@ def _coerce_int(value, default=None):
# Get the current asyncio event loop
loop = asyncio.get_running_loop()
- # Use centralized async retry helper to avoid blocking the event loop
- result = await async_send_command_with_retry("manage_asset", params_dict, loop=loop)
+ # Use centralized async retry helper with instance routing
+ result = await async_send_command_with_retry("manage_asset", params_dict, instance_id=unity_instance, loop=loop)
# Return the result obtained from Unity
return result if isinstance(result, dict) else {"success": False, "message": str(result)}
diff --git a/Server/tools/manage_editor.py b/Server/tools/manage_editor.py
index f7911458..637fc678 100644
--- a/Server/tools/manage_editor.py
+++ b/Server/tools/manage_editor.py
@@ -21,6 +21,8 @@ def manage_editor(
"Tag name when adding and removing tags"] | None = None,
layer_name: Annotated[str,
"Layer name when adding and removing layers"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing manage_editor: {action}")
@@ -62,8 +64,8 @@ def _coerce_bool(value, default=None):
}
params = {k: v for k, v in params.items() if v is not None}
- # Send command using centralized retry helper
- response = send_command_with_retry("manage_editor", params)
+ # Send command using centralized retry helper with instance routing
+ response = send_command_with_retry("manage_editor", params, instance_id=unity_instance)
# Preserve structured failure data; unwrap success into a friendlier shape
if isinstance(response, dict) and response.get("success"):
diff --git a/Server/tools/manage_gameobject.py b/Server/tools/manage_gameobject.py
index 794013b9..2896e309 100644
--- a/Server/tools/manage_gameobject.py
+++ b/Server/tools/manage_gameobject.py
@@ -64,6 +64,8 @@ def manage_gameobject(
# Controls whether serialization of private [SerializeField] fields is included
includeNonPublicSerialized: Annotated[bool | str,
"Controls whether serialization of private [SerializeField] fields is included (accepts true/false or 'true'/'false')"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing manage_gameobject: {action}")
@@ -195,8 +197,8 @@ def _to_vec3(parts):
params.pop("prefabFolder", None)
# --------------------------------
- # Use centralized retry helper
- response = send_command_with_retry("manage_gameobject", params)
+ # Use centralized retry helper with instance routing
+ response = send_command_with_retry("manage_gameobject", params, instance_id=unity_instance)
# Check if the response indicates success
# If the response is not successful, raise an exception with the error message
diff --git a/Server/tools/manage_prefabs.py b/Server/tools/manage_prefabs.py
index 2540e9f2..010ceada 100644
--- a/Server/tools/manage_prefabs.py
+++ b/Server/tools/manage_prefabs.py
@@ -28,6 +28,8 @@ def manage_prefabs(
"Allow replacing an existing prefab at the same path"] | None = None,
search_inactive: Annotated[bool,
"Include inactive objects when resolving the target name"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing manage_prefabs: {action}")
try:
@@ -45,7 +47,7 @@ def manage_prefabs(
params["allowOverwrite"] = bool(allow_overwrite)
if search_inactive is not None:
params["searchInactive"] = bool(search_inactive)
- response = send_command_with_retry("manage_prefabs", params)
+ response = send_command_with_retry("manage_prefabs", params, instance_id=unity_instance)
if isinstance(response, dict) and response.get("success"):
return {
diff --git a/Server/tools/manage_scene.py b/Server/tools/manage_scene.py
index 50927ca9..a5748196 100644
--- a/Server/tools/manage_scene.py
+++ b/Server/tools/manage_scene.py
@@ -15,6 +15,8 @@ def manage_scene(
"Asset path for scene operations (default: 'Assets/')"] | None = None,
build_index: Annotated[int | str,
"Build index for load/build settings actions (accepts int or string, e.g., 0 or '0')"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing manage_scene: {action}")
try:
@@ -44,8 +46,8 @@ def _coerce_int(value, default=None):
if coerced_build_index is not None:
params["buildIndex"] = coerced_build_index
- # Use centralized retry helper
- response = send_command_with_retry("manage_scene", params)
+ # Use centralized retry helper with instance routing
+ response = send_command_with_retry("manage_scene", params, instance_id=unity_instance)
# Preserve structured failure data; unwrap success into a friendlier shape
if isinstance(response, dict) and response.get("success"):
diff --git a/Server/tools/manage_script.py b/Server/tools/manage_script.py
index 6ed8cbca..831afa9e 100644
--- a/Server/tools/manage_script.py
+++ b/Server/tools/manage_script.py
@@ -85,6 +85,8 @@ def apply_text_edits(
"Optional strict flag, used to enforce strict mode"] | None = None,
options: Annotated[dict[str, Any],
"Optional options, used to pass additional options to the script editor"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing apply_text_edits: {uri}")
name, directory = _split_uri(uri)
@@ -107,7 +109,7 @@ def _needs_normalization(arr: list[dict[str, Any]]) -> bool:
"action": "read",
"name": name,
"path": directory,
- })
+ }, instance_id=unity_instance)
if not (isinstance(read_resp, dict) and read_resp.get("success")):
return read_resp if isinstance(read_resp, dict) else {"success": False, "message": str(read_resp)}
data = read_resp.get("data", {})
@@ -304,7 +306,7 @@ def _le(a: tuple[int, int], b: tuple[int, int]) -> bool:
"options": opts,
}
params = {k: v for k, v in params.items() if v is not None}
- resp = unity_connection.send_command_with_retry("manage_script", params)
+ resp = unity_connection.send_command_with_retry("manage_script", params, instance_id=unity_instance)
if isinstance(resp, dict):
data = resp.setdefault("data", {})
data.setdefault("normalizedEdits", normalized_edits)
@@ -341,6 +343,7 @@ def _flip_async():
{"menuPath": "MCP/Flip Reload Sentinel"},
max_retries=0,
retry_ms=0,
+ instance_id=unity_instance,
)
except Exception:
pass
@@ -359,6 +362,8 @@ def create_script(
contents: Annotated[str, "Contents of the script to create. Note, this is Base64 encoded over transport."],
script_type: Annotated[str, "Script type (e.g., 'C#')"] | None = None,
namespace: Annotated[str, "Namespace for the script"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing create_script: {path}")
name = os.path.splitext(os.path.basename(path))[0]
@@ -386,14 +391,16 @@ def create_script(
contents.encode("utf-8")).decode("utf-8")
params["contentsEncoded"] = True
params = {k: v for k, v in params.items() if v is not None}
- resp = unity_connection.send_command_with_retry("manage_script", params)
+ resp = unity_connection.send_command_with_retry("manage_script", params, instance_id=unity_instance)
return resp if isinstance(resp, dict) else {"success": False, "message": str(resp)}
@mcp_for_unity_tool(description=("Delete a C# script by URI or Assets-relative path."))
def delete_script(
ctx: Context,
- uri: Annotated[str, "URI of the script to delete under Assets/ directory, unity://path/Assets/... or file://... or Assets/..."]
+ uri: Annotated[str, "URI of the script to delete under Assets/ directory, unity://path/Assets/... or file://... or Assets/..."],
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
"""Delete a C# script by URI."""
ctx.info(f"Processing delete_script: {uri}")
@@ -401,7 +408,7 @@ def delete_script(
if not directory or directory.split("/")[0].lower() != "assets":
return {"success": False, "code": "path_outside_assets", "message": "URI must resolve under 'Assets/'."}
params = {"action": "delete", "name": name, "path": directory}
- resp = unity_connection.send_command_with_retry("manage_script", params)
+ resp = unity_connection.send_command_with_retry("manage_script", params, instance_id=unity_instance)
return resp if isinstance(resp, dict) else {"success": False, "message": str(resp)}
@@ -412,7 +419,9 @@ def validate_script(
level: Annotated[Literal['basic', 'standard'],
"Validation level"] = "basic",
include_diagnostics: Annotated[bool,
- "Include full diagnostics and summary"] = False
+ "Include full diagnostics and summary"] = False,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing validate_script: {uri}")
name, directory = _split_uri(uri)
@@ -426,7 +435,7 @@ def validate_script(
"path": directory,
"level": level,
}
- resp = unity_connection.send_command_with_retry("manage_script", params)
+ resp = unity_connection.send_command_with_retry("manage_script", params, instance_id=unity_instance)
if isinstance(resp, dict) and resp.get("success"):
diags = resp.get("data", {}).get("diagnostics", []) or []
warnings = sum(1 for d in diags if str(
@@ -450,6 +459,8 @@ def manage_script(
script_type: Annotated[str, "Script type (e.g., 'C#')",
"Type hint (e.g., 'MonoBehaviour')"] | None = None,
namespace: Annotated[str, "Namespace for the script"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing manage_script: {action}")
try:
@@ -473,7 +484,7 @@ def manage_script(
params = {k: v for k, v in params.items() if v is not None}
- response = unity_connection.send_command_with_retry("manage_script", params)
+ response = unity_connection.send_command_with_retry("manage_script", params, instance_id=unity_instance)
if isinstance(response, dict):
if response.get("success"):
@@ -535,13 +546,15 @@ def manage_script_capabilities(ctx: Context) -> dict[str, Any]:
@mcp_for_unity_tool(description="Get SHA256 and basic metadata for a Unity C# script without returning file contents")
def get_sha(
ctx: Context,
- uri: Annotated[str, "URI of the script to edit under Assets/ directory, unity://path/Assets/... or file://... or Assets/..."]
+ uri: Annotated[str, "URI of the script to edit under Assets/ directory, unity://path/Assets/... or file://... or Assets/..."],
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing get_sha: {uri}")
try:
name, directory = _split_uri(uri)
params = {"action": "get_sha", "name": name, "path": directory}
- resp = unity_connection.send_command_with_retry("manage_script", params)
+ resp = unity_connection.send_command_with_retry("manage_script", params, instance_id=unity_instance)
if isinstance(resp, dict) and resp.get("success"):
data = resp.get("data", {})
minimal = {"sha256": data.get(
diff --git a/Server/tools/manage_shader.py b/Server/tools/manage_shader.py
index 19b94550..625f4cb0 100644
--- a/Server/tools/manage_shader.py
+++ b/Server/tools/manage_shader.py
@@ -16,6 +16,8 @@ def manage_shader(
path: Annotated[str, "Asset path (default: \"Assets/\")"],
contents: Annotated[str,
"Shader code for 'create'/'update'"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing manage_shader: {action}")
try:
@@ -39,8 +41,8 @@ def manage_shader(
# Remove None values so they don't get sent as null
params = {k: v for k, v in params.items() if v is not None}
- # Send command via centralized retry helper
- response = send_command_with_retry("manage_shader", params)
+ # Send command via centralized retry helper with instance routing
+ response = send_command_with_retry("manage_shader", params, instance_id=unity_instance)
# Process response from Unity
if isinstance(response, dict) and response.get("success"):
diff --git a/Server/tools/read_console.py b/Server/tools/read_console.py
index d922982c..0dbdf465 100644
--- a/Server/tools/read_console.py
+++ b/Server/tools/read_console.py
@@ -23,7 +23,9 @@ def read_console(
format: Annotated[Literal['plain', 'detailed',
'json'], "Output format"] | None = None,
include_stacktrace: Annotated[bool | str,
- "Include stack traces in output (accepts true/false or 'true'/'false')"] | None = None
+ "Include stack traces in output (accepts true/false or 'true'/'false')"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None
) -> dict[str, Any]:
ctx.info(f"Processing read_console: {action}")
# Set defaults if values are None
@@ -87,8 +89,8 @@ def _coerce_int(value, default=None):
if 'count' not in params_dict:
params_dict['count'] = None
- # Use centralized retry helper
- resp = send_command_with_retry("read_console", params_dict)
+ # Use centralized retry helper with instance routing
+ resp = send_command_with_retry("read_console", params_dict, instance_id=unity_instance)
if isinstance(resp, dict) and resp.get("success") and not include_stacktrace:
# Strip stacktrace fields from returned lines if present
try:
diff --git a/Server/tools/resource_tools.py b/Server/tools/resource_tools.py
index d84bf7be..08c3035d 100644
--- a/Server/tools/resource_tools.py
+++ b/Server/tools/resource_tools.py
@@ -42,7 +42,7 @@ def _coerce_int(value: Any, default: int | None = None, minimum: int | None = No
return default
-def _resolve_project_root(override: str | None) -> Path:
+def _resolve_project_root(override: str | None, unity_instance: str | None = None) -> Path:
# 1) Explicit override
if override:
pr = Path(override).expanduser().resolve()
@@ -60,7 +60,7 @@ def _resolve_project_root(override: str | None) -> Path:
# 3) Ask Unity via manage_editor.get_project_root
try:
resp = send_command_with_retry(
- "manage_editor", {"action": "get_project_root"})
+ "manage_editor", {"action": "get_project_root"}, instance_id=unity_instance)
if isinstance(resp, dict) and resp.get("success"):
pr = Path(resp.get("data", {}).get(
"projectRoot", "")).expanduser().resolve()
@@ -141,10 +141,12 @@ async def list_resources(
"Folder under project root, default is Assets"] = "Assets",
limit: Annotated[int, "Page limit"] = 200,
project_root: Annotated[str, "Project path"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing list_resources: {pattern}")
try:
- project = _resolve_project_root(project_root)
+ project = _resolve_project_root(project_root, unity_instance)
base = (project / under).resolve()
try:
base.relative_to(project)
@@ -201,6 +203,8 @@ async def read_resource(
project_root: Annotated[str,
"The project root directory"] | None = None,
request: Annotated[str, "The request ID"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing read_resource: {uri}")
try:
@@ -266,7 +270,7 @@ async def read_resource(
sha = hashlib.sha256(spec_json.encode("utf-8")).hexdigest()
return {"success": True, "data": {"text": spec_json, "metadata": {"sha256": sha}}}
- project = _resolve_project_root(project_root)
+ project = _resolve_project_root(project_root, unity_instance)
p = _resolve_safe_path_from_uri(uri, project)
if not p or not p.exists() or not p.is_file():
return {"success": False, "error": f"Resource not found: {uri}"}
@@ -356,10 +360,12 @@ async def find_in_file(
"The project root directory"] | None = None,
max_results: Annotated[int,
"Cap results to avoid huge payloads"] = 200,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing find_in_file: {uri}")
try:
- project = _resolve_project_root(project_root)
+ project = _resolve_project_root(project_root, unity_instance)
p = _resolve_safe_path_from_uri(uri, project)
if not p or not p.exists() or not p.is_file():
return {"success": False, "error": f"Resource not found: {uri}"}
diff --git a/Server/tools/run_tests.py b/Server/tools/run_tests.py
index e70fd00c..6e3e3960 100644
--- a/Server/tools/run_tests.py
+++ b/Server/tools/run_tests.py
@@ -45,6 +45,8 @@ async def run_tests(
description="Unity test mode to run")] = "edit",
timeout_seconds: Annotated[str, Field(
description="Optional timeout in seconds for the Unity test run (string, e.g. '30')")] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> RunTestsResponse:
await ctx.info(f"Processing run_tests: mode={mode}")
@@ -69,6 +71,6 @@ def _coerce_int(value, default=None):
if ts is not None:
params["timeoutSeconds"] = ts
- response = await async_send_command_with_retry("run_tests", params)
+ response = await async_send_command_with_retry("run_tests", params, instance_id=unity_instance)
await ctx.info(f'Response {response}')
return RunTestsResponse(**response) if isinstance(response, dict) else response
diff --git a/Server/tools/script_apply_edits.py b/Server/tools/script_apply_edits.py
index e339a754..d99770a8 100644
--- a/Server/tools/script_apply_edits.py
+++ b/Server/tools/script_apply_edits.py
@@ -365,6 +365,8 @@ def script_apply_edits(
"Type of the script to edit"] = "MonoBehaviour",
namespace: Annotated[str,
"Namespace of the script to edit"] | None = None,
+ unity_instance: Annotated[str,
+ "Target Unity instance (project name, hash, or 'Name@hash'). If not specified, uses default instance."] | None = None,
) -> dict[str, Any]:
ctx.info(f"Processing script_apply_edits: {name}")
# Normalize locator first so downstream calls target the correct script file.
@@ -586,7 +588,7 @@ def error_with_hint(message: str, expected: dict[str, Any], suggestion: dict[str
"options": opts2,
}
resp_struct = send_command_with_retry(
- "manage_script", params_struct)
+ "manage_script", params_struct, instance_id=unity_instance)
if isinstance(resp_struct, dict) and resp_struct.get("success"):
pass # Optional sentinel reload removed (deprecated)
return _with_norm(resp_struct if isinstance(resp_struct, dict) else {"success": False, "message": str(resp_struct)}, normalized_for_echo, routing="structured")
@@ -598,7 +600,7 @@ def error_with_hint(message: str, expected: dict[str, Any], suggestion: dict[str
"path": path,
"namespace": namespace,
"scriptType": script_type,
- })
+ }, instance_id=unity_instance)
if not isinstance(read_resp, dict) or not read_resp.get("success"):
return read_resp if isinstance(read_resp, dict) else {"success": False, "message": str(read_resp)}
@@ -722,7 +724,7 @@ def _expand_dollars(rep: str, _m=m) -> str:
"options": {"refresh": (options or {}).get("refresh", "debounced"), "validate": (options or {}).get("validate", "standard"), "applyMode": ("atomic" if len(at_edits) > 1 else (options or {}).get("applyMode", "sequential"))}
}
resp_text = send_command_with_retry(
- "manage_script", params_text)
+ "manage_script", params_text, instance_id=unity_instance)
if not (isinstance(resp_text, dict) and resp_text.get("success")):
return _with_norm(resp_text if isinstance(resp_text, dict) else {"success": False, "message": str(resp_text)}, normalized_for_echo, routing="mixed/text-first")
# Optional sentinel reload removed (deprecated)
@@ -743,7 +745,7 @@ def _expand_dollars(rep: str, _m=m) -> str:
"options": opts2
}
resp_struct = send_command_with_retry(
- "manage_script", params_struct)
+ "manage_script", params_struct, instance_id=unity_instance)
if isinstance(resp_struct, dict) and resp_struct.get("success"):
pass # Optional sentinel reload removed (deprecated)
return _with_norm(resp_struct if isinstance(resp_struct, dict) else {"success": False, "message": str(resp_struct)}, normalized_for_echo, routing="mixed/text-first")
@@ -871,7 +873,7 @@ def _expand_dollars(rep: str, _m=m) -> str:
"applyMode": ("atomic" if len(at_edits) > 1 else (options or {}).get("applyMode", "sequential"))
}
}
- resp = send_command_with_retry("manage_script", params)
+ resp = send_command_with_retry("manage_script", params, instance_id=unity_instance)
if isinstance(resp, dict) and resp.get("success"):
pass # Optional sentinel reload removed (deprecated)
return _with_norm(
@@ -955,7 +957,7 @@ def _expand_dollars(rep: str, _m=m) -> str:
"options": options or {"validate": "standard", "refresh": "debounced"},
}
- write_resp = send_command_with_retry("manage_script", params)
+ write_resp = send_command_with_retry("manage_script", params, instance_id=unity_instance)
if isinstance(write_resp, dict) and write_resp.get("success"):
pass # Optional sentinel reload removed (deprecated)
return _with_norm(
diff --git a/Server/unity_connection.py b/Server/unity_connection.py
index f0e06b76..916229ee 100644
--- a/Server/unity_connection.py
+++ b/Server/unity_connection.py
@@ -11,9 +11,10 @@
import struct
import threading
import time
-from typing import Any, Dict
+from typing import Any
from models import MCPResponse
+from connection_pool import get_unity_connection_pool
# Configure logging using settings from config
@@ -37,6 +38,7 @@ class UnityConnection:
port: int = None # Will be set dynamically
sock: socket.socket = None # Socket for Unity communication
use_framing: bool = False # Negotiated per-connection
+ instance_id: str | None = None # Instance identifier for reconnection
def __post_init__(self):
"""Set port from discovery if not explicitly provided"""
@@ -223,40 +225,63 @@ def receive_full_response(self, sock, buffer_size=config.buffer_size) -> bytes:
logger.error(f"Error during receive: {str(e)}")
raise
- def send_command(self, command_type: str, params: Dict[str, Any] = None) -> Dict[str, Any]:
+ def send_command(self, command_type: str, params: dict[str, Any] | None = None) -> dict[str, Any]:
"""Send a command with retry/backoff and port rediscovery. Pings only when requested."""
# Defensive guard: catch empty/placeholder invocations early
if not command_type:
raise ValueError("MCP call missing command_type")
if params is None:
- return MCPResponse(success=False, error="MCP call received with no parameters (client placeholder?)")
+ params = {}
attempts = max(config.max_retries, 5)
base_backoff = max(0.5, config.retry_delay)
- def read_status_file() -> dict | None:
+ def read_status_file(target_hash: str | None = None) -> dict | None:
try:
- status_files = sorted(Path.home().joinpath(
- '.unity-mcp').glob('unity-mcp-status-*.json'), key=lambda p: p.stat().st_mtime, reverse=True)
+ base_path = Path.home().joinpath('.unity-mcp')
+ status_files = sorted(
+ base_path.glob('unity-mcp-status-*.json'),
+ key=lambda p: p.stat().st_mtime,
+ reverse=True,
+ )
if not status_files:
return None
- latest = status_files[0]
- with latest.open('r') as f:
+ if target_hash:
+ for status_path in status_files:
+ if status_path.stem.endswith(target_hash):
+ with status_path.open('r') as f:
+ return json.load(f)
+ # Target hash not found - don't fallback to avoid reading wrong instance's status
+ logger.debug(f"Status file for hash '{target_hash}' not found, available: {[p.stem for p in status_files[:3]]}")
+ return None
+ # Only return most recent when no specific hash requested
+ with status_files[0].open('r') as f:
return json.load(f)
except Exception:
return None
last_short_timeout = None
+ # Extract hash suffix from instance id (e.g., Project@hash)
+ target_hash: str | None = None
+ if self.instance_id:
+ if '@' in self.instance_id:
+ maybe_hash = self.instance_id.split('@', 1)[1].strip()
+ if maybe_hash:
+ target_hash = maybe_hash
+ else:
+ # instance_id is just the hash (fallback format)
+ target_hash = self.instance_id.strip() or None
+
# Preflight: if Unity reports reloading, return a structured hint so clients can retry politely
try:
- status = read_status_file()
+ status = read_status_file(target_hash)
if status and (status.get('reloading') or status.get('reason') == 'reloading'):
- return MCPResponse(
- success=False,
- error="Unity domain reload in progress, please try again shortly",
- data={"state": "reloading", "retry_after_ms": int(
- config.reload_retry_ms)}
- )
+ return {
+ "success": False,
+ "state": "reloading",
+ "retry_after_ms": int(config.reload_retry_ms),
+ "error": "Unity domain reload in progress, please try again shortly",
+ }
except Exception:
pass
@@ -328,9 +353,28 @@ def read_status_file() -> dict | None:
finally:
self.sock = None
- # Re-discover port each time
+ # Re-discover the port for this specific instance
try:
- new_port = PortDiscovery.discover_unity_port()
+ new_port: int | None = None
+ if self.instance_id:
+ # Try to rediscover the specific instance
+ pool = get_unity_connection_pool()
+ refreshed = pool.discover_all_instances(force_refresh=True)
+ match = next((inst for inst in refreshed if inst.id == self.instance_id), None)
+ if match:
+ new_port = match.port
+ logger.debug(f"Rediscovered instance {self.instance_id} on port {new_port}")
+ else:
+ logger.warning(f"Instance {self.instance_id} not found during reconnection")
+
+ # Fallback to generic port discovery if instance-specific discovery failed
+ if new_port is None:
+ if self.instance_id:
+ raise ConnectionError(
+ f"Unity instance '{self.instance_id}' could not be rediscovered"
+ ) from e
+ new_port = PortDiscovery.discover_unity_port()
+
if new_port != self.port:
logger.info(
f"Unity port changed {self.port} -> {new_port}")
@@ -340,7 +384,7 @@ def read_status_file() -> dict | None:
if attempt < attempts:
# Heartbeat-aware, jittered backoff
- status = read_status_file()
+ status = read_status_file(target_hash)
# Base exponential backoff
backoff = base_backoff * (2 ** attempt)
# Decorrelated jitter multiplier
@@ -371,32 +415,22 @@ def read_status_file() -> dict | None:
raise
-# Global Unity connection
-_unity_connection = None
+# Backwards compatibility: keep old single-connection function
+def get_unity_connection(instance_identifier: str | None = None) -> UnityConnection:
+ """Retrieve or establish a Unity connection.
+
+ Args:
+ instance_identifier: Optional identifier for specific Unity instance.
+ If None, uses default or most recent instance.
-def get_unity_connection() -> UnityConnection:
- """Retrieve or establish a persistent Unity connection.
+ Returns:
+ UnityConnection to the specified or default Unity instance
- Note: Do NOT ping on every retrieval to avoid connection storms. Rely on
- send_command() exceptions to detect broken sockets and reconnect there.
+ Note: This function now uses the connection pool internally.
"""
- global _unity_connection
- if _unity_connection is not None:
- return _unity_connection
-
- # Double-checked locking to avoid concurrent socket creation
- with _connection_lock:
- if _unity_connection is not None:
- return _unity_connection
- logger.info("Creating new Unity connection")
- _unity_connection = UnityConnection()
- if not _unity_connection.connect():
- _unity_connection = None
- raise ConnectionError(
- "Could not connect to Unity. Ensure the Unity Editor and MCP Bridge are running.")
- logger.info("Connected to Unity on startup")
- return _unity_connection
+ pool = get_unity_connection_pool()
+ return pool.get_connection(instance_identifier)
# -----------------------------
@@ -413,13 +447,30 @@ def _is_reloading_response(resp: dict) -> bool:
return "reload" in message_text
-def send_command_with_retry(command_type: str, params: Dict[str, Any], *, max_retries: int | None = None, retry_ms: int | None = None) -> Dict[str, Any]:
- """Send a command via the shared connection, waiting politely through Unity reloads.
+def send_command_with_retry(
+ command_type: str,
+ params: dict[str, Any],
+ *,
+ instance_id: str | None = None,
+ max_retries: int | None = None,
+ retry_ms: int | None = None
+) -> dict[str, Any]:
+ """Send a command to a Unity instance, waiting politely through Unity reloads.
+
+ Args:
+ command_type: The command type to send
+ params: Command parameters
+ instance_id: Optional Unity instance identifier (name, hash, name@hash, etc.)
+ max_retries: Maximum number of retries for reload states
+ retry_ms: Delay between retries in milliseconds
+
+ Returns:
+ Response dictionary from Unity
Uses config.reload_retry_ms and config.reload_max_retries by default. Preserves the
structured failure if retries are exhausted.
"""
- conn = get_unity_connection()
+ conn = get_unity_connection(instance_id)
if max_retries is None:
max_retries = getattr(config, "reload_max_retries", 40)
if retry_ms is None:
@@ -436,8 +487,28 @@ def send_command_with_retry(command_type: str, params: Dict[str, Any], *, max_re
return response
-async def async_send_command_with_retry(command_type: str, params: dict[str, Any], *, loop=None, max_retries: int | None = None, retry_ms: int | None = None) -> dict[str, Any] | MCPResponse:
- """Async wrapper that runs the blocking retry helper in a thread pool."""
+async def async_send_command_with_retry(
+ command_type: str,
+ params: dict[str, Any],
+ *,
+ instance_id: str | None = None,
+ loop=None,
+ max_retries: int | None = None,
+ retry_ms: int | None = None
+) -> dict[str, Any] | MCPResponse:
+ """Async wrapper that runs the blocking retry helper in a thread pool.
+
+ Args:
+ command_type: The command type to send
+ params: Command parameters
+ instance_id: Optional Unity instance identifier
+ loop: Optional asyncio event loop
+ max_retries: Maximum number of retries for reload states
+ retry_ms: Delay between retries in milliseconds
+
+ Returns:
+ Response dictionary or MCPResponse on error
+ """
try:
import asyncio # local import to avoid mandatory asyncio dependency for sync callers
if loop is None:
@@ -445,7 +516,7 @@ async def async_send_command_with_retry(command_type: str, params: dict[str, Any
return await loop.run_in_executor(
None,
lambda: send_command_with_retry(
- command_type, params, max_retries=max_retries, retry_ms=retry_ms),
+ command_type, params, instance_id=instance_id, max_retries=max_retries, retry_ms=retry_ms),
)
except Exception as e:
return MCPResponse(success=False, error=str(e))