LCOV - code coverage report
Current view: top level - src/detail/epoll - op.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 84.8 % 112 95
Test Date: 2026-02-06 05:04:16 Functions: 81.0 % 21 17

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Steve Gerbino
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #ifndef BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
      11              : #define BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
      12              : 
      13              : #include <boost/corosio/detail/platform.hpp>
      14              : 
      15              : #if BOOST_COROSIO_HAS_EPOLL
      16              : 
      17              : #include <boost/corosio/detail/config.hpp>
      18              : #include <boost/corosio/io_object.hpp>
      19              : #include <boost/corosio/endpoint.hpp>
      20              : #include <boost/capy/ex/executor_ref.hpp>
      21              : #include <boost/capy/coro.hpp>
      22              : #include <boost/capy/error.hpp>
      23              : #include <system_error>
      24              : 
      25              : #include "src/detail/make_err.hpp"
      26              : #include "src/detail/resume_coro.hpp"
      27              : #include "src/detail/scheduler_op.hpp"
      28              : #include "src/detail/endpoint_convert.hpp"
      29              : 
      30              : #include <unistd.h>
      31              : #include <errno.h>
      32              : 
      33              : #include <atomic>
      34              : #include <cstddef>
      35              : #include <memory>
      36              : #include <mutex>
      37              : #include <optional>
      38              : #include <stop_token>
      39              : 
      40              : #include <netinet/in.h>
      41              : #include <sys/socket.h>
      42              : #include <sys/uio.h>
      43              : 
      44              : /*
      45              :     epoll Operation State
      46              :     =====================
      47              : 
      48              :     Each async I/O operation has a corresponding epoll_op-derived struct that
      49              :     holds the operation's state while it's in flight. The socket impl owns
      50              :     fixed slots for each operation type (conn_, rd_, wr_), so only one
      51              :     operation of each type can be pending per socket at a time.
      52              : 
      53              :     Persistent Registration
      54              :     -----------------------
      55              :     File descriptors are registered with epoll once (via descriptor_state) and
      56              :     stay registered until closed. The descriptor_state tracks which operations
      57              :     are pending (read_op, write_op, connect_op). When an event arrives, the
      58              :     reactor dispatches to the appropriate pending operation.
      59              : 
      60              :     Impl Lifetime Management
      61              :     ------------------------
      62              :     When cancel() posts an op to the scheduler's ready queue, the socket impl
      63              :     might be destroyed before the scheduler processes the op. The `impl_ptr`
      64              :     member holds a shared_ptr to the impl, keeping it alive until the op
      65              :     completes. This is set by cancel() and cleared in operator() after the
      66              :     coroutine is resumed.
      67              : 
      68              :     EOF Detection
      69              :     -------------
      70              :     For reads, 0 bytes with no error means EOF. But an empty user buffer also
      71              :     returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
      72              : 
      73              :     SIGPIPE Prevention
      74              :     ------------------
      75              :     Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
      76              :     SIGPIPE when the peer has closed.
      77              : */
      78              : 
      79              : namespace boost::corosio::detail {
      80              : 
      81              : // Forward declarations
      82              : class epoll_socket_impl;
      83              : class epoll_acceptor_impl;
      84              : struct epoll_op;
      85              : 
      86              : // Forward declaration
      87              : class epoll_scheduler;
      88              : 
      89              : /** Per-descriptor state for persistent epoll registration.
      90              : 
      91              :     Tracks pending operations for a file descriptor. The fd is registered
      92              :     once with epoll and stays registered until closed.
      93              : 
      94              :     This struct extends scheduler_op to support deferred I/O processing.
      95              :     When epoll events arrive, the reactor sets ready_events and queues
      96              :     this descriptor for processing. When popped from the scheduler queue,
      97              :     operator() performs the actual I/O and queues completion handlers.
      98              : 
      99              :     @par Deferred I/O Model
     100              :     The reactor no longer performs I/O directly. Instead:
     101              :     1. Reactor sets ready_events and queues descriptor_state
     102              :     2. Scheduler pops descriptor_state and calls operator()
     103              :     3. operator() performs I/O under mutex and queues completions
     104              : 
     105              :     This eliminates per-descriptor mutex locking from the reactor hot path.
     106              : 
     107              :     @par Thread Safety
     108              :     The mutex protects operation pointers and ready flags during I/O.
     109              :     ready_events_ and is_enqueued_ are atomic for lock-free reactor access.
     110              : */
     111              : struct descriptor_state : scheduler_op
     112              : {
     113              :     std::mutex mutex;
     114              : 
     115              :     // Protected by mutex
     116              :     epoll_op* read_op = nullptr;
     117              :     epoll_op* write_op = nullptr;
     118              :     epoll_op* connect_op = nullptr;
     119              : 
     120              :     // Caches edge events that arrived before an op was registered
     121              :     bool read_ready = false;
     122              :     bool write_ready = false;
     123              : 
     124              :     // Set during registration only (no mutex needed)
     125              :     std::uint32_t registered_events = 0;
     126              :     int fd = -1;
     127              : 
     128              :     // For deferred I/O - set by reactor, read by scheduler
     129              :     std::atomic<std::uint32_t> ready_events_{0};
     130              :     std::atomic<bool> is_enqueued_{false};
     131              :     epoll_scheduler const* scheduler_ = nullptr;
     132              : 
     133              :     // Prevents impl destruction while this descriptor_state is queued.
     134              :     // Set by close_socket() when is_enqueued_ is true, cleared by operator().
     135              :     std::shared_ptr<void> impl_ref_;
     136              : 
     137              :     /// Add ready events atomically.
     138       125169 :     void add_ready_events(std::uint32_t ev) noexcept
     139              :     {
     140       125169 :         ready_events_.fetch_or(ev, std::memory_order_relaxed);
     141       125169 :     }
     142              : 
     143              :     /// Perform deferred I/O and queue completions.
     144              :     void operator()() override;
     145              : 
     146              :     /// Destroy without invoking.
     147            0 :     void destroy() override {}
     148              : };
     149              : 
     150              : struct epoll_op : scheduler_op
     151              : {
     152              :     struct canceller
     153              :     {
     154              :         epoll_op* op;
     155              :         void operator()() const noexcept;
     156              :     };
     157              : 
     158              :     capy::coro h;
     159              :     capy::executor_ref ex;
     160              :     std::error_code* ec_out = nullptr;
     161              :     std::size_t* bytes_out = nullptr;
     162              : 
     163              :     int fd = -1;
     164              :     int errn = 0;
     165              :     std::size_t bytes_transferred = 0;
     166              : 
     167              :     std::atomic<bool> cancelled{false};
     168              :     std::optional<std::stop_callback<canceller>> stop_cb;
     169              : 
     170              :     // Prevents use-after-free when socket is closed with pending ops.
     171              :     // See "Impl Lifetime Management" in file header.
     172              :     std::shared_ptr<void> impl_ptr;
     173              : 
     174              :     // For stop_token cancellation - pointer to owning socket/acceptor impl.
     175              :     // When stop is requested, we call back to the impl to perform actual I/O cancellation.
     176              :     epoll_socket_impl* socket_impl_ = nullptr;
     177              :     epoll_acceptor_impl* acceptor_impl_ = nullptr;
     178              : 
     179        30210 :     epoll_op() = default;
     180              : 
     181       240281 :     void reset() noexcept
     182              :     {
     183       240281 :         fd = -1;
     184       240281 :         errn = 0;
     185       240281 :         bytes_transferred = 0;
     186       240281 :         cancelled.store(false, std::memory_order_relaxed);
     187       240281 :         impl_ptr.reset();
     188       240281 :         socket_impl_ = nullptr;
     189       240281 :         acceptor_impl_ = nullptr;
     190       240281 :     }
     191              : 
     192       230237 :     void operator()() override
     193              :     {
     194       230237 :         stop_cb.reset();
     195              : 
     196       230237 :         if (ec_out)
     197              :         {
     198       230237 :             if (cancelled.load(std::memory_order_acquire))
     199          206 :                 *ec_out = capy::error::canceled;
     200       230031 :             else if (errn != 0)
     201            1 :                 *ec_out = make_err(errn);
     202       230030 :             else if (is_read_operation() && bytes_transferred == 0)
     203            5 :                 *ec_out = capy::error::eof;
     204              :             else
     205       230025 :                 *ec_out = {};
     206              :         }
     207              : 
     208       230237 :         if (bytes_out)
     209       230237 :             *bytes_out = bytes_transferred;
     210              : 
     211              :         // Move to stack before resuming coroutine. The coroutine might close
     212              :         // the socket, releasing the last wrapper ref. If impl_ptr were the
     213              :         // last ref and we destroyed it while still in operator(), we'd have
     214              :         // use-after-free. Moving to local ensures destruction happens at
     215              :         // function exit, after all member accesses are complete.
     216       230237 :         capy::executor_ref saved_ex( std::move( ex ) );
     217       230237 :         capy::coro saved_h( std::move( h ) );
     218       230237 :         auto prevent_premature_destruction = std::move(impl_ptr);
     219       230237 :         resume_coro(saved_ex, saved_h);
     220       230237 :     }
     221              : 
     222       115052 :     virtual bool is_read_operation() const noexcept { return false; }
     223              :     virtual void cancel() noexcept = 0;
     224              : 
     225            0 :     void destroy() override
     226              :     {
     227            0 :         stop_cb.reset();
     228            0 :         impl_ptr.reset();
     229            0 :     }
     230              : 
     231        45769 :     void request_cancel() noexcept
     232              :     {
     233        45769 :         cancelled.store(true, std::memory_order_release);
     234        45769 :     }
     235              : 
     236       235255 :     void start(std::stop_token token, epoll_socket_impl* impl)
     237              :     {
     238       235255 :         cancelled.store(false, std::memory_order_release);
     239       235255 :         stop_cb.reset();
     240       235255 :         socket_impl_ = impl;
     241       235255 :         acceptor_impl_ = nullptr;
     242              : 
     243       235255 :         if (token.stop_possible())
     244          106 :             stop_cb.emplace(token, canceller{this});
     245       235255 :     }
     246              : 
     247         5026 :     void start(std::stop_token token, epoll_acceptor_impl* impl)
     248              :     {
     249         5026 :         cancelled.store(false, std::memory_order_release);
     250         5026 :         stop_cb.reset();
     251         5026 :         socket_impl_ = nullptr;
     252         5026 :         acceptor_impl_ = impl;
     253              : 
     254         5026 :         if (token.stop_possible())
     255            9 :             stop_cb.emplace(token, canceller{this});
     256         5026 :     }
     257              : 
     258       240199 :     void complete(int err, std::size_t bytes) noexcept
     259              :     {
     260       240199 :         errn = err;
     261       240199 :         bytes_transferred = bytes;
     262       240199 :     }
     263              : 
     264            0 :     virtual void perform_io() noexcept {}
     265              : };
     266              : 
     267              : 
     268              : struct epoll_connect_op : epoll_op
     269              : {
     270              :     endpoint target_endpoint;
     271              : 
     272         5018 :     void reset() noexcept
     273              :     {
     274         5018 :         epoll_op::reset();
     275         5018 :         target_endpoint = endpoint{};
     276         5018 :     }
     277              : 
     278         5018 :     void perform_io() noexcept override
     279              :     {
     280              :         // connect() completion status is retrieved via SO_ERROR, not return value
     281         5018 :         int err = 0;
     282         5018 :         socklen_t len = sizeof(err);
     283         5018 :         if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
     284            0 :             err = errno;
     285         5018 :         complete(err, 0);
     286         5018 :     }
     287              : 
     288              :     // Defined in sockets.cpp where epoll_socket_impl is complete
     289              :     void operator()() override;
     290              :     void cancel() noexcept override;
     291              : };
     292              : 
     293              : 
     294              : struct epoll_read_op : epoll_op
     295              : {
     296              :     static constexpr std::size_t max_buffers = 16;
     297              :     iovec iovecs[max_buffers];
     298              :     int iovec_count = 0;
     299              :     bool empty_buffer_read = false;
     300              : 
     301       114978 :     bool is_read_operation() const noexcept override
     302              :     {
     303       114978 :         return !empty_buffer_read;
     304              :     }
     305              : 
     306       115180 :     void reset() noexcept
     307              :     {
     308       115180 :         epoll_op::reset();
     309       115180 :         iovec_count = 0;
     310       115180 :         empty_buffer_read = false;
     311       115180 :     }
     312              : 
     313           98 :     void perform_io() noexcept override
     314              :     {
     315           98 :         ssize_t n = ::readv(fd, iovecs, iovec_count);
     316           98 :         if (n >= 0)
     317           52 :             complete(0, static_cast<std::size_t>(n));
     318              :         else
     319           46 :             complete(errno, 0);
     320           98 :     }
     321              : 
     322              :     void cancel() noexcept override;
     323              : };
     324              : 
     325              : 
     326              : struct epoll_write_op : epoll_op
     327              : {
     328              :     static constexpr std::size_t max_buffers = 16;
     329              :     iovec iovecs[max_buffers];
     330              :     int iovec_count = 0;
     331              : 
     332       115057 :     void reset() noexcept
     333              :     {
     334       115057 :         epoll_op::reset();
     335       115057 :         iovec_count = 0;
     336       115057 :     }
     337              : 
     338            0 :     void perform_io() noexcept override
     339              :     {
     340            0 :         msghdr msg{};
     341            0 :         msg.msg_iov = iovecs;
     342            0 :         msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
     343              : 
     344            0 :         ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
     345            0 :         if (n >= 0)
     346            0 :             complete(0, static_cast<std::size_t>(n));
     347              :         else
     348            0 :             complete(errno, 0);
     349            0 :     }
     350              : 
     351              :     void cancel() noexcept override;
     352              : };
     353              : 
     354              : 
     355              : struct epoll_accept_op : epoll_op
     356              : {
     357              :     int accepted_fd = -1;
     358              :     io_object::io_object_impl* peer_impl = nullptr;
     359              :     io_object::io_object_impl** impl_out = nullptr;
     360              : 
     361         5026 :     void reset() noexcept
     362              :     {
     363         5026 :         epoll_op::reset();
     364         5026 :         accepted_fd = -1;
     365         5026 :         peer_impl = nullptr;
     366         5026 :         impl_out = nullptr;
     367         5026 :     }
     368              : 
     369         5015 :     void perform_io() noexcept override
     370              :     {
     371         5015 :         sockaddr_in addr{};
     372         5015 :         socklen_t addrlen = sizeof(addr);
     373         5015 :         int new_fd = ::accept4(fd, reinterpret_cast<sockaddr*>(&addr),
     374              :                                &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
     375              : 
     376         5015 :         if (new_fd >= 0)
     377              :         {
     378         5015 :             accepted_fd = new_fd;
     379         5015 :             complete(0, 0);
     380              :         }
     381              :         else
     382              :         {
     383            0 :             complete(errno, 0);
     384              :         }
     385         5015 :     }
     386              : 
     387              :     // Defined in acceptors.cpp where epoll_acceptor_impl is complete
     388              :     void operator()() override;
     389              :     void cancel() noexcept override;
     390              : };
     391              : 
     392              : } // namespace boost::corosio::detail
     393              : 
     394              : #endif // BOOST_COROSIO_HAS_EPOLL
     395              : 
     396              : #endif // BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
        

Generated by: LCOV version 2.3