LCOV - code coverage report
Current view: top level - src/detail/select - scheduler.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 0.0 % 2 0
Test Date: 2026-02-06 05:04:16 Functions: 0.0 % 2 0

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Steve Gerbino
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #ifndef BOOST_COROSIO_DETAIL_SELECT_SCHEDULER_HPP
      11              : #define BOOST_COROSIO_DETAIL_SELECT_SCHEDULER_HPP
      12              : 
      13              : #include <boost/corosio/detail/platform.hpp>
      14              : 
      15              : #if BOOST_COROSIO_HAS_SELECT
      16              : 
      17              : #include <boost/corosio/detail/config.hpp>
      18              : #include <boost/corosio/detail/scheduler.hpp>
      19              : #include <boost/capy/ex/execution_context.hpp>
      20              : 
      21              : #include "src/detail/scheduler_op.hpp"
      22              : #include "src/detail/timer_service.hpp"
      23              : 
      24              : #include <sys/select.h>
      25              : 
      26              : #include <atomic>
      27              : #include <condition_variable>
      28              : #include <cstddef>
      29              : #include <mutex>
      30              : #include <unordered_map>
      31              : 
      32              : namespace boost::corosio::detail {
      33              : 
      34              : struct select_op;
      35              : 
      36              : /** POSIX scheduler using select() for I/O multiplexing.
      37              : 
      38              :     This scheduler implements the scheduler interface using the POSIX select()
      39              :     call for I/O event notification. It uses a single reactor model
      40              :     where one thread runs select() while other threads wait on a condition
      41              :     variable for handler work. This design provides:
      42              : 
      43              :     - Handler parallelism: N posted handlers can execute on N threads
      44              :     - No thundering herd: condition_variable wakes exactly one thread
      45              :     - Portability: Works on all POSIX systems
      46              : 
      47              :     The design mirrors epoll_scheduler for behavioral consistency:
      48              :     - Same single-reactor thread coordination model
      49              :     - Same work counting semantics
      50              :     - Same timer integration pattern
      51              : 
      52              :     Known Limitations:
      53              :     - FD_SETSIZE (~1024) limits maximum concurrent connections
      54              :     - O(n) scanning: rebuilds fd_sets each iteration
      55              :     - Level-triggered only (no edge-triggered mode)
      56              : 
      57              :     @par Thread Safety
      58              :     All public member functions are thread-safe.
      59              : */
      60              : class select_scheduler
      61              :     : public scheduler
      62              :     , public capy::execution_context::service
      63              : {
      64              : public:
      65              :     using key_type = scheduler;
      66              : 
      67              :     /** Construct the scheduler.
      68              : 
      69              :         Creates a self-pipe for reactor interruption.
      70              : 
      71              :         @param ctx Reference to the owning execution_context.
      72              :         @param concurrency_hint Hint for expected thread count (unused).
      73              :     */
      74              :     select_scheduler(
      75              :         capy::execution_context& ctx,
      76              :         int concurrency_hint = -1);
      77              : 
      78              :     ~select_scheduler();
      79              : 
      80              :     select_scheduler(select_scheduler const&) = delete;
      81              :     select_scheduler& operator=(select_scheduler const&) = delete;
      82              : 
      83              :     void shutdown() override;
      84              :     void post(capy::coro h) const override;
      85              :     void post(scheduler_op* h) const override;
      86              :     void on_work_started() noexcept override;
      87              :     void on_work_finished() noexcept override;
      88              :     bool running_in_this_thread() const noexcept override;
      89              :     void stop() override;
      90              :     bool stopped() const noexcept override;
      91              :     void restart() override;
      92              :     std::size_t run() override;
      93              :     std::size_t run_one() override;
      94              :     std::size_t wait_one(long usec) override;
      95              :     std::size_t poll() override;
      96              :     std::size_t poll_one() override;
      97              : 
      98              :     /** Return the maximum file descriptor value supported.
      99              : 
     100              :         Returns FD_SETSIZE - 1, the maximum fd value that can be
     101              :         monitored by select(). Operations with fd >= FD_SETSIZE
     102              :         will fail with EINVAL.
     103              : 
     104              :         @return The maximum supported file descriptor value.
     105              :     */
     106              :     static constexpr int max_fd() noexcept { return FD_SETSIZE - 1; }
     107              : 
     108              :     /** Register a file descriptor for monitoring.
     109              : 
     110              :         @param fd The file descriptor to register.
     111              :         @param op The operation associated with this fd.
     112              :         @param events Event mask: 1 = read, 2 = write, 3 = both.
     113              :     */
     114              :     void register_fd(int fd, select_op* op, int events) const;
     115              : 
     116              :     /** Unregister a file descriptor from monitoring.
     117              : 
     118              :         @param fd The file descriptor to unregister.
     119              :         @param events Event mask to remove: 1 = read, 2 = write, 3 = both.
     120              :     */
     121              :     void deregister_fd(int fd, int events) const;
     122              : 
     123              :     /** For use by I/O operations to track pending work. */
     124              :     void work_started() const noexcept override;
     125              : 
     126              :     /** For use by I/O operations to track completed work. */
     127              :     void work_finished() const noexcept override;
     128              : 
     129              :     // Event flags for register_fd/deregister_fd
     130              :     static constexpr int event_read  = 1;
     131              :     static constexpr int event_write = 2;
     132              : 
     133              : private:
     134              :     std::size_t do_one(long timeout_us);
     135              :     void run_reactor(std::unique_lock<std::mutex>& lock);
     136              :     void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
     137              :     void interrupt_reactor() const;
     138              :     long calculate_timeout(long requested_timeout_us) const;
     139              : 
     140              :     // Self-pipe for interrupting select()
     141              :     int pipe_fds_[2];  // [0]=read, [1]=write
     142              : 
     143              :     mutable std::mutex mutex_;
     144              :     mutable std::condition_variable wakeup_event_;
     145              :     mutable op_queue completed_ops_;
     146              :     mutable std::atomic<long> outstanding_work_;
     147              :     std::atomic<bool> stopped_;
     148              :     bool shutdown_;
     149              :     timer_service* timer_svc_ = nullptr;
     150              : 
     151              :     // Per-fd state for tracking registered operations
     152              :     struct fd_state
     153              :     {
     154              :         select_op* read_op = nullptr;
     155              :         select_op* write_op = nullptr;
     156              :     };
     157              :     mutable std::unordered_map<int, fd_state> registered_fds_;
     158              :     mutable int max_fd_ = -1;
     159              : 
     160              :     // Single reactor thread coordination
     161              :     mutable bool reactor_running_ = false;
     162              :     mutable bool reactor_interrupted_ = false;
     163              :     mutable int idle_thread_count_ = 0;
     164              : 
     165              :     // Sentinel operation for interleaving reactor runs with handler execution.
     166              :     // Ensures the reactor runs periodically even when handlers are continuously
     167              :     // posted, preventing timer starvation.
     168              :     struct task_op final : scheduler_op
     169              :     {
     170            0 :         void operator()() override {}
     171            0 :         void destroy() override {}
     172              :     };
     173              :     task_op task_op_;
     174              : };
     175              : 
     176              : } // namespace boost::corosio::detail
     177              : 
     178              : #endif // BOOST_COROSIO_HAS_SELECT
     179              : 
     180              : #endif // BOOST_COROSIO_DETAIL_SELECT_SCHEDULER_HPP
        

Generated by: LCOV version 2.3