Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 114 additions & 0 deletions src/google/adk/agents/remote_a2a_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from a2a.client.client_factory import ClientFactory as A2AClientFactory
from a2a.client.errors import A2AClientHTTPError
from a2a.types import AgentCard
from a2a.types import Artifact as A2AArtifact
from a2a.types import Message as A2AMessage
from a2a.types import Part as A2APart
from a2a.types import Role
Expand Down Expand Up @@ -485,6 +486,97 @@ def _construct_message_parts_from_session(

return message_parts, context_id

async def _save_a2a_artifacts_to_session(
self,
artifacts: Optional[list[A2AArtifact]],
event: Event,
ctx: InvocationContext,
part_converter: Optional[A2APartToGenAIPartConverter] = None,
) -> None:
"""Persists A2A artifacts into the orchestrator session's artifact service.

When a remote A2A agent returns artifacts (e.g. files) in its response,
they are saved into the (parent/orchestrator) session's artifact service so
that downstream agents can load them via ``context.load_artifact(...)``. The
saved versions are recorded on the event's ``artifact_delta`` so the rest of
the ADK runtime is aware of them.

This is best-effort: if no artifact service is configured, or an individual
artifact cannot be converted/saved, it is skipped without failing the
overall A2A response handling.

Args:
artifacts: The A2A artifacts to persist. May be None or empty.
event: The ADK event produced for this A2A response. Its
``actions.artifact_delta`` is updated with the saved filenames/versions.
ctx: The invocation context, providing the session artifact service.
part_converter: Optional A2A-to-GenAI part converter. Defaults to the
agent's configured ``a2a_part_converter``.
"""
if not artifacts or ctx.artifact_service is None:
return

part_converter = part_converter or self._a2a_part_converter

for artifact in artifacts:
if not artifact or not artifact.parts:
continue

# Prefer the human-readable artifact name (this is the original filename
# when the remote is an ADK A2A server), falling back to the artifact id.
filename = artifact.name or artifact.artifact_id
if not filename:
logger.warning(
"Skipping A2A artifact without a name or id for agent %s",
self.name,
)
continue

for a2a_part in artifact.parts:
converted = part_converter(a2a_part)
if not isinstance(converted, list):
converted = [converted] if converted else []

# Only blob-like parts (files / inline data) are saved as artifacts.
genai_part = next(
(
part
for part in converted
if part is not None
and (part.inline_data is not None or part.file_data is not None)
),
None,
)
if genai_part is None:
continue

try:
version = await ctx.artifact_service.save_artifact(
app_name=ctx.app_name,
user_id=ctx.user_id,
session_id=ctx.session.id,
filename=filename,
artifact=genai_part,
)
except Exception as e: # pylint: disable=broad-except
logger.warning(
"Failed to save A2A artifact %s for agent %s: %s",
filename,
self.name,
e,
)
break

event.actions.artifact_delta[filename] = version
logger.debug(
"Saved A2A artifact %s (version %s) to session for agent %s",
filename,
version,
self.name,
)
# One artifact maps to a single saved file; ignore any extra parts.
break

async def _handle_a2a_response(
self, a2a_response: A2AClientEvent | A2AMessage, ctx: InvocationContext
) -> Optional[Event]:
Expand Down Expand Up @@ -570,6 +662,17 @@ async def _handle_a2a_response(
task.context_id
)

# Persist any artifacts returned by the remote agent into the
# orchestrator session so downstream agents can load them. Full task
# responses carry them on ``task.artifacts``; streaming artifact updates
# carry a single artifact on ``update.artifact``.
if isinstance(update, A2ATaskArtifactUpdateEvent):
update_artifact = getattr(update, "artifact", None)
artifacts = [update_artifact] if update_artifact else None
else:
artifacts = getattr(task, "artifacts", None) if task else None
await self._save_a2a_artifacts_to_session(artifacts, event, ctx)

# Otherwise, it's a regular A2AMessage for non-streaming responses.
elif isinstance(a2a_response, A2AMessage):
event = convert_a2a_message_to_event(
Expand Down Expand Up @@ -642,6 +745,17 @@ async def _handle_a2a_response_v2(
task.context_id
)

# Persist any artifacts returned by the remote agent into the
# orchestrator session so downstream agents can load them.
if isinstance(update, A2ATaskArtifactUpdateEvent):
update_artifact = getattr(update, "artifact", None)
artifacts = [update_artifact] if update_artifact else None
else:
artifacts = getattr(task, "artifacts", None) if task else None
await self._save_a2a_artifacts_to_session(
artifacts, event, ctx, self._config.a2a_part_converter
)

# Otherwise, it's a regular A2AMessage.
elif isinstance(a2a_response, A2AMessage):
event = self._config.a2a_message_converter(
Expand Down
Loading