-
Notifications
You must be signed in to change notification settings - Fork 33
feat: add client side tools to mapper and runtime #819
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,85 @@ | ||
| """Factory for creating client-side tools that execute on the client SDK.""" | ||
|
|
||
| import json | ||
| from typing import Annotated, Any | ||
|
|
||
| from langchain_core.messages import ToolMessage | ||
| from langchain_core.tools import InjectedToolCallId, StructuredTool | ||
| from uipath.agent.models.agent import AgentClientSideToolResourceConfig | ||
| from uipath.eval.mocks import mockable | ||
|
|
||
| from uipath_langchain._utils.durable_interrupt import durable_interrupt | ||
| from uipath_langchain.agent.react.jsonschema_pydantic_converter import ( | ||
| create_model as create_model_from_schema, | ||
| ) | ||
| from uipath_langchain.chat.hitl import CLIENT_SIDE_TOOL_MARKER | ||
|
|
||
| from .utils import sanitize_tool_name | ||
|
|
||
|
|
||
| def create_client_side_tool( | ||
| resource: AgentClientSideToolResourceConfig, | ||
| ) -> StructuredTool: | ||
| """Create a client-side tool that pauses the graph and waits for the client to execute it. | ||
|
|
||
| The tool uses @durable_interrupt to suspend the graph. The client SDK receives | ||
| an executingToolCall event, runs its registered handler, and sends endToolCall | ||
| back through CAS. The bridge routes that endToolCall to wait_for_resume(), | ||
| which unblocks the graph with the client's result. | ||
| """ | ||
| tool_name = sanitize_tool_name(resource.name) | ||
| input_model = create_model_from_schema(resource.input_schema) | ||
|
|
||
| async def client_side_tool_fn( | ||
| *, tool_call_id: Annotated[str, InjectedToolCallId], **kwargs: Any | ||
| ) -> Any: | ||
| @mockable( | ||
| name=resource.name, | ||
| description=resource.description, | ||
| input_schema=input_model.model_json_schema(), | ||
| output_schema=(resource.output_schema or {}), | ||
| example_calls=getattr(resource.properties, "example_calls", None), | ||
| ) | ||
| async def execute_tool() -> dict[str, Any]: | ||
| """Execute client-side tool, pausing for client response.""" | ||
|
|
||
| @durable_interrupt | ||
| async def wait_for_client_execution() -> dict[str, Any]: | ||
| return { | ||
| "tool_call_id": tool_call_id, | ||
| "tool_name": tool_name, | ||
| "input": kwargs, | ||
| "is_execution_phase": True, | ||
| } | ||
|
|
||
| result = await wait_for_client_execution() | ||
| return result.get("output", result) if isinstance(result, dict) else result | ||
|
|
||
| result = await execute_tool() | ||
|
|
||
| if isinstance(result, dict): | ||
| try: | ||
| content = json.dumps(result) | ||
| except TypeError: | ||
| content = str(result) | ||
| else: | ||
| content = str(result) if result is not None else "" | ||
|
|
||
| return ToolMessage( | ||
| content=content, | ||
| tool_call_id=tool_call_id, | ||
| response_metadata={CLIENT_SIDE_TOOL_MARKER: True}, | ||
| ) | ||
|
|
||
| tool = StructuredTool( | ||
| name=tool_name, | ||
| description=resource.description or f"Client-side tool: {tool_name}", | ||
| args_schema=input_model, | ||
| coroutine=client_side_tool_fn, | ||
| metadata={ | ||
| CLIENT_SIDE_TOOL_MARKER: True, | ||
| "output_schema": resource.output_schema, | ||
| }, | ||
| ) | ||
|
|
||
| return tool | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ | |
| UiPathConversationContentPartEndEvent, | ||
| UiPathConversationContentPartEvent, | ||
| UiPathConversationContentPartStartEvent, | ||
| UiPathConversationExecutingToolCallEvent, | ||
| UiPathConversationMessage, | ||
| UiPathConversationMessageData, | ||
| UiPathConversationMessageEndEvent, | ||
|
|
@@ -39,6 +40,8 @@ | |
| ) | ||
| from uipath.runtime import UiPathRuntimeStorageProtocol | ||
|
|
||
| from uipath_langchain.chat.hitl import CLIENT_SIDE_TOOL_MARKER | ||
|
|
||
| from ._citations import CitationStreamProcessor, extract_citations_from_text | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
@@ -60,6 +63,7 @@ def __init__(self, runtime_id: str, storage: UiPathRuntimeStorageProtocol | None | |
| self.storage = storage | ||
| self.current_message: AIMessageChunk | AIMessage | ||
| self.tools_requiring_confirmation: dict[str, Any] = {} | ||
| self.client_side_tools: dict[str, Any] = {} # {tool_name: output_schema} | ||
| self.seen_message_ids: set[str] = set() | ||
| self._storage_lock = asyncio.Lock() | ||
| self._citation_stream_processor = CitationStreamProcessor() | ||
|
|
@@ -436,15 +440,40 @@ async def map_current_message_to_start_tool_call_events(self): | |
| tool_name in self.tools_requiring_confirmation | ||
| ) | ||
| input_schema = self.tools_requiring_confirmation.get(tool_name) | ||
| is_client_side = tool_name in self.client_side_tools | ||
| output_schema = ( | ||
| self.client_side_tools.get(tool_name) | ||
| if is_client_side | ||
| else None | ||
| ) | ||
| events.append( | ||
| self.map_tool_call_to_tool_call_start_event( | ||
| self.current_message.id, | ||
| tool_call, | ||
| require_confirmation=require_confirmation or None, | ||
| input_schema=input_schema, | ||
| is_client_side_tool=is_client_side or None, | ||
| output_schema=output_schema, | ||
| ) | ||
| ) | ||
|
|
||
| # Emit executingToolCall from MessageMapper for tools without | ||
| # a durable interrupt. Tools with interrupts (client-side, HITL) | ||
| # get executingToolCall from the bridge instead. | ||
| if not require_confirmation and not is_client_side: | ||
| events.append( | ||
| UiPathConversationMessageEvent( | ||
| message_id=self.current_message.id, | ||
| tool_call=UiPathConversationToolCallEvent( | ||
| tool_call_id=tool_call["id"], | ||
| executing=UiPathConversationExecutingToolCallEvent( | ||
| tool_name=tool_call["name"], | ||
| input=tool_call["args"], | ||
|
Comment on lines
+460
to
+471
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm, I guess related to my comment here. Personally would prefer if a single place creates/outputs the executingToolCall event, especially since right now it seems to be split across two repos and could add some confusion. Is there a code-place where any path (including tools with interrupts) eventually converge to once they are ready to run the tool?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wasnt able to get them to fire in a single place, because the MessageMapper processes the AIMessage before the tool node runs (MessageMapper sees tool calls at stream time), but the graph isn't ready to receive responses until interrupt time. For tools that don't need a response from the user, this doesn't matter. For tools that pause and wait, the emission must happen after the pause (tool confirmation and client side tools). I added it in both places to keep the events consistent for tool calls, but it's a no-op for non interrupt tools anyways. I think if it's causing issues, removing it from non interrupt tools might be the better case so that users wont get confused and try to handle that event. |
||
| ), | ||
| ), | ||
| ) | ||
| ) | ||
|
|
||
| if self.storage is not None: | ||
| await self.storage.set_value( | ||
| self.runtime_id, | ||
|
|
@@ -476,19 +505,24 @@ async def map_tool_message_to_events( | |
| # Keep as string if not valid JSON | ||
| pass | ||
|
|
||
| events = [ | ||
| UiPathConversationMessageEvent( | ||
| message_id=message_id, | ||
| tool_call=UiPathConversationToolCallEvent( | ||
| tool_call_id=message.tool_call_id, | ||
| end=UiPathConversationToolCallEndEvent( | ||
| timestamp=self.get_timestamp(), | ||
| output=content_value, | ||
| is_error=message.status == "error", | ||
| # Suppress endToolCall for client-side tools — the client already has the result (it produced it). | ||
| is_client_side = message.response_metadata.get(CLIENT_SIDE_TOOL_MARKER, False) | ||
| events: list[UiPathConversationMessageEvent] = [] | ||
|
|
||
| if not is_client_side: | ||
| events.append( | ||
|
Comment on lines
+508
to
+513
|
||
| UiPathConversationMessageEvent( | ||
| message_id=message_id, | ||
| tool_call=UiPathConversationToolCallEvent( | ||
| tool_call_id=message.tool_call_id, | ||
| end=UiPathConversationToolCallEndEvent( | ||
| timestamp=self.get_timestamp(), | ||
| output=content_value, | ||
| is_error=message.status == "error", | ||
| ), | ||
| ), | ||
| ), | ||
| ) | ||
| ) | ||
| ] | ||
|
|
||
| if is_last_tool_call: | ||
| events.append(self.map_to_message_end_event(message_id)) | ||
|
|
@@ -546,6 +580,8 @@ def map_tool_call_to_tool_call_start_event( | |
| *, | ||
| require_confirmation: bool | None = None, | ||
| input_schema: Any | None = None, | ||
| is_client_side_tool: bool | None = None, | ||
| output_schema: Any | None = None, | ||
| ) -> UiPathConversationMessageEvent: | ||
| return UiPathConversationMessageEvent( | ||
| message_id=message_id, | ||
|
|
@@ -557,6 +593,8 @@ def map_tool_call_to_tool_call_start_event( | |
| input=tool_call["args"], | ||
| require_confirmation=require_confirmation, | ||
| input_schema=input_schema, | ||
| is_client_side_tool=is_client_side_tool, | ||
| output_schema=output_schema, | ||
| ), | ||
| ), | ||
| ) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a more consistent name compared to the other markers, which include
CONVERSATIONAL? I'm thinkingIS_CONVERSATIONAL_CLIENT_SIDE_TOOL?