Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions deploy_tee/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,14 @@ def configure(argv: tuple[str, ...]) -> None:
forward(configure_mod.main, "seismic-tee configure", argv)


@app.command(name="status", context_settings=PASSTHROUGH, add_help_option=False)
@click.argument("argv", nargs=-1, type=click.UNPROCESSED)
def status(argv: tuple[str, ...]) -> None:
"""Watch a node's first-boot LUKS provisioning progress."""
from deploy_tee import status as status_mod

forward(status_mod.main, "seismic-tee status", argv)


if __name__ == "__main__":
app()
26 changes: 25 additions & 1 deletion deploy_tee/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from deploy_tee import manifest as manifest_mod
from deploy_tee.descriptor import load_descriptor, require
from deploy_tee.proxy import ProxyClient
from deploy_tee.status import watch_luks_provisioning
from deploy_tee.utils.logging_setup import setup_logging

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -198,6 +199,16 @@ def parse_args() -> argparse.Namespace:
default=False,
help="Print proxy/verify logs as they run",
)
parser.add_argument(
"--no-wait",
action="store_true",
default=False,
help=(
"Don't watch first-boot LUKS provisioning after POSTing. Default "
"is to watch (ctrl-C to stop); use this for CI/headless runs where "
"there's no TTY to interrupt and the wipe can take 1h+."
),
)
args = parser.parse_args()
if not args.node.is_file():
raise SystemExit(f"--node descriptor not found: {args.node}")
Expand Down Expand Up @@ -233,8 +244,21 @@ def main() -> None:

logger.info(f"Configuring node {fqdn} ({public_ip})...")
post_config_to_tdx_init(public_ip, merged_config)
logger.info("Attestation verification is not yet wired (pending attested-tls).")
logger.info("config delivered to tdx-init.")

if not args.no_wait:
# Watch the first-boot LUKS wipe — the long, otherwise-opaque phase.
# Purely local observability: the POST already landed, so ctrl-C here
# only stops watching; the node keeps provisioning in the background.
try:
watch_luks_provisioning(public_ip)
except KeyboardInterrupt:
print("\nStopped watching — node still provisioning in the background.")

_print_summary(fqdn, public_ip)


def _print_summary(fqdn: str, public_ip: str) -> None:
print("\n" + "=" * 80)
print("NODE CONFIGURED")
print("=" * 80)
Expand Down
224 changes: 224 additions & 0 deletions deploy_tee/status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
#!/usr/bin/env python3
"""Watch a node's first-boot LUKS-provisioning progress.

enclave-server serves `getLuksProvisioningStatus` on :7878 (JSON-RPC) for
the duration of the first-boot disk wipe — the one long (1h+), otherwise
opaque phase. This module polls it and renders a progress bar, and is the
shared poller behind both `seismic-tee status` and `configure`'s default
post-POST wait.

States (see enclave/crates/tdx-init's `LuksProvisioningStatus`):
provisioning {bytes_done, bytes_total, eta_seconds?} | idle | error {error} | unknown

Watch-completion is deliberately conservative about `idle`: right after a
POST the server may be down (connection refused) or up-but-idle *before* the
wipe starts, which looks identical to idle-because-finished. So we treat
idle as "done" only after we've seen provisioning; otherwise we wait a short
grace for the wipe to begin and, if it never does, conclude there's no wipe
(already finished, or a fast-unlock restart). This watches only the wipe —
it is NOT a node-readiness gate (summit/reth/genesis come later).
"""

import argparse
import json
import sys
import time
from pathlib import Path

import requests

from deploy_tee.descriptor import load_descriptor, require
from deploy_tee.utils.logging_setup import setup_logging

ENCLAVE_PORT = 7878
POLL_INTERVAL_SECONDS = 5
# Max wait for :7878 to first respond — covers enclave-server startup (and, on
# a joiner, the root_key fetch from peers that precedes the listener coming up).
CONNECT_TIMEOUT_SECONDS = 180
# Once reachable and idle, how long to wait for the wipe to begin before
# concluding none is in progress. The enclave-server-up→first-wipe-tick gap
# (udev settle, disk discovery, luksFormat warm-up) is seconds; a fast-unlock
# restart stays idle forever, so this bounds the wait instead of hanging.
IDLE_GRACE_SECONDS = 60


