Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 31 additions & 25 deletions perf/bench/asio/callback/accept_churn_bench.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ struct sequential_churn_op
std::atomic<bool>& running;
perf::statistics& latency_stats;
std::atomic<int64_t>& ops;
std::unique_ptr<tcp_socket> client;
std::unique_ptr<tcp_socket> server;
tcp_socket client;
tcp_socket server;
perf::stopwatch sw;
char byte = 'X';
char recv_byte = 0;
Expand All @@ -93,27 +93,27 @@ struct sequential_churn_op
sw.reset();
connect_done = false;
accept_done = false;
client = std::make_unique<tcp_socket>( ioc.get_executor() );
server = std::make_unique<tcp_socket>( ioc.get_executor() );
client = tcp_socket( ioc.get_executor() );
server = tcp_socket( ioc.get_executor() );

boost::system::error_code ec;
ec = client->open( tcp::v4(), ec );
ec = client.open( tcp::v4(), ec );
if( ec )
{
asio::post( ioc, [this]() { start(); } );
return;
}
configure_churn_socket( *client );
configure_churn_socket( client );

client->async_connect(ep, [this](boost::system::error_code ec) {
client.async_connect(ep, [this](boost::system::error_code ec) {
if (ec)
return;
connect_done = true;
if (accept_done)
do_write();
});

acc.async_accept(*server, [this](boost::system::error_code ec) {
acc.async_accept(server, [this](boost::system::error_code ec) {
if (ec)
return;
accept_done = true;
Expand All @@ -126,7 +126,7 @@ struct sequential_churn_op
{
byte = 'X';
asio::async_write(
*client, asio::buffer(&byte, 1),
client, asio::buffer(&byte, 1),
[this](boost::system::error_code ec, std::size_t) {
if (ec)
return;
Expand All @@ -138,7 +138,7 @@ struct sequential_churn_op
{
recv_byte = 0;
asio::async_read(
*server, asio::buffer(&recv_byte, 1),
server, asio::buffer(&recv_byte, 1),
[this](boost::system::error_code ec, std::size_t) {
if (ec)
return;
Expand All @@ -148,8 +148,8 @@ struct sequential_churn_op

void finish()
{
client->close();
server->close();
client.close();
server.close();

latency_stats.add(sw.elapsed_ns());
ops.fetch_add(1, std::memory_order_relaxed);
Expand All @@ -169,7 +169,9 @@ bench_sequential_churn(bench::state& state)
std::atomic<bool> running{true};

sequential_churn_op op{ioc, acc, ep, running, state.latency(),
state.ops(), {}, {}, {}};
state.ops(),
tcp_socket(ioc.get_executor()),
tcp_socket(ioc.get_executor()), {}};

perf::stopwatch total_sw;

Expand Down Expand Up @@ -198,7 +200,9 @@ bench_sequential_churn_lockless(bench::state& state)
std::atomic<bool> running{true};

sequential_churn_op op{ioc, acc, ep, running, state.latency(),
state.ops(), {}, {}, {}};
state.ops(),
tcp_socket(ioc.get_executor()),
tcp_socket(ioc.get_executor()), {}};

perf::stopwatch total_sw;

Expand Down Expand Up @@ -244,7 +248,9 @@ bench_concurrent_churn(bench::state& state)
asio::ip::address_v4::loopback(), acceptors[i].local_endpoint().port() );
ops.push_back( std::make_unique<sequential_churn_op>(
sequential_churn_op{ ioc, acceptors[i], ep, running,
state.latency(), state.ops(), {}, {}, {} } ) );
state.latency(), state.ops(),
tcp_socket(ioc.get_executor()),
tcp_socket(ioc.get_executor()), {} } ) );
ops.back()->start();
}

Expand Down Expand Up @@ -273,8 +279,8 @@ struct burst_churn_op
std::atomic<int64_t>& ops;
int burst_size;

std::vector<std::unique_ptr<tcp_socket>> clients;
std::vector<std::unique_ptr<tcp_socket>> servers;
std::vector<tcp_socket> clients;
std::vector<tcp_socket> servers;
int accepted_count = 0;
perf::stopwatch sw;

Expand All @@ -295,27 +301,27 @@ struct burst_churn_op
// partial failure doesn't leave dangling async_accept operations.
for( int i = 0; i < burst_size; ++i )
{
clients.push_back( std::make_unique<tcp_socket>( ioc.get_executor() ) );
clients.emplace_back( ioc.get_executor() );
boost::system::error_code ec;
ec = clients.back()->open( tcp::v4(), ec );
ec = clients.back().open( tcp::v4(), ec );
if( ec )
{
clients.clear();
asio::post( ioc, [this]() { start(); } );
return;
}
configure_churn_socket( *clients.back() );
configure_churn_socket( clients.back() );
}

// Initiate all connects and accepts
for( int i = 0; i < burst_size; ++i )
{
clients[i]->async_connect( ep,
clients[i].async_connect( ep,
[](boost::system::error_code) {} );

servers.push_back(std::make_unique<tcp_socket>(ioc.get_executor()));
servers.emplace_back( ioc.get_executor() );
acc.async_accept(
*servers.back(), [this](boost::system::error_code ec) {
servers.back(), [this](boost::system::error_code ec) {
if (ec)
return;
++accepted_count;
Expand All @@ -328,9 +334,9 @@ struct burst_churn_op
void close_all()
{
for (auto& c : clients)
c->close();
c.close();
for (auto& s : servers)
s->close();
s.close();

burst_stats.add(sw.elapsed_ns());
ops.fetch_add(1, std::memory_order_relaxed);
Expand Down
Loading
Loading