Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions include/condy/async_operations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,17 @@ inline auto async_send(Fd sockfd, const Buffer &buf, int flags) {
return detail::maybe_flag_fixed_fd(std::move(op), sockfd);
}

/**
* @brief See io_uring_prep_send (with registered buffer)
*/
template <FdLike Fd, BufferLike Buffer>
inline auto async_send(Fd sockfd, detail::FixedBuffer<Buffer> buf, int flags) {
auto op = detail::make_op_awaiter(detail::prep_send_fixed, sockfd,
buf.value.data(), buf.value.size(),
flags, buf.buf_index);
return detail::maybe_flag_fixed_fd(std::move(op), sockfd);
}

/**
* @brief See io_uring_prep_send
*/
Expand Down Expand Up @@ -505,6 +516,19 @@ inline auto async_sendto(Fd sockfd, const Buffer &buf, int flags,
return detail::maybe_flag_fixed_fd(std::move(op), sockfd);
}

/**
* @brief See io_uring_prep_send and io_uring_prep_send_set_addr
* (with registered buffer)
*/
template <FdLike Fd, BufferLike Buffer>
inline auto async_sendto(Fd sockfd, detail::FixedBuffer<Buffer> buf, int flags,
const struct sockaddr *addr, socklen_t addrlen) {
auto op = detail::make_op_awaiter(detail::prep_sendto_fixed, sockfd,
buf.value.data(), buf.value.size(),
flags, addr, addrlen, buf.buf_index);
return detail::maybe_flag_fixed_fd(std::move(op), sockfd);
}

/**
* @brief See io_uring_prep_send and io_uring_prep_send_set_addr
*/
Expand Down Expand Up @@ -592,6 +616,17 @@ inline auto async_recv(Fd sockfd, const Buffer &buf, int flags) {
return detail::maybe_flag_fixed_fd(std::move(op), sockfd);
}

/**
* @brief See io_uring_prep_recv (with registered buffer)
*/
template <FdLike Fd, BufferLike Buffer>
inline auto async_recv(Fd sockfd, detail::FixedBuffer<Buffer> buf, int flags) {
auto op = detail::make_op_awaiter(detail::prep_recv_fixed, sockfd,
buf.value.data(), buf.value.size(),
flags, buf.buf_index);
return detail::maybe_flag_fixed_fd(std::move(op), sockfd);
}

/**
* @brief See io_uring_prep_recv
*/
Expand Down
7 changes: 7 additions & 0 deletions include/condy/detail/async_operations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ inline void prep_send_fixed(io_uring_sqe *sqe, int sockfd, const void *buf,
sqe->buf_index = buf_index;
}

inline void prep_recv_fixed(io_uring_sqe *sqe, int sockfd, void *buf,
size_t len, int flags, int buf_index) noexcept {
io_uring_prep_recv(sqe, sockfd, buf, len, flags);
sqe->ioprio |= IORING_RECVSEND_FIXED_BUF;
sqe->buf_index = buf_index;
}

inline void prep_sendto_fixed(io_uring_sqe *sqe, int sockfd, const void *buf,
size_t len, int flags,
const struct sockaddr *addr, socklen_t addrlen,
Expand Down
87 changes: 87 additions & 0 deletions tests/test_async_operations.3.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,35 @@ TEST_CASE("test async_operations - test send - zero copy fixed buffer") {
close(sv[1]);
}

TEST_CASE("test async_operations - test send - fixed buffer") {
int sv[2];
create_tcp_socketpair(sv);

auto msg = generate_data(1024);
auto func = [&]() -> condy::Coro<void> {
auto &buffer_table = condy::current_runtime().buffer_table();
buffer_table.init(1);
iovec register_iov{
.iov_base = const_cast<char *>(msg.data()),
.iov_len = msg.size(),
};
buffer_table.update(0, &register_iov, 1);

ssize_t n = co_await condy::async_send(
sv[1], condy::fixed(0, condy::buffer(msg)), 0);
REQUIRE(n == msg.size());
};
condy::sync_wait(func());

char read_buf[2048];
ssize_t r = recv(sv[0], read_buf, sizeof(read_buf), 0);
REQUIRE(r == msg.size());
REQUIRE(std::string_view(read_buf, r) == msg);

close(sv[0]);
close(sv[1]);
}

TEST_CASE("test async_operations - test sendto - basic") {
int sender_fd = socket(AF_INET, SOCK_DGRAM, 0);
REQUIRE(sender_fd >= 0);
Expand Down Expand Up @@ -584,6 +613,35 @@ TEST_CASE("test async_operations - test sendto - zero copy fixed buffer") {
close(receiver_fd);
}

TEST_CASE("test async_operations - test sendto - fixed buffer") {
int udp_sv[2];
REQUIRE(socketpair(AF_UNIX, SOCK_DGRAM, 0, udp_sv) == 0);

auto msg = generate_data(1024);
auto func = [&]() -> condy::Coro<void> {
auto &buffer_table = condy::current_runtime().buffer_table();
buffer_table.init(1);
iovec register_iov{
.iov_base = const_cast<char *>(msg.data()),
.iov_len = msg.size(),
};
buffer_table.update(0, &register_iov, 1);

ssize_t n = co_await condy::async_sendto(
udp_sv[0], condy::fixed(0, condy::buffer(msg)), 0, nullptr, 0);
REQUIRE(n == msg.size());
};
condy::sync_wait(func());

char read_buf[2048];
ssize_t r = recv(udp_sv[1], read_buf, sizeof(read_buf), 0);
REQUIRE(r == msg.size());
REQUIRE(std::string_view(read_buf, r) == msg);

close(udp_sv[0]);
close(udp_sv[1]);
}

TEST_CASE("test async_operations - test recv - basic") {
int sv[2];
create_tcp_socketpair(sv);
Expand Down Expand Up @@ -630,6 +688,35 @@ TEST_CASE("test async_operations - test recv - fixed fd") {
close(sv[1]);
}

TEST_CASE("test async_operations - test recv - fixed buffer") {
int sv[2];
create_tcp_socketpair(sv);

auto msg = generate_data(1024);
ssize_t r = send(sv[1], msg.data(), msg.size(), 0);
REQUIRE(r == msg.size());

auto func = [&]() -> condy::Coro<void> {
auto &buffer_table = condy::current_runtime().buffer_table();
buffer_table.init(1);
char buf_storage[2048];
iovec register_iov{
.iov_base = buf_storage,
.iov_len = sizeof(buf_storage),
};
buffer_table.update(0, &register_iov, 1);

ssize_t n = co_await condy::async_recv(
sv[0], condy::fixed(0, condy::buffer(buf_storage, 2048)), 0);
REQUIRE(n == msg.size());
REQUIRE(std::string_view(buf_storage, n) == msg);
};
condy::sync_wait(func());

close(sv[0]);
close(sv[1]);
}

TEST_CASE("test async_operations - test recv - provided buffer") {
int sv[2];
create_tcp_socketpair(sv);
Expand Down
Loading