Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 12 additions & 33 deletions src/api/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,14 @@
import hashlib
import hmac
import logging
import time
from collections import defaultdict
from typing import Optional

from fastapi import Depends, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt

from src.config import settings
from src.database.control_plane_store import control_plane_store
from src.database.api_key_store import APIKeyStore
from src.database.user_store import UserStore
from src.pipelines.ingest import IngestPipeline
Expand Down Expand Up @@ -175,7 +174,13 @@ async def require_api_key(
return user

# 2. Check MongoDB for user-generated API keys
key_doc = _api_key_store.validate_api_key(token)
try:
key_doc = _api_key_store.validate_api_key(token)
except RuntimeError as exc:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=str(exc),
)
if key_doc:
user_id = key_doc.get("user_id")
if user_id:
Expand Down Expand Up @@ -276,40 +281,14 @@ async def require_user(current_user: Optional[dict] = Depends(get_current_user))
return current_user


# ═══════════════════════════════════════════════════════════════════════════
# Sliding-window rate limiter (in-process, per-key)
# ═══════════════════════════════════════════════════════════════════════════

class _SlidingWindowRateLimiter:
"""Thread-safe sliding-window counter keyed by API identity."""

def __init__(self, max_requests: int, window_seconds: int = 60):
self.max_requests = max_requests
self.window = window_seconds
self._hits: dict[str, list[float]] = defaultdict(list)
self._lock = asyncio.Lock()
class _ControlPlaneRateLimiter:
"""Rate limiter backed by shared control-plane storage."""

async def check(self, key: str) -> tuple[bool, int]:
"""Return (allowed, remaining) for *key*."""
now = time.monotonic()
cutoff = now - self.window

async with self._lock:
timestamps = self._hits[key]
self._hits[key] = [t for t in timestamps if t > cutoff]

if len(self._hits[key]) >= self.max_requests:
return False, 0

self._hits[key].append(now)
remaining = self.max_requests - len(self._hits[key])
return True, remaining
return control_plane_store.check_rate_limit(key, settings.rate_limit, 60)


_rate_limiter = _SlidingWindowRateLimiter(
max_requests=settings.rate_limit,
window_seconds=60,
)
_rate_limiter = _ControlPlaneRateLimiter()


async def enforce_rate_limit(
Expand Down
39 changes: 25 additions & 14 deletions src/api/routes/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

from src.config import settings
from src.config.analytics import analytics
from src.database.control_plane_store import control_plane_store

logger = logging.getLogger("xmem.api.admin")

Expand All @@ -45,7 +46,6 @@
# ═══════════════════════════════════════════════════════════════════════════

_admin_collection = None
_admin_sessions: Dict[str, Dict[str, Any]] = {} # token → {user, expires}


def _get_admin_collection():
Expand Down Expand Up @@ -88,15 +88,18 @@ def _verify_admin_token(request: Request) -> Dict[str, Any]:
if auth.startswith("Bearer "):
token = auth[7:]

if not token or token not in _admin_sessions:
if not token:
raise HTTPException(status_code=401, detail="Not authenticated")

session = _admin_sessions[token]
if datetime.now(timezone.utc) > session["expires"]:
del _admin_sessions[token]
try:
session_user = control_plane_store.get_admin_session(token)
except RuntimeError:
raise HTTPException(status_code=503, detail="Admin session storage unavailable")

if not session_user:
raise HTTPException(status_code=401, detail="Session expired")

return session["user"]
return session_user


# ═══════════════════════════════════════════════════════════════════════════
Expand All @@ -115,11 +118,10 @@ async def admin_login(req: AdminLoginRequest):
raise HTTPException(status_code=401, detail="Invalid credentials")

# Generate session token
token = hashlib.sha256(f"{req.username}{time.time()}".encode()).hexdigest()
_admin_sessions[token] = {
"user": {"username": user["username"], "role": user.get("role", "admin")},
"expires": datetime.now(timezone.utc) + timedelta(hours=24),
}
token = control_plane_store.create_admin_session(
{"username": user["username"], "role": user.get("role", "admin")},
ttl_hours=24,
)

response = JSONResponse({"status": "ok", "token": token, "username": user["username"]})
response.set_cookie(
Expand All @@ -135,8 +137,11 @@ async def admin_login(req: AdminLoginRequest):
@router.post("/api/logout")
async def admin_logout(request: Request):
token = request.cookies.get("xmem_admin_token")
if token and token in _admin_sessions:
del _admin_sessions[token]
if token:
try:
control_plane_store.delete_admin_session(token)
except RuntimeError:
pass
response = JSONResponse({"status": "ok"})
response.delete_cookie("xmem_admin_token")
return response
Expand Down Expand Up @@ -220,7 +225,13 @@ async def ws_live_logs(websocket: WebSocket):

# Validate auth token from query param
token = websocket.query_params.get("token", "")
if token not in _admin_sessions:
try:
session_user = control_plane_store.get_admin_session(token)
except RuntimeError:
await websocket.close(code=1011, reason="Admin session storage unavailable")
return

if not session_user:
await websocket.close(code=4001, reason="Not authenticated")
return

Expand Down
46 changes: 29 additions & 17 deletions src/api/routes/api_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,13 @@ async def list_api_keys(

Returns metadata about each key but NOT the actual key values.
"""
keys = api_key_store.get_user_api_keys(
user_id=current_user["id"],
include_inactive=include_inactive
)
try:
keys = api_key_store.get_user_api_keys(
user_id=current_user["id"],
include_inactive=include_inactive
)
except RuntimeError as exc:
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=str(exc))

# Convert to response model
key_responses = [
Expand Down Expand Up @@ -118,10 +121,13 @@ async def create_api_key(
WARNING: The full API key is only returned once in this response.
Make sure to save it securely - it cannot be retrieved again.
"""
result = api_key_store.create_api_key(
user_id=current_user["id"],
name=request.name,
)
try:
result = api_key_store.create_api_key(
user_id=current_user["id"],
name=request.name,
)
except RuntimeError as exc:
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=str(exc))

return APIKeyCreateResponse(
key=result["key"],
Expand All @@ -138,11 +144,14 @@ async def update_api_key(
current_user: dict = Depends(require_auth),
):
"""Update an API key's name."""
success = api_key_store.update_api_key_name(
user_id=current_user["id"],
key_id=key_id,
new_name=request.name,
)
try:
success = api_key_store.update_api_key_name(
user_id=current_user["id"],
key_id=key_id,
new_name=request.name,
)
except RuntimeError as exc:
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=str(exc))

if not success:
raise HTTPException(
Expand Down Expand Up @@ -180,10 +189,13 @@ async def revoke_api_key(
Once revoked, the key cannot be used for authentication.
This action is reversible - you can reactivate a key if needed (not implemented yet).
"""
success = api_key_store.revoke_api_key(
user_id=current_user["id"],
key_id=key_id,
)
try:
success = api_key_store.revoke_api_key(
user_id=current_user["id"],
key_id=key_id,
)
except RuntimeError as exc:
raise HTTPException(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, detail=str(exc))

if not success:
raise HTTPException(
Expand Down
Loading
Loading