Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions perf/bench/asio/callback/accept_churn_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <boost/asio/post.hpp>
#include <boost/asio/read.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/detail/concurrency_hint.hpp>

#include <atomic>
#include <chrono>
Expand Down Expand Up @@ -187,6 +188,35 @@ bench_sequential_churn(bench::state& state)
acc.close();
}

void
bench_sequential_churn_lockless(bench::state& state)
{
asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE);
auto acc = make_churn_acceptor( ioc );
auto ep = tcp::endpoint( asio::ip::address_v4::loopback(), acc.local_endpoint().port() );

std::atomic<bool> running{true};

sequential_churn_op op{ioc, acc, ep, running, state.latency(),
state.ops(), {}, {}, {}};

perf::stopwatch total_sw;

op.start();

std::thread timer([&]() {
std::this_thread::sleep_for(std::chrono::duration<double>(state.duration()));
running.store(false, std::memory_order_relaxed);
ioc.stop();
});

ioc.run();
timer.join();

state.set_elapsed(total_sw.elapsed_seconds());
acc.close();
}

// N independent accept loops on separate listeners. Reveals whether
// fd allocation or acceptor state scales linearly under callbacks.
void
Expand Down Expand Up @@ -344,6 +374,39 @@ bench_burst_churn(bench::state& state)
acc.close();
}

void
bench_burst_churn_lockless(bench::state& state)
{
int burst_size = static_cast<int>(state.range(0));
state.counters["burst_size"] = burst_size;

asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE);
auto acc = make_churn_acceptor( ioc );
auto ep = tcp::endpoint( asio::ip::address_v4::loopback(), acc.local_endpoint().port() );

std::atomic<bool> running{true};

burst_churn_op op{ioc, acc, ep, running, state.latency(),
state.ops(), burst_size, {}, {}, {},
{}};

perf::stopwatch total_sw;

op.start();

std::thread stopper([&]() {
std::this_thread::sleep_for(std::chrono::duration<double>(state.duration()));
running.store(false, std::memory_order_relaxed);
ioc.stop();
});

ioc.run();
stopper.join();

state.set_elapsed(total_sw.elapsed_seconds());
acc.close();
}

} // anonymous namespace

bench::benchmark_suite
Expand All @@ -352,9 +415,12 @@ make_accept_churn_suite()
using F = bench::bench_flags;
return bench::benchmark_suite("accept_churn", F::needs_conntrack_drain)
.add("sequential", bench_sequential_churn)
.add("sequential_lockless", bench_sequential_churn_lockless)
.add("concurrent", bench_concurrent_churn)
.args({1, 4, 16})
.add("burst", bench_burst_churn)
.args({10, 100})
.add("burst_lockless", bench_burst_churn_lockless)
.args({10, 100});
}

Expand Down
155 changes: 155 additions & 0 deletions perf/bench/asio/callback/fan_out_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <boost/asio/read.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/detail/concurrency_hint.hpp>

#include <atomic>
#include <chrono>
Expand Down Expand Up @@ -466,6 +467,154 @@ bench_concurrent_parents(bench::state& state)
state.set_elapsed(sw.elapsed_seconds());
}

void
bench_fork_join_lockless(bench::state& state)
{
int fan_out = static_cast<int>(state.range(0));
state.counters["fan_out"] = fan_out;

asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE);

std::vector<tcp_socket> clients;
std::vector<tcp_socket> servers;
clients.reserve(fan_out);
servers.reserve(fan_out);

for (int i = 0; i < fan_out; ++i)
{
auto [c, s] = asio_bench::make_socket_pair(ioc);
clients.push_back(std::move(c));
servers.push_back(std::move(s));
}

for (int i = 0; i < fan_out; ++i)
{
auto echo = std::make_shared<echo_server_op>(servers[i]);
echo->start();
}

fork_join_op op{ioc, clients, servers, fan_out, state, {}, {}};

op.start();

std::thread stopper([&]() {
std::this_thread::sleep_for(
std::chrono::duration<double>(state.duration()));
state.stop();
});

perf::stopwatch sw;
ioc.run();
stopper.join();

state.set_elapsed(sw.elapsed_seconds());
}

void
bench_nested_lockless(bench::state& state)
{
int groups = static_cast<int>(state.range(0));
int subs_per_group = 4;
int total_subs = groups * subs_per_group;

state.counters["groups"] = groups;
state.counters["subs_per_group"] = subs_per_group;

asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE);

std::vector<tcp_socket> clients;
std::vector<tcp_socket> servers;
clients.reserve(total_subs);
servers.reserve(total_subs);

for (int i = 0; i < total_subs; ++i)
{
auto [c, s] = asio_bench::make_socket_pair(ioc);
clients.push_back(std::move(c));
servers.push_back(std::move(s));
}

for (int i = 0; i < total_subs; ++i)
{
auto echo = std::make_shared<echo_server_op>(servers[i]);
echo->start();
}

nested_op op{ioc, clients, servers, groups, subs_per_group,
state, {}, {}, {}};

op.start();

