Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ codecov:
after_n_builds: 1
wait_for_ci: yes

# Make coverage checks informational (report but never fail CI)
coverage:
status:
project:
default:
informational: true
patch:
default:
informational: true

# Change how pull request comments look
comment:
layout: "reach,diff,flags,files,footer"
Expand Down
2 changes: 1 addition & 1 deletion doc/continuation-rationale.md
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ their own `continuation`:

- `when_all_core::continuation_` (parent handle for combinator)
- `when_any_core::continuation_` (same)
- `dispatch_trampoline::parent_` (cross-executor trampoline)
- `boundary_trampoline::parent_` (cross-executor trampoline)
- `run_awaitable_ex::task_cont_` (initial task dispatch)
- `run_async_trampoline::task_cont_` (same)

Expand Down
7 changes: 4 additions & 3 deletions include/boost/capy/detail/await_suspend_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ namespace detail {
The return value is written to the now-destroyed frame.

@li `await_suspend` hands the continuation to another thread
via `executor::dispatch()`, which may resume the parent.
The parent can destroy this frame before the runtime reads
`__$ReturnUdt$` (e.g. `dispatch_trampoline` final_suspend).
via an executor handoff (e.g. `post()` or `dispatch()`),
which may resume the parent. The parent can destroy this
frame before the runtime reads `__$ReturnUdt$` (e.g.
`boundary_trampoline` final_suspend).

On MSVC this function calls `h.resume()` on the current stack
and returns `void`, causing unconditional suspension. The
Expand Down
32 changes: 17 additions & 15 deletions include/boost/capy/ex/run.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,17 +70,17 @@ namespace boost::capy::detail {

The trampoline never touches the task's result.
*/
struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE dispatch_trampoline
struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE boundary_trampoline
{
struct promise_type
: frame_alloc_mixin
{
executor_ref caller_ex_;
continuation parent_;

dispatch_trampoline get_return_object() noexcept
boundary_trampoline get_return_object() noexcept
{
return dispatch_trampoline{
return boundary_trampoline{
std::coroutine_handle<promise_type>::from_promise(*this)};
}

Expand All @@ -96,8 +96,9 @@ struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE dispatch_trampoline
auto await_suspend(
std::coroutine_handle<>) noexcept
{
p_->caller_ex_.post(p_->parent_);
return detail::symmetric_transfer(
p_->caller_ex_.dispatch(p_->parent_));
std::noop_coroutine());
}

void await_resume() const noexcept {}
Expand All @@ -111,20 +112,20 @@ struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE dispatch_trampoline

std::coroutine_handle<promise_type> h_{nullptr};

dispatch_trampoline() noexcept = default;
boundary_trampoline() noexcept = default;

~dispatch_trampoline()
~boundary_trampoline()
{
if(h_) h_.destroy();
}

dispatch_trampoline(dispatch_trampoline const&) = delete;
dispatch_trampoline& operator=(dispatch_trampoline const&) = delete;
boundary_trampoline(boundary_trampoline const&) = delete;
boundary_trampoline& operator=(boundary_trampoline const&) = delete;

dispatch_trampoline(dispatch_trampoline&& o) noexcept
boundary_trampoline(boundary_trampoline&& o) noexcept
: h_(std::exchange(o.h_, nullptr)) {}

dispatch_trampoline& operator=(dispatch_trampoline&& o) noexcept
boundary_trampoline& operator=(boundary_trampoline&& o) noexcept
{
if(this != &o)
{
Expand All @@ -135,11 +136,11 @@ struct BOOST_CAPY_CORO_DESTROY_WHEN_COMPLETE dispatch_trampoline
}

private:
explicit dispatch_trampoline(std::coroutine_handle<promise_type> h) noexcept
explicit boundary_trampoline(std::coroutine_handle<promise_type> h) noexcept
: h_(h) {}
};

inline dispatch_trampoline make_dispatch_trampoline()
inline boundary_trampoline make_boundary_trampoline()
{
co_return;
}
Expand Down Expand Up @@ -170,7 +171,7 @@ struct [[nodiscard]] run_awaitable_ex
frame_memory_resource<Alloc> resource_;
std::conditional_t<InheritStopToken, std::monostate, std::stop_token> st_;
io_env env_;
dispatch_trampoline tr_;
boundary_trampoline tr_;
continuation task_cont_;
Task inner_; // Last: destroyed first, while env_ is still valid

Expand Down Expand Up @@ -224,7 +225,7 @@ struct [[nodiscard]] run_awaitable_ex

std::coroutine_handle<> await_suspend(std::coroutine_handle<> cont, io_env const* caller_env)
{
tr_ = make_dispatch_trampoline();
tr_ = make_boundary_trampoline();
tr_.h_.promise().caller_ex_ = caller_env->executor;
tr_.h_.promise().parent_.h = cont;

Expand All @@ -245,7 +246,8 @@ struct [[nodiscard]] run_awaitable_ex

p.set_environment(&env_);
task_cont_.h = h;
return ex_.dispatch(task_cont_);
ex_.post(task_cont_);
return std::noop_coroutine();
}

// Non-copyable
Expand Down
6 changes: 2 additions & 4 deletions src/test/run_blocking.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,8 @@ void
blocking_context::enqueue(
std::coroutine_handle<> h)
{
{
std::lock_guard<std::mutex> lock(impl_->mtx);
impl_->queue.push(h);
}
std::lock_guard<std::mutex> lock(impl_->mtx);
impl_->queue.push(h);
impl_->cv.notify_one();
}

Expand Down
241 changes: 241 additions & 0 deletions test/unit/ex/priority_executor.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
//
// Copyright (c) 2026 Steve Gerbino
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
// Official repository: https://github.com/cppalliance/capy
//

#ifndef BOOST_CAPY_TEST_UNIT_EX_PRIORITY_EXECUTOR_HPP
#define BOOST_CAPY_TEST_UNIT_EX_PRIORITY_EXECUTOR_HPP

#include <boost/capy/concept/executor.hpp>
#include <boost/capy/continuation.hpp>
#include <boost/capy/ex/execution_context.hpp>
#include <boost/capy/ex/executor_ref.hpp>
#include <boost/capy/ex/frame_allocator.hpp>

#include <atomic>
#include <coroutine>
#include <exception>
#include <mutex>
#include <thread>
#include <type_traits>
#include <utility>

namespace boost {
namespace capy {
namespace test {

/** Test-only strand-shaped executor that drains high before low.
*/
struct priority_executor_state
{
std::mutex mutex;
continuation* high_head = nullptr;
continuation* high_tail = nullptr;
continuation* low_head = nullptr;
continuation* low_tail = nullptr;
bool locked = false;
std::atomic<std::thread::id> dispatch_thread{};
};

namespace detail {

struct priority_invoker
{
struct promise_type
{
continuation self;

priority_invoker get_return_object() noexcept
{
return {std::coroutine_handle<promise_type>::from_promise(*this)};
}

std::suspend_always initial_suspend() noexcept { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() noexcept {}
void unhandled_exception() { std::terminate(); }
};

std::coroutine_handle<promise_type> h_;
};

inline void
drain_list(continuation* head) noexcept
{
while(head)
{
continuation* c = head;
head = c->next;
c->next = nullptr;
::boost::capy::safe_resume(c->h);
}
}

inline priority_invoker
make_priority_invoker(priority_executor_state* s)
{
for(;;)
{
s->dispatch_thread.store(
std::this_thread::get_id(),
std::memory_order_release);

continuation* high_head;
continuation* low_head;
{
std::lock_guard<std::mutex> lk(s->mutex);
high_head = s->high_head;
low_head = s->low_head;
s->high_head = nullptr;
s->high_tail = nullptr;
s->low_head = nullptr;
s->low_tail = nullptr;
}

drain_list(high_head);
drain_list(low_head);

{
std::lock_guard<std::mutex> lk(s->mutex);
if(!s->high_head && !s->low_head)
{
s->locked = false;
s->dispatch_thread.store(
std::thread::id{},
std::memory_order_release);
co_return;
}
}
}
}

} // namespace detail

/** Executor view over priority_executor_state. Dispatch has the same
thread-check fast path as strand; post defaults to the low queue.
*/
template<class Ex>
class priority_executor
{
priority_executor_state* state_;
Ex inner_ex_;

enum class priority { high, low };

void
enqueue_under_lock(continuation& c, priority p) const noexcept
{
c.next = nullptr;
if(p == priority::high)
{
if(state_->high_tail) state_->high_tail->next = &c;
else state_->high_head = &c;
state_->high_tail = &c;
}
else
{
if(state_->low_tail) state_->low_tail->next = &c;
else state_->low_head = &c;
state_->low_tail = &c;
}
}

void
post_with_priority(continuation& c, priority p) const
{
bool first;
{
std::lock_guard<std::mutex> lk(state_->mutex);
enqueue_under_lock(c, p);
first = !state_->locked;
if(first) state_->locked = true;
}
if(first)
post_invoker();
}

void
post_invoker() const
{
auto inv = detail::make_priority_invoker(state_);
auto& self = inv.h_.promise().self;
self.h = inv.h_;
self.next = nullptr;
inner_ex_.post(self);
}

public:
priority_executor(priority_executor_state& state, Ex inner) noexcept(
std::is_nothrow_move_constructible_v<Ex>)
: state_(&state)
, inner_ex_(std::move(inner))
{
}

priority_executor(priority_executor const&) noexcept(
std::is_nothrow_copy_constructible_v<Ex>) = default;
priority_executor(priority_executor&&) noexcept(
std::is_nothrow_move_constructible_v<Ex>) = default;
priority_executor& operator=(priority_executor const&) = default;
priority_executor& operator=(priority_executor&&) noexcept(
std::is_nothrow_move_assignable_v<Ex>) = default;

bool
operator==(priority_executor const& other) const noexcept
{
return state_ == other.state_;
}

auto&
context() const noexcept
{
return inner_ex_.context();
}

void on_work_started() const noexcept { inner_ex_.on_work_started(); }
void on_work_finished() const noexcept { inner_ex_.on_work_finished(); }

bool
running_in_this_thread() const noexcept
{
return state_->dispatch_thread.load(std::memory_order_acquire)
== std::this_thread::get_id();
}

std::coroutine_handle<>
dispatch(continuation& c) const
{
if(running_in_this_thread())
return c.h;
post_with_priority(c, priority::low);
return std::noop_coroutine();
}

void
post(continuation& c) const
{
post_with_priority(c, priority::low);
}

void
post_high(continuation& c) const
{
post_with_priority(c, priority::high);
}

void
post_low(continuation& c) const
{
post_with_priority(c, priority::low);
}
};

} // namespace test
} // namespace capy
} // namespace boost

#endif
Loading
Loading