LCOV - code coverage report
Current view: top level - src/detail/epoll - scheduler.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 0.0 % 2 0
Test Date: 2026-02-06 05:04:16 Functions: 0.0 % 2 0

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Steve Gerbino
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #ifndef BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
      11              : #define BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
      12              : 
      13              : #include <boost/corosio/detail/platform.hpp>
      14              : 
      15              : #if BOOST_COROSIO_HAS_EPOLL
      16              : 
      17              : #include <boost/corosio/detail/config.hpp>
      18              : #include <boost/corosio/detail/scheduler.hpp>
      19              : #include <boost/capy/ex/execution_context.hpp>
      20              : 
      21              : #include "src/detail/scheduler_op.hpp"
      22              : #include "src/detail/timer_service.hpp"
      23              : 
      24              : #include <atomic>
      25              : #include <condition_variable>
      26              : #include <cstddef>
      27              : #include <cstdint>
      28              : #include <mutex>
      29              : 
      30              : namespace boost::corosio::detail {
      31              : 
      32              : struct epoll_op;
      33              : struct descriptor_state;
      34              : struct scheduler_context;
      35              : 
      36              : /** Linux scheduler using epoll for I/O multiplexing.
      37              : 
      38              :     This scheduler implements the scheduler interface using Linux epoll
      39              :     for efficient I/O event notification. It uses a single reactor model
      40              :     where one thread runs epoll_wait while other threads
      41              :     wait on a condition variable for handler work. This design provides:
      42              : 
      43              :     - Handler parallelism: N posted handlers can execute on N threads
      44              :     - No thundering herd: condition_variable wakes exactly one thread
      45              :     - IOCP parity: Behavior matches Windows I/O completion port semantics
      46              : 
      47              :     When threads call run(), they first try to execute queued handlers.
      48              :     If the queue is empty and no reactor is running, one thread becomes
      49              :     the reactor and runs epoll_wait. Other threads wait on a condition
      50              :     variable until handlers are available.
      51              : 
      52              :     @par Thread Safety
      53              :     All public member functions are thread-safe.
      54              : */
      55              : class epoll_scheduler
      56              :     : public scheduler
      57              :     , public capy::execution_context::service
      58              : {
      59              : public:
      60              :     using key_type = scheduler;
      61              : 
      62              :     /** Construct the scheduler.
      63              : 
      64              :         Creates an epoll instance, eventfd for reactor interruption,
      65              :         and timerfd for kernel-managed timer expiry.
      66              : 
      67              :         @param ctx Reference to the owning execution_context.
      68              :         @param concurrency_hint Hint for expected thread count (unused).
      69              :     */
      70              :     epoll_scheduler(
      71              :         capy::execution_context& ctx,
      72              :         int concurrency_hint = -1);
      73              : 
      74              :     /// Destroy the scheduler.
      75              :     ~epoll_scheduler();
      76              : 
      77              :     epoll_scheduler(epoll_scheduler const&) = delete;
      78              :     epoll_scheduler& operator=(epoll_scheduler const&) = delete;
      79              : 
      80              :     void shutdown() override;
      81              :     void post(capy::coro h) const override;
      82              :     void post(scheduler_op* h) const override;
      83              :     void on_work_started() noexcept override;
      84              :     void on_work_finished() noexcept override;
      85              :     bool running_in_this_thread() const noexcept override;
      86              :     void stop() override;
      87              :     bool stopped() const noexcept override;
      88              :     void restart() override;
      89              :     std::size_t run() override;
      90              :     std::size_t run_one() override;
      91              :     std::size_t wait_one(long usec) override;
      92              :     std::size_t poll() override;
      93              :     std::size_t poll_one() override;
      94              : 
      95              :     /** Return the epoll file descriptor.
      96              : 
      97              :         Used by socket services to register file descriptors
      98              :         for I/O event notification.
      99              : 
     100              :         @return The epoll file descriptor.
     101              :     */
     102              :     int epoll_fd() const noexcept { return epoll_fd_; }
     103              : 
     104              :     /** Register a descriptor for persistent monitoring.
     105              : 
     106              :         The fd is registered once and stays registered until explicitly
     107              :         deregistered. Events are dispatched via descriptor_state which
     108              :         tracks pending read/write/connect operations.
     109              : 
     110              :         @param fd The file descriptor to register.
     111              :         @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
     112              :     */
     113              :     void register_descriptor(int fd, descriptor_state* desc) const;
     114              : 
     115              :     /** Deregister a persistently registered descriptor.
     116              : 
     117              :         @param fd The file descriptor to deregister.
     118              :     */
     119              :     void deregister_descriptor(int fd) const;
     120              : 
     121              :     /** For use by I/O operations to track pending work. */
     122              :     void work_started() const noexcept override;
     123              : 
     124              :     /** For use by I/O operations to track completed work. */
     125              :     void work_finished() const noexcept override;
     126              : 
     127              :     /** Offset a forthcoming work_finished from work_cleanup.
     128              : 
     129              :         Called by descriptor_state when all I/O returned EAGAIN and no
     130              :         handler will be executed. Must be called from a scheduler thread.
     131              :     */
     132              :     void compensating_work_started() const noexcept;
     133              : 
     134              :     /** Drain work from thread context's private queue to global queue.
     135              : 
     136              :         Called by thread_context_guard destructor when a thread exits run().
     137              :         Transfers pending work to the global queue under mutex protection.
     138              : 
     139              :         @param queue The private queue to drain.
     140              :         @param count Item count for wakeup decisions (wakes other threads if positive).
     141              :     */
     142              :     void drain_thread_queue(op_queue& queue, long count) const;
     143              : 
     144              :     /** Post completed operations for deferred invocation.
     145              : 
     146              :         If called from a thread running this scheduler, operations go to
     147              :         the thread's private queue (fast path). Otherwise, operations are
     148              :         added to the global queue under mutex and a waiter is signaled.
     149              : 
     150              :         @par Preconditions
     151              :         work_started() must have been called for each operation.
     152              : 
     153              :         @param ops Queue of operations to post.
     154              :     */
     155              :     void post_deferred_completions(op_queue& ops) const;
     156              : 
     157              : private:
     158              :     friend struct work_cleanup;
     159              :     friend struct task_cleanup;
     160              : 
     161              :     std::size_t do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx);
     162              :     void run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx);
     163              :     void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
     164              :     void interrupt_reactor() const;
     165              :     void update_timerfd() const;
     166              : 
     167              :     /** Set the signaled state and wake all waiting threads.
     168              : 
     169              :         @par Preconditions
     170              :         Mutex must be held.
     171              : 
     172              :         @param lock The held mutex lock.
     173              :     */
     174              :     void signal_all(std::unique_lock<std::mutex>& lock) const;
     175              : 
     176              :     /** Set the signaled state and wake one waiter if any exist.
     177              : 
     178              :         Only unlocks and signals if at least one thread is waiting.
     179              :         Use this when the caller needs to perform a fallback action
     180              :         (such as interrupting the reactor) when no waiters exist.
     181              : 
     182              :         @par Preconditions
     183              :         Mutex must be held.
     184              : 
     185              :         @param lock The held mutex lock.
     186              : 
     187              :         @return `true` if unlocked and signaled, `false` if lock still held.
     188              :     */
     189              :     bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
     190              : 
     191              :     /** Set the signaled state, unlock, and wake one waiter if any exist.
     192              : 
     193              :         Always unlocks the mutex. Use this when the caller will release
     194              :         the lock regardless of whether a waiter exists.
     195              : 
     196              :         @par Preconditions
     197              :         Mutex must be held.
     198              : 
     199              :         @param lock The held mutex lock.
     200              :     */
     201              :     void unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
     202              : 
     203              :     /** Clear the signaled state before waiting.
     204              : 
     205              :         @par Preconditions
     206              :         Mutex must be held.
     207              :     */
     208              :     void clear_signal() const;
     209              : 
     210              :     /** Block until the signaled state is set.
     211              : 
     212              :         Returns immediately if already signaled (fast-path). Otherwise
     213              :         increments the waiter count, waits on the condition variable,
     214              :         and decrements the waiter count upon waking.
     215              : 
     216              :         @par Preconditions
     217              :         Mutex must be held.
     218              : 
     219              :         @param lock The held mutex lock.
     220              :     */
     221              :     void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
     222              : 
     223              :     /** Block until signaled or timeout expires.
     224              : 
     225              :         @par Preconditions
     226              :         Mutex must be held.
     227              : 
     228              :         @param lock The held mutex lock.
     229              :         @param timeout_us Maximum time to wait in microseconds.
     230              :     */
     231              :     void wait_for_signal_for(
     232              :         std::unique_lock<std::mutex>& lock,
     233              :         long timeout_us) const;
     234              : 
     235              :     int epoll_fd_;
     236              :     int event_fd_;                              // for interrupting reactor
     237              :     int timer_fd_;                              // timerfd for kernel-managed timer expiry
     238              :     mutable std::mutex mutex_;
     239              :     mutable std::condition_variable cond_;
     240              :     mutable op_queue completed_ops_;
     241              :     mutable std::atomic<long> outstanding_work_;
     242              :     bool stopped_;
     243              :     bool shutdown_;
     244              :     timer_service* timer_svc_ = nullptr;
     245              : 
     246              :     // True while a thread is blocked in epoll_wait. Used by
     247              :     // wake_one_thread_and_unlock and work_finished to know when
     248              :     // an eventfd interrupt is needed instead of a condvar signal.
     249              :     mutable bool task_running_ = false;
     250              : 
     251              :     // True when the reactor has been told to do a non-blocking poll
     252              :     // (more handlers queued or poll mode). Prevents redundant eventfd
     253              :     // writes and controls the epoll_wait timeout.
     254              :     mutable bool task_interrupted_ = false;
     255              : 
     256              :     // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
     257              :     mutable std::size_t state_ = 0;
     258              : 
     259              :     // Edge-triggered eventfd state
     260              :     mutable std::atomic<bool> eventfd_armed_{false};
     261              : 
     262              :     // Sentinel operation for interleaving reactor runs with handler execution.
     263              :     // Ensures the reactor runs periodically even when handlers are continuously
     264              :     // posted, preventing starvation of I/O events, timers, and signals.
     265              :     struct task_op final : scheduler_op
     266              :     {
     267            0 :         void operator()() override {}
     268            0 :         void destroy() override {}
     269              :     };
     270              :     task_op task_op_;
     271              : };
     272              : 
     273              : } // namespace boost::corosio::detail
     274              : 
     275              : #endif // BOOST_COROSIO_HAS_EPOLL
     276              : 
     277              : #endif // BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
        

Generated by: LCOV version 2.3