diff --git a/.codecov.yml b/.codecov.yml index 4e80d1cac..e67db52b9 100644 --- a/.codecov.yml +++ b/.codecov.yml @@ -12,6 +12,16 @@ codecov: after_n_builds: 1 wait_for_ci: yes +# Make coverage checks informational (report but never fail CI) +coverage: + status: + project: + default: + informational: true + patch: + default: + informational: true + # Change how pull request comments look comment: layout: "reach,diff,flags,files,footer" diff --git a/doc/continuation-rationale.md b/doc/continuation-rationale.md index 544e30d1c..c69d3c97a 100644 --- a/doc/continuation-rationale.md +++ b/doc/continuation-rationale.md @@ -432,7 +432,7 @@ their own `continuation`: - `when_all_core::continuation_` (parent handle for combinator) - `when_any_core::continuation_` (same) -- `dispatch_trampoline::parent_` (cross-executor trampoline) +- `boundary_trampoline::parent_` (cross-executor trampoline) - `run_awaitable_ex::task_cont_` (initial task dispatch) - `run_async_trampoline::task_cont_` (same) diff --git a/include/boost/capy/detail/await_suspend_helper.hpp b/include/boost/capy/detail/await_suspend_helper.hpp index 3d72c7c64..c2d704ccb 100644 --- a/include/boost/capy/detail/await_suspend_helper.hpp +++ b/include/boost/capy/detail/await_suspend_helper.hpp @@ -35,9 +35,10 @@ namespace detail { The return value is written to the now-destroyed frame. @li `await_suspend` hands the continuation to another thread - via `executor::dispatch()`, which may resume the parent. - The parent can destroy this frame before the runtime reads - `__$ReturnUdt$` (e.g. `dispatch_trampoline` final_suspend). + via an executor handoff (e.g. `post()` or `dispatch()`), + which may resume the parent. The parent can destroy this + frame before the runtime reads `__$ReturnUdt$` (e.g. + `boundary_trampoline` final_suspend). On MSVC this function calls `h.resume()` on the current stack and returns `void`, causing unconditional suspension. The diff --git a/include/boost/capy/ex/run.hpp b/include/boost/capy/ex/run.hpp index 9cb02dab3..d0699ac14 100644 --- a/include/boost/capy/ex/run.hpp +++ b/include/boost/capy/ex/run.hpp @@ -70,7 +70,7 @@ namespace boost::capy::detail { The trampoline never touches the task's result. */ -struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE dispatch_trampoline +struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE boundary_trampoline { struct promise_type : frame_alloc_mixin @@ -78,9 +78,9 @@ struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE dispatch_trampoline executor_ref caller_ex_; continuation parent_; - dispatch_trampoline get_return_object() noexcept + boundary_trampoline get_return_object() noexcept { - return dispatch_trampoline{ + return boundary_trampoline{ std::coroutine_handle::from_promise(*this)}; } @@ -96,8 +96,9 @@ struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE dispatch_trampoline auto await_suspend( std::coroutine_handle<>) noexcept { + p_->caller_ex_.post(p_->parent_); return detail::symmetric_transfer( - p_->caller_ex_.dispatch(p_->parent_)); + std::noop_coroutine()); } void await_resume() const noexcept {} @@ -111,20 +112,20 @@ struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE dispatch_trampoline std::coroutine_handle h_{nullptr}; - dispatch_trampoline() noexcept = default; + boundary_trampoline() noexcept = default; - ~dispatch_trampoline() + ~boundary_trampoline() { if(h_) h_.destroy(); } - dispatch_trampoline(dispatch_trampoline const&) = delete; - dispatch_trampoline& operator=(dispatch_trampoline const&) = delete; + boundary_trampoline(boundary_trampoline const&) = delete; + boundary_trampoline& operator=(boundary_trampoline const&) = delete; - dispatch_trampoline(dispatch_trampoline&& o) noexcept + boundary_trampoline(boundary_trampoline&& o) noexcept : h_(std::exchange(o.h_, nullptr)) {} - dispatch_trampoline& operator=(dispatch_trampoline&& o) noexcept + boundary_trampoline& operator=(boundary_trampoline&& o) noexcept { if(this != &o) { @@ -135,11 +136,11 @@ struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE dispatch_trampoline } private: - explicit dispatch_trampoline(std::coroutine_handle h) noexcept + explicit boundary_trampoline(std::coroutine_handle h) noexcept : h_(h) {} }; -inline dispatch_trampoline make_dispatch_trampoline() +inline boundary_trampoline make_boundary_trampoline() { co_return; } @@ -170,7 +171,7 @@ struct [[nodiscard]] run_awaitable_ex frame_memory_resource resource_; std::conditional_t st_; io_env env_; - dispatch_trampoline tr_; + boundary_trampoline tr_; continuation task_cont_; Task inner_; // Last: destroyed first, while env_ is still valid @@ -224,7 +225,7 @@ struct [[nodiscard]] run_awaitable_ex std::coroutine_handle<> await_suspend(std::coroutine_handle<> cont, io_env const* caller_env) { - tr_ = make_dispatch_trampoline(); + tr_ = make_boundary_trampoline(); tr_.h_.promise().caller_ex_ = caller_env->executor; tr_.h_.promise().parent_.h = cont; @@ -245,7 +246,8 @@ struct [[nodiscard]] run_awaitable_ex p.set_environment(&env_); task_cont_.h = h; - return ex_.dispatch(task_cont_); + ex_.post(task_cont_); + return std::noop_coroutine(); } // Non-copyable diff --git a/src/test/run_blocking.cpp b/src/test/run_blocking.cpp index 58a1e7f61..143dc5a26 100644 --- a/src/test/run_blocking.cpp +++ b/src/test/run_blocking.cpp @@ -87,10 +87,8 @@ void blocking_context::enqueue( std::coroutine_handle<> h) { - { - std::lock_guard lock(impl_->mtx); - impl_->queue.push(h); - } + std::lock_guard lock(impl_->mtx); + impl_->queue.push(h); impl_->cv.notify_one(); } diff --git a/test/unit/ex/priority_executor.hpp b/test/unit/ex/priority_executor.hpp new file mode 100644 index 000000000..cdbfb9fc0 --- /dev/null +++ b/test/unit/ex/priority_executor.hpp @@ -0,0 +1,241 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +#ifndef BOOST_CAPY_TEST_UNIT_EX_PRIORITY_EXECUTOR_HPP +#define BOOST_CAPY_TEST_UNIT_EX_PRIORITY_EXECUTOR_HPP + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +namespace boost { +namespace capy { +namespace test { + +/** Test-only strand-shaped executor that drains high before low. +*/ +struct priority_executor_state +{ + std::mutex mutex; + continuation* high_head = nullptr; + continuation* high_tail = nullptr; + continuation* low_head = nullptr; + continuation* low_tail = nullptr; + bool locked = false; + std::atomic dispatch_thread{}; +}; + +namespace detail { + +struct priority_invoker +{ + struct promise_type + { + continuation self; + + priority_invoker get_return_object() noexcept + { + return {std::coroutine_handle::from_promise(*this)}; + } + + std::suspend_always initial_suspend() noexcept { return {}; } + std::suspend_never final_suspend() noexcept { return {}; } + void return_void() noexcept {} + void unhandled_exception() { std::terminate(); } + }; + + std::coroutine_handle h_; +}; + +inline void +drain_list(continuation* head) noexcept +{ + while(head) + { + continuation* c = head; + head = c->next; + c->next = nullptr; + ::boost::capy::safe_resume(c->h); + } +} + +inline priority_invoker +make_priority_invoker(priority_executor_state* s) +{ + for(;;) + { + s->dispatch_thread.store( + std::this_thread::get_id(), + std::memory_order_release); + + continuation* high_head; + continuation* low_head; + { + std::lock_guard lk(s->mutex); + high_head = s->high_head; + low_head = s->low_head; + s->high_head = nullptr; + s->high_tail = nullptr; + s->low_head = nullptr; + s->low_tail = nullptr; + } + + drain_list(high_head); + drain_list(low_head); + + { + std::lock_guard lk(s->mutex); + if(!s->high_head && !s->low_head) + { + s->locked = false; + s->dispatch_thread.store( + std::thread::id{}, + std::memory_order_release); + co_return; + } + } + } +} + +} // namespace detail + +/** Executor view over priority_executor_state. Dispatch has the same + thread-check fast path as strand; post defaults to the low queue. +*/ +template +class priority_executor +{ + priority_executor_state* state_; + Ex inner_ex_; + + enum class priority { high, low }; + + void + enqueue_under_lock(continuation& c, priority p) const noexcept + { + c.next = nullptr; + if(p == priority::high) + { + if(state_->high_tail) state_->high_tail->next = &c; + else state_->high_head = &c; + state_->high_tail = &c; + } + else + { + if(state_->low_tail) state_->low_tail->next = &c; + else state_->low_head = &c; + state_->low_tail = &c; + } + } + + void + post_with_priority(continuation& c, priority p) const + { + bool first; + { + std::lock_guard lk(state_->mutex); + enqueue_under_lock(c, p); + first = !state_->locked; + if(first) state_->locked = true; + } + if(first) + post_invoker(); + } + + void + post_invoker() const + { + auto inv = detail::make_priority_invoker(state_); + auto& self = inv.h_.promise().self; + self.h = inv.h_; + self.next = nullptr; + inner_ex_.post(self); + } + +public: + priority_executor(priority_executor_state& state, Ex inner) noexcept( + std::is_nothrow_move_constructible_v) + : state_(&state) + , inner_ex_(std::move(inner)) + { + } + + priority_executor(priority_executor const&) noexcept( + std::is_nothrow_copy_constructible_v) = default; + priority_executor(priority_executor&&) noexcept( + std::is_nothrow_move_constructible_v) = default; + priority_executor& operator=(priority_executor const&) = default; + priority_executor& operator=(priority_executor&&) noexcept( + std::is_nothrow_move_assignable_v) = default; + + bool + operator==(priority_executor const& other) const noexcept + { + return state_ == other.state_; + } + + auto& + context() const noexcept + { + return inner_ex_.context(); + } + + void on_work_started() const noexcept { inner_ex_.on_work_started(); } + void on_work_finished() const noexcept { inner_ex_.on_work_finished(); } + + bool + running_in_this_thread() const noexcept + { + return state_->dispatch_thread.load(std::memory_order_acquire) + == std::this_thread::get_id(); + } + + std::coroutine_handle<> + dispatch(continuation& c) const + { + if(running_in_this_thread()) + return c.h; + post_with_priority(c, priority::low); + return std::noop_coroutine(); + } + + void + post(continuation& c) const + { + post_with_priority(c, priority::low); + } + + void + post_high(continuation& c) const + { + post_with_priority(c, priority::high); + } + + void + post_low(continuation& c) const + { + post_with_priority(c, priority::low); + } +}; + +} // namespace test +} // namespace capy +} // namespace boost + +#endif diff --git a/test/unit/ex/run.cpp b/test/unit/ex/run.cpp index a968ef609..5c67645ef 100644 --- a/test/unit/ex/run.cpp +++ b/test/unit/ex/run.cpp @@ -21,9 +21,11 @@ #include #include +#include #include #include +#include namespace boost { namespace capy { @@ -500,6 +502,35 @@ struct run_test pool.join(); } + // co_await run(compute_exec)(...) from an io loop must return + // the caller to the io thread, not leave it on a compute worker. + void + testHopsBackToIoThread() + { + thread_pool compute_pool(2, "compute-"); + + std::thread::id io_tid = std::this_thread::get_id(); + std::thread::id compute_tid{}; + std::thread::id parent_tid_after_run{}; + + test::run_blocking()([&]() -> task { + auto compute_exec = compute_pool.get_executor(); + + co_await capy::run(compute_exec)([&]() -> task { + compute_tid = std::this_thread::get_id(); + co_return; + }()); + + parent_tid_after_run = std::this_thread::get_id(); + }()); + + BOOST_TEST(compute_tid != std::thread::id{}); + BOOST_TEST(compute_tid != io_tid); + BOOST_TEST_EQ(parent_tid_after_run, io_tid); + + compute_pool.join(); + } + void run() { @@ -523,6 +554,7 @@ struct run_test testAllocatorPropagation(); testAllocatorPropagationThroughRun(); testRunExStrandFirstInstruction(); + testHopsBackToIoThread(); } }; diff --git a/test/unit/ex/run_priority.cpp b/test/unit/ex/run_priority.cpp new file mode 100644 index 000000000..b890936fc --- /dev/null +++ b/test/unit/ex/run_priority.cpp @@ -0,0 +1,242 @@ +// +// Copyright (c) 2026 Steve Gerbino +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +// Official repository: https://github.com/cppalliance/capy +// + +// Tests that fail against the current run()'s use of dispatch at +// cross-executor boundaries; pass once run() posts on both trips. + +#include +#include +#include +#include +#include +#include + +#include "priority_executor.hpp" +#include "test/unit/test_helpers.hpp" +#include "test_suite.hpp" + +#include +#include +#include +#include +#include +#include + +namespace boost { +namespace capy { + +static_assert(Executor>, + "priority_executor must satisfy Executor concept"); + +namespace { + +// Bare coroutine that appends a message and ends. Posted directly. +struct log_coro +{ + struct promise_type + { + std::vector* log; + std::string msg; + + log_coro get_return_object() noexcept + { + return log_coro{ + std::coroutine_handle::from_promise(*this)}; + } + + std::suspend_always initial_suspend() noexcept { return {}; } + std::suspend_never final_suspend() noexcept { return {}; } + void return_void() noexcept {} + void unhandled_exception() { std::terminate(); } + }; + + std::coroutine_handle h_; + + ~log_coro() { if(h_) h_.destroy(); } + + log_coro(log_coro&& o) noexcept : h_(o.h_) { o.h_ = nullptr; } + + std::coroutine_handle handle() const noexcept { return h_; } + + void release() noexcept { h_ = nullptr; } + +private: + explicit log_coro(std::coroutine_handle h) : h_(h) {} +}; + +inline log_coro +make_log_coro(std::vector& log, std::string msg) +{ + return [](std::vector* log, std::string msg) -> log_coro { + log->push_back(std::move(msg)); + co_return; + }(&log, std::move(msg)); +} + +inline void +pump(std::queue>& q) +{ + while(!q.empty()) + { + auto h = q.front(); + q.pop(); + h.resume(); + } +} + +} // namespace + +struct run_priority_test +{ + // run(pe)(inner) from a handler on pe must enqueue inner + // behind other work already in pe's queue, not cut in line. + void + testForwardCrossing() + { + std::queue> q; + queuing_executor qe(q); + test::priority_executor_state state; + test::priority_executor pe(state, qe); + + std::vector log; + + auto inner_task_fn = [&]() -> task { + log.push_back("inner"); + co_return; + }; + + auto outer_task_fn = [&]() -> task { + log.push_back("outer_start"); + co_await capy::run(pe)(inner_task_fn()); + log.push_back("outer_end"); + }; + + bool outer_done = false; + run_async(pe, [&]() { outer_done = true; })(outer_task_fn()); + + auto sibling_coro = make_log_coro(log, "sibling"); + continuation sibling_cont{sibling_coro.handle()}; + pe.post(sibling_cont); + sibling_coro.release(); + + pump(q); + + BOOST_TEST(outer_done); + + BOOST_TEST_EQ(log.size(), std::size_t(4)); + if(log.size() == 4) + { + BOOST_TEST_EQ(log[0], std::string("outer_start")); + BOOST_TEST_EQ(log[1], std::string("sibling")); + BOOST_TEST_EQ(log[2], std::string("inner")); + BOOST_TEST_EQ(log[3], std::string("outer_end")); + } + } + + // The return trip must post the caller back to its executor, + // giving pe a tick to drain higher-priority work before the + // caller resumes. inline_ex is chosen as the target so the + // forward trip is trivial and only the return trip is observed. + void + testReturnTripParentWrongFrame() + { + std::queue> q; + queuing_executor qe(q); + test::priority_executor_state state; + test::priority_executor pe(state, qe); + + std::vector log; + + auto inner_task_fn = [&]() -> task { + log.push_back("inner"); + co_return; + }; + + auto outer_task_fn = [&]() -> task { + log.push_back("outer_start"); + + auto pending_high_coro = make_log_coro(log, "pending_high"); + continuation pending_high_cont{pending_high_coro.handle()}; + pe.post_high(pending_high_cont); + pending_high_coro.release(); + + int dummy = 0; + test_executor inline_ex(1, dummy); + co_await capy::run(inline_ex)(inner_task_fn()); + + log.push_back("outer_end"); + }; + + bool outer_done = false; + run_async(pe, [&]() { outer_done = true; })(outer_task_fn()); + + pump(q); + + BOOST_TEST(outer_done); + + BOOST_TEST_EQ(log.size(), std::size_t(4)); + if(log.size() == 4) + { + BOOST_TEST_EQ(log[0], std::string("outer_start")); + BOOST_TEST_EQ(log[1], std::string("inner")); + BOOST_TEST_EQ(log[2], std::string("pending_high")); + BOOST_TEST_EQ(log[3], std::string("outer_end")); + } + } + + // run(inner)(work) from inside a strand must actually release + // the strand while work runs, not nest work in the strand's frame. + void + testExitStrandOverPriority() + { + std::queue> q; + queuing_executor qe(q); + test::priority_executor_state state; + test::priority_executor pe(state, qe); + + strand> s(pe); + + bool s_running_inside_work = false; + bool work_ran = false; + + auto work_task_fn = [&]() -> task { + s_running_inside_work = s.running_in_this_thread(); + work_ran = true; + co_return; + }; + + auto outer_task_fn = [&]() -> task { + co_await capy::run(pe)(work_task_fn()); + }; + + bool outer_done = false; + run_async(s, [&]() { outer_done = true; })(outer_task_fn()); + + pump(q); + + BOOST_TEST(outer_done); + BOOST_TEST(work_ran); + BOOST_TEST(!s_running_inside_work); + } + + void + run() + { + testForwardCrossing(); + testReturnTripParentWrongFrame(); + testExitStrandOverPriority(); + } +}; + +TEST_SUITE( + run_priority_test, + "boost.capy.run.priority"); + +} // namespace capy +} // namespace boost diff --git a/test/unit/ex/strand.cpp b/test/unit/ex/strand.cpp index 38ecaa616..d3795ab89 100644 --- a/test/unit/ex/strand.cpp +++ b/test/unit/ex/strand.cpp @@ -12,7 +12,11 @@ #include #include +#include +#include #include +#include +#include #include "test_suite.hpp" @@ -566,6 +570,30 @@ struct strand_test BOOST_TEST_EQ(completed.load(), N); } + // After co_await run(strand)(...) returns, caller must be outside + // the strand. User-reported bug: today it is still inside. + void + testExitStrandAfterRun() + { + bool running_in_strand_after_run = true; + bool inner_ran = false; + + test::run_blocking()([&]() -> task { + auto ex = co_await this_coro::executor; + auto str = capy::strand(ex); + + co_await capy::run(str)([&]() -> task { + inner_ran = true; + co_return; + }()); + + running_in_strand_after_run = str.running_in_this_thread(); + }()); + + BOOST_TEST(inner_ran); + BOOST_TEST(!running_in_strand_after_run); + } + void testAnyExecutor() { @@ -637,6 +665,7 @@ struct strand_test testConcurrentPost(); testFifoOrder(); testSerialization(); + testExitStrandAfterRun(); testAnyExecutor(); } };