diff --git a/include/boost/capy/ex/thread_pool.hpp b/include/boost/capy/ex/thread_pool.hpp index 45eb60871..29097ec60 100644 --- a/include/boost/capy/ex/thread_pool.hpp +++ b/include/boost/capy/ex/thread_pool.hpp @@ -196,22 +196,22 @@ class thread_pool::executor_type /** Dispatch a continuation for execution. - Posts the continuation to the thread pool for execution on a - worker thread and returns `std::noop_coroutine()`. Thread - pools never execute inline because no single thread "owns" - the pool. - - @param c The continuation to execute. Must remain at a - stable address until dequeued and resumed. - - @return `std::noop_coroutine()` always. + If the calling thread is a worker of this pool, returns + `c.h` for symmetric transfer so the caller can resume the + continuation inline. Otherwise, posts the continuation to + the pool for execution on a worker thread and returns + `std::noop_coroutine()`. + + @param c The continuation to execute. On the post path, + must remain at a stable address until dequeued + and resumed. + + @return `c.h` when the calling thread is a pool worker; + `std::noop_coroutine()` otherwise. */ + BOOST_CAPY_DECL std::coroutine_handle<> - dispatch(continuation& c) const - { - post(c); - return std::noop_coroutine(); - } + dispatch(continuation& c) const; /** Post a continuation to the thread pool. diff --git a/src/ex/thread_pool.cpp b/src/ex/thread_pool.cpp index 8d5d79d03..393a397c9 100644 --- a/src/ex/thread_pool.cpp +++ b/src/ex/thread_pool.cpp @@ -10,6 +10,7 @@ #include #include +#include #include #include #include @@ -51,6 +52,12 @@ namespace capy { class thread_pool::impl { + // Identifies the pool owning the current worker thread, or + // nullptr if the calling thread is not a pool worker. Checked + // by dispatch() to decide between symmetric transfer (inline + // resume) and post. + static inline detail::thread_local_ptr current_; + // Intrusive queue of continuations via continuation::next. // No per-post allocation: the continuation is owned by the caller. continuation* head_ = nullptr; @@ -96,6 +103,12 @@ class thread_pool::impl public: ~impl() = default; + bool + running_in_this_thread() const noexcept + { + return current_.get() == this; + } + // Destroy abandoned coroutine frames. Must be called // before execution_context::shutdown()/destroy() so // that suspended-frame destructors (e.g. delay_awaitable @@ -213,6 +226,14 @@ class thread_pool::impl std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index); set_current_thread_name(name); + // Mark this thread as a worker of this pool so dispatch() + // can symmetric-transfer when called from within pool work. + struct scoped_pool + { + scoped_pool(impl const* p) noexcept { current_.set(p); } + ~scoped_pool() noexcept { current_.set(nullptr); } + } guard(this); + for(;;) { continuation* c = nullptr; @@ -297,5 +318,15 @@ post(continuation& c) const pool_->impl_->post(c); } +std::coroutine_handle<> +thread_pool::executor_type:: +dispatch(continuation& c) const +{ + if(pool_->impl_->running_in_this_thread()) + return c.h; + pool_->impl_->post(c); + return std::noop_coroutine(); +} + } // capy } // boost diff --git a/test/unit/ex/thread_pool.cpp b/test/unit/ex/thread_pool.cpp index fb012decf..25d561f1b 100644 --- a/test/unit/ex/thread_pool.cpp +++ b/test/unit/ex/thread_pool.cpp @@ -49,6 +49,46 @@ struct test_service : execution_context::service void shutdown() override {} }; +// Probe coroutine starts suspended; resuming it completes and +// auto-destroys the frame (suspend_never final). If never +// resumed, probe_coro's dtor destroys it. +struct probe_coro +{ + struct promise_type + { + probe_coro + get_return_object() noexcept + { + return probe_coro{ + std::coroutine_handle::from_promise(*this)}; + } + std::suspend_always initial_suspend() noexcept { return {}; } + std::suspend_never final_suspend() noexcept { return {}; } + void return_void() noexcept {} + void unhandled_exception() { std::terminate(); } + }; + + std::coroutine_handle h_; + + ~probe_coro() { if(h_) h_.destroy(); } + + probe_coro(probe_coro&& other) noexcept + : h_(other.h_) { other.h_ = nullptr; } + + std::coroutine_handle handle() const noexcept { return h_; } + void release() noexcept { h_ = nullptr; } + +private: + explicit probe_coro(std::coroutine_handle h) + : h_(h) {} +}; + +inline probe_coro +make_probe() +{ + co_return; +} + #if defined(BOOST_CAPY_TEST_CAN_GET_THREAD_NAME) // Result storage for thread name check struct name_check_result @@ -189,12 +229,95 @@ struct thread_pool_test void testDispatch() { - continuation c{std::noop_coroutine()}; - thread_pool pool(1); - auto ex = pool.get_executor(); + // From outside any pool, dispatch() posts. + auto probe = make_probe(); + auto probe_h = probe.handle(); + auto* target = new continuation{probe_h}; + + std::coroutine_handle<> returned; + { + thread_pool pool(1); + auto ex = pool.get_executor(); + returned = ex.dispatch(*target); + } + + BOOST_TEST(returned != probe_h); + if(returned != probe_h) + probe.release(); + delete target; + } + + void + testDispatchSymmetricTransfer() + { + // From a worker thread of the same pool, dispatch() + // returns c.h for symmetric transfer and does not + // enqueue the continuation. + auto probe = make_probe(); + auto probe_h = probe.handle(); + + // Heap-allocated so target outlives the pool if a buggy + // implementation erroneously posts it. + auto* target = new continuation{probe_h}; + + std::atomic done{false}; + std::coroutine_handle<> returned; + + { + thread_pool pool(1); + auto ex = pool.get_executor(); + + run_async(ex, [&]{ + returned = ex.dispatch(*target); + done.store(true); + })(void_task()); + + BOOST_TEST(wait_for([&]{ return done.load(); })); + } + + // On symmetric transfer the returned handle equals the + // target's handle and the probe is never enqueued. + BOOST_TEST(returned == probe_h); + + // If the dispatch posted (buggy), the pool destructor's + // drain_abandoned already destroyed probe_h; release so + // the probe_coro dtor does not double-destroy. + if(returned != probe_h) + probe.release(); + delete target; + } + + void + testDispatchCrossPool() + { + // Worker threads of pool A are not workers of pool B: + // dispatch() on B from an A worker must post, not + // symmetric-transfer. + auto probe = make_probe(); + auto probe_h = probe.handle(); + auto* target = new continuation{probe_h}; + + std::atomic done{false}; + std::coroutine_handle<> returned; + + { + thread_pool pool_a(1); + thread_pool pool_b(1); + auto ex_a = pool_a.get_executor(); + auto ex_b = pool_b.get_executor(); + + run_async(ex_a, [&]{ + returned = ex_b.dispatch(*target); + done.store(true); + })(void_task()); + + BOOST_TEST(wait_for([&]{ return done.load(); })); + } - // dispatch() always posts for thread_pool (returns void) - ex.dispatch(c); + BOOST_TEST(returned != probe_h); + if(returned != probe_h) + probe.release(); + delete target; } void @@ -584,6 +707,8 @@ struct thread_pool_test testPostWork(); testWorkCounting(); testDispatch(); + testDispatchSymmetricTransfer(); + testDispatchCrossPool(); testServiceManagement(); testMakeService(); testConcurrentPost();