LCOV - code coverage report
Current view: top level - src/detail/epoll - acceptors.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 79.6 % 245 195
Test Date: 2026-02-06 05:04:16 Functions: 100.0 % 19 19

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Steve Gerbino
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #include <boost/corosio/detail/platform.hpp>
      11              : 
      12              : #if BOOST_COROSIO_HAS_EPOLL
      13              : 
      14              : #include "src/detail/epoll/acceptors.hpp"
      15              : #include "src/detail/epoll/sockets.hpp"
      16              : #include "src/detail/endpoint_convert.hpp"
      17              : #include "src/detail/make_err.hpp"
      18              : 
      19              : #include <utility>
      20              : 
      21              : #include <errno.h>
      22              : #include <netinet/in.h>
      23              : #include <sys/epoll.h>
      24              : #include <sys/socket.h>
      25              : #include <unistd.h>
      26              : 
      27              : namespace boost::corosio::detail {
      28              : 
      29              : void
      30            6 : epoll_accept_op::
      31              : cancel() noexcept
      32              : {
      33            6 :     if (acceptor_impl_)
      34            6 :         acceptor_impl_->cancel_single_op(*this);
      35              :     else
      36            0 :         request_cancel();
      37            6 : }
      38              : 
      39              : void
      40         5026 : epoll_accept_op::
      41              : operator()()
      42              : {
      43         5026 :     stop_cb.reset();
      44              : 
      45         5026 :     bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
      46              : 
      47         5026 :     if (ec_out)
      48              :     {
      49         5026 :         if (cancelled.load(std::memory_order_acquire))
      50            9 :             *ec_out = capy::error::canceled;
      51         5017 :         else if (errn != 0)
      52            0 :             *ec_out = make_err(errn);
      53              :         else
      54         5017 :             *ec_out = {};
      55              :     }
      56              : 
      57         5026 :     if (success && accepted_fd >= 0)
      58              :     {
      59         5017 :         if (acceptor_impl_)
      60              :         {
      61         5017 :             auto* socket_svc = static_cast<epoll_acceptor_impl*>(acceptor_impl_)
      62         5017 :                 ->service().socket_service();
      63         5017 :             if (socket_svc)
      64              :             {
      65         5017 :                 auto& impl = static_cast<epoll_socket_impl&>(socket_svc->create_impl());
      66         5017 :                 impl.set_socket(accepted_fd);
      67              : 
      68              :                 // Register accepted socket with epoll (edge-triggered mode)
      69         5017 :                 impl.desc_state_.fd = accepted_fd;
      70              :                 {
      71         5017 :                     std::lock_guard lock(impl.desc_state_.mutex);
      72         5017 :                     impl.desc_state_.read_op = nullptr;
      73         5017 :                     impl.desc_state_.write_op = nullptr;
      74         5017 :                     impl.desc_state_.connect_op = nullptr;
      75         5017 :                 }
      76         5017 :                 socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_state_);
      77              : 
      78         5017 :                 sockaddr_in local_addr{};
      79         5017 :                 socklen_t local_len = sizeof(local_addr);
      80         5017 :                 sockaddr_in remote_addr{};
      81         5017 :                 socklen_t remote_len = sizeof(remote_addr);
      82              : 
      83         5017 :                 endpoint local_ep, remote_ep;
      84         5017 :                 if (::getsockname(accepted_fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
      85         5017 :                     local_ep = from_sockaddr_in(local_addr);
      86         5017 :                 if (::getpeername(accepted_fd, reinterpret_cast<sockaddr*>(&remote_addr), &remote_len) == 0)
      87         5017 :                     remote_ep = from_sockaddr_in(remote_addr);
      88              : 
      89         5017 :                 impl.set_endpoints(local_ep, remote_ep);
      90              : 
      91         5017 :                 if (impl_out)
      92         5017 :                     *impl_out = &impl;
      93              : 
      94         5017 :                 accepted_fd = -1;
      95              :             }
      96              :             else
      97              :             {
      98            0 :                 if (ec_out && !*ec_out)
      99            0 :                     *ec_out = make_err(ENOENT);
     100            0 :                 ::close(accepted_fd);
     101            0 :                 accepted_fd = -1;
     102            0 :                 if (impl_out)
     103            0 :                     *impl_out = nullptr;
     104              :             }
     105              :         }
     106              :         else
     107              :         {
     108            0 :             ::close(accepted_fd);
     109            0 :             accepted_fd = -1;
     110            0 :             if (impl_out)
     111            0 :                 *impl_out = nullptr;
     112              :         }
     113         5017 :     }
     114              :     else
     115              :     {
     116            9 :         if (accepted_fd >= 0)
     117              :         {
     118            0 :             ::close(accepted_fd);
     119            0 :             accepted_fd = -1;
     120              :         }
     121              : 
     122            9 :         if (peer_impl)
     123              :         {
     124            0 :             peer_impl->release();
     125            0 :             peer_impl = nullptr;
     126              :         }
     127              : 
     128            9 :         if (impl_out)
     129            9 :             *impl_out = nullptr;
     130              :     }
     131              : 
     132              :     // Move to stack before resuming. See epoll_op::operator()() for rationale.
     133         5026 :     capy::executor_ref saved_ex( std::move( ex ) );
     134         5026 :     capy::coro saved_h( std::move( h ) );
     135         5026 :     auto prevent_premature_destruction = std::move(impl_ptr);
     136         5026 :     saved_ex.dispatch( saved_h );
     137         5026 : }
     138              : 
     139           72 : epoll_acceptor_impl::
     140           72 : epoll_acceptor_impl(epoll_acceptor_service& svc) noexcept
     141           72 :     : svc_(svc)
     142              : {
     143           72 : }
     144              : 
     145              : void
     146           72 : epoll_acceptor_impl::
     147              : release()
     148              : {
     149           72 :     close_socket();
     150           72 :     svc_.destroy_acceptor_impl(*this);
     151           72 : }
     152              : 
     153              : std::coroutine_handle<>
     154         5026 : epoll_acceptor_impl::
     155              : accept(
     156              :     std::coroutine_handle<> h,
     157              :     capy::executor_ref ex,
     158              :     std::stop_token token,
     159              :     std::error_code* ec,
     160              :     io_object::io_object_impl** impl_out)
     161              : {
     162         5026 :     auto& op = acc_;
     163         5026 :     op.reset();
     164         5026 :     op.h = h;
     165         5026 :     op.ex = ex;
     166         5026 :     op.ec_out = ec;
     167         5026 :     op.impl_out = impl_out;
     168         5026 :     op.fd = fd_;
     169         5026 :     op.start(token, this);
     170              : 
     171         5026 :     sockaddr_in addr{};
     172         5026 :     socklen_t addrlen = sizeof(addr);
     173         5026 :     int accepted = ::accept4(fd_, reinterpret_cast<sockaddr*>(&addr),
     174              :                              &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
     175              : 
     176         5026 :     if (accepted >= 0)
     177              :     {
     178              :         {
     179            2 :             std::lock_guard lock(desc_state_.mutex);
     180            2 :             desc_state_.read_ready = false;
     181            2 :         }
     182            2 :         op.accepted_fd = accepted;
     183            2 :         op.complete(0, 0);
     184            2 :         op.impl_ptr = shared_from_this();
     185            2 :         svc_.post(&op);
     186              :         // completion is always posted to scheduler queue, never inline.
     187            2 :         return std::noop_coroutine();
     188              :     }
     189              : 
     190         5024 :     if (errno == EAGAIN || errno == EWOULDBLOCK)
     191              :     {
     192         5024 :         svc_.work_started();
     193         5024 :         op.impl_ptr = shared_from_this();
     194              : 
     195         5024 :         bool perform_now = false;
     196              :         {
     197         5024 :             std::lock_guard lock(desc_state_.mutex);
     198         5024 :             if (desc_state_.read_ready)
     199              :             {
     200            0 :                 desc_state_.read_ready = false;
     201            0 :                 perform_now = true;
     202              :             }
     203              :             else
     204              :             {
     205         5024 :                 desc_state_.read_op = &op;
     206              :             }
     207         5024 :         }
     208              : 
     209         5024 :         if (perform_now)
     210              :         {
     211            0 :             op.perform_io();
     212            0 :             if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
     213              :             {
     214            0 :                 op.errn = 0;
     215            0 :                 std::lock_guard lock(desc_state_.mutex);
     216            0 :                 desc_state_.read_op = &op;
     217            0 :             }
     218              :             else
     219              :             {
     220            0 :                 svc_.post(&op);
     221            0 :                 svc_.work_finished();
     222              :             }
     223            0 :             return std::noop_coroutine();
     224              :         }
     225              : 
     226         5024 :         if (op.cancelled.load(std::memory_order_acquire))
     227              :         {
     228            0 :             epoll_op* claimed = nullptr;
     229              :             {
     230            0 :                 std::lock_guard lock(desc_state_.mutex);
     231            0 :                 if (desc_state_.read_op == &op)
     232            0 :                     claimed = std::exchange(desc_state_.read_op, nullptr);
     233            0 :             }
     234            0 :             if (claimed)
     235              :             {
     236            0 :                 svc_.post(claimed);
     237            0 :                 svc_.work_finished();
     238              :             }
     239              :         }
     240              :         // completion is always posted to scheduler queue, never inline.
     241         5024 :         return std::noop_coroutine();
     242              :     }
     243              : 
     244            0 :     op.complete(errno, 0);
     245            0 :     op.impl_ptr = shared_from_this();
     246            0 :     svc_.post(&op);
     247              :     // completion is always posted to scheduler queue, never inline.
     248            0 :     return std::noop_coroutine();
     249              : }
     250              : 
     251              : void
     252          145 : epoll_acceptor_impl::
     253              : cancel() noexcept
     254              : {
     255          145 :     std::shared_ptr<epoll_acceptor_impl> self;
     256              :     try {
     257          145 :         self = shared_from_this();
     258            0 :     } catch (const std::bad_weak_ptr&) {
     259            0 :         return;
     260            0 :     }
     261              : 
     262          145 :     acc_.request_cancel();
     263              : 
     264          145 :     epoll_op* claimed = nullptr;
     265              :     {
     266          145 :         std::lock_guard lock(desc_state_.mutex);
     267          145 :         if (desc_state_.read_op == &acc_)
     268            3 :             claimed = std::exchange(desc_state_.read_op, nullptr);
     269          145 :     }
     270          145 :     if (claimed)
     271              :     {
     272            3 :         acc_.impl_ptr = self;
     273            3 :         svc_.post(&acc_);
     274            3 :         svc_.work_finished();
     275              :     }
     276          145 : }
     277              : 
     278              : void
     279            6 : epoll_acceptor_impl::
     280              : cancel_single_op(epoll_op& op) noexcept
     281              : {
     282            6 :     op.request_cancel();
     283              : 
     284            6 :     epoll_op* claimed = nullptr;
     285              :     {
     286            6 :         std::lock_guard lock(desc_state_.mutex);
     287            6 :         if (desc_state_.read_op == &op)
     288            6 :             claimed = std::exchange(desc_state_.read_op, nullptr);
     289            6 :     }
     290            6 :     if (claimed)
     291              :     {
     292              :         try {
     293            6 :             op.impl_ptr = shared_from_this();
     294            0 :         } catch (const std::bad_weak_ptr&) {}
     295            6 :         svc_.post(&op);
     296            6 :         svc_.work_finished();
     297              :     }
     298            6 : }
     299              : 
     300              : void
     301          144 : epoll_acceptor_impl::
     302              : close_socket() noexcept
     303              : {
     304          144 :     cancel();
     305              : 
     306          144 :     if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
     307              :     {
     308              :         try {
     309            0 :             desc_state_.impl_ref_ = shared_from_this();
     310            0 :         } catch (std::bad_weak_ptr const&) {}
     311              :     }
     312              : 
     313          144 :     if (fd_ >= 0)
     314              :     {
     315           62 :         if (desc_state_.registered_events != 0)
     316           62 :             svc_.scheduler().deregister_descriptor(fd_);
     317           62 :         ::close(fd_);
     318           62 :         fd_ = -1;
     319              :     }
     320              : 
     321          144 :     desc_state_.fd = -1;
     322              :     {
     323          144 :         std::lock_guard lock(desc_state_.mutex);
     324          144 :         desc_state_.read_op = nullptr;
     325          144 :         desc_state_.read_ready = false;
     326          144 :         desc_state_.write_ready = false;
     327          144 :     }
     328          144 :     desc_state_.registered_events = 0;
     329              : 
     330              :     // Clear cached endpoint
     331          144 :     local_endpoint_ = endpoint{};
     332          144 : }
     333              : 
     334          189 : epoll_acceptor_service::
     335          189 : epoll_acceptor_service(capy::execution_context& ctx)
     336          189 :     : ctx_(ctx)
     337          189 :     , state_(std::make_unique<epoll_acceptor_state>(ctx.use_service<epoll_scheduler>()))
     338              : {
     339          189 : }
     340              : 
     341          378 : epoll_acceptor_service::
     342          189 : ~epoll_acceptor_service()
     343              : {
     344          378 : }
     345              : 
     346              : void
     347          189 : epoll_acceptor_service::
     348              : shutdown()
     349              : {
     350          189 :     std::lock_guard lock(state_->mutex_);
     351              : 
     352          189 :     while (auto* impl = state_->acceptor_list_.pop_front())
     353            0 :         impl->close_socket();
     354              : 
     355          189 :     state_->acceptor_ptrs_.clear();
     356          189 : }
     357              : 
     358              : tcp_acceptor::acceptor_impl&
     359           72 : epoll_acceptor_service::
     360              : create_acceptor_impl()
     361              : {
     362           72 :     auto impl = std::make_shared<epoll_acceptor_impl>(*this);
     363           72 :     auto* raw = impl.get();
     364              : 
     365           72 :     std::lock_guard lock(state_->mutex_);
     366           72 :     state_->acceptor_list_.push_back(raw);
     367           72 :     state_->acceptor_ptrs_.emplace(raw, std::move(impl));
     368              : 
     369           72 :     return *raw;
     370           72 : }
     371              : 
     372              : void
     373           72 : epoll_acceptor_service::
     374              : destroy_acceptor_impl(tcp_acceptor::acceptor_impl& impl)
     375              : {
     376           72 :     auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
     377           72 :     std::lock_guard lock(state_->mutex_);
     378           72 :     state_->acceptor_list_.remove(epoll_impl);
     379           72 :     state_->acceptor_ptrs_.erase(epoll_impl);
     380           72 : }
     381              : 
     382              : std::error_code
     383           72 : epoll_acceptor_service::
     384              : open_acceptor(
     385              :     tcp_acceptor::acceptor_impl& impl,
     386              :     endpoint ep,
     387              :     int backlog)
     388              : {
     389           72 :     auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
     390           72 :     epoll_impl->close_socket();
     391              : 
     392           72 :     int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
     393           72 :     if (fd < 0)
     394            0 :         return make_err(errno);
     395              : 
     396           72 :     int reuse = 1;
     397           72 :     ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
     398              : 
     399           72 :     sockaddr_in addr = detail::to_sockaddr_in(ep);
     400           72 :     if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
     401              :     {
     402           10 :         int errn = errno;
     403           10 :         ::close(fd);
     404           10 :         return make_err(errn);
     405              :     }
     406              : 
     407           62 :     if (::listen(fd, backlog) < 0)
     408              :     {
     409            0 :         int errn = errno;
     410            0 :         ::close(fd);
     411            0 :         return make_err(errn);
     412              :     }
     413              : 
     414           62 :     epoll_impl->fd_ = fd;
     415              : 
     416              :     // Register fd with epoll (edge-triggered mode)
     417           62 :     epoll_impl->desc_state_.fd = fd;
     418              :     {
     419           62 :         std::lock_guard lock(epoll_impl->desc_state_.mutex);
     420           62 :         epoll_impl->desc_state_.read_op = nullptr;
     421           62 :     }
     422           62 :     scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
     423              : 
     424              :     // Cache the local endpoint (queries OS for ephemeral port if port was 0)
     425           62 :     sockaddr_in local_addr{};
     426           62 :     socklen_t local_len = sizeof(local_addr);
     427           62 :     if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
     428           62 :         epoll_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
     429              : 
     430           62 :     return {};
     431              : }
     432              : 
     433              : void
     434           11 : epoll_acceptor_service::
     435              : post(epoll_op* op)
     436              : {
     437           11 :     state_->sched_.post(op);
     438           11 : }
     439              : 
     440              : void
     441         5024 : epoll_acceptor_service::
     442              : work_started() noexcept
     443              : {
     444         5024 :     state_->sched_.work_started();
     445         5024 : }
     446              : 
     447              : void
     448            9 : epoll_acceptor_service::
     449              : work_finished() noexcept
     450              : {
     451            9 :     state_->sched_.work_finished();
     452            9 : }
     453              : 
     454              : epoll_socket_service*
     455         5017 : epoll_acceptor_service::
     456              : socket_service() const noexcept
     457              : {
     458         5017 :     auto* svc = ctx_.find_service<detail::socket_service>();
     459         5017 :     return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
     460              : }
     461              : 
     462              : } // namespace boost::corosio::detail
     463              : 
     464              : #endif // BOOST_COROSIO_HAS_EPOLL
        

Generated by: LCOV version 2.3