feat: add ArtifactStreamer for streaming artifact updates#942
feat: add ArtifactStreamer for streaming artifact updates#942mtsmyassin wants to merge 1 commit intoa2aproject:mainfrom
Conversation
Adds a stateful streaming helper to a2a.utils that maintains a stable artifact_id across chunks, enabling correct append=True semantics for TaskArtifactUpdateEvent. Closes a2aproject#833
There was a problem hiding this comment.
Code Review
This pull request introduces the ArtifactStreamer utility class to facilitate streaming artifact updates with a stable ID, ensuring consistency when appending data. The changes include the new class implementation, its export in the utility module, and a suite of unit tests. Feedback was provided to refactor the event creation logic into a private helper method to eliminate code duplication between the append and finalize methods.
| def append(self, text: str) -> TaskArtifactUpdateEvent: | ||
| """Create an append event for the next chunk of text. | ||
|
|
||
| Args: | ||
| text: The text content to append. | ||
|
|
||
| Returns: | ||
| A ``TaskArtifactUpdateEvent`` with ``append=True`` and | ||
| ``last_chunk=False``. | ||
|
|
||
| Raises: | ||
| RuntimeError: If ``finalize()`` has already been called. | ||
| """ | ||
| if self._finalized: | ||
| raise RuntimeError( | ||
| 'Cannot append after finalize() has been called.' | ||
| ) | ||
| return TaskArtifactUpdateEvent( | ||
| context_id=self._context_id, | ||
| task_id=self._task_id, | ||
| append=True, | ||
| last_chunk=False, | ||
| artifact=Artifact( | ||
| artifact_id=self._artifact_id, | ||
| name=self._name, | ||
| description=self._description, | ||
| parts=[Part(root=TextPart(text=text))], | ||
| ), | ||
| ) | ||
|
|
||
| def finalize(self, text: str = '') -> TaskArtifactUpdateEvent: | ||
| """Create the final chunk event, closing the stream. | ||
|
|
||
| Args: | ||
| text: Optional final text content. Defaults to empty string. | ||
|
|
||
| Returns: | ||
| A ``TaskArtifactUpdateEvent`` with ``append=True`` and | ||
| ``last_chunk=True``. | ||
|
|
||
| Raises: | ||
| RuntimeError: If ``finalize()`` has already been called. | ||
| """ | ||
| if self._finalized: | ||
| raise RuntimeError('finalize() has already been called.') | ||
| self._finalized = True | ||
| return TaskArtifactUpdateEvent( | ||
| context_id=self._context_id, | ||
| task_id=self._task_id, | ||
| append=True, | ||
| last_chunk=True, | ||
| artifact=Artifact( | ||
| artifact_id=self._artifact_id, | ||
| name=self._name, | ||
| description=self._description, | ||
| parts=[Part(root=TextPart(text=text))], | ||
| ), | ||
| ) |
There was a problem hiding this comment.
The logic for creating the TaskArtifactUpdateEvent and its nested Artifact object is duplicated between the append and finalize methods. Refactoring this into a private helper method would improve maintainability and reduce the risk of inconsistencies if the event structure changes in the future.
def _create_event(self, text: str, last_chunk: bool) -> TaskArtifactUpdateEvent:
return TaskArtifactUpdateEvent(
context_id=self._context_id,
task_id=self._task_id,
append=True,
last_chunk=last_chunk,
artifact=Artifact(
artifact_id=self._artifact_id,
name=self._name,
description=self._description,
parts=[Part(root=TextPart(text=text))],
),
)
def append(self, text: str) -> TaskArtifactUpdateEvent:
"""Create an append event for the next chunk of text.
Args:
text: The text content to append.
Returns:
A TaskArtifactUpdateEvent with append=True and
last_chunk=False.
Raises:
RuntimeError: If finalize() has already been called.
"""
if self._finalized:
raise RuntimeError(
'Cannot append after finalize() has been called.'
)
return self._create_event(text, last_chunk=False)
def finalize(self, text: str = '') -> TaskArtifactUpdateEvent:
"""Create the final chunk event, closing the stream.
Args:
text: Optional final text content. Defaults to empty string.
Returns:
A TaskArtifactUpdateEvent with append=True and
last_chunk=True.
Raises:
RuntimeError: If finalize() has already been called.
"""
if self._finalized:
raise RuntimeError('finalize() has already been called.')
self._finalized = True
return self._create_event(text, last_chunk=True)
🧪 Code Coverage (vs
|
Summary
Adds
ArtifactStreamertoa2a.utils— a stateful helper that maintains astable
artifact_idacross streaming chunks, enabling correctappend=Truesemantics for
TaskArtifactUpdateEvent.Problem
new_text_artifactgenerates a fresh UUID on every call. When used in astreaming loop, each chunk gets a different
artifact_id, makingappend=Trueunusable. Clients see N separate artifacts instead of one progressively
streamed response. (See the
travel_planner_agentsample for this exact issue.)Solution