Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
__pycache__/
5 changes: 5 additions & 0 deletions sentinel/.trace-tests-config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
trace_tests:
enabled: true
test_data_dir: ./test_data
output_dir: ./test_reports
max_traces_per_run: 100
13 changes: 13 additions & 0 deletions sentinel/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
version: '3.8'

services:
sentinel:
build: .
ports:
- "8001:8001"
environment:
- QUARANTINE_THRESHOLD=0.7
- TRACE_PRIVATE_KEY_PEM=optional
volumes:
- ./data:/app/data
restart: unless-stopped
4 changes: 4 additions & 0 deletions sentinel/integration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ metadata:
category: governance
maintainer:
name: Akhilesh Warik
<<<<<<< HEAD
email: warik.akhilesh@gmail.com
=======
email: akhilesh.warik@example.com
>>>>>>> upstream/main
github: a1k7
license: MIT
version: 2.0.0
Expand Down
6 changes: 5 additions & 1 deletion sentinel/src/detectors/collusion_detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,8 @@ def _risk_level(self, score: float) -> RiskLevel:
if score < 0.3: return RiskLevel.LOW
if score < 0.6: return RiskLevel.MEDIUM
if score < 0.8: return RiskLevel.HIGH
return RiskLevel.CRITICAL
return RiskLevel.CRITICAL
if __name__ == "__main__":
# Quick test
from ..models import DetectionResult, RiskLevel
print("CollusionDetector loaded successfully.")
6 changes: 6 additions & 0 deletions sentinel/src/trace_claim_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,19 @@
import base64
from datetime import datetime
from typing import Dict, Any
<<<<<<< HEAD

from src.models import DetectionResult, TraceClaim, DetectionType

=======
from src.models import DetectionResult, TraceClaim, DetectionType

import time
import hashlib
from datetime import datetime
from src.models import DetectionResult, TraceClaim

>>>>>>> upstream/main
def generate_trace_claim(agent_id: str, detection: DetectionResult, decision: str = "ADMIT") -> TraceClaim:
claim_id = f"sentinel-{int(time.time())}-{hashlib.md5(f'{agent_id}{detection.detection_type}'.encode()).hexdigest()[:8]}"
claim_payload = {
Expand Down
27 changes: 27 additions & 0 deletions sentinel/tests/test_detectors.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import pytest
from src.detectors import DelegationEscalationDetector
from src.models import DetectionResult, DetectionType, RiskLevel

def test_delegation_escalation_detector():
detector = DelegationEscalationDetector()
# Simulate a trace with delegation chain length > 2
trace = {
"agent_id": "test-agent",
"delegation_chain": ["root", "admin", "superadmin"], # length 3 > threshold 2
"policy_version": "v1"
}
result = detector.detect(trace)
assert result is not None
assert result.detection_type == DetectionType.DELEGATION_ESCALATION
assert result.risk_score > 0.5
assert result.risk_level in (RiskLevel.MEDIUM, RiskLevel.HIGH)

def test_delegation_escalation_detector_clean():
detector = DelegationEscalationDetector()
trace = {
"agent_id": "test-agent",
"delegation_chain": ["root", "admin"], # length 2 allowed
"policy_version": "v1"
}
result = detector.detect(trace)
assert result is None or result.risk_score < 0.5
20 changes: 20 additions & 0 deletions sentinel/tests/test_integration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import pytest
import json
from src.cli import main as cli_main
from src.server import app
from fastapi.testclient import TestClient

client = TestClient(app)

def test_health_endpoint():
response = client.get("/health")
assert response.status_code == 200
assert response.json() == {"status": "ok"}

def test_cli_import():
# Smoke test: ensure CLI can be imported without errors
try:
from src.cli import main
assert callable(main)
except ImportError:
pytest.fail("CLI module not importable")
Loading