LCOV - code coverage report
Current view: top level - src/detail/epoll - scheduler.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 79.4 % 499 396
Test Date: 2026-02-06 05:04:16 Functions: 88.0 % 50 44

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Steve Gerbino
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #include <boost/corosio/detail/platform.hpp>
      11              : 
      12              : #if BOOST_COROSIO_HAS_EPOLL
      13              : 
      14              : #include "src/detail/epoll/scheduler.hpp"
      15              : #include "src/detail/epoll/op.hpp"
      16              : #include "src/detail/make_err.hpp"
      17              : #include "src/detail/posix/resolver_service.hpp"
      18              : #include "src/detail/posix/signals.hpp"
      19              : 
      20              : #include <boost/corosio/detail/except.hpp>
      21              : #include <boost/corosio/detail/thread_local_ptr.hpp>
      22              : 
      23              : #include <atomic>
      24              : #include <chrono>
      25              : #include <limits>
      26              : #include <utility>
      27              : 
      28              : #include <errno.h>
      29              : #include <fcntl.h>
      30              : #include <sys/epoll.h>
      31              : #include <sys/eventfd.h>
      32              : #include <sys/socket.h>
      33              : #include <sys/timerfd.h>
      34              : #include <unistd.h>
      35              : 
      36              : /*
      37              :     epoll Scheduler - Single Reactor Model
      38              :     ======================================
      39              : 
      40              :     This scheduler uses a thread coordination strategy to provide handler
      41              :     parallelism and avoid the thundering herd problem.
      42              :     Instead of all threads blocking on epoll_wait(), one thread becomes the
      43              :     "reactor" while others wait on a condition variable for handler work.
      44              : 
      45              :     Thread Model
      46              :     ------------
      47              :     - ONE thread runs epoll_wait() at a time (the reactor thread)
      48              :     - OTHER threads wait on cond_ (condition variable) for handlers
      49              :     - When work is posted, exactly one waiting thread wakes via notify_one()
      50              :     - This matches Windows IOCP semantics where N posted items wake N threads
      51              : 
      52              :     Event Loop Structure (do_one)
      53              :     -----------------------------
      54              :     1. Lock mutex, try to pop handler from queue
      55              :     2. If got handler: execute it (unlocked), return
      56              :     3. If queue empty and no reactor running: become reactor
      57              :        - Run epoll_wait (unlocked), queue I/O completions, loop back
      58              :     4. If queue empty and reactor running: wait on condvar for work
      59              : 
      60              :     The task_running_ flag ensures only one thread owns epoll_wait().
      61              :     After the reactor queues I/O completions, it loops back to try getting
      62              :     a handler, giving priority to handler execution over more I/O polling.
      63              : 
      64              :     Signaling State (state_)
      65              :     ------------------------
      66              :     The state_ variable encodes two pieces of information:
      67              :     - Bit 0: signaled flag (1 = signaled, persists until cleared)
      68              :     - Upper bits: waiter count (each waiter adds 2 before blocking)
      69              : 
      70              :     This allows efficient coordination:
      71              :     - Signalers only call notify when waiters exist (state_ > 1)
      72              :     - Waiters check if already signaled before blocking (fast-path)
      73              : 
      74              :     Wake Coordination (wake_one_thread_and_unlock)
      75              :     ----------------------------------------------
      76              :     When posting work:
      77              :     - If waiters exist (state_ > 1): signal and notify_one()
      78              :     - Else if reactor running: interrupt via eventfd write
      79              :     - Else: no-op (thread will find work when it checks queue)
      80              : 
      81              :     This avoids waking threads unnecessarily. With cascading wakes,
      82              :     each handler execution wakes at most one additional thread if
      83              :     more work exists in the queue.
      84              : 
      85              :     Work Counting
      86              :     -------------
      87              :     outstanding_work_ tracks pending operations. When it hits zero, run()
      88              :     returns. Each operation increments on start, decrements on completion.
      89              : 
      90              :     Timer Integration
      91              :     -----------------
      92              :     Timers are handled by timer_service. The reactor adjusts epoll_wait
      93              :     timeout to wake for the nearest timer expiry. When a new timer is
      94              :     scheduled earlier than current, timer_service calls interrupt_reactor()
      95              :     to re-evaluate the timeout.
      96              : */
      97              : 
      98              : namespace boost::corosio::detail {
      99              : 
     100              : struct scheduler_context
     101              : {
     102              :     epoll_scheduler const* key;
     103              :     scheduler_context* next;
     104              :     op_queue private_queue;
     105              :     long private_outstanding_work;
     106              : 
     107          168 :     scheduler_context(epoll_scheduler const* k, scheduler_context* n)
     108          168 :         : key(k)
     109          168 :         , next(n)
     110          168 :         , private_outstanding_work(0)
     111              :     {
     112          168 :     }
     113              : };
     114              : 
     115              : namespace {
     116              : 
     117              : corosio::detail::thread_local_ptr<scheduler_context> context_stack;
     118              : 
     119              : struct thread_context_guard
     120              : {
     121              :     scheduler_context frame_;
     122              : 
     123          168 :     explicit thread_context_guard(
     124              :         epoll_scheduler const* ctx) noexcept
     125          168 :         : frame_(ctx, context_stack.get())
     126              :     {
     127          168 :         context_stack.set(&frame_);
     128          168 :     }
     129              : 
     130          168 :     ~thread_context_guard() noexcept
     131              :     {
     132          168 :         if (!frame_.private_queue.empty())
     133            0 :             frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
     134          168 :         context_stack.set(frame_.next);
     135          168 :     }
     136              : };
     137              : 
     138              : scheduler_context*
     139       357461 : find_context(epoll_scheduler const* self) noexcept
     140              : {
     141       357461 :     for (auto* c = context_stack.get(); c != nullptr; c = c->next)
     142       355812 :         if (c->key == self)
     143       355812 :             return c;
     144         1649 :     return nullptr;
     145              : }
     146              : 
     147              : /// Flush private work count to global counter.
     148              : void
     149            0 : flush_private_work(
     150              :     scheduler_context* ctx,
     151              :     std::atomic<long>& outstanding_work) noexcept
     152              : {
     153            0 :     if (ctx && ctx->private_outstanding_work > 0)
     154              :     {
     155            0 :         outstanding_work.fetch_add(
     156              :             ctx->private_outstanding_work, std::memory_order_relaxed);
     157            0 :         ctx->private_outstanding_work = 0;
     158              :     }
     159            0 : }
     160              : 
     161              : /// Drain private queue to global queue, flushing work count first.
     162              : ///
     163              : /// @return True if any ops were drained.
     164              : bool
     165            4 : drain_private_queue(
     166              :     scheduler_context* ctx,
     167              :     std::atomic<long>& outstanding_work,
     168              :     op_queue& completed_ops) noexcept
     169              : {
     170            4 :     if (!ctx || ctx->private_queue.empty())
     171            4 :         return false;
     172              : 
     173            0 :     flush_private_work(ctx, outstanding_work);
     174            0 :     completed_ops.splice(ctx->private_queue);
     175            0 :     return true;
     176              : }
     177              : 
     178              : } // namespace
     179              : 
     180              : void
     181       125169 : descriptor_state::
     182              : operator()()
     183              : {
     184       125169 :     is_enqueued_.store(false, std::memory_order_relaxed);
     185              : 
     186              :     // Take ownership of impl ref set by close_socket() to prevent
     187              :     // the owning impl from being freed while we're executing
     188       125169 :     auto prevent_impl_destruction = std::move(impl_ref_);
     189              : 
     190       125169 :     std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
     191       125169 :     if (ev == 0)
     192              :     {
     193            0 :         scheduler_->compensating_work_started();
     194            0 :         return;
     195              :     }
     196              : 
     197       125169 :     op_queue local_ops;
     198              : 
     199       125169 :     int err = 0;
     200       125169 :     if (ev & EPOLLERR)
     201              :     {
     202            1 :         socklen_t len = sizeof(err);
     203            1 :         if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
     204            0 :             err = errno;
     205            1 :         if (err == 0)
     206            1 :             err = EIO;
     207              :     }
     208              : 
     209       125169 :     epoll_op* rd = nullptr;
     210       125169 :     epoll_op* wr = nullptr;
     211       125169 :     epoll_op* cn = nullptr;
     212              :     {
     213       125169 :         std::lock_guard lock(mutex);
     214       125169 :         if (ev & EPOLLIN)
     215              :         {
     216        58871 :             rd = std::exchange(read_op, nullptr);
     217        58871 :             if (!rd)
     218        53804 :                 read_ready = true;
     219              :         }
     220       125169 :         if (ev & EPOLLOUT)
     221              :         {
     222       120154 :             cn = std::exchange(connect_op, nullptr);
     223       120154 :             wr = std::exchange(write_op, nullptr);
     224       120154 :             if (!cn && !wr)
     225       115136 :                 write_ready = true;
     226              :         }
     227       125169 :         if (err && !(ev & (EPOLLIN | EPOLLOUT)))
     228              :         {
     229            0 :             rd = std::exchange(read_op, nullptr);
     230            0 :             wr = std::exchange(write_op, nullptr);
     231            0 :             cn = std::exchange(connect_op, nullptr);
     232              :         }
     233       125169 :     }
     234              : 
     235              :     // Non-null after I/O means EAGAIN; re-register under lock below
     236       125169 :     if (rd)
     237              :     {
     238         5067 :         if (err)
     239            0 :             rd->complete(err, 0);
     240              :         else
     241         5067 :             rd->perform_io();
     242              : 
     243         5067 :         if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
     244              :         {
     245            0 :             rd->errn = 0;
     246              :         }
     247              :         else
     248              :         {
     249         5067 :             local_ops.push(rd);
     250         5067 :             rd = nullptr;
     251              :         }
     252              :     }
     253              : 
     254       125169 :     if (cn)
     255              :     {
     256         5018 :         if (err)
     257            0 :             cn->complete(err, 0);
     258              :         else
     259         5018 :             cn->perform_io();
     260         5018 :         local_ops.push(cn);
     261         5018 :         cn = nullptr;
     262              :     }
     263              : 
     264       125169 :     if (wr)
     265              :     {
     266            0 :         if (err)
     267            0 :             wr->complete(err, 0);
     268              :         else
     269            0 :             wr->perform_io();
     270              : 
     271            0 :         if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
     272              :         {
     273            0 :             wr->errn = 0;
     274              :         }
     275              :         else
     276              :         {
     277            0 :             local_ops.push(wr);
     278            0 :             wr = nullptr;
     279              :         }
     280              :     }
     281              : 
     282       125169 :     if (rd || wr)
     283              :     {
     284            0 :         std::lock_guard lock(mutex);
     285            0 :         if (rd)
     286            0 :             read_op = rd;
     287            0 :         if (wr)
     288            0 :             write_op = wr;
     289            0 :     }
     290              : 
     291              :     // Execute first handler inline — the scheduler's work_cleanup
     292              :     // accounts for this as the "consumed" work item
     293       125169 :     scheduler_op* first = local_ops.pop();
     294       125169 :     if (first)
     295              :     {
     296        10085 :         scheduler_->post_deferred_completions(local_ops);
     297        10085 :         (*first)();
     298              :     }
     299              :     else
     300              :     {
     301       115084 :         scheduler_->compensating_work_started();
     302              :     }
     303       125169 : }
     304              : 
     305          189 : epoll_scheduler::
     306              : epoll_scheduler(
     307              :     capy::execution_context& ctx,
     308          189 :     int)
     309          189 :     : epoll_fd_(-1)
     310          189 :     , event_fd_(-1)
     311          189 :     , timer_fd_(-1)
     312          189 :     , outstanding_work_(0)
     313          189 :     , stopped_(false)
     314          189 :     , shutdown_(false)
     315          189 :     , task_running_(false)
     316          189 :     , task_interrupted_(false)
     317          378 :     , state_(0)
     318              : {
     319          189 :     epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
     320          189 :     if (epoll_fd_ < 0)
     321            0 :         detail::throw_system_error(make_err(errno), "epoll_create1");
     322              : 
     323          189 :     event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
     324          189 :     if (event_fd_ < 0)
     325              :     {
     326            0 :         int errn = errno;
     327            0 :         ::close(epoll_fd_);
     328            0 :         detail::throw_system_error(make_err(errn), "eventfd");
     329              :     }
     330              : 
     331          189 :     timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
     332          189 :     if (timer_fd_ < 0)
     333              :     {
     334            0 :         int errn = errno;
     335            0 :         ::close(event_fd_);
     336            0 :         ::close(epoll_fd_);
     337            0 :         detail::throw_system_error(make_err(errn), "timerfd_create");
     338              :     }
     339              : 
     340          189 :     epoll_event ev{};
     341          189 :     ev.events = EPOLLIN | EPOLLET;
     342          189 :     ev.data.ptr = nullptr;
     343          189 :     if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
     344              :     {
     345            0 :         int errn = errno;
     346            0 :         ::close(timer_fd_);
     347            0 :         ::close(event_fd_);
     348            0 :         ::close(epoll_fd_);
     349            0 :         detail::throw_system_error(make_err(errn), "epoll_ctl");
     350              :     }
     351              : 
     352          189 :     epoll_event timer_ev{};
     353          189 :     timer_ev.events = EPOLLIN | EPOLLERR;
     354          189 :     timer_ev.data.ptr = &timer_fd_;
     355          189 :     if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
     356              :     {
     357            0 :         int errn = errno;
     358            0 :         ::close(timer_fd_);
     359            0 :         ::close(event_fd_);
     360            0 :         ::close(epoll_fd_);
     361            0 :         detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
     362              :     }
     363              : 
     364          189 :     timer_svc_ = &get_timer_service(ctx, *this);
     365          189 :     timer_svc_->set_on_earliest_changed(
     366              :         timer_service::callback(
     367              :             this,
     368         5261 :             [](void* p) { static_cast<epoll_scheduler*>(p)->update_timerfd(); }));
     369              : 
     370              :     // Initialize resolver service
     371          189 :     get_resolver_service(ctx, *this);
     372              : 
     373              :     // Initialize signal service
     374          189 :     get_signal_service(ctx, *this);
     375              : 
     376              :     // Push task sentinel to interleave reactor runs with handler execution
     377          189 :     completed_ops_.push(&task_op_);
     378          189 : }
     379              : 
     380          378 : epoll_scheduler::
     381          189 : ~epoll_scheduler()
     382              : {
     383          189 :     if (timer_fd_ >= 0)
     384          189 :         ::close(timer_fd_);
     385          189 :     if (event_fd_ >= 0)
     386          189 :         ::close(event_fd_);
     387          189 :     if (epoll_fd_ >= 0)
     388          189 :         ::close(epoll_fd_);
     389          378 : }
     390              : 
     391              : void
     392          189 : epoll_scheduler::
     393              : shutdown()
     394              : {
     395              :     {
     396          189 :         std::unique_lock lock(mutex_);
     397          189 :         shutdown_ = true;
     398              : 
     399          378 :         while (auto* h = completed_ops_.pop())
     400              :         {
     401          189 :             if (h == &task_op_)
     402          189 :                 continue;
     403            0 :             lock.unlock();
     404            0 :             h->destroy();
     405            0 :             lock.lock();
     406          189 :         }
     407              : 
     408          189 :         signal_all(lock);
     409          189 :     }
     410              : 
     411          189 :     outstanding_work_.store(0, std::memory_order_release);
     412              : 
     413          189 :     if (event_fd_ >= 0)
     414          189 :         interrupt_reactor();
     415          189 : }
     416              : 
     417              : void
     418         6893 : epoll_scheduler::
     419              : post(capy::coro h) const
     420              : {
     421              :     struct post_handler final
     422              :         : scheduler_op
     423              :     {
     424              :         capy::coro h_;
     425              : 
     426              :         explicit
     427         6893 :         post_handler(capy::coro h)
     428         6893 :             : h_(h)
     429              :         {
     430         6893 :         }
     431              : 
     432        13786 :         ~post_handler() = default;
     433              : 
     434         6893 :         void operator()() override
     435              :         {
     436         6893 :             auto h = h_;
     437         6893 :             delete this;
     438              :             std::atomic_thread_fence(std::memory_order_acquire);
     439         6893 :             h.resume();
     440         6893 :         }
     441              : 
     442            0 :         void destroy() override
     443              :         {
     444            0 :             delete this;
     445            0 :         }
     446              :     };
     447              : 
     448         6893 :     auto ph = std::make_unique<post_handler>(h);
     449              : 
     450              :     // Fast path: same thread posts to private queue
     451              :     // Only count locally; work_cleanup batches to global counter
     452         6893 :     if (auto* ctx = find_context(this))
     453              :     {
     454         5270 :         ++ctx->private_outstanding_work;
     455         5270 :         ctx->private_queue.push(ph.release());
     456         5270 :         return;
     457              :     }
     458              : 
     459              :     // Slow path: cross-thread post requires mutex
     460         1623 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     461              : 
     462         1623 :     std::unique_lock lock(mutex_);
     463         1623 :     completed_ops_.push(ph.release());
     464         1623 :     wake_one_thread_and_unlock(lock);
     465         6893 : }
     466              : 
     467              : void
     468       235484 : epoll_scheduler::
     469              : post(scheduler_op* h) const
     470              : {
     471              :     // Fast path: same thread posts to private queue
     472              :     // Only count locally; work_cleanup batches to global counter
     473       235484 :     if (auto* ctx = find_context(this))
     474              :     {
     475       235458 :         ++ctx->private_outstanding_work;
     476       235458 :         ctx->private_queue.push(h);
     477       235458 :         return;
     478              :     }
     479              : 
     480              :     // Slow path: cross-thread post requires mutex
     481           26 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     482              : 
     483           26 :     std::unique_lock lock(mutex_);
     484           26 :     completed_ops_.push(h);
     485           26 :     wake_one_thread_and_unlock(lock);
     486           26 : }
     487              : 
     488              : void
     489         5285 : epoll_scheduler::
     490              : on_work_started() noexcept
     491              : {
     492         5285 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     493         5285 : }
     494              : 
     495              : void
     496         5253 : epoll_scheduler::
     497              : on_work_finished() noexcept
     498              : {
     499        10506 :     if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
     500            0 :         stop();
     501         5253 : }
     502              : 
     503              : bool
     504         5500 : epoll_scheduler::
     505              : running_in_this_thread() const noexcept
     506              : {
     507         5500 :     for (auto* c = context_stack.get(); c != nullptr; c = c->next)
     508         5290 :         if (c->key == this)
     509         5290 :             return true;
     510          210 :     return false;
     511              : }
     512              : 
     513              : void
     514           30 : epoll_scheduler::
     515              : stop()
     516              : {
     517           30 :     std::unique_lock lock(mutex_);
     518           30 :     if (!stopped_)
     519              :     {
     520           20 :         stopped_ = true;
     521           20 :         signal_all(lock);
     522           20 :         interrupt_reactor();
     523              :     }
     524           30 : }
     525              : 
     526              : bool
     527           16 : epoll_scheduler::
     528              : stopped() const noexcept
     529              : {
     530           16 :     std::unique_lock lock(mutex_);
     531           32 :     return stopped_;
     532           16 : }
     533              : 
     534              : void
     535           49 : epoll_scheduler::
     536              : restart()
     537              : {
     538           49 :     std::unique_lock lock(mutex_);
     539           49 :     stopped_ = false;
     540           49 : }
     541              : 
     542              : std::size_t
     543          175 : epoll_scheduler::
     544              : run()
     545              : {
     546          350 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     547              :     {
     548           21 :         stop();
     549           21 :         return 0;
     550              :     }
     551              : 
     552          154 :     thread_context_guard ctx(this);
     553          154 :     std::unique_lock lock(mutex_);
     554              : 
     555          154 :     std::size_t n = 0;
     556              :     for (;;)
     557              :     {
     558       367685 :         if (!do_one(lock, -1, &ctx.frame_))
     559          154 :             break;
     560       367531 :         if (n != (std::numeric_limits<std::size_t>::max)())
     561       367531 :             ++n;
     562       367531 :         if (!lock.owns_lock())
     563       132102 :             lock.lock();
     564              :     }
     565          154 :     return n;
     566          154 : }
     567              : 
     568              : std::size_t
     569            2 : epoll_scheduler::
     570              : run_one()
     571              : {
     572            4 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     573              :     {
     574            0 :         stop();
     575            0 :         return 0;
     576              :     }
     577              : 
     578            2 :     thread_context_guard ctx(this);
     579            2 :     std::unique_lock lock(mutex_);
     580            2 :     return do_one(lock, -1, &ctx.frame_);
     581            2 : }
     582              : 
     583              : std::size_t
     584           14 : epoll_scheduler::
     585              : wait_one(long usec)
     586              : {
     587           28 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     588              :     {
     589            5 :         stop();
     590            5 :         return 0;
     591              :     }
     592              : 
     593            9 :     thread_context_guard ctx(this);
     594            9 :     std::unique_lock lock(mutex_);
     595            9 :     return do_one(lock, usec, &ctx.frame_);
     596            9 : }
     597              : 
     598              : std::size_t
     599            2 : epoll_scheduler::
     600              : poll()
     601              : {
     602            4 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     603              :     {
     604            1 :         stop();
     605            1 :         return 0;
     606              :     }
     607              : 
     608            1 :     thread_context_guard ctx(this);
     609            1 :     std::unique_lock lock(mutex_);
     610              : 
     611            1 :     std::size_t n = 0;
     612              :     for (;;)
     613              :     {
     614            3 :         if (!do_one(lock, 0, &ctx.frame_))
     615            1 :             break;
     616            2 :         if (n != (std::numeric_limits<std::size_t>::max)())
     617            2 :             ++n;
     618            2 :         if (!lock.owns_lock())
     619            2 :             lock.lock();
     620              :     }
     621            1 :     return n;
     622            1 : }
     623              : 
     624              : std::size_t
     625            4 : epoll_scheduler::
     626              : poll_one()
     627              : {
     628            8 :     if (outstanding_work_.load(std::memory_order_acquire) == 0)
     629              :     {
     630            2 :         stop();
     631            2 :         return 0;
     632              :     }
     633              : 
     634            2 :     thread_context_guard ctx(this);
     635            2 :     std::unique_lock lock(mutex_);
     636            2 :     return do_one(lock, 0, &ctx.frame_);
     637            2 : }
     638              : 
     639              : void
     640        10108 : epoll_scheduler::
     641              : register_descriptor(int fd, descriptor_state* desc) const
     642              : {
     643        10108 :     epoll_event ev{};
     644        10108 :     ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
     645        10108 :     ev.data.ptr = desc;
     646              : 
     647        10108 :     if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
     648            0 :         detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
     649              : 
     650        10108 :     desc->registered_events = ev.events;
     651        10108 :     desc->fd = fd;
     652        10108 :     desc->scheduler_ = this;
     653              : 
     654        10108 :     std::lock_guard lock(desc->mutex);
     655        10108 :     desc->read_ready = false;
     656        10108 :     desc->write_ready = false;
     657        10108 : }
     658              : 
     659              : void
     660        10108 : epoll_scheduler::
     661              : deregister_descriptor(int fd) const
     662              : {
     663        10108 :     ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
     664        10108 : }
     665              : 
     666              : void
     667        10213 : epoll_scheduler::
     668              : work_started() const noexcept
     669              : {
     670        10213 :     outstanding_work_.fetch_add(1, std::memory_order_relaxed);
     671        10213 : }
     672              : 
     673              : void
     674        17190 : epoll_scheduler::
     675              : work_finished() const noexcept
     676              : {
     677        34380 :     if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
     678              :     {
     679              :         // Last work item completed - wake all threads so they can exit.
     680              :         // signal_all() wakes threads waiting on the condvar.
     681              :         // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
     682              :         // Both are needed because they target different blocking mechanisms.
     683          148 :         std::unique_lock lock(mutex_);
     684          148 :         signal_all(lock);
     685          148 :         if (task_running_ && !task_interrupted_)
     686              :         {
     687            3 :             task_interrupted_ = true;
     688            3 :             lock.unlock();
     689            3 :             interrupt_reactor();
     690              :         }
     691          148 :     }
     692        17190 : }
     693              : 
     694              : void
     695       115084 : epoll_scheduler::
     696              : compensating_work_started() const noexcept
     697              : {
     698       115084 :     auto* ctx = find_context(this);
     699       115084 :     if (ctx)
     700       115084 :         ++ctx->private_outstanding_work;
     701       115084 : }
     702              : 
     703              : void
     704            0 : epoll_scheduler::
     705              : drain_thread_queue(op_queue& queue, long count) const
     706              : {
     707              :     // Note: outstanding_work_ was already incremented when posting
     708            0 :     std::unique_lock lock(mutex_);
     709            0 :     completed_ops_.splice(queue);
     710            0 :     if (count > 0)
     711            0 :         maybe_unlock_and_signal_one(lock);
     712            0 : }
     713              : 
     714              : void
     715        10085 : epoll_scheduler::
     716              : post_deferred_completions(op_queue& ops) const
     717              : {
     718        10085 :     if (ops.empty())
     719        10085 :         return;
     720              : 
     721              :     // Fast path: if on scheduler thread, use private queue
     722            0 :     if (auto* ctx = find_context(this))
     723              :     {
     724            0 :         ctx->private_queue.splice(ops);
     725            0 :         return;
     726              :     }
     727              : 
     728              :     // Slow path: add to global queue and wake a thread
     729            0 :     std::unique_lock lock(mutex_);
     730            0 :     completed_ops_.splice(ops);
     731            0 :     wake_one_thread_and_unlock(lock);
     732            0 : }
     733              : 
     734              : void
     735          238 : epoll_scheduler::
     736              : interrupt_reactor() const
     737              : {
     738              :     // Only write if not already armed to avoid redundant writes
     739          238 :     bool expected = false;
     740          238 :     if (eventfd_armed_.compare_exchange_strong(expected, true,
     741              :             std::memory_order_release, std::memory_order_relaxed))
     742              :     {
     743          224 :         std::uint64_t val = 1;
     744          224 :         [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
     745              :     }
     746          238 : }
     747              : 
     748              : void
     749          357 : epoll_scheduler::
     750              : signal_all(std::unique_lock<std::mutex>&) const
     751              : {
     752          357 :     state_ |= 1;
     753          357 :     cond_.notify_all();
     754          357 : }
     755              : 
     756              : bool
     757        81138 : epoll_scheduler::
     758              : maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
     759              : {
     760        81138 :     state_ |= 1;
     761        81138 :     if (state_ > 1)
     762              :     {
     763            0 :         lock.unlock();
     764            0 :         cond_.notify_one();
     765            0 :         return true;
     766              :     }
     767        81138 :     return false;
     768              : }
     769              : 
     770              : void
     771       496003 : epoll_scheduler::
     772              : unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
     773              : {
     774       496003 :     state_ |= 1;
     775       496003 :     bool have_waiters = state_ > 1;
     776       496003 :     lock.unlock();
     777       496003 :     if (have_waiters)
     778            0 :         cond_.notify_one();
     779       496003 : }
     780              : 
     781              : void
     782            0 : epoll_scheduler::
     783              : clear_signal() const
     784              : {
     785            0 :     state_ &= ~std::size_t(1);
     786            0 : }
     787              : 
     788              : void
     789            0 : epoll_scheduler::
     790              : wait_for_signal(std::unique_lock<std::mutex>& lock) const
     791              : {
     792            0 :     while ((state_ & 1) == 0)
     793              :     {
     794            0 :         state_ += 2;
     795            0 :         cond_.wait(lock);
     796            0 :         state_ -= 2;
     797              :     }
     798            0 : }
     799              : 
     800              : void
     801            0 : epoll_scheduler::
     802              : wait_for_signal_for(
     803              :     std::unique_lock<std::mutex>& lock,
     804              :     long timeout_us) const
     805              : {
     806            0 :     if ((state_ & 1) == 0)
     807              :     {
     808            0 :         state_ += 2;
     809            0 :         cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
     810            0 :         state_ -= 2;
     811              :     }
     812            0 : }
     813              : 
     814              : void
     815         1649 : epoll_scheduler::
     816              : wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
     817              : {
     818         1649 :     if (maybe_unlock_and_signal_one(lock))
     819            0 :         return;
     820              : 
     821         1649 :     if (task_running_ && !task_interrupted_)
     822              :     {
     823           26 :         task_interrupted_ = true;
     824           26 :         lock.unlock();
     825           26 :         interrupt_reactor();
     826              :     }
     827              :     else
     828              :     {
     829         1623 :         lock.unlock();
     830              :     }
     831              : }
     832              : 
     833              : /** RAII guard for handler execution work accounting.
     834              : 
     835              :     Handler consumes 1 work item, may produce N new items via fast-path posts.
     836              :     Net change = N - 1:
     837              :     - If N > 1: add (N-1) to global (more work produced than consumed)
     838              :     - If N == 1: net zero, do nothing
     839              :     - If N < 1: call work_finished() (work consumed, may trigger stop)
     840              : 
     841              :     Also drains private queue to global for other threads to process.
     842              : */
     843              : struct work_cleanup
     844              : {
     845              :     epoll_scheduler const* scheduler;
     846              :     std::unique_lock<std::mutex>* lock;
     847              :     scheduler_context* ctx;
     848              : 
     849       367546 :     ~work_cleanup()
     850              :     {
     851       367546 :         if (ctx)
     852              :         {
     853       367546 :             long produced = ctx->private_outstanding_work;
     854       367546 :             if (produced > 1)
     855           48 :                 scheduler->outstanding_work_.fetch_add(produced - 1, std::memory_order_relaxed);
     856       367498 :             else if (produced < 1)
     857        17030 :                 scheduler->work_finished();
     858              :             // produced == 1: net zero, handler consumed what it produced
     859       367546 :             ctx->private_outstanding_work = 0;
     860              : 
     861       367546 :             if (!ctx->private_queue.empty())
     862              :             {
     863       235432 :                 lock->lock();
     864       235432 :                 scheduler->completed_ops_.splice(ctx->private_queue);
     865              :             }
     866              :         }
     867              :         else
     868              :         {
     869              :             // No thread context - slow-path op was already counted globally
     870            0 :             scheduler->work_finished();
     871              :         }
     872       367546 :     }
     873              : };
     874              : 
     875              : /** RAII guard for reactor work accounting.
     876              : 
     877              :     Reactor only produces work via timer/signal callbacks posting handlers.
     878              :     Unlike handler execution which consumes 1, the reactor consumes nothing.
     879              :     All produced work must be flushed to global counter.
     880              : */
     881              : struct task_cleanup
     882              : {
     883              :     epoll_scheduler const* scheduler;
     884              :     scheduler_context* ctx;
     885              : 
     886       138746 :     ~task_cleanup()
     887              :     {
     888       138746 :         if (ctx && ctx->private_outstanding_work > 0)
     889              :         {
     890         5246 :             scheduler->outstanding_work_.fetch_add(
     891         5246 :                 ctx->private_outstanding_work, std::memory_order_relaxed);
     892         5246 :             ctx->private_outstanding_work = 0;
     893              :         }
     894       138746 :     }
     895              : };
     896              : 
     897              : void
     898        10510 : epoll_scheduler::
     899              : update_timerfd() const
     900              : {
     901        10510 :     auto nearest = timer_svc_->nearest_expiry();
     902              : 
     903        10510 :     itimerspec ts{};
     904        10510 :     int flags = 0;
     905              : 
     906        10510 :     if (nearest == timer_service::time_point::max())
     907              :     {
     908              :         // No timers - disarm by setting to 0 (relative)
     909              :     }
     910              :     else
     911              :     {
     912        10467 :         auto now = std::chrono::steady_clock::now();
     913        10467 :         if (nearest <= now)
     914              :         {
     915              :             // Use 1ns instead of 0 - zero disarms the timerfd
     916           38 :             ts.it_value.tv_nsec = 1;
     917              :         }
     918              :         else
     919              :         {
     920        10429 :             auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
     921        20858 :                 nearest - now).count();
     922        10429 :             ts.it_value.tv_sec = nsec / 1000000000;
     923        10429 :             ts.it_value.tv_nsec = nsec % 1000000000;
     924              :             // Ensure non-zero to avoid disarming if duration rounds to 0
     925        10429 :             if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
     926            0 :                 ts.it_value.tv_nsec = 1;
     927              :         }
     928              :     }
     929              : 
     930        10510 :     if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
     931            0 :         detail::throw_system_error(make_err(errno), "timerfd_settime");
     932        10510 : }
     933              : 
     934              : void
     935       138746 : epoll_scheduler::
     936              : run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx)
     937              : {
     938       138746 :     int timeout_ms = task_interrupted_ ? 0 : -1;
     939              : 
     940       138746 :     if (lock.owns_lock())
     941        10289 :         lock.unlock();
     942              : 
     943              :     // Flush private work count when reactor completes
     944       138746 :     task_cleanup on_exit{this, ctx};
     945              :     (void)on_exit;
     946              : 
     947              :     // Event loop runs without mutex held
     948              : 
     949              :     epoll_event events[128];
     950       138746 :     int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
     951       138746 :     int saved_errno = errno;
     952              : 
     953       138746 :     if (nfds < 0 && saved_errno != EINTR)
     954            0 :         detail::throw_system_error(make_err(saved_errno), "epoll_wait");
     955              : 
     956       138746 :     bool check_timers = false;
     957       138746 :     op_queue local_ops;
     958       138746 :     int completions_queued = 0;
     959              : 
     960              :     // Process events without holding the mutex
     961       269199 :     for (int i = 0; i < nfds; ++i)
     962              :     {
     963       130453 :         if (events[i].data.ptr == nullptr)
     964              :         {
     965              :             std::uint64_t val;
     966           35 :             [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
     967           35 :             eventfd_armed_.store(false, std::memory_order_relaxed);
     968           35 :             continue;
     969           35 :         }
     970              : 
     971       130418 :         if (events[i].data.ptr == &timer_fd_)
     972              :         {
     973              :             std::uint64_t expirations;
     974         5249 :             [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
     975         5249 :             check_timers = true;
     976         5249 :             continue;
     977         5249 :         }
     978              : 
     979              :         // Deferred I/O: just set ready events and enqueue descriptor
     980              :         // No per-descriptor mutex locking in reactor hot path!
     981       125169 :         auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
     982       125169 :         desc->add_ready_events(events[i].events);
     983              : 
     984              :         // Only enqueue if not already enqueued
     985       125169 :         bool expected = false;
     986       125169 :         if (desc->is_enqueued_.compare_exchange_strong(expected, true,
     987              :                 std::memory_order_release, std::memory_order_relaxed))
     988              :         {
     989       125169 :             local_ops.push(desc);
     990       125169 :             ++completions_queued;
     991              :         }
     992              :     }
     993              : 
     994              :     // Process timers only when timerfd fires
     995       138746 :     if (check_timers)
     996              :     {
     997         5249 :         timer_svc_->process_expired();
     998         5249 :         update_timerfd();
     999              :     }
    1000              : 
    1001              :     // --- Acquire mutex only for queue operations ---
    1002       138746 :     lock.lock();
    1003              : 
    1004       138746 :     if (!local_ops.empty())
    1005        74245 :         completed_ops_.splice(local_ops);
    1006              : 
    1007              :     // Drain private queue to global (work count handled by task_cleanup)
    1008       138746 :     if (ctx && !ctx->private_queue.empty())
    1009              :     {
    1010         5246 :         completions_queued += ctx->private_outstanding_work;
    1011         5246 :         completed_ops_.splice(ctx->private_queue);
    1012              :     }
    1013              : 
    1014              :     // Signal and wake one waiter if work is queued
    1015       138746 :     if (completions_queued > 0)
    1016              :     {
    1017        79489 :         if (maybe_unlock_and_signal_one(lock))
    1018            0 :             lock.lock();
    1019              :     }
    1020       138746 : }
    1021              : 
    1022              : std::size_t
    1023       367701 : epoll_scheduler::
    1024              : do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx)
    1025              : {
    1026              :     for (;;)
    1027              :     {
    1028       506447 :         if (stopped_)
    1029            2 :             return 0;
    1030              : 
    1031       506445 :         scheduler_op* op = completed_ops_.pop();
    1032              : 
    1033              :         // Handle reactor sentinel - time to poll for I/O
    1034       506445 :         if (op == &task_op_)
    1035              :         {
    1036       149333 :             bool more_handlers = !completed_ops_.empty() ||
    1037        10438 :                 (ctx && !ctx->private_queue.empty());
    1038              : 
    1039              :             // Nothing to run the reactor for: no pending work to wait on,
    1040              :             // or caller requested a non-blocking poll
    1041       149333 :             if (!more_handlers &&
    1042        20876 :                 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
    1043              :                     timeout_us == 0))
    1044              :             {
    1045          149 :                 completed_ops_.push(&task_op_);
    1046          149 :                 return 0;
    1047              :             }
    1048              : 
    1049       138746 :             task_interrupted_ = more_handlers || timeout_us == 0;
    1050       138746 :             task_running_ = true;
    1051              : 
    1052       138746 :             if (more_handlers)
    1053       128457 :                 unlock_and_signal_one(lock);
    1054              : 
    1055       138746 :             run_task(lock, ctx);
    1056              : 
    1057       138746 :             task_running_ = false;
    1058       138746 :             completed_ops_.push(&task_op_);
    1059       138746 :             continue;
    1060       138746 :         }
    1061              : 
    1062              :         // Handle operation
    1063       367550 :         if (op != nullptr)
    1064              :         {
    1065       367546 :             if (!completed_ops_.empty())
    1066       367546 :                 unlock_and_signal_one(lock);
    1067              :             else
    1068            0 :                 lock.unlock();
    1069              : 
    1070       367546 :             work_cleanup on_exit{this, &lock, ctx};
    1071              :             (void)on_exit;
    1072              : 
    1073       367546 :             (*op)();
    1074       367546 :             return 1;
    1075       367546 :         }
    1076              : 
    1077              :         // No work from global queue - try private queue before blocking
    1078            4 :         if (drain_private_queue(ctx, outstanding_work_, completed_ops_))
    1079            0 :             continue;
    1080              : 
    1081              :         // No pending work to wait on, or caller requested non-blocking poll
    1082            8 :         if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
    1083              :             timeout_us == 0)
    1084            4 :             return 0;
    1085              : 
    1086            0 :         clear_signal();
    1087            0 :         if (timeout_us < 0)
    1088            0 :             wait_for_signal(lock);
    1089              :         else
    1090            0 :             wait_for_signal_for(lock, timeout_us);
    1091       138746 :     }
    1092              : }
    1093              : 
    1094              : } // namespace boost::corosio::detail
    1095              : 
    1096              : #endif
        

Generated by: LCOV version 2.3