Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions include/boost/capy/ex/thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,22 +196,22 @@ class thread_pool::executor_type

/** Dispatch a continuation for execution.

Posts the continuation to the thread pool for execution on a
worker thread and returns `std::noop_coroutine()`. Thread
pools never execute inline because no single thread "owns"
the pool.

@param c The continuation to execute. Must remain at a
stable address until dequeued and resumed.

@return `std::noop_coroutine()` always.
If the calling thread is a worker of this pool, returns
`c.h` for symmetric transfer so the caller can resume the
continuation inline. Otherwise, posts the continuation to
the pool for execution on a worker thread and returns
`std::noop_coroutine()`.

@param c The continuation to execute. On the post path,
must remain at a stable address until dequeued
and resumed.

@return `c.h` when the calling thread is a pool worker;
`std::noop_coroutine()` otherwise.
*/
BOOST_CAPY_DECL
std::coroutine_handle<>
dispatch(continuation& c) const
{
post(c);
return std::noop_coroutine();
}
dispatch(continuation& c) const;

/** Post a continuation to the thread pool.

Expand Down
31 changes: 31 additions & 0 deletions src/ex/thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include <boost/capy/ex/thread_pool.hpp>
#include <boost/capy/continuation.hpp>
#include <boost/capy/detail/thread_local_ptr.hpp>
#include <boost/capy/ex/frame_allocator.hpp>
#include <boost/capy/test/thread_name.hpp>
#include <algorithm>
Expand Down Expand Up @@ -51,6 +52,12 @@ namespace capy {

class thread_pool::impl
{
// Identifies the pool owning the current worker thread, or
// nullptr if the calling thread is not a pool worker. Checked
// by dispatch() to decide between symmetric transfer (inline
// resume) and post.
static inline detail::thread_local_ptr<impl const> current_;

// Intrusive queue of continuations via continuation::next.
// No per-post allocation: the continuation is owned by the caller.
continuation* head_ = nullptr;
Expand Down Expand Up @@ -96,6 +103,12 @@ class thread_pool::impl
public:
~impl() = default;

bool
running_in_this_thread() const noexcept
{
return current_.get() == this;
}

// Destroy abandoned coroutine frames. Must be called
// before execution_context::shutdown()/destroy() so
// that suspended-frame destructors (e.g. delay_awaitable
Expand Down Expand Up @@ -213,6 +226,14 @@ class thread_pool::impl
std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
set_current_thread_name(name);

// Mark this thread as a worker of this pool so dispatch()
// can symmetric-transfer when called from within pool work.
struct scoped_pool
{
scoped_pool(impl const* p) noexcept { current_.set(p); }
~scoped_pool() noexcept { current_.set(nullptr); }
} guard(this);

for(;;)
{
continuation* c = nullptr;
Expand Down Expand Up @@ -297,5 +318,15 @@ post(continuation& c) const
pool_->impl_->post(c);
}

std::coroutine_handle<>
thread_pool::executor_type::
dispatch(continuation& c) const
{
if(pool_->impl_->running_in_this_thread())
return c.h;
pool_->impl_->post(c);
return std::noop_coroutine();
}

} // capy
} // boost
135 changes: 130 additions & 5 deletions test/unit/ex/thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,46 @@ struct test_service : execution_context::service
void shutdown() override {}
};

// Probe coroutine starts suspended; resuming it completes and
// auto-destroys the frame (suspend_never final). If never
// resumed, probe_coro's dtor destroys it.
struct probe_coro
{
struct promise_type
{
probe_coro
get_return_object() noexcept
{
return probe_coro{
std::coroutine_handle<promise_type>::from_promise(*this)};
}
std::suspend_always initial_suspend() noexcept { return {}; }
std::suspend_never final_suspend() noexcept { return {}; }
void return_void() noexcept {}
void unhandled_exception() { std::terminate(); }
};

std::coroutine_handle<promise_type> h_;

~probe_coro() { if(h_) h_.destroy(); }

probe_coro(probe_coro&& other) noexcept
: h_(other.h_) { other.h_ = nullptr; }

std::coroutine_handle<void> handle() const noexcept { return h_; }
void release() noexcept { h_ = nullptr; }

private:
explicit probe_coro(std::coroutine_handle<promise_type> h)
: h_(h) {}
};

inline probe_coro
make_probe()
{
co_return;
}

#if defined(BOOST_CAPY_TEST_CAN_GET_THREAD_NAME)
// Result storage for thread name check
struct name_check_result
Expand Down Expand Up @@ -189,12 +229,95 @@ struct thread_pool_test
void
testDispatch()
{
continuation c{std::noop_coroutine()};
thread_pool pool(1);
auto ex = pool.get_executor();
// From outside any pool, dispatch() posts.
auto probe = make_probe();
auto probe_h = probe.handle();
auto* target = new continuation{probe_h};

std::coroutine_handle<> returned;
{
thread_pool pool(1);
auto ex = pool.get_executor();
returned = ex.dispatch(*target);
}

BOOST_TEST(returned != probe_h);
if(returned != probe_h)
probe.release();
delete target;
}

void
testDispatchSymmetricTransfer()
{
// From a worker thread of the same pool, dispatch()
// returns c.h for symmetric transfer and does not
// enqueue the continuation.
auto probe = make_probe();
auto probe_h = probe.handle();

// Heap-allocated so target outlives the pool if a buggy
// implementation erroneously posts it.
auto* target = new continuation{probe_h};

std::atomic<bool> done{false};
std::coroutine_handle<> returned;

{
thread_pool pool(1);
auto ex = pool.get_executor();

run_async(ex, [&]{
returned = ex.dispatch(*target);
done.store(true);
})(void_task());

BOOST_TEST(wait_for([&]{ return done.load(); }));
}

// On symmetric transfer the returned handle equals the
// target's handle and the probe is never enqueued.
BOOST_TEST(returned == probe_h);

// If the dispatch posted (buggy), the pool destructor's
// drain_abandoned already destroyed probe_h; release so
// the probe_coro dtor does not double-destroy.
if(returned != probe_h)
probe.release();
delete target;
}

void
testDispatchCrossPool()
{
// Worker threads of pool A are not workers of pool B:
// dispatch() on B from an A worker must post, not
// symmetric-transfer.
auto probe = make_probe();
auto probe_h = probe.handle();
auto* target = new continuation{probe_h};

std::atomic<bool> done{false};
std::coroutine_handle<> returned;

{
thread_pool pool_a(1);
thread_pool pool_b(1);
auto ex_a = pool_a.get_executor();
auto ex_b = pool_b.get_executor();

run_async(ex_a, [&]{
returned = ex_b.dispatch(*target);
done.store(true);
})(void_task());

BOOST_TEST(wait_for([&]{ return done.load(); }));
}

// dispatch() always posts for thread_pool (returns void)
ex.dispatch(c);
BOOST_TEST(returned != probe_h);
if(returned != probe_h)
probe.release();
delete target;
}

void
Expand Down Expand Up @@ -584,6 +707,8 @@ struct thread_pool_test
testPostWork();
testWorkCounting();
testDispatch();
testDispatchSymmetricTransfer();
testDispatchCrossPool();
testServiceManagement();
testMakeService();
testConcurrentPost();
Expand Down
Loading