Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
11 : #define BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/corosio/detail/scheduler.hpp>
19 : #include <boost/capy/ex/execution_context.hpp>
20 :
21 : #include "src/detail/scheduler_op.hpp"
22 : #include "src/detail/timer_service.hpp"
23 :
24 : #include <atomic>
25 : #include <condition_variable>
26 : #include <cstddef>
27 : #include <cstdint>
28 : #include <mutex>
29 :
30 : namespace boost::corosio::detail {
31 :
32 : struct epoll_op;
33 : struct descriptor_state;
34 : struct scheduler_context;
35 :
36 : /** Linux scheduler using epoll for I/O multiplexing.
37 :
38 : This scheduler implements the scheduler interface using Linux epoll
39 : for efficient I/O event notification. It uses a single reactor model
40 : where one thread runs epoll_wait while other threads
41 : wait on a condition variable for handler work. This design provides:
42 :
43 : - Handler parallelism: N posted handlers can execute on N threads
44 : - No thundering herd: condition_variable wakes exactly one thread
45 : - IOCP parity: Behavior matches Windows I/O completion port semantics
46 :
47 : When threads call run(), they first try to execute queued handlers.
48 : If the queue is empty and no reactor is running, one thread becomes
49 : the reactor and runs epoll_wait. Other threads wait on a condition
50 : variable until handlers are available.
51 :
52 : @par Thread Safety
53 : All public member functions are thread-safe.
54 : */
55 : class epoll_scheduler
56 : : public scheduler
57 : , public capy::execution_context::service
58 : {
59 : public:
60 : using key_type = scheduler;
61 :
62 : /** Construct the scheduler.
63 :
64 : Creates an epoll instance, eventfd for reactor interruption,
65 : and timerfd for kernel-managed timer expiry.
66 :
67 : @param ctx Reference to the owning execution_context.
68 : @param concurrency_hint Hint for expected thread count (unused).
69 : */
70 : epoll_scheduler(
71 : capy::execution_context& ctx,
72 : int concurrency_hint = -1);
73 :
74 : /// Destroy the scheduler.
75 : ~epoll_scheduler();
76 :
77 : epoll_scheduler(epoll_scheduler const&) = delete;
78 : epoll_scheduler& operator=(epoll_scheduler const&) = delete;
79 :
80 : void shutdown() override;
81 : void post(capy::coro h) const override;
82 : void post(scheduler_op* h) const override;
83 : void on_work_started() noexcept override;
84 : void on_work_finished() noexcept override;
85 : bool running_in_this_thread() const noexcept override;
86 : void stop() override;
87 : bool stopped() const noexcept override;
88 : void restart() override;
89 : std::size_t run() override;
90 : std::size_t run_one() override;
91 : std::size_t wait_one(long usec) override;
92 : std::size_t poll() override;
93 : std::size_t poll_one() override;
94 :
95 : /** Return the epoll file descriptor.
96 :
97 : Used by socket services to register file descriptors
98 : for I/O event notification.
99 :
100 : @return The epoll file descriptor.
101 : */
102 : int epoll_fd() const noexcept { return epoll_fd_; }
103 :
104 : /** Register a descriptor for persistent monitoring.
105 :
106 : The fd is registered once and stays registered until explicitly
107 : deregistered. Events are dispatched via descriptor_state which
108 : tracks pending read/write/connect operations.
109 :
110 : @param fd The file descriptor to register.
111 : @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
112 : */
113 : void register_descriptor(int fd, descriptor_state* desc) const;
114 :
115 : /** Deregister a persistently registered descriptor.
116 :
117 : @param fd The file descriptor to deregister.
118 : */
119 : void deregister_descriptor(int fd) const;
120 :
121 : /** For use by I/O operations to track pending work. */
122 : void work_started() const noexcept override;
123 :
124 : /** For use by I/O operations to track completed work. */
125 : void work_finished() const noexcept override;
126 :
127 : /** Offset a forthcoming work_finished from work_cleanup.
128 :
129 : Called by descriptor_state when all I/O returned EAGAIN and no
130 : handler will be executed. Must be called from a scheduler thread.
131 : */
132 : void compensating_work_started() const noexcept;
133 :
134 : /** Drain work from thread context's private queue to global queue.
135 :
136 : Called by thread_context_guard destructor when a thread exits run().
137 : Transfers pending work to the global queue under mutex protection.
138 :
139 : @param queue The private queue to drain.
140 : @param count Item count for wakeup decisions (wakes other threads if positive).
141 : */
142 : void drain_thread_queue(op_queue& queue, long count) const;
143 :
144 : /** Post completed operations for deferred invocation.
145 :
146 : If called from a thread running this scheduler, operations go to
147 : the thread's private queue (fast path). Otherwise, operations are
148 : added to the global queue under mutex and a waiter is signaled.
149 :
150 : @par Preconditions
151 : work_started() must have been called for each operation.
152 :
153 : @param ops Queue of operations to post.
154 : */
155 : void post_deferred_completions(op_queue& ops) const;
156 :
157 : private:
158 : friend struct work_cleanup;
159 : friend struct task_cleanup;
160 :
161 : std::size_t do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx);
162 : void run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx);
163 : void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
164 : void interrupt_reactor() const;
165 : void update_timerfd() const;
166 :
167 : /** Set the signaled state and wake all waiting threads.
168 :
169 : @par Preconditions
170 : Mutex must be held.
171 :
172 : @param lock The held mutex lock.
173 : */
174 : void signal_all(std::unique_lock<std::mutex>& lock) const;
175 :
176 : /** Set the signaled state and wake one waiter if any exist.
177 :
178 : Only unlocks and signals if at least one thread is waiting.
179 : Use this when the caller needs to perform a fallback action
180 : (such as interrupting the reactor) when no waiters exist.
181 :
182 : @par Preconditions
183 : Mutex must be held.
184 :
185 : @param lock The held mutex lock.
186 :
187 : @return `true` if unlocked and signaled, `false` if lock still held.
188 : */
189 : bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
190 :
191 : /** Set the signaled state, unlock, and wake one waiter if any exist.
192 :
193 : Always unlocks the mutex. Use this when the caller will release
194 : the lock regardless of whether a waiter exists.
195 :
196 : @par Preconditions
197 : Mutex must be held.
198 :
199 : @param lock The held mutex lock.
200 : */
201 : void unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
202 :
203 : /** Clear the signaled state before waiting.
204 :
205 : @par Preconditions
206 : Mutex must be held.
207 : */
208 : void clear_signal() const;
209 :
210 : /** Block until the signaled state is set.
211 :
212 : Returns immediately if already signaled (fast-path). Otherwise
213 : increments the waiter count, waits on the condition variable,
214 : and decrements the waiter count upon waking.
215 :
216 : @par Preconditions
217 : Mutex must be held.
218 :
219 : @param lock The held mutex lock.
220 : */
221 : void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
222 :
223 : /** Block until signaled or timeout expires.
224 :
225 : @par Preconditions
226 : Mutex must be held.
227 :
228 : @param lock The held mutex lock.
229 : @param timeout_us Maximum time to wait in microseconds.
230 : */
231 : void wait_for_signal_for(
232 : std::unique_lock<std::mutex>& lock,
233 : long timeout_us) const;
234 :
235 : int epoll_fd_;
236 : int event_fd_; // for interrupting reactor
237 : int timer_fd_; // timerfd for kernel-managed timer expiry
238 : mutable std::mutex mutex_;
239 : mutable std::condition_variable cond_;
240 : mutable op_queue completed_ops_;
241 : mutable std::atomic<long> outstanding_work_;
242 : bool stopped_;
243 : bool shutdown_;
244 : timer_service* timer_svc_ = nullptr;
245 :
246 : // True while a thread is blocked in epoll_wait. Used by
247 : // wake_one_thread_and_unlock and work_finished to know when
248 : // an eventfd interrupt is needed instead of a condvar signal.
249 : mutable bool task_running_ = false;
250 :
251 : // True when the reactor has been told to do a non-blocking poll
252 : // (more handlers queued or poll mode). Prevents redundant eventfd
253 : // writes and controls the epoll_wait timeout.
254 : mutable bool task_interrupted_ = false;
255 :
256 : // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
257 : mutable std::size_t state_ = 0;
258 :
259 : // Edge-triggered eventfd state
260 : mutable std::atomic<bool> eventfd_armed_{false};
261 :
262 : // Sentinel operation for interleaving reactor runs with handler execution.
263 : // Ensures the reactor runs periodically even when handlers are continuously
264 : // posted, preventing starvation of I/O events, timers, and signals.
265 : struct task_op final : scheduler_op
266 : {
267 0 : void operator()() override {}
268 0 : void destroy() override {}
269 : };
270 : task_op task_op_;
271 : };
272 :
273 : } // namespace boost::corosio::detail
274 :
275 : #endif // BOOST_COROSIO_HAS_EPOLL
276 :
277 : #endif // BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
|