LCOV - code coverage report
Current view: top level - src/detail/select - acceptors.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 64.3 % 249 160
Test Date: 2026-02-06 05:04:16 Functions: 89.5 % 19 17

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Steve Gerbino
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #include <boost/corosio/detail/platform.hpp>
      11              : 
      12              : #if BOOST_COROSIO_HAS_SELECT
      13              : 
      14              : #include "src/detail/select/acceptors.hpp"
      15              : #include "src/detail/select/sockets.hpp"
      16              : #include "src/detail/endpoint_convert.hpp"
      17              : #include "src/detail/make_err.hpp"
      18              : 
      19              : #include <errno.h>
      20              : #include <fcntl.h>
      21              : #include <netinet/in.h>
      22              : #include <sys/socket.h>
      23              : #include <unistd.h>
      24              : 
      25              : namespace boost::corosio::detail {
      26              : 
      27              : void
      28            0 : select_accept_op::
      29              : cancel() noexcept
      30              : {
      31            0 :     if (acceptor_impl_)
      32            0 :         acceptor_impl_->cancel_single_op(*this);
      33              :     else
      34            0 :         request_cancel();
      35            0 : }
      36              : 
      37              : void
      38         3837 : select_accept_op::
      39              : operator()()
      40              : {
      41         3837 :     stop_cb.reset();
      42              : 
      43         3837 :     bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
      44              : 
      45         3837 :     if (ec_out)
      46              :     {
      47         3837 :         if (cancelled.load(std::memory_order_acquire))
      48            3 :             *ec_out = capy::error::canceled;
      49         3834 :         else if (errn != 0)
      50            0 :             *ec_out = make_err(errn);
      51              :         else
      52         3834 :             *ec_out = {};
      53              :     }
      54              : 
      55         3837 :     if (success && accepted_fd >= 0)
      56              :     {
      57         3834 :         if (acceptor_impl_)
      58              :         {
      59         3834 :             auto* socket_svc = static_cast<select_acceptor_impl*>(acceptor_impl_)
      60         3834 :                 ->service().socket_service();
      61         3834 :             if (socket_svc)
      62              :             {
      63         3834 :                 auto& impl = static_cast<select_socket_impl&>(socket_svc->create_impl());
      64         3834 :                 impl.set_socket(accepted_fd);
      65              : 
      66         3834 :                 sockaddr_in local_addr{};
      67         3834 :                 socklen_t local_len = sizeof(local_addr);
      68         3834 :                 sockaddr_in remote_addr{};
      69         3834 :                 socklen_t remote_len = sizeof(remote_addr);
      70              : 
      71         3834 :                 endpoint local_ep, remote_ep;
      72         3834 :                 if (::getsockname(accepted_fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
      73         3834 :                     local_ep = from_sockaddr_in(local_addr);
      74         3834 :                 if (::getpeername(accepted_fd, reinterpret_cast<sockaddr*>(&remote_addr), &remote_len) == 0)
      75         3834 :                     remote_ep = from_sockaddr_in(remote_addr);
      76              : 
      77         3834 :                 impl.set_endpoints(local_ep, remote_ep);
      78              : 
      79         3834 :                 if (impl_out)
      80         3834 :                     *impl_out = &impl;
      81              : 
      82         3834 :                 accepted_fd = -1;
      83              :             }
      84              :             else
      85              :             {
      86            0 :                 if (ec_out && !*ec_out)
      87            0 :                     *ec_out = make_err(ENOENT);
      88            0 :                 ::close(accepted_fd);
      89            0 :                 accepted_fd = -1;
      90            0 :                 if (impl_out)
      91            0 :                     *impl_out = nullptr;
      92              :             }
      93              :         }
      94              :         else
      95              :         {
      96            0 :             ::close(accepted_fd);
      97            0 :             accepted_fd = -1;
      98            0 :             if (impl_out)
      99            0 :                 *impl_out = nullptr;
     100              :         }
     101         3834 :     }
     102              :     else
     103              :     {
     104            3 :         if (accepted_fd >= 0)
     105              :         {
     106            0 :             ::close(accepted_fd);
     107            0 :             accepted_fd = -1;
     108              :         }
     109              : 
     110            3 :         if (peer_impl)
     111              :         {
     112            0 :             peer_impl->release();
     113            0 :             peer_impl = nullptr;
     114              :         }
     115              : 
     116            3 :         if (impl_out)
     117            3 :             *impl_out = nullptr;
     118              :     }
     119              : 
     120              :     // Move to stack before destroying the frame
     121         3837 :     capy::executor_ref saved_ex( std::move( ex ) );
     122         3837 :     capy::coro saved_h( std::move( h ) );
     123         3837 :     impl_ptr.reset();
     124         3837 :     saved_ex.dispatch( saved_h );
     125         3837 : }
     126              : 
     127           54 : select_acceptor_impl::
     128           54 : select_acceptor_impl(select_acceptor_service& svc) noexcept
     129           54 :     : svc_(svc)
     130              : {
     131           54 : }
     132              : 
     133              : void
     134           54 : select_acceptor_impl::
     135              : release()
     136              : {
     137           54 :     close_socket();
     138           54 :     svc_.destroy_acceptor_impl(*this);
     139           54 : }
     140              : 
     141              : std::coroutine_handle<>
     142         3837 : select_acceptor_impl::
     143              : accept(
     144              :     std::coroutine_handle<> h,
     145              :     capy::executor_ref ex,
     146              :     std::stop_token token,
     147              :     std::error_code* ec,
     148              :     io_object::io_object_impl** impl_out)
     149              : {
     150         3837 :     auto& op = acc_;
     151         3837 :     op.reset();
     152         3837 :     op.h = h;
     153         3837 :     op.ex = ex;
     154         3837 :     op.ec_out = ec;
     155         3837 :     op.impl_out = impl_out;
     156         3837 :     op.fd = fd_;
     157         3837 :     op.start(token, this);
     158              : 
     159         3837 :     sockaddr_in addr{};
     160         3837 :     socklen_t addrlen = sizeof(addr);
     161         3837 :     int accepted = ::accept(fd_, reinterpret_cast<sockaddr*>(&addr), &addrlen);
     162              : 
     163         3837 :     if (accepted >= 0)
     164              :     {
     165              :         // Reject fds that exceed select()'s FD_SETSIZE limit.
     166              :         // Better to fail now than during later async operations.
     167            2 :         if (accepted >= FD_SETSIZE)
     168              :         {
     169            0 :             ::close(accepted);
     170            0 :             op.accepted_fd = -1;
     171            0 :             op.complete(EINVAL, 0);
     172            0 :             op.impl_ptr = shared_from_this();
     173            0 :             svc_.post(&op);
     174              :             // completion is always posted to scheduler queue, never inline.
     175            0 :             return std::noop_coroutine();
     176              :         }
     177              : 
     178              :         // Set non-blocking and close-on-exec flags.
     179              :         // A non-blocking socket is essential for the async reactor;
     180              :         // if we can't configure it, fail rather than risk blocking.
     181            2 :         int flags = ::fcntl(accepted, F_GETFL, 0);
     182            2 :         if (flags == -1)
     183              :         {
     184            0 :             int err = errno;
     185            0 :             ::close(accepted);
     186            0 :             op.accepted_fd = -1;
     187            0 :             op.complete(err, 0);
     188            0 :             op.impl_ptr = shared_from_this();
     189            0 :             svc_.post(&op);
     190              :             // completion is always posted to scheduler queue, never inline.
     191            0 :             return std::noop_coroutine();
     192              :         }
     193              : 
     194            2 :         if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
     195              :         {
     196            0 :             int err = errno;
     197            0 :             ::close(accepted);
     198            0 :             op.accepted_fd = -1;
     199            0 :             op.complete(err, 0);
     200            0 :             op.impl_ptr = shared_from_this();
     201            0 :             svc_.post(&op);
     202              :             // completion is always posted to scheduler queue, never inline.
     203            0 :             return std::noop_coroutine();
     204              :         }
     205              : 
     206            2 :         if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
     207              :         {
     208            0 :             int err = errno;
     209            0 :             ::close(accepted);
     210            0 :             op.accepted_fd = -1;
     211            0 :             op.complete(err, 0);
     212            0 :             op.impl_ptr = shared_from_this();
     213            0 :             svc_.post(&op);
     214              :             // completion is always posted to scheduler queue, never inline.
     215            0 :             return std::noop_coroutine();
     216              :         }
     217              : 
     218            2 :         op.accepted_fd = accepted;
     219            2 :         op.complete(0, 0);
     220            2 :         op.impl_ptr = shared_from_this();
     221            2 :         svc_.post(&op);
     222              :         // completion is always posted to scheduler queue, never inline.
     223            2 :         return std::noop_coroutine();
     224              :     }
     225              : 
     226         3835 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     227              :     {
     228         3835 :         svc_.work_started();
     229         3835 :         op.impl_ptr = shared_from_this();
     230              : 
     231              :         // Set registering BEFORE register_fd to close the race window where
     232              :         // reactor sees an event before we set registered.
     233         3835 :         op.registered.store(select_registration_state::registering, std::memory_order_release);
     234         3835 :         svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
     235              : 
     236              :         // Transition to registered. If this fails, reactor or cancel already
     237              :         // claimed the op (state is now unregistered), so we're done. However,
     238              :         // we must still deregister the fd because cancel's deregister_fd may
     239              :         // have run before our register_fd, leaving the fd orphaned.
     240         3835 :         auto expected = select_registration_state::registering;
     241         3835 :         if (!op.registered.compare_exchange_strong(
     242              :                 expected, select_registration_state::registered, std::memory_order_acq_rel))
     243              :         {
     244            0 :             svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
     245              :             // completion is always posted to scheduler queue, never inline.
     246            0 :             return std::noop_coroutine();
     247              :         }
     248              : 
     249              :         // If cancelled was set before we registered, handle it now.
     250         3835 :         if (op.cancelled.load(std::memory_order_acquire))
     251              :         {
     252            0 :             auto prev = op.registered.exchange(
     253              :                 select_registration_state::unregistered, std::memory_order_acq_rel);
     254            0 :             if (prev != select_registration_state::unregistered)
     255              :             {
     256            0 :                 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
     257            0 :                 op.impl_ptr = shared_from_this();
     258            0 :                 svc_.post(&op);
     259            0 :                 svc_.work_finished();
     260              :             }
     261              :         }
     262              :         // completion is always posted to scheduler queue, never inline.
     263         3835 :         return std::noop_coroutine();
     264              :     }
     265              : 
     266            0 :     op.complete(errno, 0);
     267            0 :     op.impl_ptr = shared_from_this();
     268            0 :     svc_.post(&op);
     269              :     // completion is always posted to scheduler queue, never inline.
     270            0 :     return std::noop_coroutine();
     271              : }
     272              : 
     273              : void
     274          109 : select_acceptor_impl::
     275              : cancel() noexcept
     276              : {
     277          109 :     std::shared_ptr<select_acceptor_impl> self;
     278              :     try {
     279          109 :         self = shared_from_this();
     280            0 :     } catch (const std::bad_weak_ptr&) {
     281            0 :         return;
     282            0 :     }
     283              : 
     284          109 :     auto prev = acc_.registered.exchange(
     285              :         select_registration_state::unregistered, std::memory_order_acq_rel);
     286          109 :     acc_.request_cancel();
     287              : 
     288          109 :     if (prev != select_registration_state::unregistered)
     289              :     {
     290            3 :         svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
     291            3 :         acc_.impl_ptr = self;
     292            3 :         svc_.post(&acc_);
     293            3 :         svc_.work_finished();
     294              :     }
     295          109 : }
     296              : 
     297              : void
     298            0 : select_acceptor_impl::
     299              : cancel_single_op(select_op& op) noexcept
     300              : {
     301              :     // Called from stop_token callback to cancel a specific pending operation.
     302            0 :     auto prev = op.registered.exchange(
     303              :         select_registration_state::unregistered, std::memory_order_acq_rel);
     304            0 :     op.request_cancel();
     305              : 
     306            0 :     if (prev != select_registration_state::unregistered)
     307              :     {
     308            0 :         svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
     309              : 
     310              :         // Keep impl alive until op completes
     311              :         try {
     312            0 :             op.impl_ptr = shared_from_this();
     313            0 :         } catch (const std::bad_weak_ptr&) {
     314              :             // Impl is being destroyed, op will be orphaned but that's ok
     315            0 :         }
     316              : 
     317            0 :         svc_.post(&op);
     318            0 :         svc_.work_finished();
     319              :     }
     320            0 : }
     321              : 
     322              : void
     323          108 : select_acceptor_impl::
     324              : close_socket() noexcept
     325              : {
     326          108 :     cancel();
     327              : 
     328          108 :     if (fd_ >= 0)
     329              :     {
     330              :         // Unconditionally remove from registered_fds_ to handle edge cases
     331           42 :         svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
     332           42 :         ::close(fd_);
     333           42 :         fd_ = -1;
     334              :     }
     335              : 
     336              :     // Clear cached endpoint
     337          108 :     local_endpoint_ = endpoint{};
     338          108 : }
     339              : 
     340          120 : select_acceptor_service::
     341          120 : select_acceptor_service(capy::execution_context& ctx)
     342          120 :     : ctx_(ctx)
     343          120 :     , state_(std::make_unique<select_acceptor_state>(ctx.use_service<select_scheduler>()))
     344              : {
     345          120 : }
     346              : 
     347          240 : select_acceptor_service::
     348          120 : ~select_acceptor_service()
     349              : {
     350          240 : }
     351              : 
     352              : void
     353          120 : select_acceptor_service::
     354              : shutdown()
     355              : {
     356          120 :     std::lock_guard lock(state_->mutex_);
     357              : 
     358          120 :     while (auto* impl = state_->acceptor_list_.pop_front())
     359            0 :         impl->close_socket();
     360              : 
     361          120 :     state_->acceptor_ptrs_.clear();
     362          120 : }
     363              : 
     364              : tcp_acceptor::acceptor_impl&
     365           54 : select_acceptor_service::
     366              : create_acceptor_impl()
     367              : {
     368           54 :     auto impl = std::make_shared<select_acceptor_impl>(*this);
     369           54 :     auto* raw = impl.get();
     370              : 
     371           54 :     std::lock_guard lock(state_->mutex_);
     372           54 :     state_->acceptor_list_.push_back(raw);
     373           54 :     state_->acceptor_ptrs_.emplace(raw, std::move(impl));
     374              : 
     375           54 :     return *raw;
     376           54 : }
     377              : 
     378              : void
     379           54 : select_acceptor_service::
     380              : destroy_acceptor_impl(tcp_acceptor::acceptor_impl& impl)
     381              : {
     382           54 :     auto* select_impl = static_cast<select_acceptor_impl*>(&impl);
     383           54 :     std::lock_guard lock(state_->mutex_);
     384           54 :     state_->acceptor_list_.remove(select_impl);
     385           54 :     state_->acceptor_ptrs_.erase(select_impl);
     386           54 : }
     387              : 
     388              : std::error_code
     389           54 : select_acceptor_service::
     390              : open_acceptor(
     391              :     tcp_acceptor::acceptor_impl& impl,
     392              :     endpoint ep,
     393              :     int backlog)
     394              : {
     395           54 :     auto* select_impl = static_cast<select_acceptor_impl*>(&impl);
     396           54 :     select_impl->close_socket();
     397              : 
     398           54 :     int fd = ::socket(AF_INET, SOCK_STREAM, 0);
     399           54 :     if (fd < 0)
     400            0 :         return make_err(errno);
     401              : 
     402              :     // Set non-blocking and close-on-exec
     403           54 :     int flags = ::fcntl(fd, F_GETFL, 0);
     404           54 :     if (flags == -1)
     405              :     {
     406            0 :         int errn = errno;
     407            0 :         ::close(fd);
     408            0 :         return make_err(errn);
     409              :     }
     410           54 :     if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
     411              :     {
     412            0 :         int errn = errno;
     413            0 :         ::close(fd);
     414            0 :         return make_err(errn);
     415              :     }
     416           54 :     if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
     417              :     {
     418            0 :         int errn = errno;
     419            0 :         ::close(fd);
     420            0 :         return make_err(errn);
     421              :     }
     422              : 
     423              :     // Check fd is within select() limits
     424           54 :     if (fd >= FD_SETSIZE)
     425              :     {
     426            0 :         ::close(fd);
     427            0 :         return make_err(EMFILE);
     428              :     }
     429              : 
     430           54 :     int reuse = 1;
     431           54 :     ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
     432              : 
     433           54 :     sockaddr_in addr = detail::to_sockaddr_in(ep);
     434           54 :     if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
     435              :     {
     436           12 :         int errn = errno;
     437           12 :         ::close(fd);
     438           12 :         return make_err(errn);
     439              :     }
     440              : 
     441           42 :     if (::listen(fd, backlog) < 0)
     442              :     {
     443            0 :         int errn = errno;
     444            0 :         ::close(fd);
     445            0 :         return make_err(errn);
     446              :     }
     447              : 
     448           42 :     select_impl->fd_ = fd;
     449              : 
     450              :     // Cache the local endpoint (queries OS for ephemeral port if port was 0)
     451           42 :     sockaddr_in local_addr{};
     452           42 :     socklen_t local_len = sizeof(local_addr);
     453           42 :     if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
     454           42 :         select_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
     455              : 
     456           42 :     return {};
     457              : }
     458              : 
     459              : void
     460            5 : select_acceptor_service::
     461              : post(select_op* op)
     462              : {
     463            5 :     state_->sched_.post(op);
     464            5 : }
     465              : 
     466              : void
     467         3835 : select_acceptor_service::
     468              : work_started() noexcept
     469              : {
     470         3835 :     state_->sched_.work_started();
     471         3835 : }
     472              : 
     473              : void
     474            3 : select_acceptor_service::
     475              : work_finished() noexcept
     476              : {
     477            3 :     state_->sched_.work_finished();
     478            3 : }
     479              : 
     480              : select_socket_service*
     481         3834 : select_acceptor_service::
     482              : socket_service() const noexcept
     483              : {
     484         3834 :     auto* svc = ctx_.find_service<detail::socket_service>();
     485         3834 :     return svc ? dynamic_cast<select_socket_service*>(svc) : nullptr;
     486              : }
     487              : 
     488              : } // namespace boost::corosio::detail
     489              : 
     490              : #endif // BOOST_COROSIO_HAS_SELECT
        

Generated by: LCOV version 2.3