From bf0eba59c50b6d32f9074558528cdf704e67631b Mon Sep 17 00:00:00 2001 From: Steve Gerbino Date: Wed, 22 Apr 2026 19:38:02 +0200 Subject: [PATCH 1/4] Add failing tests for executor-hopping bug in run() Pins the required semantics of capy::run at a cross-executor boundary: the forward trip must enqueue the target task, and the return trip must enqueue the caller's continuation. Covers five scenarios: - run(pe)(inner) from a handler on pe must not let inner cut the queue ahead of other pending work. - When the target runs synchronously, the return trip must still tick the caller's executor, so higher-priority work there runs before the caller resumes. - run(inner)(work) from inside a strand must release the strand while work runs. - A handler that does co_await run(strand)(task) must be outside the strand after the await returns. - An io loop that does co_await run(compute_pool)(task) must resume on the io thread, not on a compute worker. All five fail against the current dispatch-based run() and will pass once run() posts on both trips. Adds a test-only priority_executor support header used by the first three. --- test/unit/ex/priority_executor.hpp | 241 ++++++++++++++++++++++++++++ test/unit/ex/run.cpp | 32 ++++ test/unit/ex/run_priority.cpp | 242 +++++++++++++++++++++++++++++ test/unit/ex/strand.cpp | 29 ++++ 4 files changed, 544 insertions(+) create mode 100644 test/unit/ex/priority_executor.hpp create mode 100644 test/unit/ex/run_priority.cpp diff --git a/test/unit/ex/priority_executor.hpp b/test/unit/ex/priority_executor.hpp new file mode 100644 index 000000000..cdbfb9fc0 --- /dev/null +++ b/test/unit/ex/priority_executor.hpp @@ -0,0 +1,241 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#ifndef BOOST_CAPY_TEST_UNIT_EX_PRIORITY_EXECUTOR_HPP +#define BOOST_CAPY_TEST_UNIT_EX_PRIORITY_EXECUTOR_HPP + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace boost { +namespace capy { +namespace test { + +/** Test-only strand-shaped executor that drains high before low. +*/ +struct priority_executor_state +{ + std::mutex mutex; + continuation* high_head = nullptr; + continuation* high_tail = nullptr; + continuation* low_head = nullptr; + continuation* low_tail = nullptr; + bool locked = false; + std::atomic dispatch_thread{}; +}; + +namespace detail { + +struct priority_invoker +{ + struct promise_type + { + continuation self; + + priority_invoker get_return_object() noexcept + { + return {std::coroutine_handle::from_promise(*this)}; + } + + std::suspend_always initial_suspend() noexcept { return {}; } + std::suspend_never final_suspend() noexcept { return {}; } + void return_void() noexcept {} + void unhandled_exception() { std::terminate(); } + }; + + std::coroutine_handle h_; +}; + +inline void +drain_list(continuation* head) noexcept +{ + while(head) + { + continuation* c = head; + head = c->next; + c->next = nullptr; + ::boost::capy::safe_resume(c->h); + } +} + +inline priority_invoker +make_priority_invoker(priority_executor_state* s) +{ + for(;;) + { + s->dispatch_thread.store( + std::this_thread::get_id(), + std::memory_order_release); + + continuation* high_head; + continuation* low_head; + { + std::lock_guard lk(s->mutex); + high_head = s->high_head; + low_head = s->low_head; + s->high_head = nullptr; + s->high_tail = nullptr; + s->low_head = nullptr; + s->low_tail = nullptr; + } + + drain_list(high_head); + drain_list(low_head); + + { + std::lock_guard lk(s->mutex); + if(!s->high_head && !s->low_head) + { + s->locked = false; + s->dispatch_thread.store( + std::thread::id{}, + std::memory_order_release); + co_return; + } + } + } +} + +} // namespace detail + +/** Executor view over priority_executor_state. Dispatch has the same + thread-check fast path as strand; post defaults to the low queue. +*/ +template +class priority_executor +{ + priority_executor_state* state_; + Ex inner_ex_; + + enum class priority { high, low }; + + void + enqueue_under_lock(continuation& c, priority p) const noexcept + { + c.next = nullptr; + if(p == priority::high) + { + if(state_->high_tail) state_->high_tail->next = &c; + else state_->high_head = &c; + state_->high_tail = &c; + } + else + { + if(state_->low_tail) state_->low_tail->next = &c; + else state_->low_head = &c; + state_->low_tail = &c; + } + } + + void + post_with_priority(continuation& c, priority p) const + { + bool first; + { + std::lock_guard lk(state_->mutex); + enqueue_under_lock(c, p); + first = !state_->locked; + if(first) state_->locked = true; + } + if(first) + post_invoker(); + } + + void + post_invoker() const + { + auto inv = detail::make_priority_invoker(state_); + auto& self = inv.h_.promise().self; + self.h = inv.h_; + self.next = nullptr; + inner_ex_.post(self); + } + +public: + priority_executor(priority_executor_state& state, Ex inner) noexcept( + std::is_nothrow_move_constructible_v) + : state_(&state) + , inner_ex_(std::move(inner)) + { + } + + priority_executor(priority_executor const&) noexcept( + std::is_nothrow_copy_constructible_v) = default; + priority_executor(priority_executor&&) noexcept( + std::is_nothrow_move_constructible_v) = default; + priority_executor& operator=(priority_executor const&) = default; + priority_executor& operator=(priority_executor&&) noexcept( + std::is_nothrow_move_assignable_v) = default; + + bool + operator==(priority_executor const& other) const noexcept + { + return state_ == other.state_; + } + + auto& + context() const noexcept + { + return inner_ex_.context(); + } + + void on_work_started() const noexcept { inner_ex_.on_work_started(); } + void on_work_finished() const noexcept { inner_ex_.on_work_finished(); } + + bool + running_in_this_thread() const noexcept + { + return state_->dispatch_thread.load(std::memory_order_acquire) + == std::this_thread::get_id(); + } + + std::coroutine_handle<> + dispatch(continuation& c) const + { + if(running_in_this_thread()) + return c.h; + post_with_priority(c, priority::low); + return std::noop_coroutine(); + } + + void + post(continuation& c) const + { + post_with_priority(c, priority::low); + } + + void + post_high(continuation& c) const + { + post_with_priority(c, priority::high); + } + + void + post_low(continuation& c) const + { + post_with_priority(c, priority::low); + } +}; + +} // namespace test +} // namespace capy +} // namespace boost + +#endif diff --git a/test/unit/ex/run.cpp b/test/unit/ex/run.cpp index a968ef609..5c67645ef 100644 --- a/test/unit/ex/run.cpp +++ b/test/unit/ex/run.cpp @@ -21,9 +21,11 @@ #include #include +#include #include #include +#include namespace boost { namespace capy { @@ -500,6 +502,35 @@ struct run_test pool.join(); } + // co_await run(compute_exec)(...) from an io loop must return + // the caller to the io thread, not leave it on a compute worker. + void + testHopsBackToIoThread() + { + thread_pool compute_pool(2, "compute-"); + + std::thread::id io_tid = std::this_thread::get_id(); + std::thread::id compute_tid{}; + std::thread::id parent_tid_after_run{}; + + test::run_blocking()([&]() -> task { + auto compute_exec = compute_pool.get_executor(); + + co_await capy::run(compute_exec)([&]() -> task { + compute_tid = std::this_thread::get_id(); + co_return; + }()); + + parent_tid_after_run = std::this_thread::get_id(); + }()); + + BOOST_TEST(compute_tid != std::thread::id{}); + BOOST_TEST(compute_tid != io_tid); + BOOST_TEST_EQ(parent_tid_after_run, io_tid); + + compute_pool.join(); + } + void run() { @@ -523,6 +554,7 @@ struct run_test testAllocatorPropagation(); testAllocatorPropagationThroughRun(); testRunExStrandFirstInstruction(); + testHopsBackToIoThread(); } }; diff --git a/test/unit/ex/run_priority.cpp b/test/unit/ex/run_priority.cpp new file mode 100644 index 000000000..b890936fc --- /dev/null +++ b/test/unit/ex/run_priority.cpp @@ -0,0 +1,242 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +// Tests that fail against the current run()'s use of dispatch at +// cross-executor boundaries; pass once run() posts on both trips. + +#include +#include +#include +#include +#include +#include + +#include "priority_executor.hpp" +#include "test/unit/test_helpers.hpp" +#include "test_suite.hpp" + +#include +#include +#include +#include +#include +#include + +namespace boost { +namespace capy { + +static_assert(Executor>, + "priority_executor must satisfy Executor concept"); + +namespace { + +// Bare coroutine that appends a message and ends. Posted directly. +struct log_coro +{ + struct promise_type + { + std::vector* log; + std::string msg; + + log_coro get_return_object() noexcept + { + return log_coro{ + std::coroutine_handle::from_promise(*this)}; + } + + std::suspend_always initial_suspend() noexcept { return {}; } + std::suspend_never final_suspend() noexcept { return {}; } + void return_void() noexcept {} + void unhandled_exception() { std::terminate(); } + }; + + std::coroutine_handle h_; + + ~log_coro() { if(h_) h_.destroy(); } + + log_coro(log_coro&& o) noexcept : h_(o.h_) { o.h_ = nullptr; } + + std::coroutine_handle handle() const noexcept { return h_; } + + void release() noexcept { h_ = nullptr; } + +private: + explicit log_coro(std::coroutine_handle h) : h_(h) {} +}; + +inline log_coro +make_log_coro(std::vector& log, std::string msg) +{ + return [](std::vector* log, std::string msg) -> log_coro { + log->push_back(std::move(msg)); + co_return; + }(&log, std::move(msg)); +} + +inline void +pump(std::queue>& q) +{ + while(!q.empty()) + { + auto h = q.front(); + q.pop(); + h.resume(); + } +} + +} // namespace + +struct run_priority_test +{ + // run(pe)(inner) from a handler on pe must enqueue inner + // behind other work already in pe's queue, not cut in line. + void + testForwardCrossing() + { + std::queue> q; + queuing_executor qe(q); + test::priority_executor_state state; + test::priority_executor pe(state, qe); + + std::vector log; + + auto inner_task_fn = [&]() -> task { + log.push_back("inner"); + co_return; + }; + + auto outer_task_fn = [&]() -> task { + log.push_back("outer_start"); + co_await capy::run(pe)(inner_task_fn()); + log.push_back("outer_end"); + }; + + bool outer_done = false; + run_async(pe, [&]() { outer_done = true; })(outer_task_fn()); + + auto sibling_coro = make_log_coro(log, "sibling"); + continuation sibling_cont{sibling_coro.handle()}; + pe.post(sibling_cont); + sibling_coro.release(); + + pump(q); + + BOOST_TEST(outer_done); + + BOOST_TEST_EQ(log.size(), std::size_t(4)); + if(log.size() == 4) + { + BOOST_TEST_EQ(log[0], std::string("outer_start")); + BOOST_TEST_EQ(log[1], std::string("sibling")); + BOOST_TEST_EQ(log[2], std::string("inner")); + BOOST_TEST_EQ(log[3], std::string("outer_end")); + } + } + + // The return trip must post the caller back to its executor, + // giving pe a tick to drain higher-priority work before the + // caller resumes. inline_ex is chosen as the target so the + // forward trip is trivial and only the return trip is observed. + void + testReturnTripParentWrongFrame() + { + std::queue> q; + queuing_executor qe(q); + test::priority_executor_state state; + test::priority_executor pe(state, qe); + + std::vector log; + + auto inner_task_fn = [&]() -> task { + log.push_back("inner"); + co_return; + }; + + auto outer_task_fn = [&]() -> task { + log.push_back("outer_start"); + + auto pending_high_coro = make_log_coro(log, "pending_high"); + continuation pending_high_cont{pending_high_coro.handle()}; + pe.post_high(pending_high_cont); + pending_high_coro.release(); + + int dummy = 0; + test_executor inline_ex(1, dummy); + co_await capy::run(inline_ex)(inner_task_fn()); + + log.push_back("outer_end"); + }; + + bool outer_done = false; + run_async(pe, [&]() { outer_done = true; })(outer_task_fn()); + + pump(q); + + BOOST_TEST(outer_done); + + BOOST_TEST_EQ(log.size(), std::size_t(4)); + if(log.size() == 4) + { + BOOST_TEST_EQ(log[0], std::string("outer_start")); + BOOST_TEST_EQ(log[1], std::string("inner")); + BOOST_TEST_EQ(log[2], std::string("pending_high")); + BOOST_TEST_EQ(log[3], std::string("outer_end")); + } + } + + // run(inner)(work) from inside a strand must actually release + // the strand while work runs, not nest work in the strand's frame. + void + testExitStrandOverPriority() + { + std::queue> q; + queuing_executor qe(q); + test::priority_executor_state state; + test::priority_executor pe(state, qe); + + strand> s(pe); + + bool s_running_inside_work = false; + bool work_ran = false; + + auto work_task_fn = [&]() -> task { + s_running_inside_work = s.running_in_this_thread(); + work_ran = true; + co_return; + }; + + auto outer_task_fn = [&]() -> task { + co_await capy::run(pe)(work_task_fn()); + }; + + bool outer_done = false; + run_async(s, [&]() { outer_done = true; })(outer_task_fn()); + + pump(q); + + BOOST_TEST(outer_done); + BOOST_TEST(work_ran); + BOOST_TEST(!s_running_inside_work); + } + + void + run() + { + testForwardCrossing(); + testReturnTripParentWrongFrame(); + testExitStrandOverPriority(); + } +}; + +TEST_SUITE( + run_priority_test, + "boost.capy.run.priority"); + +} // namespace capy +} // namespace boost diff --git a/test/unit/ex/strand.cpp b/test/unit/ex/strand.cpp index 38ecaa616..d3795ab89 100644 --- a/test/unit/ex/strand.cpp +++ b/test/unit/ex/strand.cpp @@ -12,7 +12,11 @@ #include #include +#include +#include #include +#include +#include #include "test_suite.hpp" @@ -566,6 +570,30 @@ struct strand_test BOOST_TEST_EQ(completed.load(), N); } + // After co_await run(strand)(...) returns, caller must be outside + // the strand. User-reported bug: today it is still inside. + void + testExitStrandAfterRun() + { + bool running_in_strand_after_run = true; + bool inner_ran = false; + + test::run_blocking()([&]() -> task { + auto ex = co_await this_coro::executor; + auto str = capy::strand(ex); + + co_await capy::run(str)([&]() -> task { + inner_ran = true; + co_return; + }()); + + running_in_strand_after_run = str.running_in_this_thread(); + }()); + + BOOST_TEST(inner_ran); + BOOST_TEST(!running_in_strand_after_run); + } + void testAnyExecutor() { @@ -637,6 +665,7 @@ struct strand_test testConcurrentPost(); testFifoOrder(); testSerialization(); + testExitStrandAfterRun(); testAnyExecutor(); } }; From c0ea43eadb2ac9c4653bc79d354847c1875b0174 Mon Sep 17 00:00:00 2001 From: Steve Gerbino Date: Thu, 23 Apr 2026 16:29:47 +0200 Subject: [PATCH 2/4] Make run() post on both trips at cross-executor boundaries run_awaitable_ex::await_suspend and the return trampoline's final_suspend both called dispatch(). On executors with a thread-check fast path (strand, blocking_executor) dispatch can fire an inline symmetric transfer, which does not enqueue the target and does not give the caller's executor a fresh tick on the return. run(ex)(task) then fails to actually run task on ex and leaves the caller resuming on the wrong frame. Switch both trips to post + std::noop_coroutine(). Also rename dispatch_trampoline to boundary_trampoline; the type's purpose is bridging the executor boundary, and the old name named a mechanism that no longer applies. The five previously-failing tests in boost.capy.run.priority, boost.capy.ex.run, and boost.capy.strand now pass; full suite green at 76743 assertions. --- doc/continuation-rationale.md | 2 +- .../capy/detail/await_suspend_helper.hpp | 7 ++-- include/boost/capy/ex/run.hpp | 32 ++++++++++--------- 3 files changed, 22 insertions(+), 19 deletions(-) diff --git a/doc/continuation-rationale.md b/doc/continuation-rationale.md index 544e30d1c..c69d3c97a 100644 --- a/doc/continuation-rationale.md +++ b/doc/continuation-rationale.md @@ -432,7 +432,7 @@ their own `continuation`: - `when_all_core::continuation_` (parent handle for combinator) - `when_any_core::continuation_` (same) -- `dispatch_trampoline::parent_` (cross-executor trampoline) +- `boundary_trampoline::parent_` (cross-executor trampoline) - `run_awaitable_ex::task_cont_` (initial task dispatch) - `run_async_trampoline::task_cont_` (same) diff --git a/include/boost/capy/detail/await_suspend_helper.hpp b/include/boost/capy/detail/await_suspend_helper.hpp index 3d72c7c64..c2d704ccb 100644 --- a/include/boost/capy/detail/await_suspend_helper.hpp +++ b/include/boost/capy/detail/await_suspend_helper.hpp @@ -35,9 +35,10 @@ namespace detail { The return value is written to the now-destroyed frame. @li `await_suspend` hands the continuation to another thread - via `executor::dispatch()`, which may resume the parent. - The parent can destroy this frame before the runtime reads - `__$ReturnUdt$` (e.g. `dispatch_trampoline` final_suspend). + via an executor handoff (e.g. `post()` or `dispatch()`), + which may resume the parent. The parent can destroy this + frame before the runtime reads `__$ReturnUdt$` (e.g. + `boundary_trampoline` final_suspend). On MSVC this function calls `h.resume()` on the current stack and returns `void`, causing unconditional suspension. The diff --git a/include/boost/capy/ex/run.hpp b/include/boost/capy/ex/run.hpp index 9cb02dab3..d0699ac14 100644 --- a/include/boost/capy/ex/run.hpp +++ b/include/boost/capy/ex/run.hpp @@ -70,7 +70,7 @@ namespace boost::capy::detail { The trampoline never touches the task's result. */ -struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE dispatch_trampoline +struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE boundary_trampoline { struct promise_type : frame_alloc_mixin @@ -78,9 +78,9 @@ struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE dispatch_trampoline executor_ref caller_ex_; continuation parent_; - dispatch_trampoline get_return_object() noexcept + boundary_trampoline get_return_object() noexcept { - return dispatch_trampoline{ + return boundary_trampoline{ std::coroutine_handle::from_promise(*this)}; } @@ -96,8 +96,9 @@ struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE dispatch_trampoline auto await_suspend( std::coroutine_handle<>) noexcept { + p_->caller_ex_.post(p_->parent_); return detail::symmetric_transfer( - p_->caller_ex_.dispatch(p_->parent_)); + std::noop_coroutine()); } void await_resume() const noexcept {} @@ -111,20 +112,20 @@ struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE dispatch_trampoline std::coroutine_handle h_{nullptr}; - dispatch_trampoline() noexcept = default; + boundary_trampoline() noexcept = default; - ~dispatch_trampoline() + ~boundary_trampoline() { if(h_) h_.destroy(); } - dispatch_trampoline(dispatch_trampoline const&) = delete; - dispatch_trampoline& operator=(dispatch_trampoline const&) = delete; + boundary_trampoline(boundary_trampoline const&) = delete; + boundary_trampoline& operator=(boundary_trampoline const&) = delete; - dispatch_trampoline(dispatch_trampoline&& o) noexcept + boundary_trampoline(boundary_trampoline&& o) noexcept : h_(std::exchange(o.h_, nullptr)) {} - dispatch_trampoline& operator=(dispatch_trampoline&& o) noexcept + boundary_trampoline& operator=(boundary_trampoline&& o) noexcept { if(this != &o) { @@ -135,11 +136,11 @@ struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE dispatch_trampoline } private: - explicit dispatch_trampoline(std::coroutine_handle h) noexcept + explicit boundary_trampoline(std::coroutine_handle h) noexcept : h_(h) {} }; -inline dispatch_trampoline make_dispatch_trampoline() +inline boundary_trampoline make_boundary_trampoline() { co_return; } @@ -170,7 +171,7 @@ struct [[nodiscard]] run_awaitable_ex frame_memory_resource resource_; std::conditional_t st_; io_env env_; - dispatch_trampoline tr_; + boundary_trampoline tr_; continuation task_cont_; Task inner_; // Last: destroyed first, while env_ is still valid @@ -224,7 +225,7 @@ struct [[nodiscard]] run_awaitable_ex std::coroutine_handle<> await_suspend(std::coroutine_handle<> cont, io_env const* caller_env) { - tr_ = make_dispatch_trampoline(); + tr_ = make_boundary_trampoline(); tr_.h_.promise().caller_ex_ = caller_env->executor; tr_.h_.promise().parent_.h = cont; @@ -245,7 +246,8 @@ struct [[nodiscard]] run_awaitable_ex p.set_environment(&env_); task_cont_.h = h; - return ex_.dispatch(task_cont_); + ex_.post(task_cont_); + return std::noop_coroutine(); } // Non-copyable From c80f00cee2a4dc00fba77ff3c7e393777a43f6b4 Mon Sep 17 00:00:00 2001 From: Steve Gerbino Date: Thu, 23 Apr 2026 17:22:15 +0200 Subject: [PATCH 3/4] Fix TSan race in blocking_context::enqueue enqueue was calling impl_->cv.notify_one() outside the lock scope. A foreign thread could still be inside notify_one() when the main thread drained the queue, completed the task, saw signal_done, and destroyed the context. TSan flagged the read against cv during cond_signal after the waiter had released it. Move notify_one inside the lock_guard scope, matching signal_done. The race was latent before run() was switched to post on the return trip: no foreign thread ever called blocking_executor::post, so enqueue only ran on the pumping thread. testHopsBackToIoThread exercises a compute-pool worker posting back to blocking_executor via the return trampoline, which is what revealed the race under TSan. --- src/test/run_blocking.cpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/test/run_blocking.cpp b/src/test/run_blocking.cpp index 58a1e7f61..143dc5a26 100644 --- a/src/test/run_blocking.cpp +++ b/src/test/run_blocking.cpp @@ -87,10 +87,8 @@ void blocking_context::enqueue( std::coroutine_handle<> h) { - { - std::lock_guard lock(impl_->mtx); - impl_->queue.push(h); - } + std::lock_guard lock(impl_->mtx); + impl_->queue.push(h); impl_->cv.notify_one(); } From f089a1bdc137f3ee134d8e8d09774ff242adbe73 Mon Sep 17 00:00:00 2001 From: Steve Gerbino Date: Thu, 23 Apr 2026 17:58:09 +0200 Subject: [PATCH 4/4] Make codecov coverage checks informational Codecov's default behavior is to fail CI on any decrease from the base commit's coverage. Small refactors can move the denominator enough to trip this (recently a 0.12% drop) without the change meaningfully reducing test quality. Mark project and patch status as informational so coverage is reported on PRs but never gates merge. Matches corosio's configuration. --- .codecov.yml | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/.codecov.yml b/.codecov.yml index 4e80d1cac..e67db52b9 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -12,6 +12,16 @@ codecov: after_n_builds: 1 wait_for_ci: yes +# Make coverage checks informational (report but never fail CI) +coverage: + status: + project: + default: + informational: true + patch: + default: + informational: true + # Change how pull request comments look comment: layout: "reach,diff,flags,files,footer"