def fetch_status(public_ip: str, *, timeout: int = 10) -> dict:
"""One getLuksProvisioningStatus call → the result dict (its `state` plus
any state-specific fields). Raises requests.RequestException if the server
isn't reachable (normal while enclave-server is still coming up)."""
url = f"http://{public_ip}:{ENCLAVE_PORT}"
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "getLuksProvisioningStatus",
"params": [],
}
resp = requests.post(url, json=payload, timeout=timeout)
resp.raise_for_status()
data = resp.json()
if "error" in data:
raise RuntimeError(f"RPC error from {url}: {data['error']}")
return data["result"]


def _gib(n: int) -> str:
return f"{n / 2**30:.1f}"


def _duration(seconds: int) -> str:
seconds = int(seconds)
hours, rem = divmod(seconds, 3600)
minutes, secs = divmod(rem, 60)
if hours:
return f"{hours}h{minutes:02d}m"
if minutes:
return f"{minutes}m{secs:02d}s"
return f"{secs}s"


def _bar(pct: float, width: int = 30) -> str:
filled = max(0, min(width, int(pct / 100 * width)))
return "[" + "#" * filled + "-" * (width - filled) + "]"


def format_provisioning(status: dict) -> str:
"""Render a `provisioning` status as a one-line progress string. A
bytes_total of 0 is the 'just started, no measurement yet' marker."""
done = status.get("bytes_done", 0)
total = status.get("bytes_total", 0)
if not total:
return "provisioning: starting (no measurement yet)"
pct = 100.0 * done / total
line = f"{_bar(pct)} {pct:5.1f}% {_gib(done)}/{_gib(total)} GiB"
eta = status.get("eta_seconds")
if eta:
line += f" eta {_duration(eta)}"
return line


def watch_luks_provisioning(
public_ip: str, *, interval: int = POLL_INTERVAL_SECONDS
) -> int:
"""Poll getLuksProvisioningStatus and render progress until the wipe
finishes, errors, or we conclude none is in progress. Returns a process
exit code (0 = wipe done / no wipe; 1 = wipe error or never reachable).

Renders an in-place bar on a TTY; plain lines otherwise (so CI logs stay
readable). Raises KeyboardInterrupt up to the caller on ctrl-C.
"""
isatty = sys.stdout.isatty()
start = time.monotonic()
first_reachable: float | None = None
seen_provisioning = False
on_bar_line = False # a TTY bar (no trailing newline) is currently shown

def emit(line: str, *, transient: bool) -> None:
nonlocal on_bar_line
if isatty and transient:
print(f"\r\033[K{line}", end="", flush=True)
on_bar_line = True
else:
if on_bar_line:
print() # close the in-place bar before a permanent line
on_bar_line = False
print(line, flush=True)

while True:
try:
status = fetch_status(public_ip)
except requests.RequestException:
if time.monotonic() - start > CONNECT_TIMEOUT_SECONDS:
emit(
f"enclave-server :{ENCLAVE_PORT} never became reachable "
f"after {CONNECT_TIMEOUT_SECONDS}s — is the node up?",
transient=False,
)
return 1
emit(f"waiting for enclave-server :{ENCLAVE_PORT} ...", transient=True)
time.sleep(interval)
continue

first_reachable = first_reachable or time.monotonic()
state = status.get("state")

if state == "provisioning":
seen_provisioning = True
emit(format_provisioning(status), transient=True)
elif state == "error":
emit(
f"LUKS provisioning failed: {status.get('error', '?')}",
transient=False,
)
return 1
elif state == "unknown":
emit("status pipeline returned 'unknown' — still polling", transient=False)
elif state == "idle":
if seen_provisioning:
emit("disk provisioning complete.", transient=False)
return 0
if time.monotonic() - first_reachable > IDLE_GRACE_SECONDS:
emit(
"no first-boot wipe in progress (already finished, or a "
"fast-unlock restart).",
transient=False,
)
return 0
emit("waiting for provisioning to start ...", transient=True)
else:
emit(f"unexpected status {state!r} — still polling", transient=False)