std::thread stopper([&]() {
std::this_thread::sleep_for(
std::chrono::duration<double>(state.duration()));
state.stop();
});

perf::stopwatch sw;
ioc.run();
stopper.join();

state.set_elapsed(sw.elapsed_seconds());
}

void
bench_concurrent_parents_lockless(bench::state& state)
{
int num_parents = static_cast<int>(state.range(0));
int fan_out = 16;
int total_subs = num_parents * fan_out;

state.counters["num_parents"] = num_parents;
state.counters["fan_out"] = fan_out;

asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE);

std::vector<tcp_socket> clients;
std::vector<tcp_socket> servers;
clients.reserve(total_subs);
servers.reserve(total_subs);

for (int i = 0; i < total_subs; ++i)
{
auto [c, s] = asio_bench::make_socket_pair(ioc);
clients.push_back(std::move(c));
servers.push_back(std::move(s));
}

for (int i = 0; i < total_subs; ++i)
{
auto echo = std::make_shared<echo_server_op>(servers[i]);
echo->start();
}

std::atomic<int> parents_done{0};

std::vector<std::unique_ptr<parent_fork_join_op>> parent_ops;
parent_ops.reserve(num_parents);

for (int p = 0; p < num_parents; ++p)
{
parent_ops.push_back(
std::make_unique<parent_fork_join_op>(
ioc, clients, servers, p * fan_out, fan_out, num_parents,
state, parents_done));
parent_ops.back()->start();
}

std::thread stopper([&]() {
std::this_thread::sleep_for(
std::chrono::duration<double>(state.duration()));
state.stop();
});

perf::stopwatch sw;
ioc.run();
stopper.join();

state.set_elapsed(sw.elapsed_seconds());
}

} // anonymous namespace

bench::benchmark_suite
Expand All @@ -475,9 +624,15 @@ make_fan_out_suite()
return bench::benchmark_suite("fan_out", F::needs_conntrack_drain)
.add("fork_join", bench_fork_join)
.args({1, 4, 16, 64})
.add("fork_join_lockless", bench_fork_join_lockless)
.args({1, 4, 16, 64})
.add("nested", bench_nested)
.args({4, 16})
.add("nested_lockless", bench_nested_lockless)
.args({4, 16})
.add("concurrent_parents", bench_concurrent_parents)
.args({1, 4, 16})
.add("concurrent_parents_lockless", bench_concurrent_parents_lockless)
.args({1, 4, 16});
}

Expand Down
53 changes: 29 additions & 24 deletions perf/bench/asio/callback/http_server_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <boost/asio/read.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/write.hpp>
#include <boost/asio/detail/concurrency_hint.hpp>

#include <atomic>
#include <chrono>
Expand Down Expand Up @@ -181,6 +182,33 @@ bench_single_connection(bench::state& state)
server.close();
}

void
bench_single_connection_lockless(bench::state& state)
{
asio::io_context ioc(BOOST_ASIO_CONCURRENCY_HINT_UNSAFE);
auto [client, server] = asio_bench::make_socket_pair(ioc);

server_op sop{server, {}};
client_op cop{client, state, {}, {}};

sop.start();
cop.start();

std::thread timer([&]() {
std::this_thread::sleep_for(
std::chrono::duration<double>(state.duration()));
state.stop();
});

perf::stopwatch sw;
ioc.run();
timer.join();

state.set_elapsed(sw.elapsed_seconds());
client.close();
server.close();
}

void
bench_concurrent_connections(bench::state& state)
{
Expand Down Expand Up @@ -310,31 +338,8 @@ make_http_server_suite()
{
using F = bench::bench_flags;
return bench::benchmark_suite("http_server", F::needs_conntrack_drain)
.set_warmup([] {
asio::io_context ioc;
auto [c, s] = asio_bench::make_socket_pair(ioc);
char buf[256] = {};
for (int i = 0; i < 10; ++i)
{
asio::write(
c,
asio::buffer(
bench::http::small_request,
bench::http::small_request_size));
asio::read(
s, asio::buffer(buf, bench::http::small_request_size));
asio::write(
s,
asio::buffer(
bench::http::small_response,
bench::http::small_response_size));
asio::read(
c, asio::buffer(buf, bench::http::small_response_size));
}
c.close();
s.close();
})
.add("single_conn", bench_single_connection)
.add("single_conn_lockless", bench_single_connection_lockless)
.add("concurrent", bench_concurrent_connections)
.args({1, 4, 16, 32})
.add("multithread", bench_multithread)
Expand Down
7 changes: 0 additions & 7 deletions perf/bench/asio/callback/io_context_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -236,13 +236,6 @@ make_io_context_suite()
{
using F = bench::bench_flags;
return bench::benchmark_suite("io_context", F::is_microbenchmark)
.set_warmup([] {
asio::io_context ioc;
int64_t counter = 0;
for (int i = 0; i < 1000; ++i)
asio::post(ioc, [&counter] { ++counter; });
ioc.run();
})
.add("single_threaded", bench_single_threaded_post)
.add("multithreaded", bench_multithreaded_scaling)
.args({8})
Expand Down
Loading
Loading