From 42fa1794afe87d626422390f8b000284fa70c2ca Mon Sep 17 00:00:00 2001 From: Steve Gerbino Date: Fri, 17 Apr 2026 19:17:38 +0200 Subject: [PATCH 1/2] perf/bench: align asio benches with corosio for apples-to-apples comparison MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add _lockless variants to asio and asio_callback across socket_throughput, socket_latency, local_socket_throughput, local_socket_latency, fan_out, accept_churn, http_server, and timer, mirroring corosio's single-threaded configurations by constructing the io_context with BOOST_ASIO_CONCURRENCY_HINT_UNSAFE. Follows the existing pattern in io_context_bench.cpp. Move throughput byte accounting out of the read loop into a local int64_t accumulator, calling state.add_bytes once after ioc.run() returns. The previous per-read state.add_bytes was an atomic fetch_add on every completion, which added ~20ns × N_reads to the measured elapsed time and structurally disadvantaged the faster library at small chunk sizes. Multithread benches still use atomic aggregation (required for correctness across N runner threads). --- .../asio/callback/accept_churn_bench.cpp | 66 +++++ perf/bench/asio/callback/fan_out_bench.cpp | 155 ++++++++++ .../bench/asio/callback/http_server_bench.cpp | 29 ++ .../callback/local_socket_latency_bench.cpp | 82 ++++++ .../local_socket_throughput_bench.cpp | 91 ++++++ .../asio/callback/socket_latency_bench.cpp | 83 ++++++ .../asio/callback/socket_throughput_bench.cpp | 91 ++++++ perf/bench/asio/callback/timer_bench.cpp | 60 ++++ .../asio/coroutine/accept_churn_bench.cpp | 162 +++++++++++ perf/bench/asio/coroutine/fan_out_bench.cpp | 268 ++++++++++++++++++ .../asio/coroutine/http_server_bench.cpp | 28 ++ .../coroutine/local_socket_latency_bench.cpp | 87 ++++++ .../local_socket_throughput_bench.cpp | 188 +++++++++++- .../asio/coroutine/socket_latency_bench.cpp | 87 ++++++ .../coroutine/socket_throughput_bench.cpp | 188 +++++++++++- perf/bench/asio/coroutine/timer_bench.cpp | 74 +++++ .../corosio/local_socket_throughput_bench.cpp | 22 +- .../bench/corosio/socket_throughput_bench.cpp | 22 +- 18 files changed, 1765 insertions(+), 18 deletions(-) diff --git a/perf/bench/asio/callback/accept_churn_bench.cpp b/perf/bench/asio/callback/accept_churn_bench.cpp index dad5bbaad..1d7bd8ca8 100644 --- a/perf/bench/asio/callback/accept_churn_bench.cpp +++ b/perf/bench/asio/callback/accept_churn_bench.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -187,6 +188,35 @@ bench_sequential_churn(bench::state& state) acc.close(); } +void +bench_sequential_churn_lockless(bench::state& state) +{ + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + auto acc = make_churn_acceptor( ioc ); + auto ep = tcp::endpoint( asio::ip::address_v4::loopback(), acc.local_endpoint().port() ); + + std::atomic running{true}; + + sequential_churn_op op{ioc, acc, ep, running, state.latency(), + state.ops(), {}, {}, {}}; + + perf::stopwatch total_sw; + + op.start(); + + std::thread timer([&]() { + std::this_thread::sleep_for(std::chrono::duration(state.duration())); + running.store(false, std::memory_order_relaxed); + ioc.stop(); + }); + + ioc.run(); + timer.join(); + + state.set_elapsed(total_sw.elapsed_seconds()); + acc.close(); +} + // N independent accept loops on separate listeners. Reveals whether // fd allocation or acceptor state scales linearly under callbacks. void @@ -344,6 +374,39 @@ bench_burst_churn(bench::state& state) acc.close(); } +void +bench_burst_churn_lockless(bench::state& state) +{ + int burst_size = static_cast(state.range(0)); + state.counters["burst_size"] = burst_size; + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + auto acc = make_churn_acceptor( ioc ); + auto ep = tcp::endpoint( asio::ip::address_v4::loopback(), acc.local_endpoint().port() ); + + std::atomic running{true}; + + burst_churn_op op{ioc, acc, ep, running, state.latency(), + state.ops(), burst_size, {}, {}, {}, + {}}; + + perf::stopwatch total_sw; + + op.start(); + + std::thread stopper([&]() { + std::this_thread::sleep_for(std::chrono::duration(state.duration())); + running.store(false, std::memory_order_relaxed); + ioc.stop(); + }); + + ioc.run(); + stopper.join(); + + state.set_elapsed(total_sw.elapsed_seconds()); + acc.close(); +} + } // anonymous namespace bench::benchmark_suite @@ -352,9 +415,12 @@ make_accept_churn_suite() using F = bench::bench_flags; return bench::benchmark_suite("accept_churn", F::needs_conntrack_drain) .add("sequential", bench_sequential_churn) + .add("sequential_lockless", bench_sequential_churn_lockless) .add("concurrent", bench_concurrent_churn) .args({1, 4, 16}) .add("burst", bench_burst_churn) + .args({10, 100}) + .add("burst_lockless", bench_burst_churn_lockless) .args({10, 100}); } diff --git a/perf/bench/asio/callback/fan_out_bench.cpp b/perf/bench/asio/callback/fan_out_bench.cpp index 6584934d4..35a86df86 100644 --- a/perf/bench/asio/callback/fan_out_bench.cpp +++ b/perf/bench/asio/callback/fan_out_bench.cpp @@ -16,6 +16,7 @@ #include #include #include +#include #include #include @@ -466,6 +467,154 @@ bench_concurrent_parents(bench::state& state) state.set_elapsed(sw.elapsed_seconds()); } +void +bench_fork_join_lockless(bench::state& state) +{ + int fan_out = static_cast(state.range(0)); + state.counters["fan_out"] = fan_out; + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + + std::vector clients; + std::vector servers; + clients.reserve(fan_out); + servers.reserve(fan_out); + + for (int i = 0; i < fan_out; ++i) + { + auto [c, s] = asio_bench::make_socket_pair(ioc); + clients.push_back(std::move(c)); + servers.push_back(std::move(s)); + } + + for (int i = 0; i < fan_out; ++i) + { + auto echo = std::make_shared(servers[i]); + echo->start(); + } + + fork_join_op op{ioc, clients, servers, fan_out, state, {}, {}}; + + op.start(); + + std::thread stopper([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + state.stop(); + }); + + perf::stopwatch sw; + ioc.run(); + stopper.join(); + + state.set_elapsed(sw.elapsed_seconds()); +} + +void +bench_nested_lockless(bench::state& state) +{ + int groups = static_cast(state.range(0)); + int subs_per_group = 4; + int total_subs = groups * subs_per_group; + + state.counters["groups"] = groups; + state.counters["subs_per_group"] = subs_per_group; + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + + std::vector clients; + std::vector servers; + clients.reserve(total_subs); + servers.reserve(total_subs); + + for (int i = 0; i < total_subs; ++i) + { + auto [c, s] = asio_bench::make_socket_pair(ioc); + clients.push_back(std::move(c)); + servers.push_back(std::move(s)); + } + + for (int i = 0; i < total_subs; ++i) + { + auto echo = std::make_shared(servers[i]); + echo->start(); + } + + nested_op op{ioc, clients, servers, groups, subs_per_group, + state, {}, {}, {}}; + + op.start(); + + std::thread stopper([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + state.stop(); + }); + + perf::stopwatch sw; + ioc.run(); + stopper.join(); + + state.set_elapsed(sw.elapsed_seconds()); +} + +void +bench_concurrent_parents_lockless(bench::state& state) +{ + int num_parents = static_cast(state.range(0)); + int fan_out = 16; + int total_subs = num_parents * fan_out; + + state.counters["num_parents"] = num_parents; + state.counters["fan_out"] = fan_out; + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + + std::vector clients; + std::vector servers; + clients.reserve(total_subs); + servers.reserve(total_subs); + + for (int i = 0; i < total_subs; ++i) + { + auto [c, s] = asio_bench::make_socket_pair(ioc); + clients.push_back(std::move(c)); + servers.push_back(std::move(s)); + } + + for (int i = 0; i < total_subs; ++i) + { + auto echo = std::make_shared(servers[i]); + echo->start(); + } + + std::atomic parents_done{0}; + + std::vector> parent_ops; + parent_ops.reserve(num_parents); + + for (int p = 0; p < num_parents; ++p) + { + parent_ops.push_back( + std::make_unique( + ioc, clients, servers, p * fan_out, fan_out, num_parents, + state, parents_done)); + parent_ops.back()->start(); + } + + std::thread stopper([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + state.stop(); + }); + + perf::stopwatch sw; + ioc.run(); + stopper.join(); + + state.set_elapsed(sw.elapsed_seconds()); +} + } // anonymous namespace bench::benchmark_suite @@ -475,9 +624,15 @@ make_fan_out_suite() return bench::benchmark_suite("fan_out", F::needs_conntrack_drain) .add("fork_join", bench_fork_join) .args({1, 4, 16, 64}) + .add("fork_join_lockless", bench_fork_join_lockless) + .args({1, 4, 16, 64}) .add("nested", bench_nested) .args({4, 16}) + .add("nested_lockless", bench_nested_lockless) + .args({4, 16}) .add("concurrent_parents", bench_concurrent_parents) + .args({1, 4, 16}) + .add("concurrent_parents_lockless", bench_concurrent_parents_lockless) .args({1, 4, 16}); } diff --git a/perf/bench/asio/callback/http_server_bench.cpp b/perf/bench/asio/callback/http_server_bench.cpp index 38507b1a9..233832463 100644 --- a/perf/bench/asio/callback/http_server_bench.cpp +++ b/perf/bench/asio/callback/http_server_bench.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -181,6 +182,33 @@ bench_single_connection(bench::state& state) server.close(); } +void +bench_single_connection_lockless(bench::state& state) +{ + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + auto [client, server] = asio_bench::make_socket_pair(ioc); + + server_op sop{server, {}}; + client_op cop{client, state, {}, {}}; + + sop.start(); + cop.start(); + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + state.stop(); + }); + + perf::stopwatch sw; + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + client.close(); + server.close(); +} + void bench_concurrent_connections(bench::state& state) { @@ -335,6 +363,7 @@ make_http_server_suite() s.close(); }) .add("single_conn", bench_single_connection) + .add("single_conn_lockless", bench_single_connection_lockless) .add("concurrent", bench_concurrent_connections) .args({1, 4, 16, 32}) .add("multithread", bench_multithread) diff --git a/perf/bench/asio/callback/local_socket_latency_bench.cpp b/perf/bench/asio/callback/local_socket_latency_bench.cpp index 623d44140..89e393d7f 100644 --- a/perf/bench/asio/callback/local_socket_latency_bench.cpp +++ b/perf/bench/asio/callback/local_socket_latency_bench.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include @@ -199,6 +200,83 @@ bench_concurrent_latency(bench::state& state) s.close(); } +void +bench_pingpong_latency_lockless(bench::state& state) +{ + auto message_size = static_cast(state.range(0)); + state.counters["message_size"] = static_cast(message_size); + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + auto [client, server] = asio_bench::make_local_socket_pair(ioc); + + unix_pingpong_op op(client, server, message_size, state); + + op.start(); + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + state.stop(); + }); + + perf::stopwatch sw; + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + client.close(); + server.close(); +} + +void +bench_concurrent_latency_lockless(bench::state& state) +{ + int num_pairs = static_cast(state.range(0)); + state.counters["num_pairs"] = num_pairs; + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + + std::vector clients; + std::vector servers; + + clients.reserve(num_pairs); + servers.reserve(num_pairs); + + for (int i = 0; i < num_pairs; ++i) + { + auto [c, s] = asio_bench::make_local_socket_pair(ioc); + clients.push_back(std::move(c)); + servers.push_back(std::move(s)); + } + + std::vector> ops; + ops.reserve(num_pairs); + for (int p = 0; p < num_pairs; ++p) + { + ops.push_back( + std::make_unique( + clients[p], servers[p], 64, state)); + ops.back()->start(); + } + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + state.stop(); + }); + + perf::stopwatch sw; + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + + for (auto& c : clients) + c.close(); + for (auto& s : servers) + s.close(); +} + } // anonymous namespace bench::benchmark_suite @@ -220,7 +298,11 @@ make_local_socket_latency_suite() }) .add("pingpong", bench_pingpong_latency) .args({1, 64, 1024}) + .add("pingpong_lockless", bench_pingpong_latency_lockless) + .args({1, 64, 1024}) .add("concurrent", bench_concurrent_latency) + .args({1, 4, 16}) + .add("concurrent_lockless", bench_concurrent_latency_lockless) .args({1, 4, 16}); } diff --git a/perf/bench/asio/callback/local_socket_throughput_bench.cpp b/perf/bench/asio/callback/local_socket_throughput_bench.cpp index 6707edceb..d4b3ea667 100644 --- a/perf/bench/asio/callback/local_socket_throughput_bench.cpp +++ b/perf/bench/asio/callback/local_socket_throughput_bench.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include @@ -154,6 +155,92 @@ bench_bidirectional_throughput(bench::state& state) sock2.close(); } +void +bench_throughput_lockless(bench::state& state) +{ + auto chunk_size = static_cast(state.range(0)); + state.counters["chunk_size"] = static_cast(chunk_size); + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + auto [writer, reader] = asio_bench::make_local_socket_pair(ioc); + + std::vector write_buf(chunk_size, 'x'); + std::vector read_buf(chunk_size); + + std::atomic running{true}; + std::size_t total_read = 0; + + unix_write_op wop{writer, write_buf, chunk_size, running}; + unix_read_op rop{reader, read_buf, total_read}; + + perf::stopwatch sw; + + wop.start(); + rop.start(); + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + running.store(false, std::memory_order_relaxed); + }); + + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + state.add_bytes(static_cast(total_read)); + + writer.close(); + reader.close(); +} + +void +bench_bidirectional_throughput_lockless(bench::state& state) +{ + auto chunk_size = static_cast(state.range(0)); + state.counters["chunk_size"] = static_cast(chunk_size); + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + auto [sock1, sock2] = asio_bench::make_local_socket_pair(ioc); + + std::vector buf1(chunk_size, 'a'); + std::vector buf2(chunk_size, 'b'); + std::vector rbuf1(chunk_size); + std::vector rbuf2(chunk_size); + + std::atomic running{true}; + std::size_t read1 = 0; + std::size_t read2 = 0; + + unix_write_op wop1{sock1, buf1, chunk_size, running}; + unix_read_op rop1{sock2, rbuf1, read1}; + + unix_write_op wop2{sock2, buf2, chunk_size, running}; + unix_read_op rop2{sock1, rbuf2, read2}; + + perf::stopwatch sw; + + wop1.start(); + rop1.start(); + wop2.start(); + rop2.start(); + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + running.store(false, std::memory_order_relaxed); + }); + + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + state.add_bytes(static_cast(read1 + read2)); + + sock1.close(); + sock2.close(); +} + } // anonymous namespace bench::benchmark_suite @@ -172,7 +259,11 @@ make_local_socket_throughput_suite() }) .add("unidirectional", bench_throughput) .range(1024, 1048576, 4) + .add("unidirectional_lockless", bench_throughput_lockless) + .range(1024, 1048576, 4) .add("bidirectional", bench_bidirectional_throughput) + .range(1024, 1048576, 4) + .add("bidirectional_lockless", bench_bidirectional_throughput_lockless) .range(1024, 1048576, 4); } diff --git a/perf/bench/asio/callback/socket_latency_bench.cpp b/perf/bench/asio/callback/socket_latency_bench.cpp index 8a3498201..f40320ab3 100644 --- a/perf/bench/asio/callback/socket_latency_bench.cpp +++ b/perf/bench/asio/callback/socket_latency_bench.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include @@ -201,6 +202,84 @@ bench_concurrent_latency(bench::state& state) s.close(); } +void +bench_pingpong_latency_lockless(bench::state& state) +{ + auto message_size = static_cast(state.range(0)); + state.counters["message_size"] = static_cast(message_size); + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + auto [client, server] = asio_bench::make_socket_pair(ioc); + + pingpong_op op(client, server, message_size, state); + + op.start(); + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + state.stop(); + }); + + perf::stopwatch sw; + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + client.close(); + server.close(); +} + +void +bench_concurrent_latency_lockless(bench::state& state) +{ + int num_pairs = static_cast(state.range(0)); + state.counters["num_pairs"] = num_pairs; + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + + std::vector clients; + std::vector servers; + + clients.reserve(num_pairs); + servers.reserve(num_pairs); + + for (int i = 0; i < num_pairs; ++i) + { + auto [c, s] = asio_bench::make_socket_pair(ioc); + clients.push_back(std::move(c)); + servers.push_back(std::move(s)); + } + + // Stable addresses needed for concurrent ops + std::vector> ops; + ops.reserve(num_pairs); + for (int p = 0; p < num_pairs; ++p) + { + ops.push_back( + std::make_unique( + clients[p], servers[p], 64, state)); + ops.back()->start(); + } + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + state.stop(); + }); + + perf::stopwatch sw; + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + + for (auto& c : clients) + c.close(); + for (auto& s : servers) + s.close(); +} + } // anonymous namespace bench::benchmark_suite @@ -222,7 +301,11 @@ make_socket_latency_suite() }) .add("pingpong", bench_pingpong_latency) .args({1, 64, 1024}) + .add("pingpong_lockless", bench_pingpong_latency_lockless) + .args({1, 64, 1024}) .add("concurrent", bench_concurrent_latency) + .args({1, 4, 16}) + .add("concurrent_lockless", bench_concurrent_latency_lockless) .args({1, 4, 16}); } diff --git a/perf/bench/asio/callback/socket_throughput_bench.cpp b/perf/bench/asio/callback/socket_throughput_bench.cpp index 923539972..145fb1d77 100644 --- a/perf/bench/asio/callback/socket_throughput_bench.cpp +++ b/perf/bench/asio/callback/socket_throughput_bench.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include @@ -155,6 +156,92 @@ bench_bidirectional_throughput(bench::state& state) sock2.close(); } +void +bench_throughput_lockless(bench::state& state) +{ + auto chunk_size = static_cast(state.range(0)); + state.counters["chunk_size"] = static_cast(chunk_size); + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + auto [writer, reader] = asio_bench::make_socket_pair(ioc); + + std::vector write_buf(chunk_size, 'x'); + std::vector read_buf(chunk_size); + + std::atomic running{true}; + std::size_t total_read = 0; + + write_op wop{writer, write_buf, chunk_size, running}; + read_op rop{reader, read_buf, total_read}; + + perf::stopwatch sw; + + wop.start(); + rop.start(); + + std::thread timer([&]() { + std::this_thread::sleep_for(std::chrono::duration(state.duration())); + running.store(false, std::memory_order_relaxed); + }); + + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + state.add_bytes(static_cast(total_read)); + + writer.close(); + reader.close(); +} + +void +bench_bidirectional_throughput_lockless(bench::state& state) +{ + auto chunk_size = static_cast(state.range(0)); + state.counters["chunk_size"] = static_cast(chunk_size); + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + auto [sock1, sock2] = asio_bench::make_socket_pair(ioc); + + std::vector buf1(chunk_size, 'a'); + std::vector buf2(chunk_size, 'b'); + std::vector rbuf1(chunk_size); + std::vector rbuf2(chunk_size); + + std::atomic running{true}; + std::size_t read1 = 0; + std::size_t read2 = 0; + + // sock1 writes, sock2 reads (direction 1) + write_op wop1{sock1, buf1, chunk_size, running}; + read_op rop1{sock2, rbuf1, read1}; + + // sock2 writes, sock1 reads (direction 2) + write_op wop2{sock2, buf2, chunk_size, running}; + read_op rop2{sock1, rbuf2, read2}; + + perf::stopwatch sw; + + wop1.start(); + rop1.start(); + wop2.start(); + rop2.start(); + + std::thread timer([&]() { + std::this_thread::sleep_for(std::chrono::duration(state.duration())); + running.store(false, std::memory_order_relaxed); + }); + + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + state.add_bytes(static_cast(read1 + read2)); + + sock1.close(); + sock2.close(); +} + struct mt_write_op { tcp_socket& sock; @@ -314,8 +401,12 @@ make_socket_throughput_suite() }) .add("unidirectional", bench_throughput) .range(1024, 1048576, 4) + .add("unidirectional_lockless", bench_throughput_lockless) + .range(1024, 1048576, 4) .add("bidirectional", bench_bidirectional_throughput) .range(1024, 1048576, 4) + .add("bidirectional_lockless", bench_bidirectional_throughput_lockless) + .range(1024, 1048576, 4) .add("multithread", bench_multithread_throughput) .args({2, 4, 8}); } diff --git a/perf/bench/asio/callback/timer_bench.cpp b/perf/bench/asio/callback/timer_bench.cpp index 4ed1bff4b..d689d4e43 100644 --- a/perf/bench/asio/callback/timer_bench.cpp +++ b/perf/bench/asio/callback/timer_bench.cpp @@ -12,6 +12,7 @@ #include #include +#include #include #include @@ -113,6 +114,63 @@ bench_fire_rate(bench::state& state) state.add_items(counter); } +void +bench_schedule_cancel_lockless(bench::state& state) +{ + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + int64_t counter = 0; + int constexpr batch_size = 1000; + + perf::stopwatch sw; + auto deadline = std::chrono::steady_clock::now() + + std::chrono::duration(state.duration()); + + while (std::chrono::steady_clock::now() < deadline) + { + for (int i = 0; i < batch_size; ++i) + { + timer_type t(ioc.get_executor()); + t.expires_after(std::chrono::hours(1)); + t.cancel(); + ++counter; + } + + ioc.poll(); + ioc.restart(); + } + + ioc.run(); + + state.set_elapsed(sw.elapsed_seconds()); + state.add_items(counter); +} + +void +bench_fire_rate_lockless(bench::state& state) +{ + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + std::atomic running{true}; + int64_t counter = 0; + + fire_rate_op op(ioc, running, counter); + + perf::stopwatch sw; + + op.start(); + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + running.store(false, std::memory_order_relaxed); + }); + + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + state.add_items(counter); +} + struct concurrent_timer_op { timer_type timer; @@ -205,7 +263,9 @@ make_timer_suite() { return bench::benchmark_suite("timer") .add("schedule_cancel", bench_schedule_cancel) + .add("schedule_cancel_lockless", bench_schedule_cancel_lockless) .add("fire_rate", bench_fire_rate) + .add("fire_rate_lockless", bench_fire_rate_lockless) .add("concurrent", bench_concurrent_timers) .args({10, 100, 1000}); } diff --git a/perf/bench/asio/coroutine/accept_churn_bench.cpp b/perf/bench/asio/coroutine/accept_churn_bench.cpp index b2e172bc9..53008ac99 100644 --- a/perf/bench/asio/coroutine/accept_churn_bench.cpp +++ b/perf/bench/asio/coroutine/accept_churn_bench.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -133,6 +134,76 @@ bench_sequential_churn(bench::state& state) acc.close(); } +void +bench_sequential_churn_lockless(bench::state& state) +{ + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + auto acc = make_churn_acceptor(ioc); + auto ep = tcp::endpoint( + asio::ip::address_v4::loopback(), acc.local_endpoint().port()); + + std::atomic running{true}; + + auto task = [&]() -> asio::awaitable { + try + { + while (running.load(std::memory_order_relaxed)) + { + auto lp = state.lap(); + + auto client = std::make_unique(ioc); + auto server = std::make_unique(ioc); + boost::system::error_code ec; + ec = client->open(tcp::v4(), ec); + if (ec) + continue; + configure_churn_socket(*client); + + asio::co_spawn( + ioc, + [](tcp_socket& c, tcp::endpoint ep) + -> asio::awaitable { + co_await c.async_connect(ep, asio::deferred); + }(*client, ep), + asio::detached); + + *server = co_await acc.async_accept(asio::deferred); + + char byte = 'X'; + co_await asio::async_write( + *client, asio::buffer(&byte, 1), asio::deferred); + + char recv = 0; + co_await asio::async_read( + *server, asio::buffer(&recv, 1), asio::deferred); + + client->close(); + server->close(); + } + } + catch (std::exception const&) + { + } + }; + + perf::stopwatch sw; + + asio::co_spawn(ioc, task(), asio::detached); + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + running.store(false, std::memory_order_relaxed); + ioc.stop(); + }); + + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + acc.close(); +} + // N independent accept loops on separate listeners void bench_concurrent_churn(bench::state& state) @@ -303,6 +374,94 @@ bench_burst_churn(bench::state& state) acc.close(); } +void +bench_burst_churn_lockless(bench::state& state) +{ + int burst_size = static_cast(state.range(0)); + state.counters["burst_size"] = burst_size; + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + auto acc = make_churn_acceptor(ioc); + auto ep = tcp::endpoint( + asio::ip::address_v4::loopback(), acc.local_endpoint().port()); + + std::atomic running{true}; + + auto task = [&]() -> asio::awaitable { + try + { + while (running.load(std::memory_order_relaxed)) + { + auto lp = state.lap(); + + std::vector> clients; + std::vector servers; + clients.reserve(burst_size); + servers.reserve(burst_size); + + bool open_ok = true; + for (int i = 0; i < burst_size; ++i) + { + clients.push_back(std::make_unique(ioc)); + boost::system::error_code ec; + ec = clients.back()->open(tcp::v4(), ec); + if (ec) + { + clients.clear(); + open_ok = false; + break; + } + configure_churn_socket(*clients.back()); + } + if (!open_ok) + continue; + + for (int i = 0; i < burst_size; ++i) + { + asio::co_spawn( + ioc, + [](tcp_socket& c, tcp::endpoint ep) + -> asio::awaitable { + co_await c.async_connect(ep, asio::deferred); + }(*clients[i], ep), + asio::detached); + } + + for (int i = 0; i < burst_size; ++i) + { + servers.push_back( + co_await acc.async_accept(asio::deferred)); + } + + for (auto& c : clients) + c->close(); + for (auto& s : servers) + s.close(); + } + } + catch (std::exception const&) + { + } + }; + + perf::stopwatch sw; + + asio::co_spawn(ioc, task(), asio::detached); + + std::thread stopper([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + running.store(false, std::memory_order_relaxed); + ioc.stop(); + }); + + ioc.run(); + stopper.join(); + + state.set_elapsed(sw.elapsed_seconds()); + acc.close(); +} + } // anonymous namespace bench::benchmark_suite @@ -311,9 +470,12 @@ make_accept_churn_suite() using F = bench::bench_flags; return bench::benchmark_suite("accept_churn", F::needs_conntrack_drain) .add("sequential", bench_sequential_churn) + .add("sequential_lockless", bench_sequential_churn_lockless) .add("concurrent", bench_concurrent_churn) .args({1, 4, 16}) .add("burst", bench_burst_churn) + .args({10, 100}) + .add("burst_lockless", bench_burst_churn_lockless) .args({10, 100}); } diff --git a/perf/bench/asio/coroutine/fan_out_bench.cpp b/perf/bench/asio/coroutine/fan_out_bench.cpp index b35676987..fa375f95f 100644 --- a/perf/bench/asio/coroutine/fan_out_bench.cpp +++ b/perf/bench/asio/coroutine/fan_out_bench.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -333,6 +334,267 @@ bench_concurrent_parents(bench::state& state) state.set_elapsed(sw.elapsed_seconds()); } +void +bench_fork_join_lockless(bench::state& state) +{ + int fan_out = static_cast(state.range(0)); + state.counters["fan_out"] = fan_out; + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + + std::vector clients; + std::vector servers; + clients.reserve(fan_out); + servers.reserve(fan_out); + + for (int i = 0; i < fan_out; ++i) + { + auto [c, s] = make_socket_pair(ioc); + clients.push_back(std::move(c)); + servers.push_back(std::move(s)); + } + + for (int i = 0; i < fan_out; ++i) + asio::co_spawn(ioc, echo_server(servers[i]), asio::detached); + + std::atomic running{true}; + + auto parent = [&]() -> asio::awaitable { + timer_type t(ioc); + try + { + while (running.load(std::memory_order_relaxed)) + { + auto lp = state.lap(); + + std::atomic remaining{fan_out}; + for (int i = 0; i < fan_out; ++i) + asio::co_spawn( + ioc, sub_request(clients[i], remaining), + asio::detached); + + while (remaining.load(std::memory_order_acquire) > 0) + { + t.expires_after(std::chrono::nanoseconds(0)); + co_await t.async_wait(asio::deferred); + } + } + } + catch (std::exception const&) + { + } + + for (auto& c : clients) + c.close(); + for (auto& s : servers) + s.close(); + }; + + perf::stopwatch sw; + + asio::co_spawn(ioc, parent(), asio::detached); + + std::thread stopper([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + running.store(false, std::memory_order_relaxed); + }); + + ioc.run(); + stopper.join(); + + state.set_elapsed(sw.elapsed_seconds()); +} + +void +bench_nested_lockless(bench::state& state) +{ + int groups = static_cast(state.range(0)); + int subs_per_group = 4; + int total_subs = groups * subs_per_group; + + state.counters["groups"] = groups; + state.counters["subs_per_group"] = subs_per_group; + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + + std::vector clients; + std::vector servers; + clients.reserve(total_subs); + servers.reserve(total_subs); + + for (int i = 0; i < total_subs; ++i) + { + auto [c, s] = make_socket_pair(ioc); + clients.push_back(std::move(c)); + servers.push_back(std::move(s)); + } + + for (int i = 0; i < total_subs; ++i) + asio::co_spawn(ioc, echo_server(servers[i]), asio::detached); + + std::atomic running{true}; + + auto group_task = [&](int base_idx, int n, + std::atomic& groups_remaining) + -> asio::awaitable { + std::atomic subs_remaining{n}; + for (int i = 0; i < n; ++i) + asio::co_spawn( + ioc, sub_request(clients[base_idx + i], subs_remaining), + asio::detached); + + timer_type t(ioc); + try + { + while (subs_remaining.load(std::memory_order_acquire) > 0) + { + t.expires_after(std::chrono::nanoseconds(0)); + co_await t.async_wait(asio::deferred); + } + } + catch (std::exception const&) + { + } + + groups_remaining.fetch_sub(1, std::memory_order_release); + }; + + auto parent = [&]() -> asio::awaitable { + timer_type t(ioc); + try + { + while (running.load(std::memory_order_relaxed)) + { + auto lp = state.lap(); + + std::atomic groups_remaining{groups}; + for (int g = 0; g < groups; ++g) + asio::co_spawn( + ioc, + group_task( + g * subs_per_group, subs_per_group, + groups_remaining), + asio::detached); + + while (groups_remaining.load(std::memory_order_acquire) > 0) + { + t.expires_after(std::chrono::nanoseconds(0)); + co_await t.async_wait(asio::deferred); + } + } + } + catch (std::exception const&) + { + } + + for (auto& c : clients) + c.close(); + for (auto& s : servers) + s.close(); + }; + + perf::stopwatch sw; + + asio::co_spawn(ioc, parent(), asio::detached); + + std::thread stopper([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + running.store(false, std::memory_order_relaxed); + }); + + ioc.run(); + stopper.join(); + + state.set_elapsed(sw.elapsed_seconds()); +} + +void +bench_concurrent_parents_lockless(bench::state& state) +{ + int num_parents = static_cast(state.range(0)); + int fan_out = 16; + int total_subs = num_parents * fan_out; + + state.counters["num_parents"] = num_parents; + state.counters["fan_out"] = fan_out; + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + + std::vector clients; + std::vector servers; + clients.reserve(total_subs); + servers.reserve(total_subs); + + for (int i = 0; i < total_subs; ++i) + { + auto [c, s] = make_socket_pair(ioc); + clients.push_back(std::move(c)); + servers.push_back(std::move(s)); + } + + for (int i = 0; i < total_subs; ++i) + asio::co_spawn(ioc, echo_server(servers[i]), asio::detached); + + std::atomic running{true}; + std::atomic parents_done{0}; + + auto parent_task = + [&](int parent_idx) -> asio::awaitable { + int base = parent_idx * fan_out; + timer_type t(ioc); + + try + { + while (running.load(std::memory_order_relaxed)) + { + auto lp = state.lap(); + + std::atomic remaining{fan_out}; + for (int i = 0; i < fan_out; ++i) + asio::co_spawn( + ioc, sub_request(clients[base + i], remaining), + asio::detached); + + while (remaining.load(std::memory_order_acquire) > 0) + { + t.expires_after(std::chrono::nanoseconds(0)); + co_await t.async_wait(asio::deferred); + } + } + } + catch (std::exception const&) + { + } + + if (parents_done.fetch_add(1, std::memory_order_acq_rel) == + num_parents - 1) + { + for (auto& c : clients) + c.close(); + for (auto& s : servers) + s.close(); + } + }; + + perf::stopwatch sw; + + for (int p = 0; p < num_parents; ++p) + asio::co_spawn(ioc, parent_task(p), asio::detached); + + std::thread stopper([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + running.store(false, std::memory_order_relaxed); + }); + + ioc.run(); + stopper.join(); + + state.set_elapsed(sw.elapsed_seconds()); +} + } // anonymous namespace bench::benchmark_suite @@ -342,9 +604,15 @@ make_fan_out_suite() return bench::benchmark_suite("fan_out", F::needs_conntrack_drain) .add("fork_join", bench_fork_join) .args({1, 4, 16, 64}) + .add("fork_join_lockless", bench_fork_join_lockless) + .args({1, 4, 16, 64}) .add("nested", bench_nested) .args({4, 16}) + .add("nested_lockless", bench_nested_lockless) + .args({4, 16}) .add("concurrent_parents", bench_concurrent_parents) + .args({1, 4, 16}) + .add("concurrent_parents_lockless", bench_concurrent_parents_lockless) .args({1, 4, 16}); } diff --git a/perf/bench/asio/coroutine/http_server_bench.cpp b/perf/bench/asio/coroutine/http_server_bench.cpp index 9ae44b3a7..5c5adf0d3 100644 --- a/perf/bench/asio/coroutine/http_server_bench.cpp +++ b/perf/bench/asio/coroutine/http_server_bench.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -141,6 +142,32 @@ bench_single_connection(bench::state& state) server.close(); } +void +bench_single_connection_lockless(bench::state& state) +{ + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + auto [client, server] = make_socket_pair(ioc); + + asio::co_spawn( + ioc, server_task(server), asio::detached); + asio::co_spawn( + ioc, client_task(client, state), asio::detached); + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + state.stop(); + }); + + perf::stopwatch sw; + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + client.close(); + server.close(); +} + void bench_concurrent_connections(bench::state& state) { @@ -278,6 +305,7 @@ make_http_server_suite() s.close(); }) .add("single_conn", bench_single_connection) + .add("single_conn_lockless", bench_single_connection_lockless) .add("concurrent", bench_concurrent_connections) .args({1, 4, 16, 32}) .add("multithread", bench_multithread) diff --git a/perf/bench/asio/coroutine/local_socket_latency_bench.cpp b/perf/bench/asio/coroutine/local_socket_latency_bench.cpp index 2bcbce1bb..cefee0744 100644 --- a/perf/bench/asio/coroutine/local_socket_latency_bench.cpp +++ b/perf/bench/asio/coroutine/local_socket_latency_bench.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -149,6 +150,88 @@ bench_concurrent_latency(bench::state& state) s.close(); } +void +bench_pingpong_latency_lockless(bench::state& state) +{ + auto message_size = static_cast(state.range(0)); + state.counters["message_size"] = static_cast(message_size); + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + auto [client, server] = make_local_socket_pair(ioc); + + std::atomic running{true}; + + asio::co_spawn( + ioc, + pingpong_client_task( + client, server, message_size, running, state), + asio::detached); + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + running.store(false, std::memory_order_relaxed); + }); + + perf::stopwatch sw; + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + client.close(); + server.close(); +} + +void +bench_concurrent_latency_lockless(bench::state& state) +{ + int num_pairs = static_cast(state.range(0)); + state.counters["num_pairs"] = num_pairs; + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + + std::vector clients; + std::vector servers; + + clients.reserve(num_pairs); + servers.reserve(num_pairs); + + for (int i = 0; i < num_pairs; ++i) + { + auto [c, s] = make_local_socket_pair(ioc); + clients.push_back(std::move(c)); + servers.push_back(std::move(s)); + } + + std::atomic running{true}; + + for (int p = 0; p < num_pairs; ++p) + { + asio::co_spawn( + ioc, + pingpong_client_task( + clients[p], servers[p], 64, running, state), + asio::detached); + } + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + running.store(false, std::memory_order_relaxed); + }); + + perf::stopwatch sw; + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + + for (auto& c : clients) + c.close(); + for (auto& s : servers) + s.close(); +} + } // anonymous namespace bench::benchmark_suite @@ -169,7 +252,11 @@ make_local_socket_latency_suite() }) .add("pingpong", bench_pingpong_latency) .args({1, 64, 1024}) + .add("pingpong_lockless", bench_pingpong_latency_lockless) + .args({1, 64, 1024}) .add("concurrent", bench_concurrent_latency) + .args({1, 4, 16}) + .add("concurrent_lockless", bench_concurrent_latency_lockless) .args({1, 4, 16}); } diff --git a/perf/bench/asio/coroutine/local_socket_throughput_bench.cpp b/perf/bench/asio/coroutine/local_socket_throughput_bench.cpp index 802d6ac32..85f048277 100644 --- a/perf/bench/asio/coroutine/local_socket_throughput_bench.cpp +++ b/perf/bench/asio/coroutine/local_socket_throughput_bench.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -39,6 +40,7 @@ bench_throughput(bench::state& state) std::vector read_buf(chunk_size); std::atomic running{true}; + int64_t total_bytes = 0; auto write_task = [&]() -> asio::awaitable { try @@ -65,7 +67,7 @@ bench_throughput(bench::state& state) asio::deferred); if (n == 0) break; - state.add_bytes(static_cast(n)); + total_bytes += static_cast(n); } } catch (std::exception const&) @@ -88,6 +90,7 @@ bench_throughput(bench::state& state) timer.join(); state.set_elapsed(sw.elapsed_seconds()); + state.add_bytes(total_bytes); writer.close(); reader.close(); } @@ -105,6 +108,8 @@ bench_bidirectional_throughput(bench::state& state) std::vector buf2(chunk_size, 'b'); std::atomic running{true}; + int64_t read1_bytes = 0; + int64_t read2_bytes = 0; auto write1_task = [&]() -> asio::awaitable { try @@ -131,7 +136,7 @@ bench_bidirectional_throughput(bench::state& state) asio::buffer(rbuf.data(), rbuf.size()), asio::deferred); if (n == 0) break; - state.add_bytes(static_cast(n)); + read1_bytes += static_cast(n); } } catch (std::exception const&) @@ -164,7 +169,7 @@ bench_bidirectional_throughput(bench::state& state) asio::buffer(rbuf.data(), rbuf.size()), asio::deferred); if (n == 0) break; - state.add_bytes(static_cast(n)); + read2_bytes += static_cast(n); } } catch (std::exception const&) @@ -189,6 +194,179 @@ bench_bidirectional_throughput(bench::state& state) timer.join(); state.set_elapsed(sw.elapsed_seconds()); + state.add_bytes(read1_bytes + read2_bytes); + sock1.close(); + sock2.close(); +} + +void +bench_throughput_lockless(bench::state& state) +{ + auto chunk_size = static_cast(state.range(0)); + state.counters["chunk_size"] = static_cast(chunk_size); + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + auto [writer, reader] = make_local_socket_pair(ioc); + + std::vector write_buf(chunk_size, 'x'); + std::vector read_buf(chunk_size); + + std::atomic running{true}; + int64_t total_bytes = 0; + + auto write_task = [&]() -> asio::awaitable { + try + { + while (running.load(std::memory_order_relaxed)) + { + co_await writer.async_write_some( + asio::buffer(write_buf.data(), chunk_size), asio::deferred); + } + writer.shutdown(local_socket::shutdown_send); + } + catch (std::exception const&) + { + } + }; + + auto read_task = [&]() -> asio::awaitable { + try + { + for (;;) + { + auto n = co_await reader.async_read_some( + asio::buffer(read_buf.data(), read_buf.size()), + asio::deferred); + if (n == 0) + break; + total_bytes += static_cast(n); + } + } + catch (std::exception const&) + { + } + }; + + perf::stopwatch sw; + + asio::co_spawn(ioc, write_task(), asio::detached); + asio::co_spawn(ioc, read_task(), asio::detached); + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + running.store(false, std::memory_order_relaxed); + }); + + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + state.add_bytes(total_bytes); + writer.close(); + reader.close(); +} + +void +bench_bidirectional_throughput_lockless(bench::state& state) +{ + auto chunk_size = static_cast(state.range(0)); + state.counters["chunk_size"] = static_cast(chunk_size); + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + auto [sock1, sock2] = make_local_socket_pair(ioc); + + std::vector buf1(chunk_size, 'a'); + std::vector buf2(chunk_size, 'b'); + + std::atomic running{true}; + int64_t read1_bytes = 0; + int64_t read2_bytes = 0; + + auto write1_task = [&]() -> asio::awaitable { + try + { + while (running.load(std::memory_order_relaxed)) + { + co_await sock1.async_write_some( + asio::buffer(buf1.data(), chunk_size), asio::deferred); + } + sock1.shutdown(local_socket::shutdown_send); + } + catch (std::exception const&) + { + } + }; + + auto read1_task = [&]() -> asio::awaitable { + try + { + std::vector rbuf(chunk_size); + for (;;) + { + auto n = co_await sock2.async_read_some( + asio::buffer(rbuf.data(), rbuf.size()), asio::deferred); + if (n == 0) + break; + read1_bytes += static_cast(n); + } + } + catch (std::exception const&) + { + } + }; + + auto write2_task = [&]() -> asio::awaitable { + try + { + while (running.load(std::memory_order_relaxed)) + { + co_await sock2.async_write_some( + asio::buffer(buf2.data(), chunk_size), asio::deferred); + } + sock2.shutdown(local_socket::shutdown_send); + } + catch (std::exception const&) + { + } + }; + + auto read2_task = [&]() -> asio::awaitable { + try + { + std::vector rbuf(chunk_size); + for (;;) + { + auto n = co_await sock1.async_read_some( + asio::buffer(rbuf.data(), rbuf.size()), asio::deferred); + if (n == 0) + break; + read2_bytes += static_cast(n); + } + } + catch (std::exception const&) + { + } + }; + + perf::stopwatch sw; + + asio::co_spawn(ioc, write1_task(), asio::detached); + asio::co_spawn(ioc, read1_task(), asio::detached); + asio::co_spawn(ioc, write2_task(), asio::detached); + asio::co_spawn(ioc, read2_task(), asio::detached); + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + running.store(false, std::memory_order_relaxed); + }); + + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + state.add_bytes(read1_bytes + read2_bytes); sock1.close(); sock2.close(); } @@ -210,7 +388,11 @@ make_local_socket_throughput_suite() }) .add("unidirectional", bench_throughput) .range(1024, 1048576, 4) + .add("unidirectional_lockless", bench_throughput_lockless) + .range(1024, 1048576, 4) .add("bidirectional", bench_bidirectional_throughput) + .range(1024, 1048576, 4) + .add("bidirectional_lockless", bench_bidirectional_throughput_lockless) .range(1024, 1048576, 4); } diff --git a/perf/bench/asio/coroutine/socket_latency_bench.cpp b/perf/bench/asio/coroutine/socket_latency_bench.cpp index b0a997cbd..854fc3549 100644 --- a/perf/bench/asio/coroutine/socket_latency_bench.cpp +++ b/perf/bench/asio/coroutine/socket_latency_bench.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -149,6 +150,88 @@ bench_concurrent_latency(bench::state& state) s.close(); } +void +bench_pingpong_latency_lockless(bench::state& state) +{ + auto message_size = static_cast(state.range(0)); + state.counters["message_size"] = static_cast(message_size); + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + auto [client, server] = make_socket_pair(ioc); + + std::atomic running{true}; + + asio::co_spawn( + ioc, + pingpong_client_task( + client, server, message_size, running, state), + asio::detached); + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + running.store(false, std::memory_order_relaxed); + }); + + perf::stopwatch sw; + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + client.close(); + server.close(); +} + +void +bench_concurrent_latency_lockless(bench::state& state) +{ + int num_pairs = static_cast(state.range(0)); + state.counters["num_pairs"] = num_pairs; + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + + std::vector clients; + std::vector servers; + + clients.reserve(num_pairs); + servers.reserve(num_pairs); + + for (int i = 0; i < num_pairs; ++i) + { + auto [c, s] = make_socket_pair(ioc); + clients.push_back(std::move(c)); + servers.push_back(std::move(s)); + } + + std::atomic running{true}; + + for (int p = 0; p < num_pairs; ++p) + { + asio::co_spawn( + ioc, + pingpong_client_task( + clients[p], servers[p], 64, running, state), + asio::detached); + } + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + running.store(false, std::memory_order_relaxed); + }); + + perf::stopwatch sw; + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + + for (auto& c : clients) + c.close(); + for (auto& s : servers) + s.close(); +} + } // anonymous namespace bench::benchmark_suite @@ -170,7 +253,11 @@ make_socket_latency_suite() }) .add("pingpong", bench_pingpong_latency) .args({1, 64, 1024}) + .add("pingpong_lockless", bench_pingpong_latency_lockless) + .args({1, 64, 1024}) .add("concurrent", bench_concurrent_latency) + .args({1, 4, 16}) + .add("concurrent_lockless", bench_concurrent_latency_lockless) .args({1, 4, 16}); } diff --git a/perf/bench/asio/coroutine/socket_throughput_bench.cpp b/perf/bench/asio/coroutine/socket_throughput_bench.cpp index afca2033f..c302449bc 100644 --- a/perf/bench/asio/coroutine/socket_throughput_bench.cpp +++ b/perf/bench/asio/coroutine/socket_throughput_bench.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -39,6 +40,7 @@ bench_throughput(bench::state& state) std::vector read_buf(chunk_size); std::atomic running{true}; + int64_t total_bytes = 0; auto write_task = [&]() -> asio::awaitable { try @@ -65,7 +67,7 @@ bench_throughput(bench::state& state) asio::deferred); if (n == 0) break; - state.add_bytes(static_cast(n)); + total_bytes += static_cast(n); } } catch (std::exception const&) @@ -88,6 +90,7 @@ bench_throughput(bench::state& state) timer.join(); state.set_elapsed(sw.elapsed_seconds()); + state.add_bytes(total_bytes); writer.close(); reader.close(); } @@ -105,6 +108,8 @@ bench_bidirectional_throughput(bench::state& state) std::vector buf2(chunk_size, 'b'); std::atomic running{true}; + int64_t read1_bytes = 0; + int64_t read2_bytes = 0; auto write1_task = [&]() -> asio::awaitable { try @@ -131,7 +136,7 @@ bench_bidirectional_throughput(bench::state& state) asio::buffer(rbuf.data(), rbuf.size()), asio::deferred); if (n == 0) break; - state.add_bytes(static_cast(n)); + read1_bytes += static_cast(n); } } catch (std::exception const&) @@ -164,7 +169,7 @@ bench_bidirectional_throughput(bench::state& state) asio::buffer(rbuf.data(), rbuf.size()), asio::deferred); if (n == 0) break; - state.add_bytes(static_cast(n)); + read2_bytes += static_cast(n); } } catch (std::exception const&) @@ -189,6 +194,179 @@ bench_bidirectional_throughput(bench::state& state) timer.join(); state.set_elapsed(sw.elapsed_seconds()); + state.add_bytes(read1_bytes + read2_bytes); + sock1.close(); + sock2.close(); +} + +void +bench_throughput_lockless(bench::state& state) +{ + auto chunk_size = static_cast(state.range(0)); + state.counters["chunk_size"] = static_cast(chunk_size); + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + auto [writer, reader] = make_socket_pair(ioc); + + std::vector write_buf(chunk_size, 'x'); + std::vector read_buf(chunk_size); + + std::atomic running{true}; + int64_t total_bytes = 0; + + auto write_task = [&]() -> asio::awaitable { + try + { + while (running.load(std::memory_order_relaxed)) + { + co_await writer.async_write_some( + asio::buffer(write_buf.data(), chunk_size), asio::deferred); + } + writer.shutdown(tcp_socket::shutdown_send); + } + catch (std::exception const&) + { + } + }; + + auto read_task = [&]() -> asio::awaitable { + try + { + for (;;) + { + auto n = co_await reader.async_read_some( + asio::buffer(read_buf.data(), read_buf.size()), + asio::deferred); + if (n == 0) + break; + total_bytes += static_cast(n); + } + } + catch (std::exception const&) + { + } + }; + + perf::stopwatch sw; + + asio::co_spawn(ioc, write_task(), asio::detached); + asio::co_spawn(ioc, read_task(), asio::detached); + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + running.store(false, std::memory_order_relaxed); + }); + + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + state.add_bytes(total_bytes); + writer.close(); + reader.close(); +} + +void +bench_bidirectional_throughput_lockless(bench::state& state) +{ + auto chunk_size = static_cast(state.range(0)); + state.counters["chunk_size"] = static_cast(chunk_size); + + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + auto [sock1, sock2] = make_socket_pair(ioc); + + std::vector buf1(chunk_size, 'a'); + std::vector buf2(chunk_size, 'b'); + + std::atomic running{true}; + int64_t read1_bytes = 0; + int64_t read2_bytes = 0; + + auto write1_task = [&]() -> asio::awaitable { + try + { + while (running.load(std::memory_order_relaxed)) + { + co_await sock1.async_write_some( + asio::buffer(buf1.data(), chunk_size), asio::deferred); + } + sock1.shutdown(tcp_socket::shutdown_send); + } + catch (std::exception const&) + { + } + }; + + auto read1_task = [&]() -> asio::awaitable { + try + { + std::vector rbuf(chunk_size); + for (;;) + { + auto n = co_await sock2.async_read_some( + asio::buffer(rbuf.data(), rbuf.size()), asio::deferred); + if (n == 0) + break; + read1_bytes += static_cast(n); + } + } + catch (std::exception const&) + { + } + }; + + auto write2_task = [&]() -> asio::awaitable { + try + { + while (running.load(std::memory_order_relaxed)) + { + co_await sock2.async_write_some( + asio::buffer(buf2.data(), chunk_size), asio::deferred); + } + sock2.shutdown(tcp_socket::shutdown_send); + } + catch (std::exception const&) + { + } + }; + + auto read2_task = [&]() -> asio::awaitable { + try + { + std::vector rbuf(chunk_size); + for (;;) + { + auto n = co_await sock1.async_read_some( + asio::buffer(rbuf.data(), rbuf.size()), asio::deferred); + if (n == 0) + break; + read2_bytes += static_cast(n); + } + } + catch (std::exception const&) + { + } + }; + + perf::stopwatch sw; + + asio::co_spawn(ioc, write1_task(), asio::detached); + asio::co_spawn(ioc, read1_task(), asio::detached); + asio::co_spawn(ioc, write2_task(), asio::detached); + asio::co_spawn(ioc, read2_task(), asio::detached); + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + running.store(false, std::memory_order_relaxed); + }); + + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + state.add_bytes(read1_bytes + read2_bytes); sock1.close(); sock2.close(); } @@ -338,8 +516,12 @@ make_socket_throughput_suite() }) .add("unidirectional", bench_throughput) .range(1024, 1048576, 4) + .add("unidirectional_lockless", bench_throughput_lockless) + .range(1024, 1048576, 4) .add("bidirectional", bench_bidirectional_throughput) .range(1024, 1048576, 4) + .add("bidirectional_lockless", bench_bidirectional_throughput_lockless) + .range(1024, 1048576, 4) .add("multithread", bench_multithread_throughput) .args({2, 4, 8}); } diff --git a/perf/bench/asio/coroutine/timer_bench.cpp b/perf/bench/asio/coroutine/timer_bench.cpp index 423e64d86..a764ff3f4 100644 --- a/perf/bench/asio/coroutine/timer_bench.cpp +++ b/perf/bench/asio/coroutine/timer_bench.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include @@ -103,6 +104,77 @@ bench_fire_rate(bench::state& state) state.add_items(counter); } +void +bench_schedule_cancel_lockless(bench::state& state) +{ + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + int64_t counter = 0; + int constexpr batch_size = 1000; + + perf::stopwatch sw; + auto deadline = std::chrono::steady_clock::now() + + std::chrono::duration(state.duration()); + + while (std::chrono::steady_clock::now() < deadline) + { + for (int i = 0; i < batch_size; ++i) + { + timer_type t(ioc); + t.expires_after(std::chrono::hours(1)); + t.cancel(); + ++counter; + } + + ioc.poll(); + ioc.restart(); + } + + ioc.run(); + + state.set_elapsed(sw.elapsed_seconds()); + state.add_items(counter); +} + +void +bench_fire_rate_lockless(bench::state& state) +{ + asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE); + std::atomic running{true}; + int64_t counter = 0; + + auto task = [&]() -> asio::awaitable { + timer_type t(ioc); + try + { + while (running.load(std::memory_order_relaxed)) + { + t.expires_after(std::chrono::nanoseconds(0)); + co_await t.async_wait(asio::deferred); + ++counter; + } + } + catch (std::exception const&) + { + } + }; + + perf::stopwatch sw; + + asio::co_spawn(ioc, task(), asio::detached); + + std::thread timer([&]() { + std::this_thread::sleep_for( + std::chrono::duration(state.duration())); + running.store(false, std::memory_order_relaxed); + }); + + ioc.run(); + timer.join(); + + state.set_elapsed(sw.elapsed_seconds()); + state.add_items(counter); +} + // N timers with staggered intervals (100us–1000us) firing concurrently. // Stresses the timer queue under contention and reveals wake accuracy // degradation as the number of pending timers grows. @@ -168,7 +240,9 @@ make_timer_suite() { return bench::benchmark_suite("timer") .add("schedule_cancel", bench_schedule_cancel) + .add("schedule_cancel_lockless", bench_schedule_cancel_lockless) .add("fire_rate", bench_fire_rate) + .add("fire_rate_lockless", bench_fire_rate_lockless) .add("concurrent", bench_concurrent_timers) .args({10, 100, 1000}); } diff --git a/perf/bench/corosio/local_socket_throughput_bench.cpp b/perf/bench/corosio/local_socket_throughput_bench.cpp index 62927a10d..027feba22 100644 --- a/perf/bench/corosio/local_socket_throughput_bench.cpp +++ b/perf/bench/corosio/local_socket_throughput_bench.cpp @@ -45,6 +45,7 @@ bench_unix_throughput(bench::state& state) std::vector read_buf(chunk_size); std::atomic running{true}; + int64_t total_bytes = 0; auto write_task = [&]() -> capy::task<> { while (running.load(std::memory_order_relaxed)) @@ -64,7 +65,7 @@ bench_unix_throughput(bench::state& state) capy::mutable_buffer(read_buf.data(), read_buf.size())); if (ec || n == 0) break; - state.add_bytes(static_cast(n)); + total_bytes += static_cast(n); } }; @@ -83,6 +84,7 @@ bench_unix_throughput(bench::state& state) timer.join(); state.set_elapsed(sw.elapsed_seconds()); + state.add_bytes(total_bytes); writer.close(); reader.close(); } @@ -101,6 +103,8 @@ bench_unix_bidirectional_throughput(bench::state& state) std::vector buf2(chunk_size, 'b'); std::atomic running{true}; + int64_t read1_bytes = 0; + int64_t read2_bytes = 0; auto write1_task = [&]() -> capy::task<> { while (running.load(std::memory_order_relaxed)) @@ -121,7 +125,7 @@ bench_unix_bidirectional_throughput(bench::state& state) capy::mutable_buffer(rbuf.data(), rbuf.size())); if (ec || n == 0) break; - state.add_bytes(static_cast(n)); + read1_bytes += static_cast(n); } }; @@ -144,7 +148,7 @@ bench_unix_bidirectional_throughput(bench::state& state) capy::mutable_buffer(rbuf.data(), rbuf.size())); if (ec || n == 0) break; - state.add_bytes(static_cast(n)); + read2_bytes += static_cast(n); } }; @@ -165,6 +169,7 @@ bench_unix_bidirectional_throughput(bench::state& state) timer.join(); state.set_elapsed(sw.elapsed_seconds()); + state.add_bytes(read1_bytes + read2_bytes); sock1.close(); sock2.close(); } @@ -185,6 +190,7 @@ bench_unix_throughput_lockless(bench::state& state) std::vector read_buf(chunk_size); std::atomic running{true}; + int64_t total_bytes = 0; auto write_task = [&]() -> capy::task<> { while (running.load(std::memory_order_relaxed)) @@ -204,7 +210,7 @@ bench_unix_throughput_lockless(bench::state& state) capy::mutable_buffer(read_buf.data(), read_buf.size())); if (ec || n == 0) break; - state.add_bytes(static_cast(n)); + total_bytes += static_cast(n); } }; @@ -223,6 +229,7 @@ bench_unix_throughput_lockless(bench::state& state) timer.join(); state.set_elapsed(sw.elapsed_seconds()); + state.add_bytes(total_bytes); writer.close(); reader.close(); } @@ -243,6 +250,8 @@ bench_unix_bidirectional_throughput_lockless(bench::state& state) std::vector buf2(chunk_size, 'b'); std::atomic running{true}; + int64_t read1_bytes = 0; + int64_t read2_bytes = 0; auto write1_task = [&]() -> capy::task<> { while (running.load(std::memory_order_relaxed)) @@ -263,7 +272,7 @@ bench_unix_bidirectional_throughput_lockless(bench::state& state) capy::mutable_buffer(rbuf.data(), rbuf.size())); if (ec || n == 0) break; - state.add_bytes(static_cast(n)); + read1_bytes += static_cast(n); } }; @@ -286,7 +295,7 @@ bench_unix_bidirectional_throughput_lockless(bench::state& state) capy::mutable_buffer(rbuf.data(), rbuf.size())); if (ec || n == 0) break; - state.add_bytes(static_cast(n)); + read2_bytes += static_cast(n); } }; @@ -307,6 +316,7 @@ bench_unix_bidirectional_throughput_lockless(bench::state& state) timer.join(); state.set_elapsed(sw.elapsed_seconds()); + state.add_bytes(read1_bytes + read2_bytes); sock1.close(); sock2.close(); } diff --git a/perf/bench/corosio/socket_throughput_bench.cpp b/perf/bench/corosio/socket_throughput_bench.cpp index c906d7e8a..8ae49b896 100644 --- a/perf/bench/corosio/socket_throughput_bench.cpp +++ b/perf/bench/corosio/socket_throughput_bench.cpp @@ -73,6 +73,7 @@ bench_throughput(bench::state& state) std::vector read_buf(chunk_size); std::atomic running{true}; + int64_t total_bytes = 0; auto write_task = [&]() -> capy::task<> { while (running.load(std::memory_order_relaxed)) @@ -92,7 +93,7 @@ bench_throughput(bench::state& state) capy::mutable_buffer(read_buf.data(), read_buf.size())); if (ec || n == 0) break; - state.add_bytes(static_cast(n)); + total_bytes += static_cast(n); } }; @@ -111,6 +112,7 @@ bench_throughput(bench::state& state) timer.join(); state.set_elapsed(sw.elapsed_seconds()); + state.add_bytes(total_bytes); writer.close(); reader.close(); } @@ -135,6 +137,8 @@ bench_bidirectional_throughput(bench::state& state) std::vector buf2(chunk_size, 'b'); std::atomic running{true}; + int64_t read1_bytes = 0; + int64_t read2_bytes = 0; auto write1_task = [&]() -> capy::task<> { while (running.load(std::memory_order_relaxed)) @@ -155,7 +159,7 @@ bench_bidirectional_throughput(bench::state& state) capy::mutable_buffer(rbuf.data(), rbuf.size())); if (ec || n == 0) break; - state.add_bytes(static_cast(n)); + read1_bytes += static_cast(n); } }; @@ -178,7 +182,7 @@ bench_bidirectional_throughput(bench::state& state) capy::mutable_buffer(rbuf.data(), rbuf.size())); if (ec || n == 0) break; - state.add_bytes(static_cast(n)); + read2_bytes += static_cast(n); } }; @@ -199,6 +203,7 @@ bench_bidirectional_throughput(bench::state& state) timer.join(); state.set_elapsed(sw.elapsed_seconds()); + state.add_bytes(read1_bytes + read2_bytes); sock1.close(); sock2.close(); } @@ -261,6 +266,7 @@ bench_throughput_lockless(bench::state& state) std::vector read_buf(chunk_size); std::atomic running{true}; + int64_t total_bytes = 0; auto write_task = [&]() -> capy::task<> { while (running.load(std::memory_order_relaxed)) @@ -280,7 +286,7 @@ bench_throughput_lockless(bench::state& state) capy::mutable_buffer(read_buf.data(), read_buf.size())); if (ec || n == 0) break; - state.add_bytes(static_cast(n)); + total_bytes += static_cast(n); } }; @@ -299,6 +305,7 @@ bench_throughput_lockless(bench::state& state) timer.join(); state.set_elapsed(sw.elapsed_seconds()); + state.add_bytes(total_bytes); writer.close(); reader.close(); } @@ -325,6 +332,8 @@ bench_bidirectional_throughput_lockless(bench::state& state) std::vector buf2(chunk_size, 'b'); std::atomic running{true}; + int64_t read1_bytes = 0; + int64_t read2_bytes = 0; auto write1_task = [&]() -> capy::task<> { while (running.load(std::memory_order_relaxed)) @@ -345,7 +354,7 @@ bench_bidirectional_throughput_lockless(bench::state& state) capy::mutable_buffer(rbuf.data(), rbuf.size())); if (ec || n == 0) break; - state.add_bytes(static_cast(n)); + read1_bytes += static_cast(n); } }; @@ -368,7 +377,7 @@ bench_bidirectional_throughput_lockless(bench::state& state) capy::mutable_buffer(rbuf.data(), rbuf.size())); if (ec || n == 0) break; - state.add_bytes(static_cast(n)); + read2_bytes += static_cast(n); } }; @@ -389,6 +398,7 @@ bench_bidirectional_throughput_lockless(bench::state& state) timer.join(); state.set_elapsed(sw.elapsed_seconds()); + state.add_bytes(read1_bytes + read2_bytes); sock1.close(); sock2.close(); } From c596fb4a6b9d66fcf312f5404977e6dc227f5bac Mon Sep 17 00:00:00 2001 From: Steve Gerbino Date: Fri, 17 Apr 2026 19:29:28 +0200 Subject: [PATCH 2/2] perf/bench: replace suite warmup lambdas with per-bench self-warmup Add --warmup flag (default 0, disabled) that runs every benchmark once with a throwaway state before the real measurement. The self-warmup exercises the exact code path being measured, eliminating the sync-vs-async warmup asymmetry in the previous scheme where corosio's set_warmup lambdas exercised async I/O paths while asio's used synchronous asio::write/read. Removes benchmark_suite::set_warmup and all 18 .set_warmup([]{...}) call sites across corosio, asio coroutine, and asio callback. For benches with needs_conntrack_drain, drain runs before both warmup and real measurement. --- .../bench/asio/callback/http_server_bench.cpp | 24 ----- perf/bench/asio/callback/io_context_bench.cpp | 7 -- .../callback/local_socket_latency_bench.cpp | 12 --- .../local_socket_throughput_bench.cpp | 9 -- .../asio/callback/socket_latency_bench.cpp | 12 --- .../asio/callback/socket_throughput_bench.cpp | 9 -- .../asio/coroutine/http_server_bench.cpp | 22 ----- .../bench/asio/coroutine/io_context_bench.cpp | 7 -- .../coroutine/local_socket_latency_bench.cpp | 12 --- .../local_socket_throughput_bench.cpp | 9 -- .../asio/coroutine/socket_latency_bench.cpp | 12 --- .../coroutine/socket_throughput_bench.cpp | 9 -- perf/bench/common/suite.hpp | 89 +++++++++++-------- perf/bench/corosio/http_server_bench.cpp | 34 +------ perf/bench/corosio/io_context_bench.cpp | 8 -- .../corosio/local_socket_latency_bench.cpp | 18 ---- .../corosio/local_socket_throughput_bench.cpp | 15 ---- perf/bench/corosio/socket_latency_bench.cpp | 22 +---- .../bench/corosio/socket_throughput_bench.cpp | 19 +--- perf/bench/main.cpp | 27 ++++++ 20 files changed, 81 insertions(+), 295 deletions(-) diff --git a/perf/bench/asio/callback/http_server_bench.cpp b/perf/bench/asio/callback/http_server_bench.cpp index 233832463..8404fea42 100644 --- a/perf/bench/asio/callback/http_server_bench.cpp +++ b/perf/bench/asio/callback/http_server_bench.cpp @@ -338,30 +338,6 @@ make_http_server_suite() { using F = bench::bench_flags; return bench::benchmark_suite("http_server", F::needs_conntrack_drain) - .set_warmup([] { - asio::io_context ioc; - auto [c, s] = asio_bench::make_socket_pair(ioc); - char buf[256] = {}; - for (int i = 0; i < 10; ++i) - { - asio::write( - c, - asio::buffer( - bench::http::small_request, - bench::http::small_request_size)); - asio::read( - s, asio::buffer(buf, bench::http::small_request_size)); - asio::write( - s, - asio::buffer( - bench::http::small_response, - bench::http::small_response_size)); - asio::read( - c, asio::buffer(buf, bench::http::small_response_size)); - } - c.close(); - s.close(); - }) .add("single_conn", bench_single_connection) .add("single_conn_lockless", bench_single_connection_lockless) .add("concurrent", bench_concurrent_connections) diff --git a/perf/bench/asio/callback/io_context_bench.cpp b/perf/bench/asio/callback/io_context_bench.cpp index b279fa33a..c7e16e966 100644 --- a/perf/bench/asio/callback/io_context_bench.cpp +++ b/perf/bench/asio/callback/io_context_bench.cpp @@ -236,13 +236,6 @@ make_io_context_suite() { using F = bench::bench_flags; return bench::benchmark_suite("io_context", F::is_microbenchmark) - .set_warmup([] { - asio::io_context ioc; - int64_t counter = 0; - for (int i = 0; i < 1000; ++i) - asio::post(ioc, [&counter] { ++counter; }); - ioc.run(); - }) .add("single_threaded", bench_single_threaded_post) .add("multithreaded", bench_multithreaded_scaling) .args({8}) diff --git a/perf/bench/asio/callback/local_socket_latency_bench.cpp b/perf/bench/asio/callback/local_socket_latency_bench.cpp index 89e393d7f..da44a50e2 100644 --- a/perf/bench/asio/callback/local_socket_latency_bench.cpp +++ b/perf/bench/asio/callback/local_socket_latency_bench.cpp @@ -284,18 +284,6 @@ make_local_socket_latency_suite() { using F = bench::bench_flags; return bench::benchmark_suite("local_socket_latency", F::none) - .set_warmup([] { - asio::io_context ioc; - auto [c, s] = asio_bench::make_local_socket_pair(ioc); - char buf[64] = {}; - for (int i = 0; i < 100; ++i) - { - asio::write(c, asio::buffer(buf)); - asio::read(s, asio::buffer(buf)); - } - c.close(); - s.close(); - }) .add("pingpong", bench_pingpong_latency) .args({1, 64, 1024}) .add("pingpong_lockless", bench_pingpong_latency_lockless) diff --git a/perf/bench/asio/callback/local_socket_throughput_bench.cpp b/perf/bench/asio/callback/local_socket_throughput_bench.cpp index d4b3ea667..e6b47f59d 100644 --- a/perf/bench/asio/callback/local_socket_throughput_bench.cpp +++ b/perf/bench/asio/callback/local_socket_throughput_bench.cpp @@ -248,15 +248,6 @@ make_local_socket_throughput_suite() { using F = bench::bench_flags; return bench::benchmark_suite("local_socket_throughput", F::none) - .set_warmup([] { - asio::io_context ioc; - auto [w, r] = asio_bench::make_local_socket_pair(ioc); - std::vector buf(4096, 'w'); - asio::write(w, asio::buffer(buf)); - asio::read(r, asio::buffer(buf)); - w.close(); - r.close(); - }) .add("unidirectional", bench_throughput) .range(1024, 1048576, 4) .add("unidirectional_lockless", bench_throughput_lockless) diff --git a/perf/bench/asio/callback/socket_latency_bench.cpp b/perf/bench/asio/callback/socket_latency_bench.cpp index f40320ab3..afef42497 100644 --- a/perf/bench/asio/callback/socket_latency_bench.cpp +++ b/perf/bench/asio/callback/socket_latency_bench.cpp @@ -287,18 +287,6 @@ make_socket_latency_suite() { using F = bench::bench_flags; return bench::benchmark_suite("socket_latency", F::needs_conntrack_drain) - .set_warmup([] { - asio::io_context ioc; - auto [c, s] = asio_bench::make_socket_pair(ioc); - char buf[64] = {}; - for (int i = 0; i < 100; ++i) - { - asio::write(c, asio::buffer(buf)); - asio::read(s, asio::buffer(buf)); - } - c.close(); - s.close(); - }) .add("pingpong", bench_pingpong_latency) .args({1, 64, 1024}) .add("pingpong_lockless", bench_pingpong_latency_lockless) diff --git a/perf/bench/asio/callback/socket_throughput_bench.cpp b/perf/bench/asio/callback/socket_throughput_bench.cpp index 145fb1d77..d4c539166 100644 --- a/perf/bench/asio/callback/socket_throughput_bench.cpp +++ b/perf/bench/asio/callback/socket_throughput_bench.cpp @@ -390,15 +390,6 @@ make_socket_throughput_suite() { using F = bench::bench_flags; return bench::benchmark_suite("socket_throughput", F::needs_conntrack_drain) - .set_warmup([] { - asio::io_context ioc; - auto [w, r] = asio_bench::make_socket_pair(ioc); - std::vector buf(4096, 'w'); - asio::write(w, asio::buffer(buf)); - asio::read(r, asio::buffer(buf)); - w.close(); - r.close(); - }) .add("unidirectional", bench_throughput) .range(1024, 1048576, 4) .add("unidirectional_lockless", bench_throughput_lockless) diff --git a/perf/bench/asio/coroutine/http_server_bench.cpp b/perf/bench/asio/coroutine/http_server_bench.cpp index 5c5adf0d3..f9d1aba0a 100644 --- a/perf/bench/asio/coroutine/http_server_bench.cpp +++ b/perf/bench/asio/coroutine/http_server_bench.cpp @@ -282,28 +282,6 @@ make_http_server_suite() using F = bench::bench_flags; return bench::benchmark_suite("http_server", F::needs_conntrack_drain) - .set_warmup([]{ - asio::io_context ioc; - auto [c, s] = make_socket_pair(ioc); - char buf[256] = {}; - for (int i = 0; i < 10; ++i) - { - asio::write( - c, - asio::buffer( - bench::http::small_request, - bench::http::small_request_size)); - asio::read(s, asio::buffer(buf, bench::http::small_request_size)); - asio::write( - s, - asio::buffer( - bench::http::small_response, - bench::http::small_response_size)); - asio::read(c, asio::buffer(buf, bench::http::small_response_size)); - } - c.close(); - s.close(); - }) .add("single_conn", bench_single_connection) .add("single_conn_lockless", bench_single_connection_lockless) .add("concurrent", bench_concurrent_connections) diff --git a/perf/bench/asio/coroutine/io_context_bench.cpp b/perf/bench/asio/coroutine/io_context_bench.cpp index 851f1d6dc..6fa7a6ea8 100644 --- a/perf/bench/asio/coroutine/io_context_bench.cpp +++ b/perf/bench/asio/coroutine/io_context_bench.cpp @@ -248,13 +248,6 @@ make_io_context_suite() { using F = bench::bench_flags; return bench::benchmark_suite("io_context", F::is_microbenchmark) - .set_warmup([] { - asio::io_context ioc; - int64_t counter = 0; - for (int i = 0; i < 1000; ++i) - asio::co_spawn(ioc, increment_task(counter), asio::detached); - ioc.run(); - }) .add("single_threaded", bench_single_threaded_post) .add("multithreaded", bench_multithreaded_scaling) .args({8}) diff --git a/perf/bench/asio/coroutine/local_socket_latency_bench.cpp b/perf/bench/asio/coroutine/local_socket_latency_bench.cpp index cefee0744..79819c30b 100644 --- a/perf/bench/asio/coroutine/local_socket_latency_bench.cpp +++ b/perf/bench/asio/coroutine/local_socket_latency_bench.cpp @@ -238,18 +238,6 @@ bench::benchmark_suite make_local_socket_latency_suite() { return bench::benchmark_suite("local_socket_latency") - .set_warmup([] { - asio::io_context ioc; - auto [c, s] = make_local_socket_pair(ioc); - char buf[64] = {}; - for (int i = 0; i < 100; ++i) - { - asio::write(c, asio::buffer(buf)); - asio::read(s, asio::buffer(buf)); - } - c.close(); - s.close(); - }) .add("pingpong", bench_pingpong_latency) .args({1, 64, 1024}) .add("pingpong_lockless", bench_pingpong_latency_lockless) diff --git a/perf/bench/asio/coroutine/local_socket_throughput_bench.cpp b/perf/bench/asio/coroutine/local_socket_throughput_bench.cpp index 85f048277..1174a8140 100644 --- a/perf/bench/asio/coroutine/local_socket_throughput_bench.cpp +++ b/perf/bench/asio/coroutine/local_socket_throughput_bench.cpp @@ -377,15 +377,6 @@ bench::benchmark_suite make_local_socket_throughput_suite() { return bench::benchmark_suite("local_socket_throughput") - .set_warmup([] { - asio::io_context ioc; - auto [w, r] = make_local_socket_pair(ioc); - std::vector buf(4096, 'w'); - asio::write(w, asio::buffer(buf)); - asio::read(r, asio::buffer(buf)); - w.close(); - r.close(); - }) .add("unidirectional", bench_throughput) .range(1024, 1048576, 4) .add("unidirectional_lockless", bench_throughput_lockless) diff --git a/perf/bench/asio/coroutine/socket_latency_bench.cpp b/perf/bench/asio/coroutine/socket_latency_bench.cpp index 854fc3549..d5e18acf8 100644 --- a/perf/bench/asio/coroutine/socket_latency_bench.cpp +++ b/perf/bench/asio/coroutine/socket_latency_bench.cpp @@ -239,18 +239,6 @@ make_socket_latency_suite() { using F = bench::bench_flags; return bench::benchmark_suite("socket_latency", F::needs_conntrack_drain) - .set_warmup([] { - asio::io_context ioc; - auto [c, s] = make_socket_pair(ioc); - char buf[64] = {}; - for (int i = 0; i < 100; ++i) - { - asio::write(c, asio::buffer(buf)); - asio::read(s, asio::buffer(buf)); - } - c.close(); - s.close(); - }) .add("pingpong", bench_pingpong_latency) .args({1, 64, 1024}) .add("pingpong_lockless", bench_pingpong_latency_lockless) diff --git a/perf/bench/asio/coroutine/socket_throughput_bench.cpp b/perf/bench/asio/coroutine/socket_throughput_bench.cpp index c302449bc..5f714af1f 100644 --- a/perf/bench/asio/coroutine/socket_throughput_bench.cpp +++ b/perf/bench/asio/coroutine/socket_throughput_bench.cpp @@ -505,15 +505,6 @@ make_socket_throughput_suite() { using F = bench::bench_flags; return bench::benchmark_suite("socket_throughput", F::needs_conntrack_drain) - .set_warmup([] { - asio::io_context ioc; - auto [w, r] = make_socket_pair(ioc); - std::vector buf(4096, 'w'); - asio::write(w, asio::buffer(buf)); - asio::read(r, asio::buffer(buf)); - w.close(); - r.close(); - }) .add("unidirectional", bench_throughput) .range(1024, 1048576, 4) .add("unidirectional_lockless", bench_throughput_lockless) diff --git a/perf/bench/common/suite.hpp b/perf/bench/common/suite.hpp index c298f5cc6..2f1b3684b 100644 --- a/perf/bench/common/suite.hpp +++ b/perf/bench/common/suite.hpp @@ -273,12 +273,10 @@ class benchmark_suite std::string library_; std::string category_; bench_flags flags_; - std::function warmup_; std::vector entries_; public: - using bench_fn = std::function; - using warmup_fn = std::function; + using bench_fn = std::function; explicit benchmark_suite( std::string category, bench_flags flags = bench_flags::none) @@ -316,20 +314,12 @@ class benchmark_suite return *this; } - /// Set a warmup function that runs once before the first benchmark. - benchmark_suite& set_warmup(warmup_fn fn) - { - warmup_ = std::move(fn); - return *this; - } - /// Set the library name (called by the runner). void set_library(std::string lib) { library_ = std::move(lib); } std::string const& library() const { return library_; } std::string const& category() const { return category_; } bench_flags flags() const { return flags_; } - warmup_fn const& warmup() const { return warmup_; } std::vector const& entries() const { return entries_; } }; @@ -338,6 +328,7 @@ class benchmark_runner { std::string backend_; double duration_s_; + double warmup_duration_s_ = 0.0; std::vector suites_; result_collector collector_; @@ -350,6 +341,27 @@ class benchmark_runner collector_.set_duration(duration_s); } + /** Set the per-benchmark warmup duration in seconds. + + When greater than zero, every benchmark runs once with a throwaway + `state` for this duration before the real measurement. The warmup + exercises the exact code path being measured, priming kernel + buffers, allocator pools, CPU caches, and library-specific + dispatch paths. Results from the warmup run are discarded. + + @par Rationale + A bench-specific lambda warmup primes the wrong paths if the + lambda uses a different I/O style than the benchmark (e.g. sync + I/O vs async). Self-warmup guarantees the primed code path is + the measured code path. + */ + void set_warmup_duration(double seconds) + { + warmup_duration_s_ = seconds; + } + + double warmup_duration() const { return warmup_duration_s_; } + /// Add a suite to the runner. void add_suite(benchmark_suite suite) { @@ -426,28 +438,22 @@ class benchmark_runner !explicit_cat && !enable_microbenchmarks) continue; - bool warmup_done = false; - for (auto const& entry : suite.entries()) { - bool suite_drain = has_flag( - suite.flags(), bench_flags::needs_conntrack_drain); - bool entry_drain = has_flag( - entry.flags, bench_flags::needs_conntrack_drain); + bool needs_drain = + has_flag(suite.flags(), + bench_flags::needs_conntrack_drain) || + has_flag(entry.flags, + bench_flags::needs_conntrack_drain); if (entry.args.empty()) { if (!want_bench(entry.name)) continue; - run_warmup(suite, warmup_done); - - if (suite_drain || entry_drain) - perf::await_conntrack_drain(); - run_entry( suite.library(), suite.category(), - entry.name, entry.fn, {}); + entry.name, entry.fn, {}, needs_drain); } else { @@ -459,14 +465,9 @@ class benchmark_runner !want_bench(full_name)) continue; - run_warmup(suite, warmup_done); - - if (suite_drain || entry_drain) - perf::await_conntrack_drain(); - run_entry( suite.library(), suite.category(), - full_name, entry.fn, {v}); + full_name, entry.fn, {v}, needs_drain); } } } @@ -480,21 +481,33 @@ class benchmark_runner } private: - void run_warmup(benchmark_suite const& suite, bool& done) - { - if (done || !suite.warmup()) - return; - suite.warmup()(); - done = true; - } - void run_entry( std::string const& library, std::string const& category, std::string const& name, std::function const& fn, - std::vector ranges) + std::vector ranges, + bool needs_drain) { + auto maybe_drain = [&] { + if (needs_drain) + perf::await_conntrack_drain(); + }; + + // Self-warmup: run the benchmark with a throwaway state so the + // exact code path being measured is primed (kernel buffers, + // allocator pools, CPU caches, library dispatch). Drain again + // between warmup and real run to clear any socket state the + // warmup left behind. + if (warmup_duration_s_ > 0.0) + { + maybe_drain(); + state warmup_st(warmup_duration_s_, ranges); + fn(warmup_st); + } + + maybe_drain(); + std::string header; if (!library.empty()) header += "(" + library + ") "; diff --git a/perf/bench/corosio/http_server_bench.cpp b/perf/bench/corosio/http_server_bench.cpp index 8d90546b2..a7db90507 100644 --- a/perf/bench/corosio/http_server_bench.cpp +++ b/perf/bench/corosio/http_server_bench.cpp @@ -305,41 +305,9 @@ template bench::benchmark_suite make_http_server_suite() { - using F = bench::bench_flags; - using socket_type = corosio::native_tcp_socket; + using F = bench::bench_flags; return bench::benchmark_suite("http_server", F::needs_conntrack_drain) - .set_warmup([]{ - corosio::native_io_context ioc; - auto [c, s] = corosio::test::make_socket_pair< - socket_type, corosio::native_tcp_acceptor>(ioc); - char buf[256] = {}; - auto task = [&]() -> capy::task<> { - for (int i = 0; i < 10; ++i) - { - (void)co_await capy::write( - c, - capy::const_buffer( - bench::http::small_request, - bench::http::small_request_size)); - (void)co_await s.read_some( - capy::mutable_buffer( - buf, bench::http::small_request_size)); - (void)co_await capy::write( - s, - capy::const_buffer( - bench::http::small_response, - bench::http::small_response_size)); - (void)co_await c.read_some( - capy::mutable_buffer( - buf, bench::http::small_response_size)); - } - }; - capy::run_async(ioc.get_executor())(task()); - ioc.run(); - c.close(); - s.close(); - }) .add("single_conn", bench_single_connection) .add("single_conn_lockless", bench_single_connection_lockless) .add("concurrent", bench_concurrent_connections) diff --git a/perf/bench/corosio/io_context_bench.cpp b/perf/bench/corosio/io_context_bench.cpp index 15aab9051..1a39cf4db 100644 --- a/perf/bench/corosio/io_context_bench.cpp +++ b/perf/bench/corosio/io_context_bench.cpp @@ -330,14 +330,6 @@ make_io_context_suite() { using F = bench::bench_flags; return bench::benchmark_suite("io_context", F::is_microbenchmark) - .set_warmup([] { - corosio::native_io_context ioc; - auto ex = ioc.get_executor(); - int64_t counter = 0; - for (int i = 0; i < 1000; ++i) - capy::run_async(ex)(increment_task(counter)); - ioc.run(); - }) .add("single_threaded", bench_single_threaded_post) .add("multithreaded", bench_multithreaded_scaling) .args({8}) diff --git a/perf/bench/corosio/local_socket_latency_bench.cpp b/perf/bench/corosio/local_socket_latency_bench.cpp index d6690d2ea..cc99561a4 100644 --- a/perf/bench/corosio/local_socket_latency_bench.cpp +++ b/perf/bench/corosio/local_socket_latency_bench.cpp @@ -233,24 +233,6 @@ make_local_socket_latency_suite() using F = bench::bench_flags; return bench::benchmark_suite("local_socket_latency", F::none) - .set_warmup([]{ - corosio::native_io_context ioc; - auto [c, s] = corosio::make_local_stream_pair(ioc); - char buf[64] = {}; - auto task = [&]() -> capy::task<> { - for (int i = 0; i < 100; ++i) - { - (void)co_await c.write_some( - capy::const_buffer(buf, sizeof(buf))); - (void)co_await s.read_some( - capy::mutable_buffer(buf, sizeof(buf))); - } - }; - capy::run_async(ioc.get_executor())(task()); - ioc.run(); - c.close(); - s.close(); - }) .add("pingpong", bench_unix_pingpong_latency) .args({1, 64, 1024}) .add("pingpong_lockless", bench_unix_pingpong_latency_lockless) diff --git a/perf/bench/corosio/local_socket_throughput_bench.cpp b/perf/bench/corosio/local_socket_throughput_bench.cpp index 027feba22..ab9d60988 100644 --- a/perf/bench/corosio/local_socket_throughput_bench.cpp +++ b/perf/bench/corosio/local_socket_throughput_bench.cpp @@ -330,21 +330,6 @@ make_local_socket_throughput_suite() using F = bench::bench_flags; return bench::benchmark_suite("local_socket_throughput", F::none) - .set_warmup([]{ - corosio::native_io_context ioc; - auto [w, r] = corosio::make_local_stream_pair(ioc); - std::vector buf(4096, 'w'); - auto task = [&]() -> capy::task<> { - (void)co_await w.write_some( - capy::const_buffer(buf.data(), buf.size())); - (void)co_await r.read_some( - capy::mutable_buffer(buf.data(), buf.size())); - }; - capy::run_async(ioc.get_executor())(task()); - ioc.run(); - w.close(); - r.close(); - }) .add("unidirectional", bench_unix_throughput) .range(1024, 1048576, 4) .add("unidirectional_lockless", bench_unix_throughput_lockless) diff --git a/perf/bench/corosio/socket_latency_bench.cpp b/perf/bench/corosio/socket_latency_bench.cpp index 647f82f9b..4497fbf33 100644 --- a/perf/bench/corosio/socket_latency_bench.cpp +++ b/perf/bench/corosio/socket_latency_bench.cpp @@ -252,29 +252,9 @@ template bench::benchmark_suite make_socket_latency_suite() { - using F = bench::bench_flags; - using socket_type = corosio::native_tcp_socket; + using F = bench::bench_flags; return bench::benchmark_suite("socket_latency", F::needs_conntrack_drain) - .set_warmup([]{ - corosio::native_io_context ioc; - auto [c, s] = corosio::test::make_socket_pair< - socket_type, corosio::native_tcp_acceptor>(ioc); - char buf[64] = {}; - auto task = [&]() -> capy::task<> { - for (int i = 0; i < 100; ++i) - { - (void)co_await c.write_some( - capy::const_buffer(buf, sizeof(buf))); - (void)co_await s.read_some( - capy::mutable_buffer(buf, sizeof(buf))); - } - }; - capy::run_async(ioc.get_executor())(task()); - ioc.run(); - c.close(); - s.close(); - }) .add("pingpong", bench_pingpong_latency) .args({1, 64, 1024}) .add("pingpong_lockless", bench_pingpong_latency_lockless) diff --git a/perf/bench/corosio/socket_throughput_bench.cpp b/perf/bench/corosio/socket_throughput_bench.cpp index 8ae49b896..a52c2735d 100644 --- a/perf/bench/corosio/socket_throughput_bench.cpp +++ b/perf/bench/corosio/socket_throughput_bench.cpp @@ -493,26 +493,9 @@ template bench::benchmark_suite make_socket_throughput_suite() { - using F = bench::bench_flags; - using socket_type = corosio::native_tcp_socket; + using F = bench::bench_flags; return bench::benchmark_suite("socket_throughput", F::needs_conntrack_drain) - .set_warmup([]{ - corosio::native_io_context ioc; - auto [w, r] = corosio::test::make_socket_pair< - socket_type, corosio::native_tcp_acceptor>(ioc); - std::vector buf(4096, 'w'); - auto task = [&]() -> capy::task<> { - (void)co_await w.write_some( - capy::const_buffer(buf.data(), buf.size())); - (void)co_await r.read_some( - capy::mutable_buffer(buf.data(), buf.size())); - }; - capy::run_async(ioc.get_executor())(task()); - ioc.run(); - w.close(); - r.close(); - }) .add("unidirectional", bench_throughput) .range(1024, 1048576, 4) .add("unidirectional_lockless", bench_throughput_lockless) diff --git a/perf/bench/main.cpp b/perf/bench/main.cpp index 8812f3457..1f18ae199 100644 --- a/perf/bench/main.cpp +++ b/perf/bench/main.cpp @@ -40,6 +40,10 @@ print_usage(char const* program_name) "(prefix match)\n"; std::cout << " --duration Duration per benchmark in seconds " "(default: 3.0)\n"; + std::cout << " --warmup Self-warmup duration per benchmark " + "(default: 0,\n"; + std::cout + << " disabled; try 0.5 for rigor)\n"; std::cout << " --output Write JSON results to file\n"; std::cout << " --enable-microbenchmarks\n"; std::cout @@ -123,6 +127,7 @@ main(int argc, char* argv[]) char const* category_filter = nullptr; char const* bench_filter = nullptr; double duration_s = 3.0; + double warmup_duration_s = 0.0; bool enable_microbenchmark = false; bool list_mode = false; @@ -193,6 +198,23 @@ main(int argc, char* argv[]) return 1; } } + else if (std::strcmp(argv[i], "--warmup") == 0) + { + if (i + 1 < argc) + { + warmup_duration_s = std::atof(argv[++i]); + if (warmup_duration_s < 0.0) + { + std::cerr << "Error: --warmup must be non-negative\n"; + return 1; + } + } + else + { + std::cerr << "Error: --warmup requires an argument\n"; + return 1; + } + } else if (std::strcmp(argv[i], "--output") == 0) { if (i + 1 < argc) @@ -259,6 +281,7 @@ main(int argc, char* argv[]) [=]( perf::context_factory, BackendTag, char const* name) { bench::benchmark_runner runner(name, duration_s); + runner.set_warmup_duration(warmup_duration_s); if (want_corosio) add_corosio_suites(runner, BackendTag{}); @@ -283,6 +306,10 @@ main(int argc, char* argv[]) std::cout << "Backend: " << name << "\n"; std::cout << "Duration: " << duration_s << "s per benchmark\n"; + std::cout << "Warmup: " << warmup_duration_s + << "s per benchmark" + << (warmup_duration_s <= 0.0 ? " (disabled)" : "") + << "\n"; } runner.run(category_filter, bench_filter, enable_microbenchmark);