time.sleep(interval)


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Watch a node's first-boot LUKS provisioning progress."
)
parser.add_argument(
"--node",
type=Path,
required=True,
metavar="DESCRIPTOR",
help="Node descriptor JSON (provides public_ip); see deploy_tee/descriptor.py.",
)
parser.add_argument(
"--once",
action="store_true",
help="Print the current status as JSON and exit (no polling).",
)
parser.add_argument(
"--interval",
type=int,
default=POLL_INTERVAL_SECONDS,
metavar="SECONDS",
help=f"Poll interval (default {POLL_INTERVAL_SECONDS}s).",
)
args = parser.parse_args()
if not args.node.is_file():
raise SystemExit(f"--node descriptor not found: {args.node}")
return args


def main() -> None:
setup_logging()
args = parse_args()
public_ip = require(load_descriptor(args.node), "public_ip", args.node)

if args.once:
try:
print(json.dumps(fetch_status(public_ip)))
except requests.RequestException as e:
raise SystemExit(
f"enclave-server :{ENCLAVE_PORT} not reachable: {e}"
) from None
return

try:
raise SystemExit(watch_luks_provisioning(public_ip, interval=args.interval))
except KeyboardInterrupt:
print("\nStopped watching.")
raise SystemExit(130) from None


if __name__ == "__main__":
main()
70 changes: 70 additions & 0 deletions deploy_tee/tests/test_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
"""Tests for deploy_tee.status (stdlib unittest; no test deps in this repo).

Run with:
uv run python -m unittest discover -s deploy_tee/tests -v
"""

import unittest
from unittest import mock

from deploy_tee import status


class FormatTests(unittest.TestCase):
def test_provisioning_with_eta(self):
line = status.format_provisioning(
{"bytes_done": 2**30, "bytes_total": 4 * 2**30, "eta_seconds": 291}
)
self.assertIn("25.0%", line)
self.assertIn("1.0/4.0 GiB", line)
self.assertIn("eta 4m51s", line)

def test_provisioning_without_eta(self):
line = status.format_provisioning({"bytes_done": 0, "bytes_total": 2**30})
self.assertIn("0.0%", line)
self.assertNotIn("eta", line)

def test_zero_total_is_indeterminate(self):
# bytes_total 0 is the "just started, no measurement yet" marker — must
# not divide by zero.
line = status.format_provisioning({"bytes_done": 0, "bytes_total": 0})
self.assertIn("starting", line)

def test_duration_formats(self):
self.assertEqual(status._duration(45), "45s")
self.assertEqual(status._duration(291), "4m51s")
self.assertEqual(status._duration(3725), "1h02m")

def test_bar_is_clamped(self):
self.assertEqual(status._bar(0, width=10), "[----------]")
self.assertEqual(status._bar(100, width=10), "[##########]")
# Out-of-range percentages must not overflow the bar width.
self.assertEqual(len(status._bar(150, width=10)), 12)
self.assertEqual(len(status._bar(-5, width=10)), 12)


class FetchStatusTests(unittest.TestCase):
def _resp(self, body: dict):
resp = mock.Mock()
resp.raise_for_status.return_value = None
resp.json.return_value = body
return resp

def test_extracts_result_and_builds_request(self):
resp = self._resp({"jsonrpc": "2.0", "id": 1, "result": {"state": "idle"}})
with mock.patch.object(status.requests, "post", return_value=resp) as post:
result = status.fetch_status("1.2.3.4")
self.assertEqual(result, {"state": "idle"})
url, kwargs = post.call_args[0][0], post.call_args[1]
self.assertEqual(url, "http://1.2.3.4:7878")
self.assertEqual(kwargs["json"]["method"], "getLuksProvisioningStatus")

def test_raises_on_rpc_error(self):
resp = self._resp({"jsonrpc": "2.0", "id": 1, "error": {"message": "boom"}})
with mock.patch.object(status.requests, "post", return_value=resp):
with self.assertRaises(RuntimeError):
status.fetch_status("1.2.3.4")


if __name__ == "__main__":
unittest.main()
Loading