Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -239,3 +239,17 @@ outputs
evaluation/data/
test_add_pipeline.py
test_file_pipeline.py
data/
config.yaml

# ── Runtime / ephemeral (Violet addition 2026-05-27) ──
daemon/bridge.pid
memos-local/
bridge-status.json
apps/memos-local-plugin/daemon/
apps/memos-local-plugin/memos-local/
apps/memos-local-plugin/.memos-node-bin
apps/memos-local-plugin/bridge-status.json
apps/memos-local-plugin/prod_check.cjs
data.stale-backup/
scripts/trigger-scoring.py
123 changes: 93 additions & 30 deletions apps/memos-local-plugin/adapters/hermes/memos_provider/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@
if str(_PLUGIN_DIR) not in sys.path:
sys.path.insert(0, str(_PLUGIN_DIR))

from bridge_client import BridgeError, MemosBridgeClient # noqa: E402
from daemon_manager import ensure_bridge_running, ensure_viewer_daemon # noqa: E402
from bridge_client import BridgeError, MemosBridgeClient, MemosHttpClient # noqa: E402
from daemon_manager import ensure_bridge_running, ensure_viewer_daemon, probe_viewer_status, kill_zombie_bridges, startup_lock_active # noqa: E402


try: # pragma: no cover — host-provided base class, absent in unit tests
Expand Down Expand Up @@ -243,7 +243,7 @@ class MemTensorProvider(MemoryProvider):
"""

def __init__(self) -> None:
self._bridge: MemosBridgeClient | None = None
self._bridge: MemosBridgeClient | MemosHttpClient | None = None
self._reconnect_lock = threading.Lock()
self._session_id: str = ""
self._episode_id: str = ""
Expand Down Expand Up @@ -293,6 +293,23 @@ def is_available(self) -> bool: # type: ignore[override]

# ─── Lifecycle ────────────────────────────────────────────────────────

def _connect_http_bridge(self, session_id: str, *, timeout: float = 60.0) -> bool:
"""Try to connect via HTTP bridge. Sets self._bridge on success."""
http_bridge: MemosHttpClient | None = None
try:
http_bridge = MemosHttpClient()
http_bridge.register_host_handler("host.llm.complete", self._handle_host_llm_complete)
self._bridge = http_bridge
self._open_session(session_id, timeout=timeout)
return True
except Exception as err:
logger.warning("MemOS: HTTP bridge failed, falling back to stdio — %s", err)
if http_bridge is not None:
with contextlib.suppress(Exception):
http_bridge.close()
self._bridge = None
return False

def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[override]
"""Called once at agent startup.

Expand All @@ -314,34 +331,73 @@ def initialize(self, session_id: str, **kwargs: Any) -> None: # type: ignore[ov
except Exception as err:
logger.warning("MemOS: failed to start bridge — %s", err)
return

# Kill zombie bridges from previous sessions before deciding
# how to connect.
try:
ensure_viewer_daemon()
except Exception as err:
logger.warning("MemOS: viewer daemon check failed — %s", err)
new_bridge: MemosBridgeClient | None = None
try:
new_bridge = MemosBridgeClient()
# Register the fallback LLM handler BEFORE we open the
# session so it is available the very first time the
# plugin's facade asks for help (e.g. on the first
# `turn.start` retrieval call).
new_bridge.register_host_handler(
"host.llm.complete",
self._handle_host_llm_complete,
)
self._bridge = new_bridge
self._open_session(session_id)
logger.info(
"MemOS: bridge ready session=%s platform=%s (episode deferred)",
self._session_id,
self._platform,
)
except Exception as err:
logger.warning("MemOS: bridge init failed — %s", err)
if new_bridge is not None:
with contextlib.suppress(Exception):
new_bridge.close()
self._bridge = None
zombies = kill_zombie_bridges()
if zombies:
logger.info("MemOS: killed %d zombie bridge(s)", zombies)
except Exception:
pass

# If the daemon is already running on the viewer port, connect
# to it over HTTP instead of spawning a new stdio bridge. This
# eliminates zombie bridge accumulation.
viewer_status = probe_viewer_status()
if viewer_status == "running_memos":
if self._connect_http_bridge(session_id):
logger.info(
"MemOS: bridge ready (HTTP) session=%s platform=%s (episode deferred)",
self._session_id,
self._platform,
)
else:
viewer_status = "free" # force stdio fallback below
elif viewer_status == "free":
# Re-probe after a short wait only when another process may be
# mid-startup (startup lock is held). On a cold first-launch the
# lock doesn't exist, so we skip the delay entirely.
if startup_lock_active():
time.sleep(1.0)
viewer_status = probe_viewer_status()
if viewer_status == "running_memos":
if self._connect_http_bridge(session_id):
logger.info(
"MemOS: bridge ready (HTTP, late probe) session=%s platform=%s (episode deferred)",
self._session_id,
self._platform,
)

