Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,11 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
.idea/

.vscode/

.claude/

# Ruff stuff:
.ruff_cache/
Expand Down
4 changes: 4 additions & 0 deletions samples/mcp-math-server/bindings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"version": "2.0",
"resources": []
}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this change needed?

2 changes: 1 addition & 1 deletion samples/mcp-math-server/mcp.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"servers": {
"math-server": {
"coded-math-mcp": {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this related to the watchdog PR?

"transport": "stdio",
"command": "python",
"args": ["server.py"]
Expand Down
2 changes: 1 addition & 1 deletion samples/mcp-math-server/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ version = "0.0.1"
description = "Advanced Math Operations MCP Server"
authors = [{ name = "John Doe" }]
dependencies = [
"uipath-mcp>=0.0.101",
"uipath-mcp>=0.1.4",
]
requires-python = ">=3.11"
2,698 changes: 1,387 additions & 1,311 deletions samples/mcp-math-server/uv.lock

Large diffs are not rendered by default.

198 changes: 109 additions & 89 deletions src/uipath_mcp/_cli/_runtime/_runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,14 @@
from .._utils._config import McpServer
from ._context import UiPathServerType
from ._exception import McpErrorCode, UiPathMcpRuntimeError
from ._session import BaseSessionServer, StdioSessionServer, StreamableHttpSessionServer
from ._session import (
BaseSessionServer,
SessionHealthInfo,
StdioSessionServer,
StreamableHttpSessionServer,
)
from ._token_refresh import TokenRefresher
from ._watchdog import SessionWatchdog

logger = logging.getLogger(__name__)
tracer = trace.get_tracer(__name__)
Expand Down Expand Up @@ -86,6 +92,7 @@ def __init__(
self._http_stderr_drain_task: asyncio.Task[None] | None = None
self._http_server_stderr_lines: list[str] = []
self._uipath = UiPath()
self._watchdog: SessionWatchdog | None = None
self._token_refresher: TokenRefresher | None = None
self._cleanup_done = False

Expand Down Expand Up @@ -118,6 +125,38 @@ def _validate_auth(self) -> None:
UiPathErrorCategory.SYSTEM,
)

def get_sessions(self) -> dict[str, SessionHealthInfo]:
"""Return health info for all active sessions (SessionProvider protocol)."""
return {
sid: session.get_health_info()
for sid, session in self._session_servers.items()
}

async def remove_session(self, session_id: str, reason: str) -> None:
"""Pop, stop, and clean up a single session (SessionProvider protocol)."""
session_server = self._session_servers.pop(session_id, None)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like a pre-existing race widened by this PR. pop() removes the entry before await session_Server.stop(), and stop() can hold for up to 3s. During that time, a new messageReceived for the same session id will pass the if session_id not in self._session_servers check in _handle_signalr_message and create a fresh session server racing the dying one. Since this new watchdog will trigger the removal path more oftern than the old code, I think we should fix it now.

if session_server is None:
return

logger.warning(f"Removing session {session_id}: {reason}")

try:
await session_server.stop()
except Exception:
logger.error(
f"Error stopping session {session_id}",
exc_info=True,
)

if session_server.output:
if self.sandboxed:
self._session_output = session_server.output
else:
logger.info(f"Session {session_id} output: {session_server.output}")

if self.sandboxed:
self._cancel_event.set()
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This unifies four previously-distinct paths into one, which is a real improvement, but the trailing if self.sandboxed: self._cancel_event.set() is a behavior change for some of them. Before, only _handle_signalr_session_closed cancelled the sandboxed runtime. Now the watchdog (dead task or idle timeout) and _monitor_http_server_process (HTTP crash) will also cancel it, and _cleanup's own loop will redundantly fire _cancel_event.set() on every iteration.

For HTTP-crash and SignalR-closed this is correct. For watchdog dead-task: in a sandboxed runtime that hosts multiple session servers, one task dying with an exception will now kill the whole runtime. Is this intended?

Maybe we can make it explicit, idk:
async def remove_session(self, session_id: str, reason: str, cancel_runtime_on_sandbox: bool = True) -> None:
...
if self.sandboxed and cancel_runtime_on_sandbox:
self._cancel_event.set()


async def get_schema(self) -> UiPathRuntimeSchema:
"""Get schema for this MCP runtime.

Expand Down Expand Up @@ -240,6 +279,9 @@ async def _run_server(self) -> UiPathRuntimeResult:
run_task = asyncio.create_task(self._signalr_client.run())
cancel_task = asyncio.create_task(self._cancel_event.wait())
self._keep_alive_task = asyncio.create_task(self._keep_alive())

self._watchdog = SessionWatchdog(self)
self._watchdog.start()
self._token_refresher.start()

try:
Expand All @@ -253,8 +295,8 @@ async def _run_server(self) -> UiPathRuntimeResult:
)
self._cancel_event.set()
finally:
# Cancel any pending tasks gracefully
for task in [run_task, cancel_task, self._keep_alive_task]:
# Cancel pending tasks
for task in [run_task, cancel_task]:
if task and not task.done():
task.cancel()
try:
Expand All @@ -280,7 +322,7 @@ async def _run_server(self) -> UiPathRuntimeResult:
except Exception as e:
if isinstance(e, UiPathMcpRuntimeError):
raise
detail = f"Error: {str(e)}"
detail = f"Error: {e}"
raise UiPathMcpRuntimeError(
UiPathErrorCode.EXECUTION_ERROR,
"MCP Runtime execution failed",
Expand Down Expand Up @@ -312,11 +354,12 @@ async def _cleanup(self) -> None:
except asyncio.CancelledError:
pass

for session_id, session_server in list(self._session_servers.items()):
try:
await session_server.stop()
except Exception as e:
logger.error(f"Error cleaning up session server {session_id}: {str(e)}")
if self._watchdog:
await self._watchdog.stop()
self._watchdog = None
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The watchdog should be stopped before the long awaits


for session_id in list(self._session_servers.keys()):
await self.remove_session(session_id, reason="runtime shutdown")

# Stop the shared HTTP server process (streamable-http only)
await self._stop_http_server_process()
Expand All @@ -327,46 +370,30 @@ async def _cleanup(self) -> None:
try:
await transport._ws.close()
except Exception as e:
logger.error(f"Error closing SignalR WebSocket: {str(e)}")
logger.error(f"Error closing SignalR WebSocket: {e}")

# Add a small delay to allow the server to shut down gracefully
if sys.platform == "win32":
await asyncio.sleep(0.5)

async def _handle_signalr_session_closed(self, args: list[str]) -> None:
"""
Handle session closed by server.
"""
"""Handle session closed by server."""
if self._cleanup_done:
return

if len(args) < 1:
logger.error(f"Received invalid websocket message arguments: {args}")
return

session_id = args[0]

logger.info(f"Received closed signal for session {session_id}")

try:
session_server = self._session_servers.pop(session_id, None)
if session_server:
await session_server.stop()
if session_server.output:
if self.sandboxed:
self._session_output = session_server.output
else:
logger.info(
f"Session {session_id} output: {session_server.output}"
)
# If this is a sandboxed runtime for a specific session, cancel the execution
if self.sandboxed:
self._cancel_event.set()

except Exception as e:
logger.error(f"Error terminating session {session_id}: {str(e)}")
await self.remove_session(session_id, reason="server closed")

async def _handle_signalr_message(self, args: list[str]) -> None:
"""
Handle incoming SignalR messages.
"""
"""Handle incoming SignalR messages."""
if self._cleanup_done:
return

if len(args) < 2:
logger.error(f"Received invalid websocket message arguments: {args}")
return
Expand All @@ -392,7 +419,7 @@ async def _handle_signalr_message(self, args: list[str]) -> None:
await session_server.start()
except Exception as e:
logger.error(
f"Error starting session server for session {session_id}: {str(e)}"
f"Error starting session server for session {session_id}: {e}"
)
await self._on_session_start_error(session_id)
raise
Expand All @@ -406,7 +433,7 @@ async def _handle_signalr_message(self, args: list[str]) -> None:

except Exception as e:
logger.error(
f"Error handling websocket notification for session {session_id}: {str(e)}"
f"Error handling websocket notification for session {session_id}: {e}"
)

async def _handle_signalr_error(self, error: Any) -> None:
Expand All @@ -421,17 +448,21 @@ async def _handle_signalr_close(self) -> None:
"""Handle SignalR connection close event."""
logger.info("Websocket connection closed.")

async def _start_http_server_process(self) -> None:
"""Spawn the streamable-http server process.

The process is started once and shared across all sessions.
"""
def _get_server_env(self) -> dict[str, str]:
"""Return server env vars, with os.environ merged in for Coded servers."""
env_vars = self._server.env.copy()
if self.server_type is UiPathServerType.Coded:
for name, value in os.environ.items():
if name not in env_vars:
env_vars[name] = value
return env_vars

async def _start_http_server_process(self) -> None:
"""Spawn the streamable-http server process.

The process is started once and shared across all sessions.
"""
env_vars = self._get_server_env()
merged_env = {**os.environ, **env_vars} if env_vars else None
self._http_server_stderr_lines = []
self._http_server_process = await asyncio.create_subprocess_exec(
Expand Down Expand Up @@ -472,7 +503,12 @@ async def _wait_for_http_server_ready(

url = self._server.url
if not url:
raise ValueError("streamable-http transport requires url in config")
raise UiPathMcpRuntimeError(
McpErrorCode.CONFIGURATION_ERROR,
"Missing URL for streamable-http server",
"Please specify a 'url' in the server configuration for streamable-http transport.",
UiPathErrorCategory.SYSTEM,
)

for attempt in range(max_retries):
# Check if process has crashed
Expand Down Expand Up @@ -561,13 +597,9 @@ async def _monitor_http_server_process(self) -> None:
# Stop all HTTP sessions, they will fail on next request anyway
for session_id, session_server in list(self._session_servers.items()):
if isinstance(session_server, StreamableHttpSessionServer):
try:
await session_server.stop()
except Exception as e:
logger.error(
f"Error stopping session {session_id} after process crash: {e}"
)
self._session_servers.pop(session_id, None)
await self.remove_session(
session_id, reason="http process crash"
)
except asyncio.CancelledError:
pass

Expand All @@ -577,14 +609,6 @@ async def _register(self) -> None:
initialization_successful = False
tools_result: ListToolsResult | None = None
server_stderr_output = ""
env_vars = self._server.env

# if server is Coded, include environment variables
if self.server_type is UiPathServerType.Coded:
for name, value in os.environ.items():
# config env variables should have precedence over system ones
if name not in env_vars:
env_vars[name] = value

try:
if self._server.is_streamable_http:
Expand Down Expand Up @@ -624,7 +648,7 @@ async def _register(self) -> None:
server_params = StdioServerParameters(
command=self._server.command,
args=self._server.args,
env=env_vars,
env=self._get_server_env(),
)

with tempfile.TemporaryFile(mode="w+b") as stderr_temp_binary:
Expand Down Expand Up @@ -754,41 +778,39 @@ async def _on_session_start_error(self, session_id: str) -> None:
f"Error sending session dispose signal to UiPath MCP Server: {e}"
)

async def _on_keep_alive_response(self, response: CompletionMessage) -> None:
"""Handle keep-alive response: log session state, detect orphaned sandboxed runtimes."""
if response.error:
logger.error(f"Error during keep-alive: {response.error}")
return
session_ids = response.result
logger.info(f"Server active sessions: {session_ids}")
runtime_sessions = {}
for sid, s in self._session_servers.items():
health = s.get_health_info()
runtime_sessions[sid] = {
"task_done": health.task_done,
"active_requests": len(s._active_requests),
Copy link

Copilot AI Mar 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_on_keep_alive_response() reaches into BaseSessionServer's private _active_requests to log active request counts. This tightly couples the runtime to session internals and makes refactors risky (especially since SessionHealthInfo was introduced for health reporting). Consider exposing an official API for this (e.g., add an active_request_count field back to SessionHealthInfo or add a public method/property on BaseSessionServer) and use that instead of accessing _active_requests directly.

Suggested change
"active_requests": len(s._active_requests),
"active_requests": health.active_request_count,

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator

@PopescuTudor PopescuTudor May 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reaching into the session's private attribute defeats the purpose of SessionHealthInfo. The PR description says active_request_count was removed from SessionHealthInfo, but this is the use. Maybe we should restore it

Also, I see queue_size from SessionHealthInfo is never consumed.

}
logger.info(f"Runtime active sessions: {runtime_sessions}")
# If there are no active sessions and this is a sandbox environment
# We need to cancel the runtime
# eg: when user kills the agent that triggered the runtime, before we subscribe to events
if not session_ids and self.sandboxed and not self._cancel_event.is_set():
logger.warning("No active sessions, cancelling sandboxed runtime...")
self._cancel_event.set()

async def _keep_alive(self) -> None:
"""
Heartbeat to keep the runtime available.
"""
"""Heartbeat to keep the runtime available."""
try:
while not self._cancel_event.is_set():
try:

async def on_keep_alive_response(
response: CompletionMessage,
) -> None:
if response.error:
logger.error(f"Error during keep-alive: {response.error}")
return
session_ids = response.result
logger.info(f"Active sessions: {session_ids}")
# If there are no active sessions and this is a sandbox environment
# We need to cancel the runtime
# eg: when user kills the agent that triggered the runtime, before we subscribe to events
if (
not session_ids
and self.sandboxed
and not self._cancel_event.is_set()
):
logger.error(
"No active sessions, cancelling sandboxed runtime..."
)
self._cancel_event.set()

if self._signalr_client:
logger.info("Sending keep-alive ping...")
await self._signalr_client.send(
method="OnKeepAlive",
arguments=[],
on_invocation=on_keep_alive_response, # type: ignore
on_invocation=self._on_keep_alive_response, # type: ignore
)
else:
logger.error("SignalR client not initialized during keep-alive")
Expand All @@ -806,9 +828,7 @@ async def on_keep_alive_response(
raise

async def _on_runtime_abort(self) -> None:
"""
Sends a runtime abort signalr to terminate all connected sessions.
"""
"""Send a runtime abort request to terminate all connected sessions."""
try:
response = await self._uipath.api_client.request_async(
"POST",
Expand All @@ -821,7 +841,7 @@ async def _on_runtime_abort(self) -> None:
)
else:
logger.error(
f"Error sending runtime abort signalr to UiPath MCP Server: {response.status_code} - {response.text}"
f"Error sending runtime abort to UiPath MCP Server: {response.status_code} - {response.text}"
)
except Exception as e:
logger.error(
Expand Down
Loading
Loading