From 4a70e95f41dace2dca95e2a6d51e2a2f7d16fbd8 Mon Sep 17 00:00:00 2001 From: wokron Date: Tue, 16 Jun 2026 12:34:39 +0800 Subject: [PATCH 1/2] send and recv support fixed buffer --- include/condy/async_operations.hpp | 35 +++++++++++++++++++++++ include/condy/detail/async_operations.hpp | 7 +++++ 2 files changed, 42 insertions(+) diff --git a/include/condy/async_operations.hpp b/include/condy/async_operations.hpp index 469115e4..3af3a832 100644 --- a/include/condy/async_operations.hpp +++ b/include/condy/async_operations.hpp @@ -471,6 +471,17 @@ inline auto async_send(Fd sockfd, const Buffer &buf, int flags) { return detail::maybe_flag_fixed_fd(std::move(op), sockfd); } +/** + * @brief See io_uring_prep_send (with registered buffer) + */ +template +inline auto async_send(Fd sockfd, detail::FixedBuffer buf, int flags) { + auto op = detail::make_op_awaiter(detail::prep_send_fixed, sockfd, + buf.value.data(), buf.value.size(), + flags, buf.buf_index); + return detail::maybe_flag_fixed_fd(std::move(op), sockfd); +} + /** * @brief See io_uring_prep_send */ @@ -505,6 +516,19 @@ inline auto async_sendto(Fd sockfd, const Buffer &buf, int flags, return detail::maybe_flag_fixed_fd(std::move(op), sockfd); } +/** + * @brief See io_uring_prep_send and io_uring_prep_send_set_addr + * (with registered buffer) + */ +template +inline auto async_sendto(Fd sockfd, detail::FixedBuffer buf, int flags, + const struct sockaddr *addr, socklen_t addrlen) { + auto op = detail::make_op_awaiter(detail::prep_sendto_fixed, sockfd, + buf.value.data(), buf.value.size(), + flags, addr, addrlen, buf.buf_index); + return detail::maybe_flag_fixed_fd(std::move(op), sockfd); +} + /** * @brief See io_uring_prep_send and io_uring_prep_send_set_addr */ @@ -592,6 +616,17 @@ inline auto async_recv(Fd sockfd, const Buffer &buf, int flags) { return detail::maybe_flag_fixed_fd(std::move(op), sockfd); } +/** + * @brief See io_uring_prep_recv (with registered buffer) + */ +template +inline auto async_recv(Fd sockfd, detail::FixedBuffer buf, int flags) { + auto op = detail::make_op_awaiter(detail::prep_recv_fixed, sockfd, + buf.value.data(), buf.value.size(), + flags, buf.buf_index); + return detail::maybe_flag_fixed_fd(std::move(op), sockfd); +} + /** * @brief See io_uring_prep_recv */ diff --git a/include/condy/detail/async_operations.hpp b/include/condy/detail/async_operations.hpp index d2764157..fc117cd3 100644 --- a/include/condy/detail/async_operations.hpp +++ b/include/condy/detail/async_operations.hpp @@ -166,6 +166,13 @@ inline void prep_send_fixed(io_uring_sqe *sqe, int sockfd, const void *buf, sqe->buf_index = buf_index; } +inline void prep_recv_fixed(io_uring_sqe *sqe, int sockfd, void *buf, + size_t len, int flags, int buf_index) noexcept { + io_uring_prep_recv(sqe, sockfd, buf, len, flags); + sqe->ioprio |= IORING_RECVSEND_FIXED_BUF; + sqe->buf_index = buf_index; +} + inline void prep_sendto_fixed(io_uring_sqe *sqe, int sockfd, const void *buf, size_t len, int flags, const struct sockaddr *addr, socklen_t addrlen, From 621bc067fb931fa94fce01953cae46ce565bfad9 Mon Sep 17 00:00:00 2001 From: wokron Date: Tue, 16 Jun 2026 12:34:50 +0800 Subject: [PATCH 2/2] add test --- tests/test_async_operations.3.cpp | 87 +++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/tests/test_async_operations.3.cpp b/tests/test_async_operations.3.cpp index f37326dd..56cad3e5 100644 --- a/tests/test_async_operations.3.cpp +++ b/tests/test_async_operations.3.cpp @@ -329,6 +329,35 @@ TEST_CASE("test async_operations - test send - zero copy fixed buffer") { close(sv[1]); } +TEST_CASE("test async_operations - test send - fixed buffer") { + int sv[2]; + create_tcp_socketpair(sv); + + auto msg = generate_data(1024); + auto func = [&]() -> condy::Coro { + auto &buffer_table = condy::current_runtime().buffer_table(); + buffer_table.init(1); + iovec register_iov{ + .iov_base = const_cast(msg.data()), + .iov_len = msg.size(), + }; + buffer_table.update(0, ®ister_iov, 1); + + ssize_t n = co_await condy::async_send( + sv[1], condy::fixed(0, condy::buffer(msg)), 0); + REQUIRE(n == msg.size()); + }; + condy::sync_wait(func()); + + char read_buf[2048]; + ssize_t r = recv(sv[0], read_buf, sizeof(read_buf), 0); + REQUIRE(r == msg.size()); + REQUIRE(std::string_view(read_buf, r) == msg); + + close(sv[0]); + close(sv[1]); +} + TEST_CASE("test async_operations - test sendto - basic") { int sender_fd = socket(AF_INET, SOCK_DGRAM, 0); REQUIRE(sender_fd >= 0); @@ -584,6 +613,35 @@ TEST_CASE("test async_operations - test sendto - zero copy fixed buffer") { close(receiver_fd); } +TEST_CASE("test async_operations - test sendto - fixed buffer") { + int udp_sv[2]; + REQUIRE(socketpair(AF_UNIX, SOCK_DGRAM, 0, udp_sv) == 0); + + auto msg = generate_data(1024); + auto func = [&]() -> condy::Coro { + auto &buffer_table = condy::current_runtime().buffer_table(); + buffer_table.init(1); + iovec register_iov{ + .iov_base = const_cast(msg.data()), + .iov_len = msg.size(), + }; + buffer_table.update(0, ®ister_iov, 1); + + ssize_t n = co_await condy::async_sendto( + udp_sv[0], condy::fixed(0, condy::buffer(msg)), 0, nullptr, 0); + REQUIRE(n == msg.size()); + }; + condy::sync_wait(func()); + + char read_buf[2048]; + ssize_t r = recv(udp_sv[1], read_buf, sizeof(read_buf), 0); + REQUIRE(r == msg.size()); + REQUIRE(std::string_view(read_buf, r) == msg); + + close(udp_sv[0]); + close(udp_sv[1]); +} + TEST_CASE("test async_operations - test recv - basic") { int sv[2]; create_tcp_socketpair(sv); @@ -630,6 +688,35 @@ TEST_CASE("test async_operations - test recv - fixed fd") { close(sv[1]); } +TEST_CASE("test async_operations - test recv - fixed buffer") { + int sv[2]; + create_tcp_socketpair(sv); + + auto msg = generate_data(1024); + ssize_t r = send(sv[1], msg.data(), msg.size(), 0); + REQUIRE(r == msg.size()); + + auto func = [&]() -> condy::Coro { + auto &buffer_table = condy::current_runtime().buffer_table(); + buffer_table.init(1); + char buf_storage[2048]; + iovec register_iov{ + .iov_base = buf_storage, + .iov_len = sizeof(buf_storage), + }; + buffer_table.update(0, ®ister_iov, 1); + + ssize_t n = co_await condy::async_recv( + sv[0], condy::fixed(0, condy::buffer(buf_storage, 2048)), 0); + REQUIRE(n == msg.size()); + REQUIRE(std::string_view(buf_storage, n) == msg); + }; + condy::sync_wait(func()); + + close(sv[0]); + close(sv[1]); +} + TEST_CASE("test async_operations - test recv - provided buffer") { int sv[2]; create_tcp_socketpair(sv);