diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index b1e22a2..c19c877 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -15,8 +15,8 @@ jobs: ld: MOLD ldflags: - name: Clang/LLVM - cc: clang-20 - cxx: clang++-20 + cc: clang-21 + cxx: clang++-21 cxxflags: -stdlib=libc++ ld: LLD ldflags: -lllvmlibc @@ -28,8 +28,8 @@ jobs: if: matrix.toolchain.name == 'Clang/LLVM' run: | sudo wget -O /etc/apt/trusted.gpg.d/apt.llvm.org.asc https://apt.llvm.org/llvm-snapshot.gpg.key - sudo add-apt-repository -y "deb http://apt.llvm.org/$(lsb_release -sc)/ llvm-toolchain-$(lsb_release -sc)-20 main" - sudo apt-get update && sudo apt-get install -y --no-install-recommends clang-20 lld-20 libllvmlibc-20-dev libc++-20-dev libc++abi-20-dev clang-tidy-20 + sudo add-apt-repository -y "deb http://apt.llvm.org/$(lsb_release -sc)/ llvm-toolchain-$(lsb_release -sc)-21 main" + sudo apt-get update && sudo apt-get install -y --no-install-recommends clang-21 lld-21 libllvmlibc-21-dev libc++-21-dev libc++abi-21-dev clang-tidy-21 - name: Check deps run: | cmake --version @@ -56,5 +56,5 @@ jobs: - name: Lint if: matrix.toolchain.name == 'Clang/LLVM' env: - CLANG_TIDY: clang-tidy-20 + CLANG_TIDY: clang-tidy-21 run: git clang-tidy diff --git a/.github/workflows/format.yml b/.github/workflows/format.yml index 7040814..84ed13f 100644 --- a/.github/workflows/format.yml +++ b/.github/workflows/format.yml @@ -19,7 +19,7 @@ jobs: - name: Fetch ClangFormat run: | sudo wget -O /etc/apt/trusted.gpg.d/apt.llvm.org.asc https://apt.llvm.org/llvm-snapshot.gpg.key - sudo add-apt-repository -y "deb http://apt.llvm.org/$(lsb_release -sc)/ llvm-toolchain-$(lsb_release -sc) main" + sudo add-apt-repository -y "deb http://apt.llvm.org/$(lsb_release -sc)/ llvm-toolchain-$(lsb_release -sc)-21 main" sudo apt-get update && sudo apt-get install -y --no-install-recommends clang-format - run: git config include.path ../.gitconfig - run: git clang-format diff --git a/CMakeLists.txt b/CMakeLists.txt index ff90f7d..443b10c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -27,9 +27,19 @@ target_sources ( src/io/addr.hxx src/io/addr.cxx src/io/endian.hxx + src/io/polled_fd.hxx + src/io/reactor.hxx + src/io/reactor.cxx + src/io/context.hxx + src/co/scheduler.hxx + src/co/coroutine.hxx + src/co/task.hxx + src/co/received.hxx src/fastipc.cxx + src/visitor.hxx src/channel.hxx ) + if (NOT DEFINED CMAKE_CXX_CLANG_TIDY OR "${CMAKE_CXX_COMPILER_ID}" STREQUAL "Clang") target_precompile_headers (fastipc PUBLIC include/fastipc.hxx) endif () @@ -37,6 +47,7 @@ endif () target_compile_features (fastipc INTERFACE cxx_std_17) target_compile_features (fastipc PRIVATE cxx_std_23) target_compile_options (fastipc PRIVATE ${FASTIPC_COMPILE_OPTIONS} -Wno-zero-length-array) +target_include_directories (fastipc PRIVATE src) add_library (tower OBJECT) target_include_directories (tower PUBLIC src) @@ -48,7 +59,7 @@ target_link_libraries (tower PRIVATE fastipc) add_executable (fastipcd) target_sources (fastipcd PRIVATE src/main.cxx) target_compile_options (fastipcd PRIVATE ${FASTIPC_COMPILE_OPTIONS}) -target_link_libraries (fastipcd PRIVATE tower) +target_link_libraries (fastipcd PRIVATE tower fastipc) enable_testing () add_subdirectory (test) diff --git a/src/co/coroutine.hxx b/src/co/coroutine.hxx new file mode 100644 index 0000000..203a75b --- /dev/null +++ b/src/co/coroutine.hxx @@ -0,0 +1,306 @@ +/* + * coroutine.hxx + * Copyright 2025 ItJustWorksTM + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include +#include "co/received.hxx" + +namespace fastipc::co { + +template +class Receiver { + public: + Receiver() = default; + + Receiver(Receiver&&) noexcept = default; + Receiver& operator=(Receiver&&) noexcept = default; + + Receiver(const Receiver&) noexcept = default; + Receiver& operator=(const Receiver&) noexcept = default; + + virtual ~Receiver() = default; + + virtual void set_value(T value) = 0; + virtual void set_exception(std::exception_ptr exc) = 0; +}; + +template <> +class Receiver { + public: + Receiver() = default; + + Receiver(Receiver&&) noexcept = default; + Receiver& operator=(Receiver&&) noexcept = default; + + Receiver(const Receiver&) noexcept = default; + Receiver& operator=(const Receiver&) noexcept = default; + + virtual ~Receiver() = default; + + virtual void set_value() = 0; + virtual void set_exception(std::exception_ptr exc) = 0; +}; + +template +struct AwaitedBy { + A value; +}; + +template +class PromiseState; + +template +class [[nodiscard]] Promise final { + public: + explicit Promise(std::coroutine_handle> handle) : m_handle{std::move(handle)} {} + + Promise(Promise&) noexcept = delete; + Promise& operator=(Promise&) noexcept = delete; + + Promise(Promise&& it) noexcept : m_handle{std::exchange(it.m_handle, {})} {} + Promise& operator=(Promise&& rhs) noexcept { + auto other = Promise{std::move(rhs)}; + + std::swap(m_handle, other.m_handle); + + return *this; + } + + ~Promise() noexcept { + if (m_handle) { + m_handle.destroy(); + } + } + + [[nodiscard]] std::coroutine_handle> handle() const { return m_handle; } + + private: + std::coroutine_handle> m_handle; +}; + +template +class PromiseState final { + public: + Promise get_return_object() { return Promise{std::coroutine_handle::from_promise(*this)}; } + + std::suspend_always initial_suspend() noexcept { return {}; } + + struct FinalSuspend final { + bool await_ready() noexcept { return false; } + void await_suspend(std::coroutine_handle h) noexcept { h.promise().complete(); } + void await_resume() noexcept {} + }; + + FinalSuspend final_suspend() noexcept { return {}; } + + void return_value(T value) { received.set_value(std::move(value)); } + void unhandled_exception() { received.set_exception(std::current_exception()); } + + template + AwaitedBy await_transform(A&& awaitable) { + return {std::forward(awaitable)}; + } + + Receiver* receiver{}; + + private: + void complete() { std::move(received).forward(*receiver); } + + Received received; +}; + +template <> +class PromiseState final { + public: + Promise get_return_object() { + return Promise{std::coroutine_handle::from_promise(*this)}; + } + + std::suspend_always initial_suspend() noexcept { return {}; } + + struct FinalSuspend final { + bool await_ready() noexcept { return false; } + void await_suspend(std::coroutine_handle h) noexcept { h.promise().complete(); } + void await_resume() noexcept {} + }; + + FinalSuspend final_suspend() noexcept { return {}; } + + void return_void() { received.set_value(); } + void unhandled_exception() { received.set_exception(std::current_exception()); } + + template + AwaitedBy await_transform(A&& awaitable) { + return {std::forward(awaitable)}; + } + + Receiver* receiver{}; + + private: + void complete() { std::move(received).forward(*receiver); } + + Received received; +}; + +template +class AwaiterReceiver final { + public: + AwaiterReceiver(Received& received, std::coroutine_handle

