Conversation
Greptile SummaryThis PR introduces async module support: an
Confidence Score: 3/5Not safe to merge as-is: two P1 logic bugs and a committed debug scratch file need to be resolved first. Two P1 findings: the
Important Files Changed
Sequence DiagramsequenceDiagram
participant SyncCaller as Sync caller
participant Wrapper as arpc wrapper
participant ModLoop as module._loop
participant AsyncFn as async fn
participant Proxy as AsyncSpecProxy
participant Executor as ThreadPoolExecutor
SyncCaller->>Wrapper: call method(args)
Wrapper->>Wrapper: compare running loop vs self._loop
alt caller is on self._loop
Wrapper-->>AsyncFn: return coroutine for await
else caller is on another thread
Wrapper->>ModLoop: schedule coroutine
ModLoop->>AsyncFn: run
AsyncFn-->>ModLoop: result
ModLoop-->>Wrapper: future result blocks
Wrapper-->>SyncCaller: return result
end
Note over Proxy,Executor: Cross-module call when spec declares async def
AsyncFn->>Proxy: await self._ref.method(x)
Proxy->>Executor: offload blocking RPC
Executor-->>Proxy: result
Proxy-->>AsyncFn: result
Reviews (1): Last reviewed commit: "feat(modules): async modules" | Re-trigger Greptile |
| if i == 3: | ||
| raise Exception("asdf") | ||
|
|
||
| @rpc | ||
| def stop(self) -> None: | ||
| if self._timer_future is not None: | ||
| self._timer_future.cancel() | ||
| self._timer_future = None |
There was a problem hiding this comment.
Debug scratch file should not be committed
dimos/asdf.py is clearly a temporary scratch file: the filename is a keyboard-mash, and the _timer_loop method deliberately raises raise Exception("asdf") after two iterations. This file is in the main dimos/ package directory (importable by any consumer), not in a test directory, and the PR description does not mention it as intended shipped code. It should be removed or moved to a dedicated example/test location before merging.
| def wrapper(self, *args: Any, **kwargs: Any) -> Any: | ||
| loop = getattr(self, "_loop", None) | ||
| try: | ||
| running = asyncio.get_running_loop() | ||
| except RuntimeError: | ||
| running = None | ||
| if running is loop: | ||
| return fn(self, *args, **kwargs) | ||
| future = asyncio.run_coroutine_threadsafe(fn(self, *args, **kwargs), loop) | ||
| return future.result() |
There was a problem hiding this comment.
None is None makes the same-loop guard a false positive
When self._loop has not been set yet (class-level default is None) and the caller is not inside any running event loop (asyncio.get_running_loop() raises, so running = None), the guard running is loop evaluates as None is None → True. The wrapper then executes return fn(self, *args, **kwargs), which returns an unawaited coroutine object to the sync caller — silently, with no error.
A minimal guard prevents this:
if loop is None:
raise RuntimeError(
f"@arpc method called before {type(self).__name__}._loop is initialised"
)| def _log_async_handler_error(self, fut: Any) -> None: | ||
| try: | ||
| fut.result() | ||
| except (asyncio.CancelledError, RuntimeError): | ||
| pass # loop stopped or task cancelled during shutdown | ||
| except BaseException as e: | ||
| # Include exception type+message in the event string so it is | ||
| # visible on consoles whose formatters strip exc_info/traceback. | ||
| logger.exception( | ||
| f"Unhandled error in async task on {type(self).__name__}._loop: " | ||
| f"{type(e).__name__}: {e}" | ||
| ) |
There was a problem hiding this comment.
RuntimeError catch swallows user exceptions from spawned tasks
The handler silently discards any RuntimeError raised inside user-written coroutines scheduled via spawn() or process_observable(). RuntimeError is extremely common in Python (e.g., StopIteration propagation, generator misuse, and countless library errors). The intent — catching "event loop is closed" — is better served by narrowing the RuntimeError arm:
import concurrent.futures
def _log_async_handler_error(self, fut: Any) -> None:
try:
fut.result()
except (asyncio.CancelledError, concurrent.futures.CancelledError):
pass # task cancelled during shutdown
except RuntimeError as e:
if "event loop" in str(e).lower() or "loop is closed" in str(e).lower():
pass # loop shut down before task completed
else:
logger.exception(
f"Unhandled error in async task on {type(self).__name__}._loop: "
f"{type(e).__name__}: {e}"
)
except BaseException as e:
logger.exception(
f"Unhandled error in async task on {type(self).__name__}._loop: "
f"{type(e).__name__}: {e}"
)As written, any RuntimeError in a spawned task is swallowed, defeating the whole purpose of spawn() over bare run_coroutine_threadsafe.
Problem
Closes DIM-XXX
Solution
Breaking Changes
How to Test
Contributor License Agreement