LCOV - code coverage report
Current view: top level - src/detail/epoll - sockets.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 72.9 % 439 320
Test Date: 2026-02-06 05:04:16 Functions: 92.1 % 38 35

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Steve Gerbino
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #include <boost/corosio/detail/platform.hpp>
      11              : 
      12              : #if BOOST_COROSIO_HAS_EPOLL
      13              : 
      14              : #include "src/detail/epoll/sockets.hpp"
      15              : #include "src/detail/endpoint_convert.hpp"
      16              : #include "src/detail/make_err.hpp"
      17              : #include "src/detail/resume_coro.hpp"
      18              : 
      19              : #include <boost/corosio/detail/except.hpp>
      20              : #include <boost/capy/buffers.hpp>
      21              : 
      22              : #include <utility>
      23              : 
      24              : #include <errno.h>
      25              : #include <netinet/in.h>
      26              : #include <netinet/tcp.h>
      27              : #include <sys/epoll.h>
      28              : #include <sys/socket.h>
      29              : #include <unistd.h>
      30              : 
      31              : namespace boost::corosio::detail {
      32              : 
      33              : void
      34          105 : epoll_op::canceller::
      35              : operator()() const noexcept
      36              : {
      37          105 :     op->cancel();
      38          105 : }
      39              : 
      40              : void
      41            0 : epoll_connect_op::
      42              : cancel() noexcept
      43              : {
      44            0 :     if (socket_impl_)
      45            0 :         socket_impl_->cancel_single_op(*this);
      46              :     else
      47            0 :         request_cancel();
      48            0 : }
      49              : 
      50              : void
      51           99 : epoll_read_op::
      52              : cancel() noexcept
      53              : {
      54           99 :     if (socket_impl_)
      55           99 :         socket_impl_->cancel_single_op(*this);
      56              :     else
      57            0 :         request_cancel();
      58           99 : }
      59              : 
      60              : void
      61            0 : epoll_write_op::
      62              : cancel() noexcept
      63              : {
      64            0 :     if (socket_impl_)
      65            0 :         socket_impl_->cancel_single_op(*this);
      66              :     else
      67            0 :         request_cancel();
      68            0 : }
      69              : 
      70              : void
      71         5018 : epoll_connect_op::
      72              : operator()()
      73              : {
      74         5018 :     stop_cb.reset();
      75              : 
      76         5018 :     bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
      77              : 
      78              :     // Cache endpoints on successful connect
      79         5018 :     if (success && socket_impl_)
      80              :     {
      81              :         // Query local endpoint via getsockname (may fail, but remote is always known)
      82         5017 :         endpoint local_ep;
      83         5017 :         sockaddr_in local_addr{};
      84         5017 :         socklen_t local_len = sizeof(local_addr);
      85         5017 :         if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
      86         5017 :             local_ep = from_sockaddr_in(local_addr);
      87              :         // Always cache remote endpoint; local may be default if getsockname failed
      88         5017 :         static_cast<epoll_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
      89              :     }
      90              : 
      91         5018 :     if (ec_out)
      92              :     {
      93         5018 :         if (cancelled.load(std::memory_order_acquire))
      94            0 :             *ec_out = capy::error::canceled;
      95         5018 :         else if (errn != 0)
      96            1 :             *ec_out = make_err(errn);
      97              :         else
      98         5017 :             *ec_out = {};
      99              :     }
     100              : 
     101         5018 :     if (bytes_out)
     102            0 :         *bytes_out = bytes_transferred;
     103              : 
     104              :     // Move to stack before resuming. See epoll_op::operator()() for rationale.
     105         5018 :     capy::executor_ref saved_ex( std::move( ex ) );
     106         5018 :     capy::coro saved_h( std::move( h ) );
     107         5018 :     auto prevent_premature_destruction = std::move(impl_ptr);
     108         5018 :     resume_coro(saved_ex, saved_h);
     109         5018 : }
     110              : 
     111        10046 : epoll_socket_impl::
     112        10046 : epoll_socket_impl(epoll_socket_service& svc) noexcept
     113        10046 :     : svc_(svc)
     114              : {
     115        10046 : }
     116              : 
     117        10046 : epoll_socket_impl::
     118              : ~epoll_socket_impl() = default;
     119              : 
     120              : void
     121        10046 : epoll_socket_impl::
     122              : release()
     123              : {
     124        10046 :     close_socket();
     125        10046 :     svc_.destroy_impl(*this);
     126        10046 : }
     127              : 
     128              : std::coroutine_handle<>
     129         5018 : epoll_socket_impl::
     130              : connect(
     131              :     std::coroutine_handle<> h,
     132              :     capy::executor_ref ex,
     133              :     endpoint ep,
     134              :     std::stop_token token,
     135              :     std::error_code* ec)
     136              : {
     137         5018 :     auto& op = conn_;
     138         5018 :     op.reset();
     139         5018 :     op.h = h;
     140         5018 :     op.ex = ex;
     141         5018 :     op.ec_out = ec;
     142         5018 :     op.fd = fd_;
     143         5018 :     op.target_endpoint = ep;  // Store target for endpoint caching
     144         5018 :     op.start(token, this);
     145              : 
     146         5018 :     sockaddr_in addr = detail::to_sockaddr_in(ep);
     147         5018 :     int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
     148              : 
     149         5018 :     if (result == 0)
     150              :     {
     151              :         // Sync success - cache endpoints immediately
     152              :         // Remote is always known; local may fail but we still cache remote
     153            0 :         sockaddr_in local_addr{};
     154            0 :         socklen_t local_len = sizeof(local_addr);
     155            0 :         if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
     156            0 :             local_endpoint_ = detail::from_sockaddr_in(local_addr);
     157            0 :         remote_endpoint_ = ep;
     158              : 
     159            0 :         op.complete(0, 0);
     160            0 :         op.impl_ptr = shared_from_this();
     161            0 :         svc_.post(&op);
     162              :         // completion is always posted to scheduler queue, never inline.
     163            0 :         return std::noop_coroutine();
     164              :     }
     165              : 
     166         5018 :     if (errno == EINPROGRESS)
     167              :     {
     168         5018 :         svc_.work_started();
     169         5018 :         op.impl_ptr = shared_from_this();
     170              : 
     171         5018 :         bool perform_now = false;
     172              :         {
     173         5018 :             std::lock_guard lock(desc_state_.mutex);
     174         5018 :             if (desc_state_.write_ready)
     175              :             {
     176            0 :                 desc_state_.write_ready = false;
     177            0 :                 perform_now = true;
     178              :             }
     179              :             else
     180              :             {
     181         5018 :                 desc_state_.connect_op = &op;
     182              :             }
     183         5018 :         }
     184              : 
     185         5018 :         if (perform_now)
     186              :         {
     187            0 :             op.perform_io();
     188            0 :             if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
     189              :             {
     190            0 :                 op.errn = 0;
     191            0 :                 std::lock_guard lock(desc_state_.mutex);
     192            0 :                 desc_state_.connect_op = &op;
     193            0 :             }
     194              :             else
     195              :             {
     196            0 :                 svc_.post(&op);
     197            0 :                 svc_.work_finished();
     198              :             }
     199            0 :             return std::noop_coroutine();
     200              :         }
     201              : 
     202         5018 :         if (op.cancelled.load(std::memory_order_acquire))
     203              :         {
     204            0 :             epoll_op* claimed = nullptr;
     205              :             {
     206            0 :                 std::lock_guard lock(desc_state_.mutex);
     207            0 :                 if (desc_state_.connect_op == &op)
     208            0 :                     claimed = std::exchange(desc_state_.connect_op, nullptr);
     209            0 :             }
     210            0 :             if (claimed)
     211              :             {
     212            0 :                 svc_.post(claimed);
     213            0 :                 svc_.work_finished();
     214              :             }
     215              :         }
     216              :         // completion is always posted to scheduler queue, never inline.
     217         5018 :         return std::noop_coroutine();
     218              :     }
     219              : 
     220            0 :     op.complete(errno, 0);
     221            0 :     op.impl_ptr = shared_from_this();
     222            0 :     svc_.post(&op);
     223              :     // completion is always posted to scheduler queue, never inline.
     224            0 :     return std::noop_coroutine();
     225              : }
     226              : 
     227              : void
     228       115179 : epoll_socket_impl::
     229              : do_read_io()
     230              : {
     231       115179 :     auto& op = rd_;
     232              : 
     233       115179 :     ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
     234              : 
     235       115179 :     if (n > 0)
     236              :     {
     237              :         {
     238       115003 :             std::lock_guard lock(desc_state_.mutex);
     239       115003 :             desc_state_.read_ready = false;
     240       115003 :         }
     241       115003 :         op.complete(0, static_cast<std::size_t>(n));
     242       115003 :         svc_.post(&op);
     243       115003 :         return;
     244              :     }
     245              : 
     246          176 :     if (n == 0)
     247              :     {
     248              :         {
     249            5 :             std::lock_guard lock(desc_state_.mutex);
     250            5 :             desc_state_.read_ready = false;
     251            5 :         }
     252            5 :         op.complete(0, 0);
     253            5 :         svc_.post(&op);
     254            5 :         return;
     255              :     }
     256              : 
     257          171 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     258              :     {
     259          171 :         svc_.work_started();
     260              : 
     261          171 :         bool perform_now = false;
     262              :         {
     263          171 :             std::lock_guard lock(desc_state_.mutex);
     264          171 :             if (desc_state_.read_ready)
     265              :             {
     266           46 :                 desc_state_.read_ready = false;
     267           46 :                 perform_now = true;
     268              :             }
     269              :             else
     270              :             {
     271          125 :                 desc_state_.read_op = &op;
     272              :             }
     273          171 :         }
     274              : 
     275          171 :         if (perform_now)
     276              :         {
     277           46 :             op.perform_io();
     278           46 :             if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
     279              :             {
     280           46 :                 op.errn = 0;
     281           46 :                 std::lock_guard lock(desc_state_.mutex);
     282           46 :                 desc_state_.read_op = &op;
     283           46 :             }
     284              :             else
     285              :             {
     286            0 :                 svc_.post(&op);
     287            0 :                 svc_.work_finished();
     288              :             }
     289           46 :             return;
     290              :         }
     291              : 
     292          125 :         if (op.cancelled.load(std::memory_order_acquire))
     293              :         {
     294            0 :             epoll_op* claimed = nullptr;
     295              :             {
     296            0 :                 std::lock_guard lock(desc_state_.mutex);
     297            0 :                 if (desc_state_.read_op == &op)
     298            0 :                     claimed = std::exchange(desc_state_.read_op, nullptr);
     299            0 :             }
     300            0 :             if (claimed)
     301              :             {
     302            0 :                 svc_.post(claimed);
     303            0 :                 svc_.work_finished();
     304              :             }
     305              :         }
     306          125 :         return;
     307              :     }
     308              : 
     309            0 :     op.complete(errno, 0);
     310            0 :     svc_.post(&op);
     311              : }
     312              : 
     313              : void
     314       115056 : epoll_socket_impl::
     315              : do_write_io()
     316              : {
     317       115056 :     auto& op = wr_;
     318              : 
     319       115056 :     msghdr msg{};
     320       115056 :     msg.msg_iov = op.iovecs;
     321       115056 :     msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
     322              : 
     323       115056 :     ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
     324              : 
     325       115056 :     if (n > 0)
     326              :     {
     327              :         {
     328       115055 :             std::lock_guard lock(desc_state_.mutex);
     329       115055 :             desc_state_.write_ready = false;
     330       115055 :         }
     331       115055 :         op.complete(0, static_cast<std::size_t>(n));
     332       115055 :         svc_.post(&op);
     333       115055 :         return;
     334              :     }
     335              : 
     336            1 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     337              :     {
     338            0 :         svc_.work_started();
     339              : 
     340            0 :         bool perform_now = false;
     341              :         {
     342            0 :             std::lock_guard lock(desc_state_.mutex);
     343            0 :             if (desc_state_.write_ready)
     344              :             {
     345            0 :                 desc_state_.write_ready = false;
     346            0 :                 perform_now = true;
     347              :             }
     348              :             else
     349              :             {
     350            0 :                 desc_state_.write_op = &op;
     351              :             }
     352            0 :         }
     353              : 
     354            0 :         if (perform_now)
     355              :         {
     356            0 :             op.perform_io();
     357            0 :             if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
     358              :             {
     359            0 :                 op.errn = 0;
     360            0 :                 std::lock_guard lock(desc_state_.mutex);
     361            0 :                 desc_state_.write_op = &op;
     362            0 :             }
     363              :             else
     364              :             {
     365            0 :                 svc_.post(&op);
     366            0 :                 svc_.work_finished();
     367              :             }
     368            0 :             return;
     369              :         }
     370              : 
     371            0 :         if (op.cancelled.load(std::memory_order_acquire))
     372              :         {
     373            0 :             epoll_op* claimed = nullptr;
     374              :             {
     375            0 :                 std::lock_guard lock(desc_state_.mutex);
     376            0 :                 if (desc_state_.write_op == &op)
     377            0 :                     claimed = std::exchange(desc_state_.write_op, nullptr);
     378            0 :             }
     379            0 :             if (claimed)
     380              :             {
     381            0 :                 svc_.post(claimed);
     382            0 :                 svc_.work_finished();
     383              :             }
     384              :         }
     385            0 :         return;
     386              :     }
     387              : 
     388            1 :     op.complete(errno ? errno : EIO, 0);
     389            1 :     svc_.post(&op);
     390              : }
     391              : 
     392              : std::coroutine_handle<>
     393       115180 : epoll_socket_impl::
     394              : read_some(
     395              :     std::coroutine_handle<> h,
     396              :     capy::executor_ref ex,
     397              :     io_buffer_param param,
     398              :     std::stop_token token,
     399              :     std::error_code* ec,
     400              :     std::size_t* bytes_out)
     401              : {
     402       115180 :     auto& op = rd_;
     403       115180 :     op.reset();
     404       115180 :     op.h = h;
     405       115180 :     op.ex = ex;
     406       115180 :     op.ec_out = ec;
     407       115180 :     op.bytes_out = bytes_out;
     408       115180 :     op.fd = fd_;
     409       115180 :     op.start(token, this);
     410       115180 :     op.impl_ptr = shared_from_this();
     411              : 
     412              :     // Must prepare buffers before initiator runs
     413       115180 :     capy::mutable_buffer bufs[epoll_read_op::max_buffers];
     414       115180 :     op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
     415              : 
     416       115180 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     417              :     {
     418            1 :         op.empty_buffer_read = true;
     419            1 :         op.complete(0, 0);
     420            1 :         svc_.post(&op);
     421            1 :         return std::noop_coroutine();
     422              :     }
     423              : 
     424       230358 :     for (int i = 0; i < op.iovec_count; ++i)
     425              :     {
     426       115179 :         op.iovecs[i].iov_base = bufs[i].data();
     427       115179 :         op.iovecs[i].iov_len = bufs[i].size();
     428              :     }
     429              : 
     430              :     // Symmetric transfer ensures caller is suspended before I/O starts
     431       115179 :     return read_initiator_.start<&epoll_socket_impl::do_read_io>(this);
     432              : }
     433              : 
     434              : std::coroutine_handle<>
     435       115057 : epoll_socket_impl::
     436              : write_some(
     437              :     std::coroutine_handle<> h,
     438              :     capy::executor_ref ex,
     439              :     io_buffer_param param,
     440              :     std::stop_token token,
     441              :     std::error_code* ec,
     442              :     std::size_t* bytes_out)
     443              : {
     444       115057 :     auto& op = wr_;
     445       115057 :     op.reset();
     446       115057 :     op.h = h;
     447       115057 :     op.ex = ex;
     448       115057 :     op.ec_out = ec;
     449       115057 :     op.bytes_out = bytes_out;
     450       115057 :     op.fd = fd_;
     451       115057 :     op.start(token, this);
     452       115057 :     op.impl_ptr = shared_from_this();
     453              : 
     454              :     // Must prepare buffers before initiator runs
     455       115057 :     capy::mutable_buffer bufs[epoll_write_op::max_buffers];
     456       115057 :     op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
     457              : 
     458       115057 :     if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
     459              :     {
     460            1 :         op.complete(0, 0);
     461            1 :         svc_.post(&op);
     462            1 :         return std::noop_coroutine();
     463              :     }
     464              : 
     465       230112 :     for (int i = 0; i < op.iovec_count; ++i)
     466              :     {
     467       115056 :         op.iovecs[i].iov_base = bufs[i].data();
     468       115056 :         op.iovecs[i].iov_len = bufs[i].size();
     469              :     }
     470              : 
     471              :     // Symmetric transfer ensures caller is suspended before I/O starts
     472       115056 :     return write_initiator_.start<&epoll_socket_impl::do_write_io>(this);
     473              : }
     474              : 
     475              : std::error_code
     476            3 : epoll_socket_impl::
     477              : shutdown(tcp_socket::shutdown_type what) noexcept
     478              : {
     479              :     int how;
     480            3 :     switch (what)
     481              :     {
     482            1 :     case tcp_socket::shutdown_receive: how = SHUT_RD;   break;
     483            1 :     case tcp_socket::shutdown_send:    how = SHUT_WR;   break;
     484            1 :     case tcp_socket::shutdown_both:    how = SHUT_RDWR; break;
     485            0 :     default:
     486            0 :         return make_err(EINVAL);
     487              :     }
     488            3 :     if (::shutdown(fd_, how) != 0)
     489            0 :         return make_err(errno);
     490            3 :     return {};
     491              : }
     492              : 
     493              : std::error_code
     494            5 : epoll_socket_impl::
     495              : set_no_delay(bool value) noexcept
     496              : {
     497            5 :     int flag = value ? 1 : 0;
     498            5 :     if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
     499            0 :         return make_err(errno);
     500            5 :     return {};
     501              : }
     502              : 
     503              : bool
     504            5 : epoll_socket_impl::
     505              : no_delay(std::error_code& ec) const noexcept
     506              : {
     507            5 :     int flag = 0;
     508            5 :     socklen_t len = sizeof(flag);
     509            5 :     if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
     510              :     {
     511            0 :         ec = make_err(errno);
     512            0 :         return false;
     513              :     }
     514            5 :     ec = {};
     515            5 :     return flag != 0;
     516              : }
     517              : 
     518              : std::error_code
     519            4 : epoll_socket_impl::
     520              : set_keep_alive(bool value) noexcept
     521              : {
     522            4 :     int flag = value ? 1 : 0;
     523            4 :     if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
     524            0 :         return make_err(errno);
     525            4 :     return {};
     526              : }
     527              : 
     528              : bool
     529            4 : epoll_socket_impl::
     530              : keep_alive(std::error_code& ec) const noexcept
     531              : {
     532            4 :     int flag = 0;
     533            4 :     socklen_t len = sizeof(flag);
     534            4 :     if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
     535              :     {
     536            0 :         ec = make_err(errno);
     537            0 :         return false;
     538              :     }
     539            4 :     ec = {};
     540            4 :     return flag != 0;
     541              : }
     542              : 
     543              : std::error_code
     544            1 : epoll_socket_impl::
     545              : set_receive_buffer_size(int size) noexcept
     546              : {
     547            1 :     if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
     548            0 :         return make_err(errno);
     549            1 :     return {};
     550              : }
     551              : 
     552              : int
     553            3 : epoll_socket_impl::
     554              : receive_buffer_size(std::error_code& ec) const noexcept
     555              : {
     556            3 :     int size = 0;
     557            3 :     socklen_t len = sizeof(size);
     558            3 :     if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
     559              :     {
     560            0 :         ec = make_err(errno);
     561            0 :         return 0;
     562              :     }
     563            3 :     ec = {};
     564            3 :     return size;
     565              : }
     566              : 
     567              : std::error_code
     568            1 : epoll_socket_impl::
     569              : set_send_buffer_size(int size) noexcept
     570              : {
     571            1 :     if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
     572            0 :         return make_err(errno);
     573            1 :     return {};
     574              : }
     575              : 
     576              : int
     577            3 : epoll_socket_impl::
     578              : send_buffer_size(std::error_code& ec) const noexcept
     579              : {
     580            3 :     int size = 0;
     581            3 :     socklen_t len = sizeof(size);
     582            3 :     if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
     583              :     {
     584            0 :         ec = make_err(errno);
     585            0 :         return 0;
     586              :     }
     587            3 :     ec = {};
     588            3 :     return size;
     589              : }
     590              : 
     591              : std::error_code
     592            4 : epoll_socket_impl::
     593              : set_linger(bool enabled, int timeout) noexcept
     594              : {
     595            4 :     if (timeout < 0)
     596            1 :         return make_err(EINVAL);
     597              :     struct ::linger lg;
     598            3 :     lg.l_onoff = enabled ? 1 : 0;
     599            3 :     lg.l_linger = timeout;
     600            3 :     if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
     601            0 :         return make_err(errno);
     602            3 :     return {};
     603              : }
     604              : 
     605              : tcp_socket::linger_options
     606            3 : epoll_socket_impl::
     607              : linger(std::error_code& ec) const noexcept
     608              : {
     609            3 :     struct ::linger lg{};
     610            3 :     socklen_t len = sizeof(lg);
     611            3 :     if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
     612              :     {
     613            0 :         ec = make_err(errno);
     614            0 :         return {};
     615              :     }
     616            3 :     ec = {};
     617            3 :     return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
     618              : }
     619              : 
     620              : void
     621        15173 : epoll_socket_impl::
     622              : cancel() noexcept
     623              : {
     624        15173 :     std::shared_ptr<epoll_socket_impl> self;
     625              :     try {
     626        15173 :         self = shared_from_this();
     627            0 :     } catch (const std::bad_weak_ptr&) {
     628            0 :         return;
     629            0 :     }
     630              : 
     631        15173 :     conn_.request_cancel();
     632        15173 :     rd_.request_cancel();
     633        15173 :     wr_.request_cancel();
     634              : 
     635        15173 :     epoll_op* conn_claimed = nullptr;
     636        15173 :     epoll_op* rd_claimed = nullptr;
     637        15173 :     epoll_op* wr_claimed = nullptr;
     638              :     {
     639        15173 :         std::lock_guard lock(desc_state_.mutex);
     640        15173 :         if (desc_state_.connect_op == &conn_)
     641            0 :             conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
     642        15173 :         if (desc_state_.read_op == &rd_)
     643           52 :             rd_claimed = std::exchange(desc_state_.read_op, nullptr);
     644        15173 :         if (desc_state_.write_op == &wr_)
     645            0 :             wr_claimed = std::exchange(desc_state_.write_op, nullptr);
     646        15173 :     }
     647              : 
     648        15173 :     if (conn_claimed)
     649              :     {
     650            0 :         conn_.impl_ptr = self;
     651            0 :         svc_.post(&conn_);
     652            0 :         svc_.work_finished();
     653              :     }
     654        15173 :     if (rd_claimed)
     655              :     {
     656           52 :         rd_.impl_ptr = self;
     657           52 :         svc_.post(&rd_);
     658           52 :         svc_.work_finished();
     659              :     }
     660        15173 :     if (wr_claimed)
     661              :     {
     662            0 :         wr_.impl_ptr = self;
     663            0 :         svc_.post(&wr_);
     664            0 :         svc_.work_finished();
     665              :     }
     666        15173 : }
     667              : 
     668              : void
     669           99 : epoll_socket_impl::
     670              : cancel_single_op(epoll_op& op) noexcept
     671              : {
     672           99 :     op.request_cancel();
     673              : 
     674           99 :     epoll_op** desc_op_ptr = nullptr;
     675           99 :     if (&op == &conn_) desc_op_ptr = &desc_state_.connect_op;
     676           99 :     else if (&op == &rd_) desc_op_ptr = &desc_state_.read_op;
     677            0 :     else if (&op == &wr_) desc_op_ptr = &desc_state_.write_op;
     678              : 
     679           99 :     if (desc_op_ptr)
     680              :     {
     681           99 :         epoll_op* claimed = nullptr;
     682              :         {
     683           99 :             std::lock_guard lock(desc_state_.mutex);
     684           99 :             if (*desc_op_ptr == &op)
     685           67 :                 claimed = std::exchange(*desc_op_ptr, nullptr);
     686           99 :         }
     687           99 :         if (claimed)
     688              :         {
     689              :             try {
     690           67 :                 op.impl_ptr = shared_from_this();
     691            0 :             } catch (const std::bad_weak_ptr&) {}
     692           67 :             svc_.post(&op);
     693           67 :             svc_.work_finished();
     694              :         }
     695              :     }
     696           99 : }
     697              : 
     698              : void
     699        15075 : epoll_socket_impl::
     700              : close_socket() noexcept
     701              : {
     702        15075 :     cancel();
     703              : 
     704              :     // Keep impl alive if descriptor_state is queued in the scheduler.
     705              :     // Without this, destroy_impl() drops the last shared_ptr while
     706              :     // the queued descriptor_state node would become dangling.
     707        15075 :     if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
     708              :     {
     709              :         try {
     710            4 :             desc_state_.impl_ref_ = shared_from_this();
     711            0 :         } catch (std::bad_weak_ptr const&) {}
     712              :     }
     713              : 
     714        15075 :     if (fd_ >= 0)
     715              :     {
     716        10046 :         if (desc_state_.registered_events != 0)
     717        10046 :             svc_.scheduler().deregister_descriptor(fd_);
     718        10046 :         ::close(fd_);
     719        10046 :         fd_ = -1;
     720              :     }
     721              : 
     722        15075 :     desc_state_.fd = -1;
     723              :     {
     724        15075 :         std::lock_guard lock(desc_state_.mutex);
     725        15075 :         desc_state_.read_op = nullptr;
     726        15075 :         desc_state_.write_op = nullptr;
     727        15075 :         desc_state_.connect_op = nullptr;
     728        15075 :         desc_state_.read_ready = false;
     729        15075 :         desc_state_.write_ready = false;
     730        15075 :     }
     731        15075 :     desc_state_.registered_events = 0;
     732              : 
     733        15075 :     local_endpoint_ = endpoint{};
     734        15075 :     remote_endpoint_ = endpoint{};
     735        15075 : }
     736              : 
     737          189 : epoll_socket_service::
     738          189 : epoll_socket_service(capy::execution_context& ctx)
     739          189 :     : state_(std::make_unique<epoll_socket_state>(ctx.use_service<epoll_scheduler>()))
     740              : {
     741          189 : }
     742              : 
     743          378 : epoll_socket_service::
     744          189 : ~epoll_socket_service()
     745              : {
     746          378 : }
     747              : 
     748              : void
     749          189 : epoll_socket_service::
     750              : shutdown()
     751              : {
     752          189 :     std::lock_guard lock(state_->mutex_);
     753              : 
     754          189 :     while (auto* impl = state_->socket_list_.pop_front())
     755            0 :         impl->close_socket();
     756              : 
     757          189 :     state_->socket_ptrs_.clear();
     758          189 : }
     759              : 
     760              : tcp_socket::socket_impl&
     761        10046 : epoll_socket_service::
     762              : create_impl()
     763              : {
     764        10046 :     auto impl = std::make_shared<epoll_socket_impl>(*this);
     765        10046 :     auto* raw = impl.get();
     766              : 
     767              :     {
     768        10046 :         std::lock_guard lock(state_->mutex_);
     769        10046 :         state_->socket_list_.push_back(raw);
     770        10046 :         state_->socket_ptrs_.emplace(raw, std::move(impl));
     771        10046 :     }
     772              : 
     773        10046 :     return *raw;
     774        10046 : }
     775              : 
     776              : void
     777        10046 : epoll_socket_service::
     778              : destroy_impl(tcp_socket::socket_impl& impl)
     779              : {
     780        10046 :     auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
     781        10046 :     std::lock_guard lock(state_->mutex_);
     782        10046 :     state_->socket_list_.remove(epoll_impl);
     783        10046 :     state_->socket_ptrs_.erase(epoll_impl);
     784        10046 : }
     785              : 
     786              : std::error_code
     787         5029 : epoll_socket_service::
     788              : open_socket(tcp_socket::socket_impl& impl)
     789              : {
     790         5029 :     auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
     791         5029 :     epoll_impl->close_socket();
     792              : 
     793         5029 :     int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
     794         5029 :     if (fd < 0)
     795            0 :         return make_err(errno);
     796              : 
     797         5029 :     epoll_impl->fd_ = fd;
     798              : 
     799              :     // Register fd with epoll (edge-triggered mode)
     800         5029 :     epoll_impl->desc_state_.fd = fd;
     801              :     {
     802         5029 :         std::lock_guard lock(epoll_impl->desc_state_.mutex);
     803         5029 :         epoll_impl->desc_state_.read_op = nullptr;
     804         5029 :         epoll_impl->desc_state_.write_op = nullptr;
     805         5029 :         epoll_impl->desc_state_.connect_op = nullptr;
     806         5029 :     }
     807         5029 :     scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
     808              : 
     809         5029 :     return {};
     810              : }
     811              : 
     812              : void
     813       230185 : epoll_socket_service::
     814              : post(epoll_op* op)
     815              : {
     816       230185 :     state_->sched_.post(op);
     817       230185 : }
     818              : 
     819              : void
     820         5189 : epoll_socket_service::
     821              : work_started() noexcept
     822              : {
     823         5189 :     state_->sched_.work_started();
     824         5189 : }
     825              : 
     826              : void
     827          119 : epoll_socket_service::
     828              : work_finished() noexcept
     829              : {
     830          119 :     state_->sched_.work_finished();
     831          119 : }
     832              : 
     833              : } // namespace boost::corosio::detail
     834              : 
     835              : #endif // BOOST_COROSIO_HAS_EPOLL
        

Generated by: LCOV version 2.3