if self._bridge is None:
try:
ensure_viewer_daemon()
except Exception as err:
logger.warning("MemOS: viewer daemon check failed — %s", err)
new_bridge: MemosBridgeClient | None = None
try:
new_bridge = MemosBridgeClient()
# Register the fallback LLM handler BEFORE we open the
# session so it is available the very first time the
# plugin's facade asks for help (e.g. on the first
# `turn.start` retrieval call).
new_bridge.register_host_handler(
"host.llm.complete",
self._handle_host_llm_complete,
)
self._bridge = new_bridge
self._open_session(session_id, timeout=60.0)
logger.info(
"MemOS: bridge ready (stdio) session=%s platform=%s (episode deferred)",
self._session_id,
self._platform,
)
except Exception as err:
logger.warning("MemOS: bridge init failed — %s", err)
if new_bridge is not None:
with contextlib.suppress(Exception):
new_bridge.close()
self._bridge = None
# Register a Hermes plugin hook to capture tool calls as they
# happen. The `post_tool_call` hook fires after every tool
# dispatch (write_file, terminal, search_files, etc.) with the
Expand Down Expand Up @@ -1746,6 +1802,13 @@ def _reconnect_bridge(self, session_id: str = "", *, timeout: float = 30.0) -> N
logger.info("MemOS: old bridge closed (pid=%s)", old_pid)

ensure_bridge_running()
# Try HTTP first if daemon is running
viewer_status = probe_viewer_status()
if viewer_status == "running_memos":
if self._connect_http_bridge(session_id, timeout=timeout):
logger.info("MemOS: reconnected via HTTP")
return

try:
ensure_viewer_daemon()
except Exception as err:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import shutil
import subprocess
import threading
import time
import urllib.error
import urllib.request

from pathlib import Path
from typing import TYPE_CHECKING, Any
Expand Down Expand Up @@ -419,4 +422,123 @@ def _resolve(self, msg: dict[str, Any]) -> None:
entry["error"] = msg["error"]
else:
entry["result"] = msg.get("result")
entry["event"].set()


class MemosHttpClient:
"""JSON-RPC 2.0 client that talks to the daemon bridge over HTTP.

Drop-in replacement for ``MemosBridgeClient`` when a daemon is already
running on the viewer port. Instead of spawning a new subprocess, this
client POSTs JSON-RPC envelopes to ``/api/v1/rpc`` on the daemon's HTTP
server. This eliminates zombie bridge accumulation.

Limitations vs. stdio client:
- No reverse-direction RPC (``host.llm.complete``). The daemon's own
stdio bridge handles host LLM fallback internally.
- No ``notify()`` (notifications have no response; use ``request()``
for all calls — the daemon handles both).
"""

def __init__(
self,
*,
port: int = 18800,
host: str = "127.0.0.1",
api_key: str | None = None,
) -> None:
self._base_url = f"http://{host}:{port}/api/v1/rpc"
self._lock = threading.Lock()
self._next_id = 1
self._closed = False
self._api_key = api_key
self._host_handlers: dict[str, Callable[[dict[str, Any]], Any]] = {}

@property
def pid(self) -> int:
"""Return 0 — HTTP client has no subprocess."""
return 0

# ─── Public API (matches MemosBridgeClient) ──

def request(
self,
method: str,
params: Any = None,
*,
timeout: float = 30.0,
) -> dict[str, Any]:
if self._closed:
raise BridgeError("transport_closed", "HTTP client is closed")
with self._lock:
rpc_id = self._next_id
self._next_id += 1

envelope = {
"jsonrpc": "2.0",
"id": rpc_id,
"method": method,
"params": params,
}
payload = json.dumps(envelope, ensure_ascii=False).encode("utf-8")

req = urllib.request.Request(
self._base_url,
data=payload,
headers={"Content-Type": "application/json"},
method="POST",
)
if self._api_key:
req.add_header("Authorization", f"Bearer {self._api_key}")

try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
body = resp.read(4_194_304) # 4 MiB safety cap
except urllib.error.HTTPError as exc:
# Try to read the JSON-RPC error body
try:
err_body = exc.read(4_194_304)
err_json = json.loads(err_body.decode("utf-8", errors="replace"))
err_obj = err_json.get("error") or {}
raise BridgeError(
err_obj.get("data", {}).get("code") or str(err_obj.get("code", "internal")),
err_obj.get("message", f"HTTP {exc.code}"),
err_obj.get("data"),
) from exc
except (json.JSONDecodeError, UnicodeDecodeError):
raise BridgeError("internal", f"HTTP {exc.code}: {exc.reason}") from exc
except urllib.error.URLError as exc:
raise BridgeError("transport_closed", str(exc)) from exc

result = json.loads(body.decode("utf-8", errors="replace"))
if "error" in result:
e = result["error"]
raise BridgeError(
(e.get("data") or {}).get("code") or str(e.get("code", "internal")),
e.get("message", "unknown error"),
e.get("data"),
)
return result.get("result") or {}

def notify(self, method: str, params: Any = None) -> None:
"""No-op for HTTP — use request() for all calls."""
try:
self.request(method, params, timeout=5.0)
except Exception:
pass

def on_event(self, cb: Callable[[dict[str, Any]], None]) -> None:
"""No-op — SSE events not supported over HTTP transport."""

def on_log(self, cb: Callable[[dict[str, Any]], None]) -> None:
"""No-op — SSE logs not supported over HTTP transport."""

def register_host_handler(
self,
method: str,
handler: Callable[[dict[str, Any]], Any],
) -> None:
"""Store the handler but it won't be called (daemon handles host LLM internally)."""
self._host_handlers[method] = handler

def close(self) -> None:
self._closed = True
Loading