libs/corosio/src/corosio/src/detail/epoll/scheduler.hpp

0.0% Lines (0/2) 0.0% Functions (0/2) -% Branches (0/0)
libs/corosio/src/corosio/src/detail/epoll/scheduler.hpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
11 #define BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/detail/scheduler.hpp>
19 #include <boost/capy/ex/execution_context.hpp>
20
21 #include "src/detail/scheduler_op.hpp"
22 #include "src/detail/timer_service.hpp"
23
24 #include <atomic>
25 #include <condition_variable>
26 #include <cstddef>
27 #include <cstdint>
28 #include <mutex>
29
30 namespace boost::corosio::detail {
31
32 struct epoll_op;
33 struct descriptor_state;
34 struct scheduler_context;
35
36 /** Linux scheduler using epoll for I/O multiplexing.
37
38 This scheduler implements the scheduler interface using Linux epoll
39 for efficient I/O event notification. It uses a single reactor model
40 where one thread runs epoll_wait while other threads
41 wait on a condition variable for handler work. This design provides:
42
43 - Handler parallelism: N posted handlers can execute on N threads
44 - No thundering herd: condition_variable wakes exactly one thread
45 - IOCP parity: Behavior matches Windows I/O completion port semantics
46
47 When threads call run(), they first try to execute queued handlers.
48 If the queue is empty and no reactor is running, one thread becomes
49 the reactor and runs epoll_wait. Other threads wait on a condition
50 variable until handlers are available.
51
52 @par Thread Safety
53 All public member functions are thread-safe.
54 */
55 class epoll_scheduler
56 : public scheduler
57 , public capy::execution_context::service
58 {
59 public:
60 using key_type = scheduler;
61
62 /** Construct the scheduler.
63
64 Creates an epoll instance, eventfd for reactor interruption,
65 and timerfd for kernel-managed timer expiry.
66
67 @param ctx Reference to the owning execution_context.
68 @param concurrency_hint Hint for expected thread count (unused).
69 */
70 epoll_scheduler(
71 capy::execution_context& ctx,
72 int concurrency_hint = -1);
73
74 /// Destroy the scheduler.
75 ~epoll_scheduler();
76
77 epoll_scheduler(epoll_scheduler const&) = delete;
78 epoll_scheduler& operator=(epoll_scheduler const&) = delete;
79
80 void shutdown() override;
81 void post(capy::coro h) const override;
82 void post(scheduler_op* h) const override;
83 void on_work_started() noexcept override;
84 void on_work_finished() noexcept override;
85 bool running_in_this_thread() const noexcept override;
86 void stop() override;
87 bool stopped() const noexcept override;
88 void restart() override;
89 std::size_t run() override;
90 std::size_t run_one() override;
91 std::size_t wait_one(long usec) override;
92 std::size_t poll() override;
93 std::size_t poll_one() override;
94
95 /** Return the epoll file descriptor.
96
97 Used by socket services to register file descriptors
98 for I/O event notification.
99
100 @return The epoll file descriptor.
101 */
102 int epoll_fd() const noexcept { return epoll_fd_; }
103
104 /** Register a descriptor for persistent monitoring.
105
106 The fd is registered once and stays registered until explicitly
107 deregistered. Events are dispatched via descriptor_state which
108 tracks pending read/write/connect operations.
109
110 @param fd The file descriptor to register.
111 @param desc Pointer to descriptor data (stored in epoll_event.data.ptr).
112 */
113 void register_descriptor(int fd, descriptor_state* desc) const;
114
115 /** Deregister a persistently registered descriptor.
116
117 @param fd The file descriptor to deregister.
118 */
119 void deregister_descriptor(int fd) const;
120
121 /** For use by I/O operations to track pending work. */
122 void work_started() const noexcept override;
123
124 /** For use by I/O operations to track completed work. */
125 void work_finished() const noexcept override;
126
127 /** Offset a forthcoming work_finished from work_cleanup.
128
129 Called by descriptor_state when all I/O returned EAGAIN and no
130 handler will be executed. Must be called from a scheduler thread.
131 */
132 void compensating_work_started() const noexcept;
133
134 /** Drain work from thread context's private queue to global queue.
135
136 Called by thread_context_guard destructor when a thread exits run().
137 Transfers pending work to the global queue under mutex protection.
138
139 @param queue The private queue to drain.
140 @param count Item count for wakeup decisions (wakes other threads if positive).
141 */
142 void drain_thread_queue(op_queue& queue, long count) const;
143
144 /** Post completed operations for deferred invocation.
145
146 If called from a thread running this scheduler, operations go to
147 the thread's private queue (fast path). Otherwise, operations are
148 added to the global queue under mutex and a waiter is signaled.
149
150 @par Preconditions
151 work_started() must have been called for each operation.
152
153 @param ops Queue of operations to post.
154 */
155 void post_deferred_completions(op_queue& ops) const;
156
157 private:
158 friend struct work_cleanup;
159 friend struct task_cleanup;
160
161 std::size_t do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx);
162 void run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx);
163 void wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const;
164 void interrupt_reactor() const;
165 void update_timerfd() const;
166
167 /** Set the signaled state and wake all waiting threads.
168
169 @par Preconditions
170 Mutex must be held.
171
172 @param lock The held mutex lock.
173 */
174 void signal_all(std::unique_lock<std::mutex>& lock) const;
175
176 /** Set the signaled state and wake one waiter if any exist.
177
178 Only unlocks and signals if at least one thread is waiting.
179 Use this when the caller needs to perform a fallback action
180 (such as interrupting the reactor) when no waiters exist.
181
182 @par Preconditions
183 Mutex must be held.
184
185 @param lock The held mutex lock.
186
187 @return `true` if unlocked and signaled, `false` if lock still held.
188 */
189 bool maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
190
191 /** Set the signaled state, unlock, and wake one waiter if any exist.
192
193 Always unlocks the mutex. Use this when the caller will release
194 the lock regardless of whether a waiter exists.
195
196 @par Preconditions
197 Mutex must be held.
198
199 @param lock The held mutex lock.
200 */
201 void unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const;
202
203 /** Clear the signaled state before waiting.
204
205 @par Preconditions
206 Mutex must be held.
207 */
208 void clear_signal() const;
209
210 /** Block until the signaled state is set.
211
212 Returns immediately if already signaled (fast-path). Otherwise
213 increments the waiter count, waits on the condition variable,
214 and decrements the waiter count upon waking.
215
216 @par Preconditions
217 Mutex must be held.
218
219 @param lock The held mutex lock.
220 */
221 void wait_for_signal(std::unique_lock<std::mutex>& lock) const;
222
223 /** Block until signaled or timeout expires.
224
225 @par Preconditions
226 Mutex must be held.
227
228 @param lock The held mutex lock.
229 @param timeout_us Maximum time to wait in microseconds.
230 */
231 void wait_for_signal_for(
232 std::unique_lock<std::mutex>& lock,
233 long timeout_us) const;
234
235 int epoll_fd_;
236 int event_fd_; // for interrupting reactor
237 int timer_fd_; // timerfd for kernel-managed timer expiry
238 mutable std::mutex mutex_;
239 mutable std::condition_variable cond_;
240 mutable op_queue completed_ops_;
241 mutable std::atomic<long> outstanding_work_;
242 bool stopped_;
243 bool shutdown_;
244 timer_service* timer_svc_ = nullptr;
245
246 // True while a thread is blocked in epoll_wait. Used by
247 // wake_one_thread_and_unlock and work_finished to know when
248 // an eventfd interrupt is needed instead of a condvar signal.
249 mutable bool task_running_ = false;
250
251 // True when the reactor has been told to do a non-blocking poll
252 // (more handlers queued or poll mode). Prevents redundant eventfd
253 // writes and controls the epoll_wait timeout.
254 mutable bool task_interrupted_ = false;
255
256 // Signaling state: bit 0 = signaled, upper bits = waiter count (incremented by 2)
257 mutable std::size_t state_ = 0;
258
259 // Edge-triggered eventfd state
260 mutable std::atomic<bool> eventfd_armed_{false};
261
262 // Sentinel operation for interleaving reactor runs with handler execution.
263 // Ensures the reactor runs periodically even when handlers are continuously
264 // posted, preventing starvation of I/O events, timers, and signals.
265 struct task_op final : scheduler_op
266 {
267 void operator()() override {}
268 void destroy() override {}
269 };
270 task_op task_op_;
271 };
272
273 } // namespace boost::corosio::detail
274
275 #endif // BOOST_COROSIO_HAS_EPOLL
276
277 #endif // BOOST_COROSIO_DETAIL_EPOLL_SCHEDULER_HPP
278