awaiter) + : m_received{&received}, m_awaiter{awaiter} {} + + void set_value(T value) { + m_received->set_value(std::move(value)); + + m_awaiter.resume(); + } + + void set_exception(std::exception_ptr ptr) { + m_received->set_exception(std::move(ptr)); + + m_awaiter.resume(); + } + + private: + Received* m_received; + std::coroutine_handle

m_awaiter; +}; + +template +class AwaiterReceiver final { + public: + AwaiterReceiver(Received& received, std::coroutine_handle

awaiter) + : m_received{&received}, m_awaiter{awaiter} {} + + void set_value() { + m_received->set_value(); + + m_awaiter.resume(); + } + + void set_exception(std::exception_ptr ptr) { + m_received->set_exception(std::move(ptr)); + + m_awaiter.resume(); + } + + private: + Received* m_received; + std::coroutine_handle

m_awaiter; +}; + +template +class SenderAwaiter final { + + public: + using sender_type = S; + using value_type = sender_type::value_type; + + explicit SenderAwaiter(sender_type sender) : state{std::move(sender)} {} + + bool await_ready() noexcept { return false; } + + std::coroutine_handle<> await_suspend(std::coroutine_handle

cont) { + auto& operation_state = state.template emplace( + std::get(std::move(state)).connect(AwaiterReceiver{this->received, cont})); + + operation_state.start(); + + return std::noop_coroutine(); + } + + [[nodiscard]] value_type await_resume() { return std::move(received).consume(); } + + private: + using operation_state_type = decltype(std::declval().connect(std::declval>())); + + std::variant state; + Received received; +}; + +// todo sender concept +template +SenderAwaiter operator co_await(AwaitedBy&& awaited_by) { + return SenderAwaiter{std::move(awaited_by).value}; +} + +template +class PromiseReceiver final : public Receiver { + + public: + explicit PromiseReceiver(R receiver) : Receiver{}, m_receiver{std::move(receiver)} {} + + void set_value(T value) override { m_receiver.set_value(std::move(value)); } + void set_exception(std::exception_ptr exc) override { m_receiver.set_exception(exc); } + + private: + R m_receiver; +}; + +template +class PromiseReceiver final : public Receiver { + + public: + explicit PromiseReceiver(R receiver) : Receiver{}, m_receiver{std::move(receiver)} {} + + void set_value() override { m_receiver.set_value(); } + void set_exception(std::exception_ptr exc) override { m_receiver.set_exception(exc); } + + private: + R m_receiver; +}; + +template +class [[nodiscard]] Co final { + + public: + using promise_type = PromiseState; + using value_type = T; + + explicit(false) Co(Promise promise) : m_promise{std::move(promise)} {} + + template + auto connect(R&& receiver) && { + class OperationState { + + public: + OperationState(Promise promise, R receiver) + : m_receiver{std::move(receiver)}, m_promise{std::move(promise)} {} + + void start() { + m_promise.handle().promise().receiver = &m_receiver; + m_promise.handle().resume(); + } + + private: + PromiseReceiver m_receiver; + Promise m_promise; + }; + + return OperationState{std::move(*this).m_promise, std::forward(receiver)}; + } + + private: + Promise m_promise; +}; + +} // namespace fastipc::co diff --git a/src/co/received.hxx b/src/co/received.hxx new file mode 100644 index 0000000..2785e6b --- /dev/null +++ b/src/co/received.hxx @@ -0,0 +1,95 @@ +/* + * scheduler.hxx + * Copyright 2025 ItJustWorksTM + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include "visitor.hxx" + +namespace fastipc::co { + +struct has_value_tag final {}; + +template +struct Received final { + + void set_value(T value) { m_value = std::move(value); } + void set_exception(std::exception_ptr exc) { m_value = std::move(exc); } + + [[nodiscard]] bool has_value() const { return !std::holds_alternative(m_value); } + + [[nodiscard]] T consume() && { + return match( + std::move(m_value), [](std::exception_ptr exc) -> T { std::rethrow_exception(std::move(exc)); }, + [](T&& value) { return std::move(value); }, + [](auto) -> T { + assert(false); + std::unreachable(); + }); + } + + template + void forward(R& receiver) && { + match( + std::move(m_value), [&receiver](std::exception_ptr exc) { receiver.set_exception(std::move(exc)); }, + [&receiver](T&& value) { receiver.set_value(std::move(value)); }, + [](std::monostate) { + assert(false); + std::unreachable(); + }); + } + + std::variant m_value; +}; + +template <> +struct Received final { + + void set_value() { m_value = has_value_tag{}; } + void set_exception(std::exception_ptr exc) { m_value = std::move(exc); } + + [[nodiscard]] bool has_value() const { return !std::holds_alternative(m_value); } + + void consume() && { + match( + std::move(m_value), [](std::exception_ptr exc) -> void { std::rethrow_exception(std::move(exc)); }, + [](has_value_tag) {}, + [](std::monostate) { + assert(false); + std::unreachable(); + }); + } + + template + void forward(R& receiver) && { + return match( + std::move(m_value), [&](std::exception_ptr exc) { receiver.set_exception(std::move(exc)); }, + [&](has_value_tag) { receiver.set_value(); }, + [](std::monostate) { + assert(false); + std::unreachable(); + }); + } + + std::variant m_value; +}; + +} // namespace fastipc::co diff --git a/src/co/scheduler.hxx b/src/co/scheduler.hxx new file mode 100644 index 0000000..e55d15e --- /dev/null +++ b/src/co/scheduler.hxx @@ -0,0 +1,69 @@ +/* + * scheduler.hxx + * Copyright 2025 ItJustWorksTM + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include "io/reactor.hxx" + +namespace fastipc::co { + +class Scheduler final { + public: + explicit Scheduler(io::Reactor* reactor = nullptr) : m_reactor(reactor) {} + + // optimize by making a custom callable interface + void schedule(std::function fn) { + auto lock = std::scoped_lock{m_child_lock}; + + m_queue.push(std::move(fn)); + + // make this smart.. + if (m_reactor != nullptr) { + expect(m_reactor->interrupt(), "failed to interrupt reactor"); + } + } + + [[nodiscard]] bool can_run() const noexcept { + auto lock = std::scoped_lock{m_child_lock}; + + return !m_queue.empty(); + } + + void run() noexcept { + auto lock = std::scoped_lock{m_child_lock}; + + const auto queued = m_queue.size(); + + for (std::size_t i = 0; i < queued; ++i) { + std::move(m_queue.front())(); + m_queue.pop(); + } + } + + private: + mutable std::recursive_mutex m_child_lock; + std::queue> m_queue; + + io::Reactor* m_reactor; +}; + +} // namespace fastipc::co diff --git a/src/co/task.hxx b/src/co/task.hxx new file mode 100644 index 0000000..734ebcb --- /dev/null +++ b/src/co/task.hxx @@ -0,0 +1,175 @@ +/* + * task.hxx + * Copyright 2025 ItJustWorksTM + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include +#include "co/received.hxx" + +namespace fastipc::co { + +struct Unit final {}; + +class Listener { + public: + Listener() = default; + + Listener(Listener&&) noexcept = default; + Listener& operator=(Listener&&) noexcept = default; + + Listener(const Listener&) noexcept = default; + Listener& operator=(const Listener&) noexcept = default; + + virtual ~Listener() = default; + + virtual void notify() = 0; +}; + +template +struct State { + State() = default; + State(const State&) = delete; + State& operator=(const State&) = delete; + State(State&&) = delete; + State& operator=(State&&) = delete; + + virtual ~State() = default; + + Received received{}; + Listener* listener{}; +}; + +template +struct JoinHandle final { + [[nodiscard]] bool completed() const noexcept { return state->received.has_value(); } + [[nodiscard]] T get() { return std::move(state->received).consume(); } + + using value_type = T; + + template + auto connect(R&& receiver) && { + struct OperationState final : public Listener { + R receiver; + std::shared_ptr> state; + + OperationState(R&& receiver, std::shared_ptr> state) + : receiver{std::move(receiver)}, state{std::move(state)} {} + + OperationState(OperationState&&) noexcept = default; + OperationState& operator=(OperationState&&) noexcept = default; + + OperationState(const OperationState&) noexcept = default; + OperationState& operator=(const OperationState&) noexcept = default; + + ~OperationState() override = default; + + void notify() override { + if (state->received.has_value()) { + state->listener = nullptr; + std::move(state->received).forward(receiver); + } + } + + void start() { + if (state->received.has_value()) { + std::move(state->received).forward(receiver); + } else { + state->listener = this; + } + } + }; + + return OperationState{std::forward(receiver), std::move(state)}; + } + + std::shared_ptr> state; +}; + +template +struct StateReceiver final { + std::shared_ptr> state; + + void set_value(T value) { + state->received.set_value(std::move(value)); + + if (state->listener) { + state->listener->notify(); + } + } + + void set_exception(std::exception_ptr exc) { + state->received.set_exception(std::move(exc)); + + if (state->listener) { + state->listener->notify(); + } + } +}; + +template <> +struct StateReceiver final { + std::shared_ptr> state; + + void set_value() { + state->received.set_value(); + + if (state->listener) { + state->listener->notify(); + } + } + + void set_exception(std::exception_ptr exc) { + state->received.set_exception(std::move(exc)); + + if (state->listener) { + state->listener->notify(); + } + } +}; + +template +struct StateImpl final : State { + explicit StateImpl() : State{} {} + + StateImpl(const StateImpl&) = delete; + StateImpl& operator=(const StateImpl&) = delete; + StateImpl(StateImpl&&) = delete; + StateImpl& operator=(StateImpl&&) = delete; + + ~StateImpl() override = default; + + using operation_state_type = + decltype(std::declval().connect(std::declval>())); + + std::optional operation_state{}; +}; + +template +JoinHandle spawn(S&& sender) { + auto state = std::make_shared>(); + state->operation_state.emplace(std::forward(sender).connect(StateReceiver{state})) + .start(); + + return JoinHandle{std::move(state)}; +} + +} // namespace fastipc::co diff --git a/src/fastipc.cxx b/src/fastipc.cxx index 07ebf2f..59d145f 100644 --- a/src/fastipc.cxx +++ b/src/fastipc.cxx @@ -37,7 +37,6 @@ #include #include #include -#include #include "io/cursor.hxx" #include "io/fd.hxx" @@ -81,7 +80,7 @@ void writeClientRequest(std::span& buf, const ClientRequest& request) writeClientRequest(sndbuf, request); const auto bytes_written = - expect(io::sysVal(::write(sockfd.fd(), buf.data(), buf.size() - sndbuf.size())), "failed to write to tower"); + expect(io::write(sockfd, std::span{buf}.first(buf.size() - sndbuf.size())), "failed to write to tower"); static_cast(bytes_written); // seq packet std::size_t total_size{0U}; diff --git a/src/io/context.hxx b/src/io/context.hxx new file mode 100644 index 0000000..84ee00c --- /dev/null +++ b/src/io/context.hxx @@ -0,0 +1,133 @@ +/* + * context.hxx + * Copyright 2025 ItJustWorksTM + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include "co/received.hxx" +#include "co/scheduler.hxx" +#include "io/result.hxx" +#include "reactor.hxx" + +namespace fastipc::io { + +class Runtime final { + explicit Runtime(Reactor reactor) + : m_reactor{std::make_unique(std::move(reactor))}, + m_scheduler{std::make_unique(m_reactor.get())} {} + + public: + static expected create() noexcept { + auto reactor_res = Reactor::create(); + + if (!reactor_res.has_value()) { + return unexpected{std::move(reactor_res).error()}; + } + + return expected{Runtime{std::move(reactor_res).value()}}; + } + + static Runtime& singleton() noexcept { return *active_singleton(); } + + template + auto block_on(F func) noexcept { + active_singleton() = this; + + using S = std::remove_cvref_t; + + co::Received received{}; + + struct Receiver { + co::Received* received; + Reactor* reactor; + + void set_value(typename S::value_type value) { + received->set_value(std::move(value)); + expect(reactor->interrupt()); + } + void set_exception(std::exception_ptr exc) { + received->set_exception(std::move(exc)); + expect(reactor->interrupt()); + } + }; + + auto op = func().connect(Receiver{&received, m_reactor.get()}); + op.start(); + + for (;;) { + while (m_scheduler->can_run()) { + m_scheduler->run(); + expect(m_reactor->react(std::chrono::milliseconds{0}), "failed to react to io events"); + + if (received.has_value()) { + return std::move(received).consume(); + } + } + + if (received.has_value()) { + return std::move(received).consume(); + } + + expect(m_reactor->react({}), "failed to react to io events"); + } + } + + Reactor& reactor() noexcept { return *m_reactor; } + co::Scheduler& scheduler() noexcept { return *m_scheduler; } + + private: + static Runtime*& active_singleton() { + static Runtime* runtime{}; + + return runtime; + } + + std::unique_ptr m_reactor; + std::unique_ptr m_scheduler; +}; + +class YieldSender final { + public: + using value_type = void; + + explicit YieldSender(co::Scheduler& scheduler) : m_scheduler{&scheduler} {} + + template + auto connect(R&& receiver) { + struct OperationState final { + R receiver; + co::Scheduler* scheduler; + + void start() { + scheduler->schedule([this]() { receiver.set_value(); }); + } + }; + + return OperationState{std::forward(receiver), m_scheduler}; + } + + private: + co::Scheduler* m_scheduler; +}; + +inline YieldSender yield() { return YieldSender{Runtime::singleton().scheduler()}; } + +} // namespace fastipc::io diff --git a/src/io/fd.hxx b/src/io/fd.hxx index 4cbea53..f376511 100644 --- a/src/io/fd.hxx +++ b/src/io/fd.hxx @@ -18,16 +18,24 @@ #pragma once -#include +#include #include #include - +#include +#include #include +#include +#include #include "result.hxx" namespace fastipc::io { +template +concept AsFd = requires(const T& t) { + { t.fd() } -> std::same_as; +}; + class Fd final { public: constexpr explicit Fd() noexcept = default; @@ -57,8 +65,39 @@ class Fd final { int m_fd{-1}; }; -[[nodiscard]] constexpr io::expected adoptSysFd(int fd) noexcept { +[[nodiscard]] constexpr expected adoptSysFd(int fd) noexcept { return sysVal(fd).transform([](int fd) { return Fd{fd}; }); } +[[nodiscard]] inline expected write(const AsFd auto& fd, std::span buf) noexcept { + return sysVal(::write(fd.fd(), buf.data(), buf.size())).transform([](int written) { + return static_cast(written); + }); +} + +[[nodiscard]] inline expected read(const AsFd auto& fd, std::span buf) noexcept { + return sysVal(::read(fd.fd(), buf.data(), buf.size())).transform([](int read) { + return static_cast(read); + }); +} + +inline expected setBlocking(const AsFd auto& fd, bool blocking) noexcept { + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-vararg) + return sysVal(::fcntl(fd.fd(), F_GETFL, 0)).and_then([&](auto flags) { + // NOLINTNEXTLINE(cppcoreguidelines-pro-type-vararg) + return sysCheck(::fcntl(fd.fd(), F_SETFL, blocking ? flags & ~O_NONBLOCK : flags | O_NONBLOCK)); + }); +} + +[[nodiscard]] inline expected> makePipe() { + std::array raw_fds{}; + + return sysCheck(::pipe2(raw_fds.data(), SOCK_CLOEXEC)).transform([&]() { + auto read_fd = io::Fd{raw_fds[0]}; + auto write_fd = io::Fd{raw_fds[1]}; + + return std::pair{std::move(read_fd), std::move(write_fd)}; + }); +} + } // namespace fastipc::io diff --git a/src/io/polled_fd.hxx b/src/io/polled_fd.hxx new file mode 100644 index 0000000..1ef0d12 --- /dev/null +++ b/src/io/polled_fd.hxx @@ -0,0 +1,228 @@ +/* + * polled_fd.hxx + * Copyright 2025 ItJustWorksTM + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include +#include "co/coroutine.hxx" +#include "io/context.hxx" +#include "fd.hxx" +#include "reactor.hxx" +#include "result.hxx" + +namespace fastipc::io { + +class StoppedException final : public std::exception { + public: + [[nodiscard]] const char* what() const noexcept override { return "operation stopped"; } +}; + +constexpr bool isErrorBlocking(std::error_code error) noexcept { + return error == std::errc::operation_would_block || error == std::errc::resource_unavailable_try_again; +} + +class PolledFd final { + public: + PolledFd(const PolledFd&) noexcept = delete; + PolledFd& operator=(const PolledFd&) noexcept = delete; + + PolledFd(PolledFd&& it) noexcept = default; + PolledFd& operator=(PolledFd&& rhs) noexcept = default; + + ~PolledFd() noexcept { + if (m_fd.fd() != -1) { + const auto res = expected(m_reactor->unregister(m_registration)); + + static_cast(res); + } + } + + static expected create(Fd fd) noexcept { return create(std::move(fd), Runtime::singleton().reactor()); } + + static expected create(Fd fd, Reactor& reactor) noexcept { + return setBlocking(fd, false) + .and_then([&]() { return reactor.registerFd(fd); }) + .transform([&](auto* registration) { return PolledFd{std::move(fd), registration, reactor}; }); + } + + [[nodiscard]] constexpr const int& fd() const noexcept { return m_fd.fd(); } + [[nodiscard]] Reactor& reactor() const noexcept { return *m_reactor; } + + private: + template + friend class TryIoSender; + + PolledFd(Fd fd, Reactor::Registration* registration, Reactor& reactor) noexcept + : m_fd{std::move(fd)}, m_registration{registration}, m_reactor{&reactor} {} + + Fd m_fd; + Reactor::Registration* m_registration; + Reactor* m_reactor; +}; + +template +class TryIoSender final { + + public: + using result_type = std::invoke_result_t; + + using value_type = result_type; + + explicit TryIoSender(const io::PolledFd& fd, io::Direction direction, std::stop_token stop_token, F io) + : m_fd{&fd}, m_direction{direction}, m_stop_token{std::move(stop_token)}, m_io{std::move(io)} {} + + template + class OperationState { + public: + OperationState(R&& receiver, const io::PolledFd& fd, io::Direction direction, std::stop_token stop_token, F io) + : m_fd{&fd}, m_direction{direction}, m_stop_token{std::move(stop_token)}, m_io{std::move(io)}, + m_receiver{std::move(receiver)} {} + + OperationState(const OperationState&) noexcept = delete; + OperationState& operator=(const OperationState&) noexcept = delete; + + OperationState(OperationState&& it) noexcept = default; + OperationState& operator=(OperationState&& rhs) noexcept = default; + + ~OperationState() = default; + + void start() { + if (m_stop_token.stop_requested()) { + set_stopped(); + return; + } + + m_stop_fn = std::make_unique>(m_stop_token, StopFn{this}); + poll(); + } + + private: + void poll() { + if (m_state != State::Blocked) { + return; + } + + if (m_stop_token.stop_requested()) { + set_stopped(); + return; + } + + for (;;) { + auto res = m_io(); + + if (res.has_value()) { + set_value(std::move(res)); + return; + } + + if (res.error() == std::errc::interrupted) { + continue; + } + + if (isErrorBlocking(res.error())) { + m_state = State::Blocked; + m_fd->m_registration->callback(m_direction, [this]() { poll(); }); + return; + } + + set_value(std::move(res)); + return; + } + } + + void set_value(result_type value) { + m_stop_fn.reset(); + + m_state = State::Done; + m_receiver.set_value(std::move(value)); + } + + void set_stopped() { + m_stop_fn.reset(); + + m_state = State::Stopped; + m_receiver.set_exception(std::make_exception_ptr(StoppedException{})); + } + + const io::PolledFd* m_fd; + io::Direction m_direction; + std::stop_token m_stop_token; + F m_io; + + R m_receiver; + + enum class State : std::uint8_t { + Blocked, + Done, + Stopped, + }; + + State m_state = State::Blocked; + + struct StopFn final { + OperationState* self; + + void operator()() noexcept { + self->m_fd->m_registration->callback(self->m_direction, {}); + self->poll(); + } + }; + + // cb is not moveable.. + std::unique_ptr> m_stop_fn{}; + }; + + template + OperationState connect(R&& receiver) && { + return OperationState{std::forward(receiver), *m_fd, m_direction, std::move(m_stop_token), std::move(m_io)}; + } + + private: + const io::PolledFd* m_fd; + io::Direction m_direction; + std::stop_token m_stop_token; + F m_io; +}; + +inline co::Co> accept(PolledFd& fd, std::stop_token stop_token = {}) { + auto accepted_fd_res = co_await io::TryIoSender{fd, io::Direction::Read, std::move(stop_token), + [&]() { return adoptSysFd(::accept(fd.fd(), nullptr, nullptr)); }}; + + if (!accepted_fd_res.has_value()) { + co_return unexpected{accepted_fd_res.error()}; + } + + co_return PolledFd::create(std::move(accepted_fd_res).value(), fd.reactor()); +} + +[[nodiscard]] inline co::Co> aread(PolledFd& fd, std::span buf, + std::stop_token stop_token = {}) { + co_return co_await io::TryIoSender{fd, io::Direction::Read, std::move(stop_token), [&]() { return read(fd, buf); }}; +} + +[[nodiscard]] inline co::Co> asendmsg(PolledFd& fd, ::msghdr& buf, int flags, + std::stop_token stop_token = {}) { + co_return co_await io::TryIoSender{fd, io::Direction::Read, std::move(stop_token), + [&]() { return sysVal(::sendmsg(fd.fd(), &buf, flags)); }}; +} + +} // namespace fastipc::io diff --git a/src/io/reactor.cxx b/src/io/reactor.cxx new file mode 100644 index 0000000..f98473e --- /dev/null +++ b/src/io/reactor.cxx @@ -0,0 +1,144 @@ +/* + * reactor.cxx + * Copyright 2025 ItJustWorksTM + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "fd.hxx" +#include "reactor.hxx" +#include "result.hxx" + +namespace fastipc::io { + +Reactor::Reactor(Fd event_fd, Fd epoll_fd, std::size_t max_events) + : m_event_fd_{std::move(event_fd)}, m_epoll_fd_{std::move(epoll_fd)} { + m_events_buf_.resize(max_events); +} + +expected Reactor::create(std::size_t max_events) noexcept { + return adoptSysFd(::eventfd(0, EFD_CLOEXEC)) + .and_then([](Fd event_fd) { + return adoptSysFd(::epoll_create1(0)) + .and_then([&](Fd epoll_fd) { + ::epoll_event event{.events = EPOLLIN | EPOLLET, .data{.u64 = kEventFdData}}; + + return sysCheck(::epoll_ctl(epoll_fd.fd(), EPOLL_CTL_ADD, event_fd.fd(), &event)).transform([&]() { + return std::move(epoll_fd); + }); + }) + .transform([&](Fd epoll_fd) { return std::pair{std::move(event_fd), std::move(epoll_fd)}; }); + }) + .transform([max_events](std::pair fds) { + return Reactor{std::move(fds.first), std::move(fds.second), max_events}; + }); +} + +expected Reactor::react(std::optional timeout) noexcept { + return wait(timeout).transform([this](auto events) { process(events); }); +} + +expected> Reactor::wait(std::optional timeout) noexcept { + if (m_registered.empty()) { + return {}; + } + + const auto timeout_ms = timeout.transform([](auto ms) { return static_cast(ms.count()); }).value_or(-1); + + const auto wait_res = sysVal( + ::epoll_wait(m_epoll_fd_.fd(), m_events_buf_.data(), static_cast(m_events_buf_.size()), timeout_ms)); + + if (!wait_res.has_value()) { + if (wait_res.error() == std::errc::interrupted) { + return {}; + } + } + + return wait_res.transform([this](int n) { return std::span{m_events_buf_}.first(static_cast(n)); }); +} + +void Reactor::process(std::span<::epoll_event> events) noexcept { + for (const auto& event : events) { + if (event.data.u64 == kEventFdData) { + std::uint64_t value{}; + + const auto res = + read(m_event_fd_, std::span{reinterpret_cast(&value), sizeof(value)}); + + static_cast(res); + + continue; + } + + auto& registered_io = *reinterpret_cast(event.data.ptr); + + const auto readable = event.events & (EPOLLIN | EPOLLRDHUP | EPOLLHUP | EPOLLERR); + if (readable && registered_io.read_cb) { + auto cb = std::move(registered_io.read_cb); + + cb(); + } + + const auto writable = event.events & (EPOLLOUT | EPOLLHUP | EPOLLERR); + if (writable && registered_io.write_cb) { + auto cb = std::move(registered_io.write_cb); + + cb(); + } + } +} + +expected Reactor::interrupt() noexcept { + std::uint64_t value{1}; + + return write(m_event_fd_, std::span{reinterpret_cast(&value), sizeof(value)}) + .transform([](std::size_t) {}); +} + +expected Reactor::registerFd(const Fd& fd) noexcept { + auto* registered_io = + &m_registered.emplace(fd.fd(), Registration{.fd = fd.fd(), .read_cb = {}, .write_cb = {}}).first->second; + + // TODO: make this configurable + const auto interests = EPOLLIN | EPOLLOUT; + + ::epoll_event event{.events = interests | EPOLLRDHUP | EPOLLET, .data = {.ptr = registered_io}}; + + return sysCheck(::epoll_ctl(m_epoll_fd_.fd(), EPOLL_CTL_ADD, fd.fd(), &event)).transform([&]() { + return registered_io; + }); +} + +expected Reactor::unregister(Registration* registration) noexcept { + // TODO: suboptimal + const auto res = sysCheck(::epoll_ctl(m_epoll_fd_.fd(), EPOLL_CTL_DEL, registration->fd, nullptr)); + + const auto it = m_registered.find(registration->fd); + assert(it != m_registered.end()); + m_registered.erase(it); + + return res; +} + +} // namespace fastipc::io diff --git a/src/io/reactor.hxx b/src/io/reactor.hxx new file mode 100644 index 0000000..4ffdfb8 --- /dev/null +++ b/src/io/reactor.hxx @@ -0,0 +1,80 @@ +/* + * reactor.hxx + * Copyright 2025 ItJustWorksTM + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include +#include "fd.hxx" +#include "result.hxx" + +#include +#include +#include +#include +#include +#include +#include + +namespace fastipc::io { + +enum class Direction : std::uint8_t { + Read, + Write, +}; + +class Reactor final { + public: + struct Registration { + int fd; + std::function read_cb; + std::function write_cb; + + void callback(io::Direction direction, std::function cb) noexcept { + auto& rw_cb = direction == io::Direction::Read ? read_cb : write_cb; + rw_cb = std::move(cb); + } + }; + + static constexpr auto kDefaultMaxEvents = 512uz; + + static expected create(std::size_t max_events = kDefaultMaxEvents) noexcept; + expected react(std::optional timeout) noexcept; + + expected interrupt() noexcept; + + expected registerFd(const Fd& fd) noexcept; + + expected unregister(Registration* registration) noexcept; + + private: + explicit Reactor(Fd event_fd, Fd epoll_fd, std::size_t max_events); + + expected> wait(std::optional timeout) noexcept; + void process(std::span<::epoll_event> events) noexcept; + + static constexpr std::uint64_t kEventFdData = 1; + Fd m_event_fd_; + + Fd m_epoll_fd_; + std::vector<::epoll_event> m_events_buf_; + + std::unordered_map m_registered; +}; + +} // namespace fastipc::io diff --git a/src/io/result.hxx b/src/io/result.hxx index 2964cef..77bed9f 100644 --- a/src/io/result.hxx +++ b/src/io/result.hxx @@ -20,7 +20,7 @@ #include #include -#include +#include #include namespace fastipc { @@ -68,7 +68,7 @@ template return std::move(expected.value()); } - std::cerr << message << ": " << expected.error().message() << "\n" << std::flush; + std::println(stderr, "{}: {}", message, expected.error().message()); std::abort(); } @@ -77,7 +77,17 @@ inline void expect(std::expected expected, std::string_vi return; } - std::cerr << message << ": " << expected.error().message() << "\n" << std::flush; + std::println(stderr, "{}: {}", message, expected.error().message()); + std::abort(); +} + +template +[[nodiscard]] T expect(std::optional expected, std::string_view message = "unexpected") noexcept { + if (expected.has_value()) { + return std::move(expected.value()); + } + + std::println(stderr, "{}", message); std::abort(); } diff --git a/src/main.cxx b/src/main.cxx index e4070e0..80b1c4a 100644 --- a/src/main.cxx +++ b/src/main.cxx @@ -16,9 +16,28 @@ * */ +#include +#include "co/coroutine.hxx" +#include "io/context.hxx" +#include "io/result.hxx" #include "tower.hxx" +namespace fastipc { +namespace { + +co::Co main() { + auto tower = co_await fastipc::Tower::create("fastipcd"); + const std::stop_source stop_source{}; + static_cast(co_await tower.run(stop_source.get_token())); + + co_return 0; +} + +} // namespace +} // namespace fastipc + int main() { - auto tower = fastipc::Tower::create("fastipcd"); - tower.run(); + auto runtime = fastipc::expect(fastipc::io::Runtime::create()); + + return runtime.block_on(fastipc::main); } diff --git a/src/tower.cxx b/src/tower.cxx index b12c262..09e0e74 100644 --- a/src/tower.cxx +++ b/src/tower.cxx @@ -21,12 +21,12 @@ #include #include #include -#include #include #include #include #include #include +#include #include #include #include @@ -39,9 +39,13 @@ #include #include #include +#include "co/task.hxx" +#include +#include "co/coroutine.hxx" #include "io/cursor.hxx" #include "io/fd.hxx" +#include "io/polled_fd.hxx" #include "io/result.hxx" #include "channel.hxx" #include "local_proto.hxx" @@ -49,22 +53,43 @@ namespace fastipc { namespace { -[[nodiscard]] ClientRequest readClientRequest(std::span& buf) noexcept { +[[nodiscard]] io::expected> readClientRequest(std::span& obuf) { + constexpr static auto kMinSize = 10u; + + auto buf = obuf; + + if (kMinSize > buf.size()) { + return {}; + } + const auto requester_type = io::getBuf>(buf); + + if (requester_type >= 2) { + return io::unexpected{std::make_error_code(std::errc::protocol_error)}; + } + const auto max_payload_size = io::getBuf(buf); - const auto topic_name_buf = io::takeBuf(buf, io::getBuf(buf)); + const auto topic_name_size = io::getBuf(buf); + + if (topic_name_size > buf.size()) { + return {}; + } - assert(requester_type < 2); + const auto topic_name_buf = io::takeBuf(buf, topic_name_size); - return { + obuf = buf; + + return ClientRequest{ .type = static_cast(requester_type), .max_payload_size = max_payload_size, .topic_name = {reinterpret_cast(topic_name_buf.data()), topic_name_buf.size()}, }; } + } // namespace -[[nodiscard]] Tower Tower::create(std::string_view path) { +[[nodiscard]] co::Co Tower::create(std::string_view path) { + auto sockfd = expect(io::adoptSysFd(::socket(AF_UNIX, SOCK_SEQPACKET | SOCK_CLOEXEC, 0)), "failed to create tower socket"); @@ -77,6 +102,7 @@ namespace { auto unlink_res = io::sysCheck(::unlink(addr.sun_path)); static_cast(unlink_res); + // TODO: This technically has to be async as well // NOLINTNEXTLINE(cppcoreguidelines-pro-type-reinterpret-cast) expect(io::sysCheck(::bind(sockfd.fd(), reinterpret_cast(&addr), sizeof(addr))), "failed to bind tower socket"); @@ -84,34 +110,46 @@ namespace { constexpr int kListenQueueSize{128}; expect(io::sysCheck(::listen(sockfd.fd(), kListenQueueSize)), "failed to listen to tower socket"); - return Tower{std::move(sockfd)}; + co_return Tower{expect(io::PolledFd::create(std::move(sockfd)), "failed to created polled fd")}; } -void Tower::run() { +co::Co Tower::run(std::stop_token stop_token) { + // NOLINTNEXTLINE(altera-unroll-loops) Service loops should not be unrolled - for (;;) { - auto expected_clientfd = io::adoptSysFd(::accept(m_sockfd.fd(), nullptr, nullptr)); - if (!expected_clientfd.has_value()) { - if (expected_clientfd.error() == std::errc::invalid_argument) - break; - if (expected_clientfd.error() == std::errc::connection_aborted) - continue; - } + for (; !stop_token.stop_requested();) { + try { + auto expected_clientfd = co_await accept(m_sockfd, stop_token); + + if (!expected_clientfd.has_value()) { + if (expected_clientfd.error() == std::errc::bad_file_descriptor) + break; + if (expected_clientfd.error() == std::errc::connection_aborted) + continue; + } + + auto clientfd = expect(std::move(expected_clientfd), "failed to accept incoming connection"); - auto clientfd = expect(std::move(expected_clientfd), "failed to accept incoming connection"); - serve(std::move(clientfd)); + static_cast(co::spawn(serve(std::move(clientfd), stop_token))); + + } catch (const fastipc::io::StoppedException&) { + break; + } } + + co_return; } -void Tower::shutdown() { expect(io::sysCheck(::shutdown(m_sockfd.fd(), SHUT_RD)), "Failed to shutdown tower socket"); } +void Tower::shutdown() { + // expect(io::sysCheck(::shutdown(m_sockfd.fd(), SHUT_RD)), "Failed to shutdown tower socket"); +} -void Tower::serve(io::Fd clientfd) { +co::Co Tower::serve(io::PolledFd clientfd, std::stop_token stop_token) { std::array buf{}; // NOLINT(*-magic-numbers) const auto bytes_read = - expect(io::sysVal(::read(clientfd.fd(), buf.data(), buf.size())), "failed to read from client"); + expect(co_await io::aread(clientfd, std::span{buf}, stop_token), "failed to read from client"); - auto recvbuf = std::span{buf.data(), static_cast(bytes_read)}; - const auto request = readClientRequest(recvbuf); + auto recvbuf = std::span{buf}.first(bytes_read); + const auto request = expect(expect(readClientRequest(recvbuf), "invalid request"), "incomplete message"); std::println("{} request for topic '{}' with max payload size of {} bytes.", (request.type == RequesterType::Reader ? "reader" : "writer"), request.topic_name, @@ -129,6 +167,7 @@ void Tower::serve(io::Fd clientfd) { // NOLINTNEXTLINE(*-narrowing-conversions) expect(io::sysCheck(::ftruncate(channel.memfd.fd(), channel.total_size)), "failed to truncate channel memory"); + // NOLINTNEXTLINE(misc-const-correctness) void* ptr = expect( io::sysVal(::mmap(nullptr, channel.total_size, PROT_READ | PROT_WRITE, MAP_SHARED, channel.memfd.fd(), 0)), "failed to mmap channel memory"); @@ -163,7 +202,10 @@ void Tower::serve(io::Fd clientfd) { std::memcpy(CMSG_DATA(cmsg), &channel.memfd.fd(), sizeof(channel.memfd)); msg.msg_controllen = cmsg->cmsg_len; - static_cast(expect(io::sysVal(::sendmsg(clientfd.fd(), &msg, 0)), "failed to send reply to client")); + auto const send_n = expect(co_await io::asendmsg(clientfd, msg, 0), "failed to send reply to client"); + static_cast(send_n); + + co_return; } } // namespace fastipc diff --git a/src/tower.hxx b/src/tower.hxx index bbc485b..30acefc 100644 --- a/src/tower.hxx +++ b/src/tower.hxx @@ -23,16 +23,16 @@ #include #include "io/fd.hxx" +#include "io/polled_fd.hxx" #include "channel.hxx" namespace fastipc { class Tower final { public: - [[nodiscard]] static Tower create(std::string_view path); - - void run(); + [[nodiscard]] static co::Co create(std::string_view path); + co::Co run(std::stop_token stop_token); void shutdown(); private: @@ -43,11 +43,11 @@ class Tower final { impl::ChannelPage* page{nullptr}; }; - explicit Tower(io::Fd sockfd) noexcept : m_sockfd{std::move(sockfd)} {} + explicit Tower(io::PolledFd sockfd) noexcept : m_sockfd{std::move(sockfd)} {} - void serve(io::Fd clientfd); + co::Co serve(io::PolledFd clientfd, std::stop_token stop_token); - io::Fd m_sockfd; + io::PolledFd m_sockfd; std::unordered_map m_channels; }; diff --git a/src/visitor.hxx b/src/visitor.hxx new file mode 100644 index 0000000..4ef8b81 --- /dev/null +++ b/src/visitor.hxx @@ -0,0 +1,36 @@ +/* + * visitor.hxx + * Copyright 2025 ItJustWorksTM + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +#pragma once + +#include +#include + +namespace fastipc { + +template +struct Visitor final : Ts... { + using Ts::operator()...; +}; + +template +decltype(auto) match(V&& variant, Ts&&... arms) { + return std::visit(Visitor{std::forward(arms)...}, std::forward(variant)); +} + +} // namespace fastipc diff --git a/test/intraprocess.cxx b/test/intraprocess.cxx index a8aa09b..1cdab05 100644 --- a/test/intraprocess.cxx +++ b/test/intraprocess.cxx @@ -18,41 +18,75 @@ #include #include +#include #include #include "fastipc.hxx" #include "tower.hxx" -int main() { +#include +#include +#include "co/coroutine.hxx" +#include "co/task.hxx" +#include "io/context.hxx" +#include "io/result.hxx" + +namespace { + +fastipc::co::Co co_main() { + auto tower = co_await fastipc::Tower::create("fastipcd"); + + std::stop_source stop_source{}; + auto handle = fastipc::co::spawn(tower.run(stop_source.get_token())); + + auto test = std::jthread{[&] { + std::println("starting test in thead"); + constexpr std::string_view channel_name{"Hallowed are the Ori"}; + constexpr std::size_t max_payload_size{sizeof(int)}; + + fastipc::Writer writer{channel_name, max_payload_size}; + fastipc::Reader reader{channel_name, max_payload_size}; - auto tower = fastipc::Tower::create("fastipcd"); - const std::jthread tower_thread{[&] { tower.run(); }}; + { + std::println("reading sample"); + auto sample = reader.acquire(); + assert(sample.getSequenceId() == 0); + reader.release(sample); + } - constexpr std::string_view channel_name{"Hallowed are the Ori"}; - constexpr std::size_t max_payload_size{sizeof(int)}; + { + std::println("writing sample"); + auto sample = writer.prepare(); + assert(sample.getSequenceId() == 1); + *static_cast(sample.getPayload()) = 5; // NOLINT(*-magic-numbers) + writer.submit(sample); + } - fastipc::Writer writer{channel_name, max_payload_size}; - fastipc::Reader reader{channel_name, max_payload_size}; + { + std::println("reading sample"); + auto sample = reader.acquire(); + assert(sample.getSequenceId() == 1); + assert(*static_cast(sample.getPayload()) == 5); + reader.release(sample); + } - { - auto sample = reader.acquire(); - assert(sample.getSequenceId() == 0); - reader.release(sample); - } + // tower.shutdown(); + std::println("test done. stopping handle"); - { - auto sample = writer.prepare(); - assert(sample.getSequenceId() == 1); - *static_cast(sample.getPayload()) = 5; // NOLINT(*-magic-numbers) - writer.submit(sample); - } + fastipc::io::Runtime::singleton().scheduler().schedule([&]() { stop_source.request_stop(); }); + }}; - { - auto sample = reader.acquire(); - assert(sample.getSequenceId() == 1); - assert(*static_cast(sample.getPayload()) == 5); - reader.release(sample); - } + static_cast(co_await std::move(handle)); + + std::println("run done!"); + + co_return 0; +} + +} // namespace + +int main() { + auto runtime = fastipc::expect(fastipc::io::Runtime::create()); - tower.shutdown(); + return runtime.block_on(co_main); }