From 0629527d75c99c47f1a06ae80cb99baf6b018514 Mon Sep 17 00:00:00 2001 From: Viswanadha Pratap Kondoju Date: Sun, 28 Jun 2026 17:49:48 -0400 Subject: [PATCH 1/4] feat: add DelegationAuthPlugin for scoped agent authorization Pluggable authorization plugin that verifies agent credentials and permissions before tool execution. Supports per-tool permission requirements and delegation scoping (child agents can only narrow permissions, never expand). - CredentialVerifier ABC: implement verify() for any identity system - StructuralVerifier: built-in development stub - Per-tool permission mapping via tool_permissions dict - Audit log for authorization decisions - 10 tests covering allow, deny, permissions, expiry, exceptions Co-Authored-By: Claude Opus 4.6 (1M context) --- src/google/adk_community/plugins/__init__.py | 10 + .../plugins/delegation_auth_plugin.py | 307 ++++++++++++++++++ tests/plugins/test_delegation_auth_plugin.py | 235 ++++++++++++++ 3 files changed, 552 insertions(+) create mode 100644 src/google/adk_community/plugins/delegation_auth_plugin.py create mode 100644 tests/plugins/test_delegation_auth_plugin.py diff --git a/src/google/adk_community/plugins/__init__.py b/src/google/adk_community/plugins/__init__.py index 2e4b2ee..c263969 100644 --- a/src/google/adk_community/plugins/__init__.py +++ b/src/google/adk_community/plugins/__init__.py @@ -15,6 +15,12 @@ from google.adk_community.plugins.agent_governance_plugin import ( AgentGovernancePlugin, ) +from google.adk_community.plugins.delegation_auth_plugin import ( + CredentialVerifier, + DelegationAuthPlugin, + StructuralVerifier, + VerificationResult, +) from google.adk_community.plugins.taxonomy import ( DefaultSkillPolicy, SkillPolicy, @@ -27,6 +33,10 @@ __all__ = [ "AgentGovernancePlugin", + "CredentialVerifier", + "DelegationAuthPlugin", + "StructuralVerifier", + "VerificationResult", "DefaultSkillPolicy", "SkillPolicy", "TaxonomyPipeline", diff --git a/src/google/adk_community/plugins/delegation_auth_plugin.py b/src/google/adk_community/plugins/delegation_auth_plugin.py new file mode 100644 index 0000000..942c33e --- /dev/null +++ b/src/google/adk_community/plugins/delegation_auth_plugin.py @@ -0,0 +1,307 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""ADK plugin for delegation-scoped agent authorization. + +Verifies that an agent holds a valid credential with sufficient permissions +before executing a tool. Credentials are scoped: a delegated agent can only +use a subset of its parent's permissions, never more. + +The plugin uses a pluggable verifier interface. A structural verifier is +included for development; plug in a real verifier (DID, JWT, ZKP, etc.) +for production. + +Example:: + + from google.adk_community.plugins import DelegationAuthPlugin + + plugin = DelegationAuthPlugin( + required_permissions={"read_data", "financial_small"}, + ) + runner = Runner(agent=my_agent, plugins=[plugin], ...) +""" + +from __future__ import annotations + +import logging +import time +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from typing import Any, Optional + +from google.adk.plugins.base_plugin import BasePlugin +from google.adk.tools.base_tool import BaseTool +from google.adk.tools.tool_context import ToolContext + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Pluggable verifier interface +# --------------------------------------------------------------------------- + +@dataclass +class VerificationResult: + """Result of a credential verification check.""" + + valid: bool + agent_id: str = "" + permissions: set[str] = field(default_factory=set) + expiry: float = 0.0 + metadata: dict[str, Any] = field(default_factory=dict) + reason: str = "" + + +class CredentialVerifier(ABC): + """Interface for agent credential verification. + + Implement this to plug in any identity system: DID verification, + JWT validation, ZKP proof checking, API key lookup, etc. + """ + + @abstractmethod + def verify(self, credential: str) -> VerificationResult: + """Verify a credential string and return the result. + + Args: + credential: The raw credential from the agent (typically from + a header or session context). + + Returns: + VerificationResult with valid=True if the credential checks out. + """ + + +class StructuralVerifier(CredentialVerifier): + """Development verifier that checks credential structure only. + + Accepts any JSON-parseable credential with the required fields. + NOT for production — use a real verifier (DID, JWT, ZKP, etc.). + """ + + def verify(self, credential: str) -> VerificationResult: + import json + + try: + data = json.loads(credential) + except (json.JSONDecodeError, TypeError): + return VerificationResult( + valid=False, reason="credential is not valid JSON" + ) + + agent_id = data.get("agent_id", "") + if not agent_id: + return VerificationResult( + valid=False, reason="missing agent_id field" + ) + + permissions = set(data.get("permissions", [])) + expiry = data.get("expiry", 0) + + if expiry and expiry < time.time(): + return VerificationResult( + valid=False, + agent_id=agent_id, + reason="credential expired", + ) + + return VerificationResult( + valid=True, + agent_id=agent_id, + permissions=permissions, + expiry=expiry, + metadata=data.get("metadata", {}), + ) + + +# --------------------------------------------------------------------------- +# Per-tool permission mapping +# --------------------------------------------------------------------------- + +# Default: every tool requires these permissions. Override with +# tool_permissions to set per-tool requirements. +_DEFAULT_PERMISSIONS: set[str] = {"read_data"} + + +# --------------------------------------------------------------------------- +# Plugin +# --------------------------------------------------------------------------- + +class DelegationAuthPlugin(BasePlugin): + """ADK plugin that enforces delegation-scoped authorization. + + Before each tool call, the plugin: + 1. Reads the agent's credential from session state + 2. Verifies it using the configured verifier + 3. Checks that the credential's permissions cover the tool's requirements + 4. Blocks execution if any check fails + + Delegation scoping: credentials can only narrow permissions, never expand. + If agent A delegates to agent B, B's permission set is always a subset + of A's. The verifier enforces this at credential issuance; this plugin + enforces it at execution time. + + Args: + required_permissions: Default permissions required for any tool call. + Individual tools can override via ``tool_permissions``. + tool_permissions: Per-tool permission requirements. Keys are tool + names, values are sets of required permission strings. + verifier: Credential verifier instance. Defaults to + ``StructuralVerifier`` (development only). + credential_key: Session state key where the agent's credential + is stored. Defaults to ``"agent_credential"``. + fail_open: If True, allow tool calls when no credential is present. + Defaults to False. + + Example:: + + plugin = DelegationAuthPlugin( + required_permissions={"read_data"}, + tool_permissions={ + "execute_payment": {"read_data", "financial_small"}, + "sign_contract": {"read_data", "sign_on_behalf"}, + }, + ) + """ + + def __init__( + self, + required_permissions: set[str] | None = None, + tool_permissions: dict[str, set[str]] | None = None, + verifier: CredentialVerifier | None = None, + credential_key: str = "agent_credential", + fail_open: bool = False, + ) -> None: + super().__init__(name="delegation_auth") + self._required = required_permissions or _DEFAULT_PERMISSIONS + self._tool_permissions = tool_permissions or {} + self._verifier = verifier or StructuralVerifier() + self._credential_key = credential_key + self._fail_open = fail_open + self._audit_log: list[dict[str, Any]] = [] + + async def before_tool_callback( + self, + *, + tool: BaseTool, + tool_args: dict[str, Any], + tool_context: ToolContext, + ) -> Optional[dict]: + """Verify agent authorization before tool execution. + + Returns None to allow the tool to proceed, or a dict response + to short-circuit execution when authorization fails. + """ + # Read credential from session state + credential = None + if hasattr(tool_context, "state") and tool_context.state: + credential = tool_context.state.get(self._credential_key) + + if not credential: + if self._fail_open: + logger.warning( + "No credential found for tool %s; fail_open=True, allowing", + tool.name, + ) + return None + self._log_denial(tool.name, "no_credential", tool_args) + return { + "error": "authorization_required", + "message": ( + f"Tool '{tool.name}' requires agent authorization. " + f"Set '{self._credential_key}' in session state." + ), + } + + # Verify the credential + try: + result = self._verifier.verify(credential) + except Exception as exc: + logger.error("Verifier raised for tool %s: %s", tool.name, exc) + self._log_denial(tool.name, "verifier_error", tool_args) + return { + "error": "verification_failed", + "message": f"Credential verification error: {exc}", + } + + if not result.valid: + self._log_denial( + tool.name, result.reason or "invalid_credential", tool_args + ) + return { + "error": "authorization_denied", + "message": ( + f"Agent '{result.agent_id}' denied access to " + f"'{tool.name}': {result.reason}" + ), + } + + # Check permissions + required = self._tool_permissions.get(tool.name, self._required) + missing = required - result.permissions + if missing: + self._log_denial( + tool.name, + f"missing_permissions: {sorted(missing)}", + tool_args, + agent_id=result.agent_id, + ) + return { + "error": "insufficient_permissions", + "message": ( + f"Agent '{result.agent_id}' lacks permissions " + f"{sorted(missing)} for tool '{tool.name}'" + ), + } + + # Authorized — log and proceed + logger.info( + "Agent %s authorized for %s (permissions: %s)", + result.agent_id, + tool.name, + sorted(result.permissions), + ) + self._log_allow(tool.name, result.agent_id, tool_args) + return None + + def _log_allow( + self, tool: str, agent_id: str, tool_args: dict[str, Any] + ) -> None: + self._audit_log.append({ + "action": "allow", + "tool": tool, + "agent_id": agent_id, + "timestamp": time.time(), + }) + + def _log_denial( + self, + tool: str, + reason: str, + tool_args: dict[str, Any], + agent_id: str = "", + ) -> None: + self._audit_log.append({ + "action": "deny", + "tool": tool, + "agent_id": agent_id, + "reason": reason, + "timestamp": time.time(), + }) + logger.warning("DENIED: tool=%s agent=%s reason=%s", tool, agent_id, reason) + + @property + def audit_log(self) -> list[dict[str, Any]]: + """Read-only access to the audit trail.""" + return list(self._audit_log) diff --git a/tests/plugins/test_delegation_auth_plugin.py b/tests/plugins/test_delegation_auth_plugin.py new file mode 100644 index 0000000..29c1622 --- /dev/null +++ b/tests/plugins/test_delegation_auth_plugin.py @@ -0,0 +1,235 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for DelegationAuthPlugin.""" + +from __future__ import annotations + +import json +import time +from typing import Any, Optional +from unittest.mock import MagicMock + +import pytest + +from google.adk_community.plugins.delegation_auth_plugin import ( + CredentialVerifier, + DelegationAuthPlugin, + StructuralVerifier, + VerificationResult, +) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _make_tool(name: str = "test_tool") -> MagicMock: + tool = MagicMock() + tool.name = name + return tool + + +def _make_context(credential: Optional[str] = None) -> MagicMock: + ctx = MagicMock() + ctx.state = {} + if credential is not None: + ctx.state["agent_credential"] = credential + return ctx + + +def _valid_credential( + agent_id: str = "agent-1", + permissions: list[str] | None = None, + expiry: float | None = None, +) -> str: + data = { + "agent_id": agent_id, + "permissions": permissions or ["read_data"], + } + if expiry is not None: + data["expiry"] = expiry + return json.dumps(data) + + +# --------------------------------------------------------------------------- +# StructuralVerifier tests +# --------------------------------------------------------------------------- + +class TestStructuralVerifier: + def test_valid_credential(self): + v = StructuralVerifier() + result = v.verify(_valid_credential()) + assert result.valid + assert result.agent_id == "agent-1" + assert "read_data" in result.permissions + + def test_invalid_json(self): + v = StructuralVerifier() + result = v.verify("not json") + assert not result.valid + assert "not valid JSON" in result.reason + + def test_missing_agent_id(self): + v = StructuralVerifier() + result = v.verify(json.dumps({"permissions": ["read_data"]})) + assert not result.valid + assert "missing agent_id" in result.reason + + def test_expired_credential(self): + v = StructuralVerifier() + result = v.verify(_valid_credential(expiry=time.time() - 100)) + assert not result.valid + assert "expired" in result.reason + + def test_future_expiry_is_valid(self): + v = StructuralVerifier() + result = v.verify(_valid_credential(expiry=time.time() + 3600)) + assert result.valid + + +# --------------------------------------------------------------------------- +# DelegationAuthPlugin tests +# --------------------------------------------------------------------------- + +class TestDelegationAuthPlugin: + @pytest.mark.asyncio + async def test_no_credential_blocks(self): + plugin = DelegationAuthPlugin() + result = await plugin.before_tool_callback( + tool=_make_tool(), + tool_args={}, + tool_context=_make_context(credential=None), + ) + assert result is not None + assert result["error"] == "authorization_required" + + @pytest.mark.asyncio + async def test_no_credential_fail_open(self): + plugin = DelegationAuthPlugin(fail_open=True) + result = await plugin.before_tool_callback( + tool=_make_tool(), + tool_args={}, + tool_context=_make_context(credential=None), + ) + assert result is None # allowed + + @pytest.mark.asyncio + async def test_valid_credential_allows(self): + plugin = DelegationAuthPlugin( + required_permissions={"read_data"}, + ) + result = await plugin.before_tool_callback( + tool=_make_tool(), + tool_args={}, + tool_context=_make_context( + credential=_valid_credential(permissions=["read_data"]) + ), + ) + assert result is None # allowed + + @pytest.mark.asyncio + async def test_missing_permissions_blocks(self): + plugin = DelegationAuthPlugin( + required_permissions={"read_data", "financial_small"}, + ) + result = await plugin.before_tool_callback( + tool=_make_tool(), + tool_args={}, + tool_context=_make_context( + credential=_valid_credential(permissions=["read_data"]) + ), + ) + assert result is not None + assert result["error"] == "insufficient_permissions" + + @pytest.mark.asyncio + async def test_per_tool_permissions(self): + plugin = DelegationAuthPlugin( + required_permissions={"read_data"}, + tool_permissions={"pay_tool": {"read_data", "financial_small"}}, + ) + # Default tool — read_data is enough + r1 = await plugin.before_tool_callback( + tool=_make_tool("query_tool"), + tool_args={}, + tool_context=_make_context( + credential=_valid_credential(permissions=["read_data"]) + ), + ) + assert r1 is None + + # pay_tool — needs financial_small too + r2 = await plugin.before_tool_callback( + tool=_make_tool("pay_tool"), + tool_args={}, + tool_context=_make_context( + credential=_valid_credential(permissions=["read_data"]) + ), + ) + assert r2 is not None + assert r2["error"] == "insufficient_permissions" + + @pytest.mark.asyncio + async def test_invalid_credential_blocks(self): + plugin = DelegationAuthPlugin() + result = await plugin.before_tool_callback( + tool=_make_tool(), + tool_args={}, + tool_context=_make_context(credential="not-json"), + ) + assert result is not None + assert result["error"] == "authorization_denied" + + @pytest.mark.asyncio + async def test_verifier_exception_returns_error(self): + class ExplodingVerifier(CredentialVerifier): + def verify(self, credential: str) -> VerificationResult: + raise RuntimeError("boom") + + plugin = DelegationAuthPlugin(verifier=ExplodingVerifier()) + result = await plugin.before_tool_callback( + tool=_make_tool(), + tool_args={}, + tool_context=_make_context(credential="anything"), + ) + assert result is not None + assert result["error"] == "verification_failed" + + @pytest.mark.asyncio + async def test_audit_log_records(self): + plugin = DelegationAuthPlugin( + required_permissions={"read_data"}, + ) + await plugin.before_tool_callback( + tool=_make_tool(), + tool_args={}, + tool_context=_make_context( + credential=_valid_credential(permissions=["read_data"]) + ), + ) + assert len(plugin.audit_log) == 1 + assert plugin.audit_log[0]["action"] == "allow" + + @pytest.mark.asyncio + async def test_custom_credential_key(self): + plugin = DelegationAuthPlugin(credential_key="my_cred") + ctx = _make_context() + ctx.state["my_cred"] = _valid_credential(permissions=["read_data"]) + result = await plugin.before_tool_callback( + tool=_make_tool(), + tool_args={}, + tool_context=ctx, + ) + assert result is None From 734620063bc752746919adf6f65f2c7ee0190b62 Mon Sep 17 00:00:00 2001 From: Viswanadha Pratap Kondoju Date: Sun, 28 Jun 2026 17:52:22 -0400 Subject: [PATCH 2/4] =?UTF-8?q?fix:=20address=20review=20=E2=80=94=20run?= =?UTF-8?q?=5Fin=5Fexecutor,=20bounded=20audit,=20consistent=20style?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Wrap verify() in loop.run_in_executor() to avoid blocking (matches AgentGovernancePlugin pattern) - Remove unused tool_args from _log_denial/_log_allow - Use collections.deque(maxlen=) for bounded audit log (default 1000) - Move import json to top of file - Use Optional[] consistently instead of mixing | None - Fix docstring: plugin enforces permission checks at execution time, delegation narrowing is enforced at credential issuance Co-Authored-By: Claude Opus 4.6 (1M context) --- .../plugins/delegation_auth_plugin.py | 87 ++++++++++--------- 1 file changed, 46 insertions(+), 41 deletions(-) diff --git a/src/google/adk_community/plugins/delegation_auth_plugin.py b/src/google/adk_community/plugins/delegation_auth_plugin.py index 942c33e..9701236 100644 --- a/src/google/adk_community/plugins/delegation_auth_plugin.py +++ b/src/google/adk_community/plugins/delegation_auth_plugin.py @@ -12,14 +12,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""ADK plugin for delegation-scoped agent authorization. +"""ADK plugin for permission-scoped agent authorization. Verifies that an agent holds a valid credential with sufficient permissions -before executing a tool. Credentials are scoped: a delegated agent can only -use a subset of its parent's permissions, never more. +before executing a tool. Uses a pluggable verifier interface so any identity +system (DID, JWT, ZKP, API keys, etc.) can be wired in. -The plugin uses a pluggable verifier interface. A structural verifier is -included for development; plug in a real verifier (DID, JWT, ZKP, etc.) +A structural verifier is included for development; plug in a real verifier for production. Example:: @@ -34,6 +33,10 @@ from __future__ import annotations +import asyncio +import collections +import functools +import json import logging import time from abc import ABC, abstractmethod @@ -46,6 +49,8 @@ logger = logging.getLogger(__name__) +_MAX_AUDIT_ENTRIES_DEFAULT = 1000 + # --------------------------------------------------------------------------- # Pluggable verifier interface @@ -68,6 +73,10 @@ class CredentialVerifier(ABC): Implement this to plug in any identity system: DID verification, JWT validation, ZKP proof checking, API key lookup, etc. + + Note: ``verify()`` is called in a thread executor to avoid blocking + the event loop. Implementations may perform synchronous I/O (network + calls, file reads) safely. """ @abstractmethod @@ -87,12 +96,10 @@ class StructuralVerifier(CredentialVerifier): """Development verifier that checks credential structure only. Accepts any JSON-parseable credential with the required fields. - NOT for production — use a real verifier (DID, JWT, ZKP, etc.). + NOT for production -- use a real verifier (DID, JWT, etc.). """ def verify(self, credential: str) -> VerificationResult: - import json - try: data = json.loads(credential) except (json.JSONDecodeError, TypeError): @@ -125,32 +132,23 @@ def verify(self, credential: str) -> VerificationResult: ) -# --------------------------------------------------------------------------- -# Per-tool permission mapping -# --------------------------------------------------------------------------- - -# Default: every tool requires these permissions. Override with -# tool_permissions to set per-tool requirements. -_DEFAULT_PERMISSIONS: set[str] = {"read_data"} - - # --------------------------------------------------------------------------- # Plugin # --------------------------------------------------------------------------- class DelegationAuthPlugin(BasePlugin): - """ADK plugin that enforces delegation-scoped authorization. + """ADK plugin that enforces permission-scoped authorization. Before each tool call, the plugin: 1. Reads the agent's credential from session state - 2. Verifies it using the configured verifier + 2. Verifies it using the configured verifier (in a thread executor) 3. Checks that the credential's permissions cover the tool's requirements 4. Blocks execution if any check fails - Delegation scoping: credentials can only narrow permissions, never expand. - If agent A delegates to agent B, B's permission set is always a subset - of A's. The verifier enforces this at credential issuance; this plugin - enforces it at execution time. + Permission scoping: this plugin enforces permission checks at execution + time. Delegation scope narrowing (ensuring child agents can only hold a + subset of parent permissions) is enforced at credential issuance by the + identity system, not by this plugin. Args: required_permissions: Default permissions required for any tool call. @@ -163,6 +161,8 @@ class DelegationAuthPlugin(BasePlugin): is stored. Defaults to ``"agent_credential"``. fail_open: If True, allow tool calls when no credential is present. Defaults to False. + max_audit_entries: Maximum audit log entries before FIFO eviction. + Defaults to 1000. Example:: @@ -177,19 +177,22 @@ class DelegationAuthPlugin(BasePlugin): def __init__( self, - required_permissions: set[str] | None = None, - tool_permissions: dict[str, set[str]] | None = None, - verifier: CredentialVerifier | None = None, + required_permissions: Optional[set[str]] = None, + tool_permissions: Optional[dict[str, set[str]]] = None, + verifier: Optional[CredentialVerifier] = None, credential_key: str = "agent_credential", fail_open: bool = False, + max_audit_entries: int = _MAX_AUDIT_ENTRIES_DEFAULT, ) -> None: super().__init__(name="delegation_auth") - self._required = required_permissions or _DEFAULT_PERMISSIONS + self._required = required_permissions or {"read_data"} self._tool_permissions = tool_permissions or {} self._verifier = verifier or StructuralVerifier() self._credential_key = credential_key self._fail_open = fail_open - self._audit_log: list[dict[str, Any]] = [] + self._audit_log: collections.deque[dict[str, Any]] = ( + collections.deque(maxlen=max_audit_entries) + ) async def before_tool_callback( self, @@ -215,7 +218,7 @@ async def before_tool_callback( tool.name, ) return None - self._log_denial(tool.name, "no_credential", tool_args) + self._log_denial(tool.name, "no_credential") return { "error": "authorization_required", "message": ( @@ -224,12 +227,16 @@ async def before_tool_callback( ), } - # Verify the credential + # Verify the credential in a thread executor to avoid blocking + loop = asyncio.get_running_loop() try: - result = self._verifier.verify(credential) + result = await loop.run_in_executor( + None, + functools.partial(self._verifier.verify, credential), + ) except Exception as exc: logger.error("Verifier raised for tool %s: %s", tool.name, exc) - self._log_denial(tool.name, "verifier_error", tool_args) + self._log_denial(tool.name, "verifier_error") return { "error": "verification_failed", "message": f"Credential verification error: {exc}", @@ -237,7 +244,9 @@ async def before_tool_callback( if not result.valid: self._log_denial( - tool.name, result.reason or "invalid_credential", tool_args + tool.name, + result.reason or "invalid_credential", + agent_id=result.agent_id, ) return { "error": "authorization_denied", @@ -254,7 +263,6 @@ async def before_tool_callback( self._log_denial( tool.name, f"missing_permissions: {sorted(missing)}", - tool_args, agent_id=result.agent_id, ) return { @@ -265,19 +273,17 @@ async def before_tool_callback( ), } - # Authorized — log and proceed + # Authorized logger.info( "Agent %s authorized for %s (permissions: %s)", result.agent_id, tool.name, sorted(result.permissions), ) - self._log_allow(tool.name, result.agent_id, tool_args) + self._log_allow(tool.name, result.agent_id) return None - def _log_allow( - self, tool: str, agent_id: str, tool_args: dict[str, Any] - ) -> None: + def _log_allow(self, tool: str, agent_id: str) -> None: self._audit_log.append({ "action": "allow", "tool": tool, @@ -289,7 +295,6 @@ def _log_denial( self, tool: str, reason: str, - tool_args: dict[str, Any], agent_id: str = "", ) -> None: self._audit_log.append({ @@ -303,5 +308,5 @@ def _log_denial( @property def audit_log(self) -> list[dict[str, Any]]: - """Read-only access to the audit trail.""" + """Read-only copy of the audit trail.""" return list(self._audit_log) From cccccdbbb2a15c81f0f995f59324bcbabb5099a1 Mon Sep 17 00:00:00 2001 From: Viswanadha Pratap Kondoju Date: Sun, 28 Jun 2026 18:17:11 -0400 Subject: [PATCH 3/4] chore: trigger CLA re-check From 89f1dbe9baeebee44b3dfcd66cd86ce840900019 Mon Sep 17 00:00:00 2001 From: Viswanadha Pratap Kondoju Date: Sun, 28 Jun 2026 18:19:20 -0400 Subject: [PATCH 4/4] chore: re-trigger CLA after email verification