LCOV - code coverage report
Current view: top level - src/detail/select - op.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 74.2 % 128 95
Test Date: 2026-02-06 05:04:16 Functions: 84.2 % 19 16

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Steve Gerbino
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #ifndef BOOST_COROSIO_DETAIL_SELECT_OP_HPP
      11              : #define BOOST_COROSIO_DETAIL_SELECT_OP_HPP
      12              : 
      13              : #include <boost/corosio/detail/platform.hpp>
      14              : 
      15              : #if BOOST_COROSIO_HAS_SELECT
      16              : 
      17              : #include <boost/corosio/detail/config.hpp>
      18              : #include <boost/corosio/io_object.hpp>
      19              : #include <boost/corosio/endpoint.hpp>
      20              : #include <boost/capy/ex/executor_ref.hpp>
      21              : #include <boost/capy/coro.hpp>
      22              : #include <boost/capy/error.hpp>
      23              : #include <system_error>
      24              : 
      25              : #include "src/detail/make_err.hpp"
      26              : #include "src/detail/scheduler_op.hpp"
      27              : #include "src/detail/endpoint_convert.hpp"
      28              : 
      29              : #include <unistd.h>
      30              : #include <errno.h>
      31              : #include <fcntl.h>
      32              : 
      33              : #include <atomic>
      34              : #include <cstddef>
      35              : #include <memory>
      36              : #include <optional>
      37              : #include <stop_token>
      38              : 
      39              : #include <netinet/in.h>
      40              : #include <sys/select.h>
      41              : #include <sys/socket.h>
      42              : #include <sys/uio.h>
      43              : 
      44              : /*
      45              :     select Operation State
      46              :     ======================
      47              : 
      48              :     Each async I/O operation has a corresponding select_op-derived struct that
      49              :     holds the operation's state while it's in flight. The socket impl owns
      50              :     fixed slots for each operation type (conn_, rd_, wr_), so only one
      51              :     operation of each type can be pending per socket at a time.
      52              : 
      53              :     This mirrors the epoll_op design for consistency across backends.
      54              : 
      55              :     Completion vs Cancellation Race
      56              :     -------------------------------
      57              :     The `registered` atomic uses a tri-state (unregistered, registering,
      58              :     registered) to handle two races: (1) between register_fd() and the
      59              :     reactor seeing an event, and (2) between reactor completion and cancel().
      60              : 
      61              :     The registering state closes the window where an event could arrive
      62              :     after register_fd() but before the boolean was set. The reactor and
      63              :     cancel() both treat registering the same as registered when claiming.
      64              : 
      65              :     Whoever atomically exchanges to unregistered "claims" the operation
      66              :     and is responsible for completing it. The loser sees unregistered and
      67              :     does nothing. The initiating thread uses compare_exchange to transition
      68              :     from registering to registered; if this fails, the reactor or cancel
      69              :     already claimed the op.
      70              : 
      71              :     Impl Lifetime Management
      72              :     ------------------------
      73              :     When cancel() posts an op to the scheduler's ready queue, the socket impl
      74              :     might be destroyed before the scheduler processes the op. The `impl_ptr`
      75              :     member holds a shared_ptr to the impl, keeping it alive until the op
      76              :     completes.
      77              : 
      78              :     EOF Detection
      79              :     -------------
      80              :     For reads, 0 bytes with no error means EOF. But an empty user buffer also
      81              :     returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
      82              : 
      83              :     SIGPIPE Prevention
      84              :     ------------------
      85              :     Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
      86              :     SIGPIPE when the peer has closed.
      87              : */
      88              : 
      89              : namespace boost::corosio::detail {
      90              : 
      91              : // Forward declarations for cancellation support
      92              : class select_socket_impl;
      93              : class select_acceptor_impl;
      94              : 
      95              : /** Registration state for async operations.
      96              : 
      97              :     Tri-state enum to handle the race between register_fd() and
      98              :     run_reactor() seeing an event. Setting REGISTERING before
      99              :     calling register_fd() ensures events delivered during the
     100              :     registration window are not dropped.
     101              : */
     102              : enum class select_registration_state : std::uint8_t
     103              : {
     104              :     unregistered,  ///< Not registered with reactor
     105              :     registering,   ///< register_fd() called, not yet confirmed
     106              :     registered     ///< Fully registered, ready for events
     107              : };
     108              : 
     109              : struct select_op : scheduler_op
     110              : {
     111              :     struct canceller
     112              :     {
     113              :         select_op* op;
     114              :         void operator()() const noexcept;
     115              :     };
     116              : 
     117              :     capy::coro h;
     118              :     capy::executor_ref ex;
     119              :     std::error_code* ec_out = nullptr;
     120              :     std::size_t* bytes_out = nullptr;
     121              : 
     122              :     int fd = -1;
     123              :     int errn = 0;
     124              :     std::size_t bytes_transferred = 0;
     125              : 
     126              :     std::atomic<bool> cancelled{false};
     127              :     std::atomic<select_registration_state> registered{select_registration_state::unregistered};
     128              :     std::optional<std::stop_callback<canceller>> stop_cb;
     129              : 
     130              :     // Prevents use-after-free when socket is closed with pending ops.
     131              :     std::shared_ptr<void> impl_ptr;
     132              : 
     133              :     // For stop_token cancellation - pointer to owning socket/acceptor impl.
     134              :     select_socket_impl* socket_impl_ = nullptr;
     135              :     select_acceptor_impl* acceptor_impl_ = nullptr;
     136              : 
     137        23094 :     select_op() = default;
     138              : 
     139       240194 :     void reset() noexcept
     140              :     {
     141       240194 :         fd = -1;
     142       240194 :         errn = 0;
     143       240194 :         bytes_transferred = 0;
     144       240194 :         cancelled.store(false, std::memory_order_relaxed);
     145       240194 :         registered.store(select_registration_state::unregistered, std::memory_order_relaxed);
     146       240194 :         impl_ptr.reset();
     147       240194 :         socket_impl_ = nullptr;
     148       240194 :         acceptor_impl_ = nullptr;
     149       240194 :     }
     150              : 
     151       232522 :     void operator()() override
     152              :     {
     153       232522 :         stop_cb.reset();
     154              : 
     155       232522 :         if (ec_out)
     156              :         {
     157       232522 :             if (cancelled.load(std::memory_order_acquire))
     158          204 :                 *ec_out = capy::error::canceled;
     159       232318 :             else if (errn != 0)
     160            1 :                 *ec_out = make_err(errn);
     161       232317 :             else if (is_read_operation() && bytes_transferred == 0)
     162            5 :                 *ec_out = capy::error::eof;
     163              :             else
     164       232312 :                 *ec_out = {};
     165              :         }
     166              : 
     167       232522 :         if (bytes_out)
     168       232522 :             *bytes_out = bytes_transferred;
     169              : 
     170              :         // Move to stack before destroying the frame
     171       232522 :         capy::executor_ref saved_ex( std::move( ex ) );
     172       232522 :         capy::coro saved_h( std::move( h ) );
     173       232522 :         impl_ptr.reset();
     174       232522 :         saved_ex.dispatch( saved_h );
     175       232522 :     }
     176              : 
     177       116195 :     virtual bool is_read_operation() const noexcept { return false; }
     178              :     virtual void cancel() noexcept = 0;
     179              : 
     180            0 :     void destroy() override
     181              :     {
     182            0 :         stop_cb.reset();
     183            0 :         impl_ptr.reset();
     184            0 :     }
     185              : 
     186        35074 :     void request_cancel() noexcept
     187              :     {
     188        35074 :         cancelled.store(true, std::memory_order_release);
     189        35074 :     }
     190              : 
     191              :     void start(std::stop_token token)
     192              :     {
     193              :         cancelled.store(false, std::memory_order_release);
     194              :         stop_cb.reset();
     195              :         socket_impl_ = nullptr;
     196              :         acceptor_impl_ = nullptr;
     197              : 
     198              :         if (token.stop_possible())
     199              :             stop_cb.emplace(token, canceller{this});
     200              :     }
     201              : 
     202       236357 :     void start(std::stop_token token, select_socket_impl* impl)
     203              :     {
     204       236357 :         cancelled.store(false, std::memory_order_release);
     205       236357 :         stop_cb.reset();
     206       236357 :         socket_impl_ = impl;
     207       236357 :         acceptor_impl_ = nullptr;
     208              : 
     209       236357 :         if (token.stop_possible())
     210          100 :             stop_cb.emplace(token, canceller{this});
     211       236357 :     }
     212              : 
     213         3837 :     void start(std::stop_token token, select_acceptor_impl* impl)
     214              :     {
     215         3837 :         cancelled.store(false, std::memory_order_release);
     216         3837 :         stop_cb.reset();
     217         3837 :         socket_impl_ = nullptr;
     218         3837 :         acceptor_impl_ = impl;
     219              : 
     220         3837 :         if (token.stop_possible())
     221            0 :             stop_cb.emplace(token, canceller{this});
     222         3837 :     }
     223              : 
     224       240073 :     void complete(int err, std::size_t bytes) noexcept
     225              :     {
     226       240073 :         errn = err;
     227       240073 :         bytes_transferred = bytes;
     228       240073 :     }
     229              : 
     230            0 :     virtual void perform_io() noexcept {}
     231              : };
     232              : 
     233              : 
     234              : struct select_connect_op : select_op
     235              : {
     236              :     endpoint target_endpoint;
     237              : 
     238         3835 :     void reset() noexcept
     239              :     {
     240         3835 :         select_op::reset();
     241         3835 :         target_endpoint = endpoint{};
     242         3835 :     }
     243              : 
     244         3835 :     void perform_io() noexcept override
     245              :     {
     246              :         // connect() completion status is retrieved via SO_ERROR, not return value
     247         3835 :         int err = 0;
     248         3835 :         socklen_t len = sizeof(err);
     249         3835 :         if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
     250            0 :             err = errno;
     251         3835 :         complete(err, 0);
     252         3835 :     }
     253              : 
     254              :     // Defined in sockets.cpp where select_socket_impl is complete
     255              :     void operator()() override;
     256              :     void cancel() noexcept override;
     257              : };
     258              : 
     259              : 
     260              : struct select_read_op : select_op
     261              : {
     262              :     static constexpr std::size_t max_buffers = 16;
     263              :     iovec iovecs[max_buffers];
     264              :     int iovec_count = 0;
     265              :     bool empty_buffer_read = false;
     266              : 
     267       116122 :     bool is_read_operation() const noexcept override
     268              :     {
     269       116122 :         return !empty_buffer_read;
     270              :     }
     271              : 
     272       116322 :     void reset() noexcept
     273              :     {
     274       116322 :         select_op::reset();
     275       116322 :         iovec_count = 0;
     276       116322 :         empty_buffer_read = false;
     277       116322 :     }
     278              : 
     279           81 :     void perform_io() noexcept override
     280              :     {
     281           81 :         ssize_t n = ::readv(fd, iovecs, iovec_count);
     282           81 :         if (n >= 0)
     283           81 :             complete(0, static_cast<std::size_t>(n));
     284              :         else
     285            0 :             complete(errno, 0);
     286           81 :     }
     287              : 
     288              :     void cancel() noexcept override;
     289              : };
     290              : 
     291              : 
     292              : struct select_write_op : select_op
     293              : {
     294              :     static constexpr std::size_t max_buffers = 16;
     295              :     iovec iovecs[max_buffers];
     296              :     int iovec_count = 0;
     297              : 
     298       116200 :     void reset() noexcept
     299              :     {
     300       116200 :         select_op::reset();
     301       116200 :         iovec_count = 0;
     302       116200 :     }
     303              : 
     304            0 :     void perform_io() noexcept override
     305              :     {
     306            0 :         msghdr msg{};
     307            0 :         msg.msg_iov = iovecs;
     308            0 :         msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
     309              : 
     310            0 :         ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
     311            0 :         if (n >= 0)
     312            0 :             complete(0, static_cast<std::size_t>(n));
     313              :         else
     314            0 :             complete(errno, 0);
     315            0 :     }
     316              : 
     317              :     void cancel() noexcept override;
     318              : };
     319              : 
     320              : 
     321              : struct select_accept_op : select_op
     322              : {
     323              :     int accepted_fd = -1;
     324              :     io_object::io_object_impl* peer_impl = nullptr;
     325              :     io_object::io_object_impl** impl_out = nullptr;
     326              : 
     327         3837 :     void reset() noexcept
     328              :     {
     329         3837 :         select_op::reset();
     330         3837 :         accepted_fd = -1;
     331         3837 :         peer_impl = nullptr;
     332         3837 :         impl_out = nullptr;
     333         3837 :     }
     334              : 
     335         3832 :     void perform_io() noexcept override
     336              :     {
     337         3832 :         sockaddr_in addr{};
     338         3832 :         socklen_t addrlen = sizeof(addr);
     339              : 
     340              :         // Note: select backend uses accept() + fcntl instead of accept4()
     341              :         // for broader POSIX compatibility
     342         3832 :         int new_fd = ::accept(fd, reinterpret_cast<sockaddr*>(&addr), &addrlen);
     343              : 
     344         3832 :         if (new_fd >= 0)
     345              :         {
     346              :             // Reject fds that exceed select()'s FD_SETSIZE limit.
     347              :             // Better to fail now than during later async operations.
     348         3832 :             if (new_fd >= FD_SETSIZE)
     349              :             {
     350            0 :                 ::close(new_fd);
     351            0 :                 complete(EINVAL, 0);
     352            0 :                 return;
     353              :             }
     354              : 
     355              :             // Set non-blocking and close-on-exec flags.
     356              :             // A non-blocking socket is essential for the async reactor;
     357              :             // if we can't configure it, fail rather than risk blocking.
     358         3832 :             int flags = ::fcntl(new_fd, F_GETFL, 0);
     359         3832 :             if (flags == -1)
     360              :             {
     361            0 :                 int err = errno;
     362            0 :                 ::close(new_fd);
     363            0 :                 complete(err, 0);
     364            0 :                 return;
     365              :             }
     366              : 
     367         3832 :             if (::fcntl(new_fd, F_SETFL, flags | O_NONBLOCK) == -1)
     368              :             {
     369            0 :                 int err = errno;
     370            0 :                 ::close(new_fd);
     371            0 :                 complete(err, 0);
     372            0 :                 return;
     373              :             }
     374              : 
     375         3832 :             if (::fcntl(new_fd, F_SETFD, FD_CLOEXEC) == -1)
     376              :             {
     377            0 :                 int err = errno;
     378            0 :                 ::close(new_fd);
     379            0 :                 complete(err, 0);
     380            0 :                 return;
     381              :             }
     382              : 
     383         3832 :             accepted_fd = new_fd;
     384         3832 :             complete(0, 0);
     385              :         }
     386              :         else
     387              :         {
     388            0 :             complete(errno, 0);
     389              :         }
     390              :     }
     391              : 
     392              :     // Defined in acceptors.cpp where select_acceptor_impl is complete
     393              :     void operator()() override;
     394              :     void cancel() noexcept override;
     395              : };
     396              : 
     397              : } // namespace boost::corosio::detail
     398              : 
     399              : #endif // BOOST_COROSIO_HAS_SELECT
     400              : 
     401              : #endif // BOOST_COROSIO_DETAIL_SELECT_OP_HPP
        

Generated by: LCOV version 2.3