Skip to content

feat(modules): async modules#1920

Open
paul-nechifor wants to merge 1 commit intodevfrom
paul/feat/async-modules
Open

feat(modules): async modules#1920
paul-nechifor wants to merge 1 commit intodevfrom
paul/feat/async-modules

Conversation

@paul-nechifor
Copy link
Copy Markdown
Contributor

Problem

Closes DIM-XXX

Solution

Breaking Changes

How to Test

Contributor License Agreement

  • I have read and approved the CLA.

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps Bot commented Apr 26, 2026

Greptile Summary

This PR introduces async module support: an @arpc decorator for async RPC methods, auto-binding of async def handle_<input> handlers at start(), process_observable / spawn helpers, AsyncSpecProxy for awaitable cross-module calls, and a logging task factory to surface silenced exceptions. Two P1 logic bugs need fixing before merge:

  • arpc false-positive same-loop guard (core.py lines 59–68): when self._loop is None and the caller is outside any event loop, running is loopNone is NoneTrue, so the wrapper returns an unawaited coroutine to sync callers instead of executing it.
  • _log_async_handler_error swallows all RuntimeError (module.py line 267): any RuntimeError raised by user code inside a spawned task or async handler is silently dropped, negating the error-surfacing guarantee that is the main reason to use spawn() over bare run_coroutine_threadsafe.

Confidence Score: 3/5

Not safe to merge as-is: two P1 logic bugs and a committed debug scratch file need to be resolved first.

Two P1 findings: the @arpc None-loop guard silently returns unawaited coroutines in edge cases, and _log_async_handler_error swallows all RuntimeError from user tasks — both undermine correctness of the core async primitives. A P2 debug file (asdf.py) is also committed to the package directory.

dimos/core/core.py (arpc guard), dimos/core/module.py (_log_async_handler_error), and dimos/asdf.py (debug file) need attention before merge.

Important Files Changed

Filename Overview
dimos/asdf.py Debug scratch file with keyboard-mash name and an intentional raise Exception("asdf"); should not be committed to the package.
dimos/core/core.py New arpc decorator with dual-mode dispatch; None is None false-positive in the same-loop guard can return an unawaited coroutine to sync callers when _loop is uninitialized.
dimos/core/module.py Adds _auto_bind_handlers, process_observable, spawn, and async task logging; _log_async_handler_error silently swallows all RuntimeError from user coroutines.
dimos/core/rpc_client.py Adds AsyncSpecProxy that wraps an RPCClient and offloads blocking RPC calls to a thread executor for async callers; implementation looks correct.
dimos/core/coordination/module_coordinator.py Detects async methods on the consumer's declared Spec and wraps the target proxy in AsyncSpecProxy; logic is sound.
dimos/core/test_async_module_handles.py Integration test for auto-bound async handlers; 100 ms queue timeout may be flaky on slow CI machines.
dimos/core/test_async_module_rpc.py Integration test for @arpc cross-module RPC; covers the async-caller path correctly.
dimos/core/test_async_module_rpc_sync_to_async.py Tests mixed sync/async RPC interop; logic appears correct.
dimos/core/test_async_module_process_observable.py Tests process_observable with rx.interval; 4-second queue timeout for 26 letters is reasonable.
docs/usage/modules.md Well-written documentation covering all new async primitives with clear examples and caveats section.

Sequence Diagram

sequenceDiagram
    participant SyncCaller as Sync caller
    participant Wrapper as arpc wrapper
    participant ModLoop as module._loop
    participant AsyncFn as async fn
    participant Proxy as AsyncSpecProxy
    participant Executor as ThreadPoolExecutor

    SyncCaller->>Wrapper: call method(args)
    Wrapper->>Wrapper: compare running loop vs self._loop
    alt caller is on self._loop
        Wrapper-->>AsyncFn: return coroutine for await
    else caller is on another thread
        Wrapper->>ModLoop: schedule coroutine
        ModLoop->>AsyncFn: run
        AsyncFn-->>ModLoop: result
        ModLoop-->>Wrapper: future result blocks
        Wrapper-->>SyncCaller: return result
    end

    Note over Proxy,Executor: Cross-module call when spec declares async def
    AsyncFn->>Proxy: await self._ref.method(x)
    Proxy->>Executor: offload blocking RPC
    Executor-->>Proxy: result
    Proxy-->>AsyncFn: result
Loading

Reviews (1): Last reviewed commit: "feat(modules): async modules" | Re-trigger Greptile

Comment thread dimos/asdf.py
Comment on lines +63 to +70
if i == 3:
raise Exception("asdf")

@rpc
def stop(self) -> None:
if self._timer_future is not None:
self._timer_future.cancel()
self._timer_future = None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Debug scratch file should not be committed

dimos/asdf.py is clearly a temporary scratch file: the filename is a keyboard-mash, and the _timer_loop method deliberately raises raise Exception("asdf") after two iterations. This file is in the main dimos/ package directory (importable by any consumer), not in a test directory, and the PR description does not mention it as intended shipped code. It should be removed or moved to a dedicated example/test location before merging.

Comment thread dimos/core/core.py
Comment on lines +59 to +68
def wrapper(self, *args: Any, **kwargs: Any) -> Any:
loop = getattr(self, "_loop", None)
try:
running = asyncio.get_running_loop()
except RuntimeError:
running = None
if running is loop:
return fn(self, *args, **kwargs)
future = asyncio.run_coroutine_threadsafe(fn(self, *args, **kwargs), loop)
return future.result()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 None is None makes the same-loop guard a false positive

When self._loop has not been set yet (class-level default is None) and the caller is not inside any running event loop (asyncio.get_running_loop() raises, so running = None), the guard running is loop evaluates as None is None → True. The wrapper then executes return fn(self, *args, **kwargs), which returns an unawaited coroutine object to the sync caller — silently, with no error.

A minimal guard prevents this:

if loop is None:
    raise RuntimeError(
        f"@arpc method called before {type(self).__name__}._loop is initialised"
    )

Comment thread dimos/core/module.py
Comment on lines +264 to +275
def _log_async_handler_error(self, fut: Any) -> None:
try:
fut.result()
except (asyncio.CancelledError, RuntimeError):
pass # loop stopped or task cancelled during shutdown
except BaseException as e:
# Include exception type+message in the event string so it is
# visible on consoles whose formatters strip exc_info/traceback.
logger.exception(
f"Unhandled error in async task on {type(self).__name__}._loop: "
f"{type(e).__name__}: {e}"
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 RuntimeError catch swallows user exceptions from spawned tasks

The handler silently discards any RuntimeError raised inside user-written coroutines scheduled via spawn() or process_observable(). RuntimeError is extremely common in Python (e.g., StopIteration propagation, generator misuse, and countless library errors). The intent — catching "event loop is closed" — is better served by narrowing the RuntimeError arm:

import concurrent.futures

def _log_async_handler_error(self, fut: Any) -> None:
    try:
        fut.result()
    except (asyncio.CancelledError, concurrent.futures.CancelledError):
        pass  # task cancelled during shutdown
    except RuntimeError as e:
        if "event loop" in str(e).lower() or "loop is closed" in str(e).lower():
            pass  # loop shut down before task completed
        else:
            logger.exception(
                f"Unhandled error in async task on {type(self).__name__}._loop: "
                f"{type(e).__name__}: {e}"
            )
    except BaseException as e:
        logger.exception(
            f"Unhandled error in async task on {type(self).__name__}._loop: "
            f"{type(e).__name__}: {e}"
        )

As written, any RuntimeError in a spawned task is swallowed, defeating the whole purpose of spawn() over bare run_coroutine_threadsafe.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant