LCOV - code coverage report
Current view: top level - src/detail/select - sockets.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 73.5 % 373 274
Test Date: 2026-02-06 05:04:16 Functions: 94.3 % 35 33

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Steve Gerbino
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #include <boost/corosio/detail/platform.hpp>
      11              : 
      12              : #if BOOST_COROSIO_HAS_SELECT
      13              : 
      14              : #include "src/detail/select/sockets.hpp"
      15              : #include "src/detail/endpoint_convert.hpp"
      16              : #include "src/detail/make_err.hpp"
      17              : 
      18              : #include <boost/capy/buffers.hpp>
      19              : 
      20              : #include <errno.h>
      21              : #include <fcntl.h>
      22              : #include <netinet/in.h>
      23              : #include <netinet/tcp.h>
      24              : #include <sys/socket.h>
      25              : #include <unistd.h>
      26              : 
      27              : namespace boost::corosio::detail {
      28              : 
      29              : void
      30           99 : select_op::canceller::
      31              : operator()() const noexcept
      32              : {
      33           99 :     op->cancel();
      34           99 : }
      35              : 
      36              : void
      37            0 : select_connect_op::
      38              : cancel() noexcept
      39              : {
      40            0 :     if (socket_impl_)
      41            0 :         socket_impl_->cancel_single_op(*this);
      42              :     else
      43            0 :         request_cancel();
      44            0 : }
      45              : 
      46              : void
      47           99 : select_read_op::
      48              : cancel() noexcept
      49              : {
      50           99 :     if (socket_impl_)
      51           99 :         socket_impl_->cancel_single_op(*this);
      52              :     else
      53            0 :         request_cancel();
      54           99 : }
      55              : 
      56              : void
      57            0 : select_write_op::
      58              : cancel() noexcept
      59              : {
      60            0 :     if (socket_impl_)
      61            0 :         socket_impl_->cancel_single_op(*this);
      62              :     else
      63            0 :         request_cancel();
      64            0 : }
      65              : 
      66              : void
      67         3835 : select_connect_op::
      68              : operator()()
      69              : {
      70         3835 :     stop_cb.reset();
      71              : 
      72         3835 :     bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
      73              : 
      74              :     // Cache endpoints on successful connect
      75         3835 :     if (success && socket_impl_)
      76              :     {
      77              :         // Query local endpoint via getsockname (may fail, but remote is always known)
      78         3834 :         endpoint local_ep;
      79         3834 :         sockaddr_in local_addr{};
      80         3834 :         socklen_t local_len = sizeof(local_addr);
      81         3834 :         if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
      82         3834 :             local_ep = from_sockaddr_in(local_addr);
      83              :         // Always cache remote endpoint; local may be default if getsockname failed
      84         3834 :         static_cast<select_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
      85              :     }
      86              : 
      87         3835 :     if (ec_out)
      88              :     {
      89         3835 :         if (cancelled.load(std::memory_order_acquire))
      90            0 :             *ec_out = capy::error::canceled;
      91         3835 :         else if (errn != 0)
      92            1 :             *ec_out = make_err(errn);
      93              :         else
      94         3834 :             *ec_out = {};
      95              :     }
      96              : 
      97         3835 :     if (bytes_out)
      98            0 :         *bytes_out = bytes_transferred;
      99              : 
     100              :     // Move to stack before destroying the frame
     101         3835 :     capy::executor_ref saved_ex( std::move( ex ) );
     102         3835 :     capy::coro saved_h( std::move( h ) );
     103         3835 :     impl_ptr.reset();
     104         3835 :     saved_ex.dispatch( saved_h );
     105         3835 : }
     106              : 
     107         7680 : select_socket_impl::
     108         7680 : select_socket_impl(select_socket_service& svc) noexcept
     109         7680 :     : svc_(svc)
     110              : {
     111         7680 : }
     112              : 
     113              : void
     114         7680 : select_socket_impl::
     115              : release()
     116              : {
     117         7680 :     close_socket();
     118         7680 :     svc_.destroy_impl(*this);
     119         7680 : }
     120              : 
     121              : std::coroutine_handle<>
     122         3835 : select_socket_impl::
     123              : connect(
     124              :     std::coroutine_handle<> h,
     125              :     capy::executor_ref ex,
     126              :     endpoint ep,
     127              :     std::stop_token token,
     128              :     std::error_code* ec)
     129              : {
     130         3835 :     auto& op = conn_;
     131         3835 :     op.reset();
     132         3835 :     op.h = h;
     133         3835 :     op.ex = ex;
     134         3835 :     op.ec_out = ec;
     135         3835 :     op.fd = fd_;
     136         3835 :     op.target_endpoint = ep;  // Store target for endpoint caching
     137         3835 :     op.start(token, this);
     138              : 
     139         3835 :     sockaddr_in addr = detail::to_sockaddr_in(ep);
     140         3835 :     int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
     141              : 
     142         3835 :     if (result == 0)
     143              :     {
     144              :         // Sync success - cache endpoints immediately
     145            0 :         sockaddr_in local_addr{};
     146            0 :         socklen_t local_len = sizeof(local_addr);
     147            0 :         if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
     148            0 :             local_endpoint_ = detail::from_sockaddr_in(local_addr);
     149            0 :         remote_endpoint_ = ep;
     150              : 
     151            0 :         op.complete(0, 0);
     152            0 :         op.impl_ptr = shared_from_this();
     153            0 :         svc_.post(&op);
     154              :         // completion is always posted to scheduler queue, never inline.
     155            0 :         return std::noop_coroutine();
     156              :     }
     157              : 
     158         3835 :     if (errno == EINPROGRESS)
     159              :     {
     160         3835 :         svc_.work_started();
     161         3835 :         op.impl_ptr = shared_from_this();
     162              : 
     163              :         // Set registering BEFORE register_fd to close the race window where
     164              :         // reactor sees an event before we set registered. The reactor treats
     165              :         // registering the same as registered when claiming the op.
     166         3835 :         op.registered.store(select_registration_state::registering, std::memory_order_release);
     167         3835 :         svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
     168              : 
     169              :         // Transition to registered. If this fails, reactor or cancel already
     170              :         // claimed the op (state is now unregistered), so we're done. However,
     171              :         // we must still deregister the fd because cancel's deregister_fd may
     172              :         // have run before our register_fd, leaving the fd orphaned.
     173         3835 :         auto expected = select_registration_state::registering;
     174         3835 :         if (!op.registered.compare_exchange_strong(
     175              :                 expected, select_registration_state::registered, std::memory_order_acq_rel))
     176              :         {
     177            0 :             svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
     178              :             // completion is always posted to scheduler queue, never inline.
     179            0 :             return std::noop_coroutine();
     180              :         }
     181              : 
     182              :         // If cancelled was set before we registered, handle it now.
     183         3835 :         if (op.cancelled.load(std::memory_order_acquire))
     184              :         {
     185            0 :             auto prev = op.registered.exchange(
     186              :                 select_registration_state::unregistered, std::memory_order_acq_rel);
     187            0 :             if (prev != select_registration_state::unregistered)
     188              :             {
     189            0 :                 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
     190            0 :                 op.impl_ptr = shared_from_this();
     191            0 :                 svc_.post(&op);
     192            0 :                 svc_.work_finished();
     193              :             }
     194              :         }
     195              :         // completion is always posted to scheduler queue, never inline.
     196         3835 :         return std::noop_coroutine();
     197              :     }
     198              : 
     199            0 :     op.complete(errno, 0);
     200            0 :     op.impl_ptr = shared_from_this();
     201            0 :     svc_.post(&op);
     202              :     // completion is always posted to scheduler queue, never inline.
     203            0 :     return std::noop_coroutine();
     204              : }
     205              : 
     206              : std::coroutine_handle<>
     207       116322 : select_socket_impl::
     208              : read_some(
     209              :     std::coroutine_handle<> h,
     210              :     capy::executor_ref ex,
     211              :     io_buffer_param param,
     212              :     std::stop_token token,
     213              :     std::error_code* ec,
     214              :     std::size_t* bytes_out)
     215              : {
     216       116322 :     auto& op = rd_;
     217       116322 :     op.reset();
     218       116322 :     op.h = h;
     219       116322 :     op.ex = ex;
     220       116322 :     op.ec_out = ec;
     221       116322 :     op.bytes_out = bytes_out;
     222       116322 :     op.fd = fd_;
     223       116322 :     op.start(token, this);
     224              : 
     225       116322 :     capy::mutable_buffer bufs[select_read_op::max_buffers];
     226       116322 :     op.iovec_count = static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
     227              : 
     228       116322 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     229              :     {
     230            1 :         op.empty_buffer_read = true;
     231            1 :         op.complete(0, 0);
     232            1 :         op.impl_ptr = shared_from_this();
     233            1 :         svc_.post(&op);
     234            1 :         return std::noop_coroutine();
     235              :     }
     236              : 
     237       232642 :     for (int i = 0; i < op.iovec_count; ++i)
     238              :     {
     239       116321 :         op.iovecs[i].iov_base = bufs[i].data();
     240       116321 :         op.iovecs[i].iov_len = bufs[i].size();
     241              :     }
     242              : 
     243       116321 :     ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
     244              : 
     245       116321 :     if (n > 0)
     246              :     {
     247       116117 :         op.complete(0, static_cast<std::size_t>(n));
     248       116117 :         op.impl_ptr = shared_from_this();
     249       116117 :         svc_.post(&op);
     250       116117 :         return std::noop_coroutine();
     251              :     }
     252              : 
     253          204 :     if (n == 0)
     254              :     {
     255            5 :         op.complete(0, 0);
     256            5 :         op.impl_ptr = shared_from_this();
     257            5 :         svc_.post(&op);
     258            5 :         return std::noop_coroutine();
     259              :     }
     260              : 
     261          199 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     262              :     {
     263          199 :         svc_.work_started();
     264          199 :         op.impl_ptr = shared_from_this();
     265              : 
     266              :         // Set registering BEFORE register_fd to close the race window where
     267              :         // reactor sees an event before we set registered.
     268          199 :         op.registered.store(select_registration_state::registering, std::memory_order_release);
     269          199 :         svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
     270              : 
     271              :         // Transition to registered. If this fails, reactor or cancel already
     272              :         // claimed the op (state is now unregistered), so we're done. However,
     273              :         // we must still deregister the fd because cancel's deregister_fd may
     274              :         // have run before our register_fd, leaving the fd orphaned.
     275          199 :         auto expected = select_registration_state::registering;
     276          199 :         if (!op.registered.compare_exchange_strong(
     277              :                 expected, select_registration_state::registered, std::memory_order_acq_rel))
     278              :         {
     279            0 :             svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
     280            0 :             return std::noop_coroutine();
     281              :         }
     282              : 
     283              :         // If cancelled was set before we registered, handle it now.
     284          199 :         if (op.cancelled.load(std::memory_order_acquire))
     285              :         {
     286            0 :             auto prev = op.registered.exchange(
     287              :                 select_registration_state::unregistered, std::memory_order_acq_rel);
     288            0 :             if (prev != select_registration_state::unregistered)
     289              :             {
     290            0 :                 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
     291            0 :                 op.impl_ptr = shared_from_this();
     292            0 :                 svc_.post(&op);
     293            0 :                 svc_.work_finished();
     294              :             }
     295              :         }
     296          199 :         return std::noop_coroutine();
     297              :     }
     298              : 
     299            0 :     op.complete(errno, 0);
     300            0 :     op.impl_ptr = shared_from_this();
     301            0 :     svc_.post(&op);
     302            0 :     return std::noop_coroutine();
     303              : }
     304              : 
     305              : std::coroutine_handle<>
     306       116200 : select_socket_impl::
     307              : write_some(
     308              :     std::coroutine_handle<> h,
     309              :     capy::executor_ref ex,
     310              :     io_buffer_param param,
     311              :     std::stop_token token,
     312              :     std::error_code* ec,
     313              :     std::size_t* bytes_out)
     314              : {
     315       116200 :     auto& op = wr_;
     316       116200 :     op.reset();
     317       116200 :     op.h = h;
     318       116200 :     op.ex = ex;
     319       116200 :     op.ec_out = ec;
     320       116200 :     op.bytes_out = bytes_out;
     321       116200 :     op.fd = fd_;
     322       116200 :     op.start(token, this);
     323              : 
     324       116200 :     capy::mutable_buffer bufs[select_write_op::max_buffers];
     325       116200 :     op.iovec_count = static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
     326              : 
     327       116200 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     328              :     {
     329            1 :         op.complete(0, 0);
     330            1 :         op.impl_ptr = shared_from_this();
     331            1 :         svc_.post(&op);
     332            1 :         return std::noop_coroutine();
     333              :     }
     334              : 
     335       232398 :     for (int i = 0; i < op.iovec_count; ++i)
     336              :     {
     337       116199 :         op.iovecs[i].iov_base = bufs[i].data();
     338       116199 :         op.iovecs[i].iov_len = bufs[i].size();
     339              :     }
     340              : 
     341       116199 :     msghdr msg{};
     342       116199 :     msg.msg_iov = op.iovecs;
     343       116199 :     msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
     344              : 
     345       116199 :     ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
     346              : 
     347       116199 :     if (n > 0)
     348              :     {
     349       116198 :         op.complete(0, static_cast<std::size_t>(n));
     350       116198 :         op.impl_ptr = shared_from_this();
     351       116198 :         svc_.post(&op);
     352       116198 :         return std::noop_coroutine();
     353              :     }
     354              : 
     355            1 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     356              :     {
     357            0 :         svc_.work_started();
     358            0 :         op.impl_ptr = shared_from_this();
     359              : 
     360              :         // Set registering BEFORE register_fd to close the race window where
     361              :         // reactor sees an event before we set registered.
     362            0 :         op.registered.store(select_registration_state::registering, std::memory_order_release);
     363            0 :         svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
     364              : 
     365              :         // Transition to registered. If this fails, reactor or cancel already
     366              :         // claimed the op (state is now unregistered), so we're done. However,
     367              :         // we must still deregister the fd because cancel's deregister_fd may
     368              :         // have run before our register_fd, leaving the fd orphaned.
     369            0 :         auto expected = select_registration_state::registering;
     370            0 :         if (!op.registered.compare_exchange_strong(
     371              :                 expected, select_registration_state::registered, std::memory_order_acq_rel))
     372              :         {
     373            0 :             svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
     374            0 :             return std::noop_coroutine();
     375              :         }
     376              : 
     377              :         // If cancelled was set before we registered, handle it now.
     378            0 :         if (op.cancelled.load(std::memory_order_acquire))
     379              :         {
     380            0 :             auto prev = op.registered.exchange(
     381              :                 select_registration_state::unregistered, std::memory_order_acq_rel);
     382            0 :             if (prev != select_registration_state::unregistered)
     383              :             {
     384            0 :                 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
     385            0 :                 op.impl_ptr = shared_from_this();
     386            0 :                 svc_.post(&op);
     387            0 :                 svc_.work_finished();
     388              :             }
     389              :         }
     390            0 :         return std::noop_coroutine();
     391              :     }
     392              : 
     393            1 :     op.complete(errno ? errno : EIO, 0);
     394            1 :     op.impl_ptr = shared_from_this();
     395            1 :     svc_.post(&op);
     396            1 :     return std::noop_coroutine();
     397              : }
     398              : 
     399              : std::error_code
     400            3 : select_socket_impl::
     401              : shutdown(tcp_socket::shutdown_type what) noexcept
     402              : {
     403              :     int how;
     404            3 :     switch (what)
     405              :     {
     406            1 :     case tcp_socket::shutdown_receive: how = SHUT_RD;   break;
     407            1 :     case tcp_socket::shutdown_send:    how = SHUT_WR;   break;
     408            1 :     case tcp_socket::shutdown_both:    how = SHUT_RDWR; break;
     409            0 :     default:
     410            0 :         return make_err(EINVAL);
     411              :     }
     412            3 :     if (::shutdown(fd_, how) != 0)
     413            0 :         return make_err(errno);
     414            3 :     return {};
     415              : }
     416              : 
     417              : std::error_code
     418            5 : select_socket_impl::
     419              : set_no_delay(bool value) noexcept
     420              : {
     421            5 :     int flag = value ? 1 : 0;
     422            5 :     if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
     423            0 :         return make_err(errno);
     424            5 :     return {};
     425              : }
     426              : 
     427              : bool
     428            5 : select_socket_impl::
     429              : no_delay(std::error_code& ec) const noexcept
     430              : {
     431            5 :     int flag = 0;
     432            5 :     socklen_t len = sizeof(flag);
     433            5 :     if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
     434              :     {
     435            0 :         ec = make_err(errno);
     436            0 :         return false;
     437              :     }
     438            5 :     ec = {};
     439            5 :     return flag != 0;
     440              : }
     441              : 
     442              : std::error_code
     443            4 : select_socket_impl::
     444              : set_keep_alive(bool value) noexcept
     445              : {
     446            4 :     int flag = value ? 1 : 0;
     447            4 :     if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
     448            0 :         return make_err(errno);
     449            4 :     return {};
     450              : }
     451              : 
     452              : bool
     453            4 : select_socket_impl::
     454              : keep_alive(std::error_code& ec) const noexcept
     455              : {
     456            4 :     int flag = 0;
     457            4 :     socklen_t len = sizeof(flag);
     458            4 :     if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
     459              :     {
     460            0 :         ec = make_err(errno);
     461            0 :         return false;
     462              :     }
     463            4 :     ec = {};
     464            4 :     return flag != 0;
     465              : }
     466              : 
     467              : std::error_code
     468            1 : select_socket_impl::
     469              : set_receive_buffer_size(int size) noexcept
     470              : {
     471            1 :     if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
     472            0 :         return make_err(errno);
     473            1 :     return {};
     474              : }
     475              : 
     476              : int
     477            3 : select_socket_impl::
     478              : receive_buffer_size(std::error_code& ec) const noexcept
     479              : {
     480            3 :     int size = 0;
     481            3 :     socklen_t len = sizeof(size);
     482            3 :     if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
     483              :     {
     484            0 :         ec = make_err(errno);
     485            0 :         return 0;
     486              :     }
     487            3 :     ec = {};
     488            3 :     return size;
     489              : }
     490              : 
     491              : std::error_code
     492            1 : select_socket_impl::
     493              : set_send_buffer_size(int size) noexcept
     494              : {
     495            1 :     if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
     496            0 :         return make_err(errno);
     497            1 :     return {};
     498              : }
     499              : 
     500              : int
     501            3 : select_socket_impl::
     502              : send_buffer_size(std::error_code& ec) const noexcept
     503              : {
     504            3 :     int size = 0;
     505            3 :     socklen_t len = sizeof(size);
     506            3 :     if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
     507              :     {
     508            0 :         ec = make_err(errno);
     509            0 :         return 0;
     510              :     }
     511            3 :     ec = {};
     512            3 :     return size;
     513              : }
     514              : 
     515              : std::error_code
     516            4 : select_socket_impl::
     517              : set_linger(bool enabled, int timeout) noexcept
     518              : {
     519            4 :     if (timeout < 0)
     520            1 :         return make_err(EINVAL);
     521              :     struct ::linger lg;
     522            3 :     lg.l_onoff = enabled ? 1 : 0;
     523            3 :     lg.l_linger = timeout;
     524            3 :     if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
     525            0 :         return make_err(errno);
     526            3 :     return {};
     527              : }
     528              : 
     529              : tcp_socket::linger_options
     530            3 : select_socket_impl::
     531              : linger(std::error_code& ec) const noexcept
     532              : {
     533            3 :     struct ::linger lg{};
     534            3 :     socklen_t len = sizeof(lg);
     535            3 :     if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
     536              :     {
     537            0 :         ec = make_err(errno);
     538            0 :         return {};
     539              :     }
     540            3 :     ec = {};
     541            3 :     return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
     542              : }
     543              : 
     544              : void
     545        11622 : select_socket_impl::
     546              : cancel() noexcept
     547              : {
     548        11622 :     std::shared_ptr<select_socket_impl> self;
     549              :     try {
     550        11622 :         self = shared_from_this();
     551            0 :     } catch (const std::bad_weak_ptr&) {
     552            0 :         return;
     553            0 :     }
     554              : 
     555        34866 :     auto cancel_op = [this, &self](select_op& op, int events) {
     556        34866 :         auto prev = op.registered.exchange(
     557              :             select_registration_state::unregistered, std::memory_order_acq_rel);
     558        34866 :         op.request_cancel();
     559        34866 :         if (prev != select_registration_state::unregistered)
     560              :         {
     561           51 :             svc_.scheduler().deregister_fd(fd_, events);
     562           51 :             op.impl_ptr = self;
     563           51 :             svc_.post(&op);
     564           51 :             svc_.work_finished();
     565              :         }
     566        46488 :     };
     567              : 
     568        11622 :     cancel_op(conn_, select_scheduler::event_write);
     569        11622 :     cancel_op(rd_, select_scheduler::event_read);
     570        11622 :     cancel_op(wr_, select_scheduler::event_write);
     571        11622 : }
     572              : 
     573              : void
     574           99 : select_socket_impl::
     575              : cancel_single_op(select_op& op) noexcept
     576              : {
     577              :     // Called from stop_token callback to cancel a specific pending operation.
     578           99 :     auto prev = op.registered.exchange(
     579              :         select_registration_state::unregistered, std::memory_order_acq_rel);
     580           99 :     op.request_cancel();
     581              : 
     582           99 :     if (prev != select_registration_state::unregistered)
     583              :     {
     584              :         // Determine which event type to deregister
     585           67 :         int events = 0;
     586           67 :         if (&op == &conn_ || &op == &wr_)
     587            0 :             events = select_scheduler::event_write;
     588           67 :         else if (&op == &rd_)
     589           67 :             events = select_scheduler::event_read;
     590              : 
     591           67 :         svc_.scheduler().deregister_fd(fd_, events);
     592              : 
     593              :         // Keep impl alive until op completes
     594              :         try {
     595           67 :             op.impl_ptr = shared_from_this();
     596            0 :         } catch (const std::bad_weak_ptr&) {
     597              :             // Impl is being destroyed, op will be orphaned but that's ok
     598            0 :         }
     599              : 
     600           67 :         svc_.post(&op);
     601           67 :         svc_.work_finished();
     602              :     }
     603           99 : }
     604              : 
     605              : void
     606        11526 : select_socket_impl::
     607              : close_socket() noexcept
     608              : {
     609        11526 :     cancel();
     610              : 
     611        11526 :     if (fd_ >= 0)
     612              :     {
     613              :         // Unconditionally remove from registered_fds_ to handle edge cases
     614              :         // where the fd might be registered but cancel() didn't clean it up
     615              :         // due to race conditions.
     616         7680 :         svc_.scheduler().deregister_fd(fd_,
     617              :             select_scheduler::event_read | select_scheduler::event_write);
     618         7680 :         ::close(fd_);
     619         7680 :         fd_ = -1;
     620              :     }
     621              : 
     622              :     // Clear cached endpoints
     623        11526 :     local_endpoint_ = endpoint{};
     624        11526 :     remote_endpoint_ = endpoint{};
     625        11526 : }
     626              : 
     627          120 : select_socket_service::
     628          120 : select_socket_service(capy::execution_context& ctx)
     629          120 :     : state_(std::make_unique<select_socket_state>(ctx.use_service<select_scheduler>()))
     630              : {
     631          120 : }
     632              : 
     633          240 : select_socket_service::
     634          120 : ~select_socket_service()
     635              : {
     636          240 : }
     637              : 
     638              : void
     639          120 : select_socket_service::
     640              : shutdown()
     641              : {
     642          120 :     std::lock_guard lock(state_->mutex_);
     643              : 
     644          120 :     while (auto* impl = state_->socket_list_.pop_front())
     645            0 :         impl->close_socket();
     646              : 
     647          120 :     state_->socket_ptrs_.clear();
     648          120 : }
     649              : 
     650              : tcp_socket::socket_impl&
     651         7680 : select_socket_service::
     652              : create_impl()
     653              : {
     654         7680 :     auto impl = std::make_shared<select_socket_impl>(*this);
     655         7680 :     auto* raw = impl.get();
     656              : 
     657              :     {
     658         7680 :         std::lock_guard lock(state_->mutex_);
     659         7680 :         state_->socket_list_.push_back(raw);
     660         7680 :         state_->socket_ptrs_.emplace(raw, std::move(impl));
     661         7680 :     }
     662              : 
     663         7680 :     return *raw;
     664         7680 : }
     665              : 
     666              : void
     667         7680 : select_socket_service::
     668              : destroy_impl(tcp_socket::socket_impl& impl)
     669              : {
     670         7680 :     auto* select_impl = static_cast<select_socket_impl*>(&impl);
     671         7680 :     std::lock_guard lock(state_->mutex_);
     672         7680 :     state_->socket_list_.remove(select_impl);
     673         7680 :     state_->socket_ptrs_.erase(select_impl);
     674         7680 : }
     675              : 
     676              : std::error_code
     677         3846 : select_socket_service::
     678              : open_socket(tcp_socket::socket_impl& impl)
     679              : {
     680         3846 :     auto* select_impl = static_cast<select_socket_impl*>(&impl);
     681         3846 :     select_impl->close_socket();
     682              : 
     683         3846 :     int fd = ::socket(AF_INET, SOCK_STREAM, 0);
     684         3846 :     if (fd < 0)
     685            0 :         return make_err(errno);
     686              : 
     687              :     // Set non-blocking and close-on-exec
     688         3846 :     int flags = ::fcntl(fd, F_GETFL, 0);
     689         3846 :     if (flags == -1)
     690              :     {
     691            0 :         int errn = errno;
     692            0 :         ::close(fd);
     693            0 :         return make_err(errn);
     694              :     }
     695         3846 :     if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
     696              :     {
     697            0 :         int errn = errno;
     698            0 :         ::close(fd);
     699            0 :         return make_err(errn);
     700              :     }
     701         3846 :     if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
     702              :     {
     703            0 :         int errn = errno;
     704            0 :         ::close(fd);
     705            0 :         return make_err(errn);
     706              :     }
     707              : 
     708              :     // Check fd is within select() limits
     709         3846 :     if (fd >= FD_SETSIZE)
     710              :     {
     711            0 :         ::close(fd);
     712            0 :         return make_err(EMFILE);  // Too many open files
     713              :     }
     714              : 
     715         3846 :     select_impl->fd_ = fd;
     716         3846 :     return {};
     717              : }
     718              : 
     719              : void
     720       232441 : select_socket_service::
     721              : post(select_op* op)
     722              : {
     723       232441 :     state_->sched_.post(op);
     724       232441 : }
     725              : 
     726              : void
     727         4034 : select_socket_service::
     728              : work_started() noexcept
     729              : {
     730         4034 :     state_->sched_.work_started();
     731         4034 : }
     732              : 
     733              : void
     734          118 : select_socket_service::
     735              : work_finished() noexcept
     736              : {
     737          118 :     state_->sched_.work_finished();
     738          118 : }
     739              : 
     740              : } // namespace boost::corosio::detail
     741              : 
     742              : #endif // BOOST_COROSIO_HAS_SELECT
        

Generated by: LCOV version 2.3