libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp

79.4% Lines (396/499) 87.5% Functions (42/48) 65.8% Branches (210/319)
libs/corosio/src/corosio/src/detail/epoll/scheduler.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/scheduler.hpp"
15 #include "src/detail/epoll/op.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/posix/resolver_service.hpp"
18 #include "src/detail/posix/signals.hpp"
19
20 #include <boost/corosio/detail/except.hpp>
21 #include <boost/corosio/detail/thread_local_ptr.hpp>
22
23 #include <atomic>
24 #include <chrono>
25 #include <limits>
26 #include <utility>
27
28 #include <errno.h>
29 #include <fcntl.h>
30 #include <sys/epoll.h>
31 #include <sys/eventfd.h>
32 #include <sys/socket.h>
33 #include <sys/timerfd.h>
34 #include <unistd.h>
35
36 /*
37 epoll Scheduler - Single Reactor Model
38 ======================================
39
40 This scheduler uses a thread coordination strategy to provide handler
41 parallelism and avoid the thundering herd problem.
42 Instead of all threads blocking on epoll_wait(), one thread becomes the
43 "reactor" while others wait on a condition variable for handler work.
44
45 Thread Model
46 ------------
47 - ONE thread runs epoll_wait() at a time (the reactor thread)
48 - OTHER threads wait on cond_ (condition variable) for handlers
49 - When work is posted, exactly one waiting thread wakes via notify_one()
50 - This matches Windows IOCP semantics where N posted items wake N threads
51
52 Event Loop Structure (do_one)
53 -----------------------------
54 1. Lock mutex, try to pop handler from queue
55 2. If got handler: execute it (unlocked), return
56 3. If queue empty and no reactor running: become reactor
57 - Run epoll_wait (unlocked), queue I/O completions, loop back
58 4. If queue empty and reactor running: wait on condvar for work
59
60 The task_running_ flag ensures only one thread owns epoll_wait().
61 After the reactor queues I/O completions, it loops back to try getting
62 a handler, giving priority to handler execution over more I/O polling.
63
64 Signaling State (state_)
65 ------------------------
66 The state_ variable encodes two pieces of information:
67 - Bit 0: signaled flag (1 = signaled, persists until cleared)
68 - Upper bits: waiter count (each waiter adds 2 before blocking)
69
70 This allows efficient coordination:
71 - Signalers only call notify when waiters exist (state_ > 1)
72 - Waiters check if already signaled before blocking (fast-path)
73
74 Wake Coordination (wake_one_thread_and_unlock)
75 ----------------------------------------------
76 When posting work:
77 - If waiters exist (state_ > 1): signal and notify_one()
78 - Else if reactor running: interrupt via eventfd write
79 - Else: no-op (thread will find work when it checks queue)
80
81 This avoids waking threads unnecessarily. With cascading wakes,
82 each handler execution wakes at most one additional thread if
83 more work exists in the queue.
84
85 Work Counting
86 -------------
87 outstanding_work_ tracks pending operations. When it hits zero, run()
88 returns. Each operation increments on start, decrements on completion.
89
90 Timer Integration
91 -----------------
92 Timers are handled by timer_service. The reactor adjusts epoll_wait
93 timeout to wake for the nearest timer expiry. When a new timer is
94 scheduled earlier than current, timer_service calls interrupt_reactor()
95 to re-evaluate the timeout.
96 */
97
98 namespace boost::corosio::detail {
99
100 struct scheduler_context
101 {
102 epoll_scheduler const* key;
103 scheduler_context* next;
104 op_queue private_queue;
105 long private_outstanding_work;
106
107 168 scheduler_context(epoll_scheduler const* k, scheduler_context* n)
108 168 : key(k)
109 168 , next(n)
110 168 , private_outstanding_work(0)
111 {
112 168 }
113 };
114
115 namespace {
116
117 corosio::detail::thread_local_ptr<scheduler_context> context_stack;
118
119 struct thread_context_guard
120 {
121 scheduler_context frame_;
122
123 168 explicit thread_context_guard(
124 epoll_scheduler const* ctx) noexcept
125 168 : frame_(ctx, context_stack.get())
126 {
127 168 context_stack.set(&frame_);
128 168 }
129
130 168 ~thread_context_guard() noexcept
131 {
132
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 168 times.
168 if (!frame_.private_queue.empty())
133 frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
134 168 context_stack.set(frame_.next);
135 168 }
136 };
137
138 scheduler_context*
139 357461 find_context(epoll_scheduler const* self) noexcept
140 {
141
2/2
✓ Branch 1 taken 355812 times.
✓ Branch 2 taken 1649 times.
357461 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
142
1/2
✓ Branch 0 taken 355812 times.
✗ Branch 1 not taken.
355812 if (c->key == self)
143 355812 return c;
144 1649 return nullptr;
145 }
146
147 /// Flush private work count to global counter.
148 void
149 flush_private_work(
150 scheduler_context* ctx,
151 std::atomic<long>& outstanding_work) noexcept
152 {
153 if (ctx && ctx->private_outstanding_work > 0)
154 {
155 outstanding_work.fetch_add(
156 ctx->private_outstanding_work, std::memory_order_relaxed);
157 ctx->private_outstanding_work = 0;
158 }
159 }
160
161 /// Drain private queue to global queue, flushing work count first.
162 ///
163 /// @return True if any ops were drained.
164 bool
165 4 drain_private_queue(
166 scheduler_context* ctx,
167 std::atomic<long>& outstanding_work,
168 op_queue& completed_ops) noexcept
169 {
170
3/6
✓ Branch 0 taken 4 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 4 times.
✗ Branch 4 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
4 if (!ctx || ctx->private_queue.empty())
171 4 return false;
172
173 flush_private_work(ctx, outstanding_work);
174 completed_ops.splice(ctx->private_queue);
175 return true;
176 }
177
178 } // namespace
179
180 void
181 125169 descriptor_state::
182 operator()()
183 {
184 125169 is_enqueued_.store(false, std::memory_order_relaxed);
185
186 // Take ownership of impl ref set by close_socket() to prevent
187 // the owning impl from being freed while we're executing
188 125169 auto prevent_impl_destruction = std::move(impl_ref_);
189
190 125169 std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
191
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 125169 times.
125169 if (ev == 0)
192 {
193 scheduler_->compensating_work_started();
194 return;
195 }
196
197 125169 op_queue local_ops;
198
199 125169 int err = 0;
200
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 125168 times.
125169 if (ev & EPOLLERR)
201 {
202 1 socklen_t len = sizeof(err);
203
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
204 err = errno;
205
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if (err == 0)
206 1 err = EIO;
207 }
208
209 125169 epoll_op* rd = nullptr;
210 125169 epoll_op* wr = nullptr;
211 125169 epoll_op* cn = nullptr;
212 {
213
1/1
✓ Branch 1 taken 125169 times.
125169 std::lock_guard lock(mutex);
214
2/2
✓ Branch 0 taken 58871 times.
✓ Branch 1 taken 66298 times.
125169 if (ev & EPOLLIN)
215 {
216 58871 rd = std::exchange(read_op, nullptr);
217
2/2
✓ Branch 0 taken 53804 times.
✓ Branch 1 taken 5067 times.
58871 if (!rd)
218 53804 read_ready = true;
219 }
220
2/2
✓ Branch 0 taken 120154 times.
✓ Branch 1 taken 5015 times.
125169 if (ev & EPOLLOUT)
221 {
222 120154 cn = std::exchange(connect_op, nullptr);
223 120154 wr = std::exchange(write_op, nullptr);
224
3/4
✓ Branch 0 taken 115136 times.
✓ Branch 1 taken 5018 times.
✓ Branch 2 taken 115136 times.
✗ Branch 3 not taken.
120154 if (!cn && !wr)
225 115136 write_ready = true;
226 }
227
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 125168 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
125169 if (err && !(ev & (EPOLLIN | EPOLLOUT)))
228 {
229 rd = std::exchange(read_op, nullptr);
230 wr = std::exchange(write_op, nullptr);
231 cn = std::exchange(connect_op, nullptr);
232 }
233 125169 }
234
235 // Non-null after I/O means EAGAIN; re-register under lock below
236
2/2
✓ Branch 0 taken 5067 times.
✓ Branch 1 taken 120102 times.
125169 if (rd)
237 {
238
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5067 times.
5067 if (err)
239 rd->complete(err, 0);
240 else
241 5067 rd->perform_io();
242
243
2/4
✓ Branch 0 taken 5067 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 5067 times.
5067 if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
244 {
245 rd->errn = 0;
246 }
247 else
248 {
249 5067 local_ops.push(rd);
250 5067 rd = nullptr;
251 }
252 }
253
254
2/2
✓ Branch 0 taken 5018 times.
✓ Branch 1 taken 120151 times.
125169 if (cn)
255 {
256
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5018 times.
5018 if (err)
257 cn->complete(err, 0);
258 else
259 5018 cn->perform_io();
260 5018 local_ops.push(cn);
261 5018 cn = nullptr;
262 }
263
264
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 125169 times.
125169 if (wr)
265 {
266 if (err)
267 wr->complete(err, 0);
268 else
269 wr->perform_io();
270
271 if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
272 {
273 wr->errn = 0;
274 }
275 else
276 {
277 local_ops.push(wr);
278 wr = nullptr;
279 }
280 }
281
282
2/4
✓ Branch 0 taken 125169 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 125169 times.
125169 if (rd || wr)
283 {
284 std::lock_guard lock(mutex);
285 if (rd)
286 read_op = rd;
287 if (wr)
288 write_op = wr;
289 }
290
291 // Execute first handler inline — the scheduler's work_cleanup
292 // accounts for this as the "consumed" work item
293 125169 scheduler_op* first = local_ops.pop();
294
2/2
✓ Branch 0 taken 10085 times.
✓ Branch 1 taken 115084 times.
125169 if (first)
295 {
296
1/1
✓ Branch 1 taken 10085 times.
10085 scheduler_->post_deferred_completions(local_ops);
297
1/1
✓ Branch 1 taken 10085 times.
10085 (*first)();
298 }
299 else
300 {
301 115084 scheduler_->compensating_work_started();
302 }
303 125169 }
304
305 189 epoll_scheduler::
306 epoll_scheduler(
307 capy::execution_context& ctx,
308 189 int)
309 189 : epoll_fd_(-1)
310 189 , event_fd_(-1)
311 189 , timer_fd_(-1)
312 189 , outstanding_work_(0)
313 189 , stopped_(false)
314 189 , shutdown_(false)
315 189 , task_running_(false)
316 189 , task_interrupted_(false)
317 378 , state_(0)
318 {
319 189 epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
320
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (epoll_fd_ < 0)
321 detail::throw_system_error(make_err(errno), "epoll_create1");
322
323 189 event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
324
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (event_fd_ < 0)
325 {
326 int errn = errno;
327 ::close(epoll_fd_);
328 detail::throw_system_error(make_err(errn), "eventfd");
329 }
330
331 189 timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
332
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 189 times.
189 if (timer_fd_ < 0)
333 {
334 int errn = errno;
335 ::close(event_fd_);
336 ::close(epoll_fd_);
337 detail::throw_system_error(make_err(errn), "timerfd_create");
338 }
339
340 189 epoll_event ev{};
341 189 ev.events = EPOLLIN | EPOLLET;
342 189 ev.data.ptr = nullptr;
343
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
344 {
345 int errn = errno;
346 ::close(timer_fd_);
347 ::close(event_fd_);
348 ::close(epoll_fd_);
349 detail::throw_system_error(make_err(errn), "epoll_ctl");
350 }
351
352 189 epoll_event timer_ev{};
353 189 timer_ev.events = EPOLLIN | EPOLLERR;
354 189 timer_ev.data.ptr = &timer_fd_;
355
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 189 times.
189 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
356 {
357 int errn = errno;
358 ::close(timer_fd_);
359 ::close(event_fd_);
360 ::close(epoll_fd_);
361 detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
362 }
363
364
1/1
✓ Branch 1 taken 189 times.
189 timer_svc_ = &get_timer_service(ctx, *this);
365
1/1
✓ Branch 3 taken 189 times.
189 timer_svc_->set_on_earliest_changed(
366 timer_service::callback(
367 this,
368 5261 [](void* p) { static_cast<epoll_scheduler*>(p)->update_timerfd(); }));
369
370 // Initialize resolver service
371
1/1
✓ Branch 1 taken 189 times.
189 get_resolver_service(ctx, *this);
372
373 // Initialize signal service
374
1/1
✓ Branch 1 taken 189 times.
189 get_signal_service(ctx, *this);
375
376 // Push task sentinel to interleave reactor runs with handler execution
377 189 completed_ops_.push(&task_op_);
378 189 }
379
380 378 epoll_scheduler::
381 189 ~epoll_scheduler()
382 {
383
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (timer_fd_ >= 0)
384 189 ::close(timer_fd_);
385
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
386 189 ::close(event_fd_);
387
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (epoll_fd_ >= 0)
388 189 ::close(epoll_fd_);
389 378 }
390
391 void
392 189 epoll_scheduler::
393 shutdown()
394 {
395 {
396
1/1
✓ Branch 1 taken 189 times.
189 std::unique_lock lock(mutex_);
397 189 shutdown_ = true;
398
399
2/2
✓ Branch 1 taken 189 times.
✓ Branch 2 taken 189 times.
378 while (auto* h = completed_ops_.pop())
400 {
401
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (h == &task_op_)
402 189 continue;
403 lock.unlock();
404 h->destroy();
405 lock.lock();
406 189 }
407
408 189 signal_all(lock);
409 189 }
410
411 189 outstanding_work_.store(0, std::memory_order_release);
412
413
1/2
✓ Branch 0 taken 189 times.
✗ Branch 1 not taken.
189 if (event_fd_ >= 0)
414 189 interrupt_reactor();
415 189 }
416
417 void
418 6893 epoll_scheduler::
419 post(capy::coro h) const
420 {
421 struct post_handler final
422 : scheduler_op
423 {
424 capy::coro h_;
425
426 explicit
427 6893 post_handler(capy::coro h)
428 6893 : h_(h)
429 {
430 6893 }
431
432 13786 ~post_handler() = default;
433
434 6893 void operator()() override
435 {
436 6893 auto h = h_;
437
1/2
✓ Branch 0 taken 6893 times.
✗ Branch 1 not taken.
6893 delete this;
438 std::atomic_thread_fence(std::memory_order_acquire);
439
1/1
✓ Branch 1 taken 6893 times.
6893 h.resume();
440 6893 }
441
442 void destroy() override
443 {
444 delete this;
445 }
446 };
447
448
1/1
✓ Branch 1 taken 6893 times.
6893 auto ph = std::make_unique<post_handler>(h);
449
450 // Fast path: same thread posts to private queue
451 // Only count locally; work_cleanup batches to global counter
452
2/2
✓ Branch 1 taken 5270 times.
✓ Branch 2 taken 1623 times.
6893 if (auto* ctx = find_context(this))
453 {
454 5270 ++ctx->private_outstanding_work;
455 5270 ctx->private_queue.push(ph.release());
456 5270 return;
457 }
458
459 // Slow path: cross-thread post requires mutex
460 1623 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
461
462
1/1
✓ Branch 1 taken 1623 times.
1623 std::unique_lock lock(mutex_);
463 1623 completed_ops_.push(ph.release());
464
1/1
✓ Branch 1 taken 1623 times.
1623 wake_one_thread_and_unlock(lock);
465 6893 }
466
467 void
468 235484 epoll_scheduler::
469 post(scheduler_op* h) const
470 {
471 // Fast path: same thread posts to private queue
472 // Only count locally; work_cleanup batches to global counter
473
2/2
✓ Branch 1 taken 235458 times.
✓ Branch 2 taken 26 times.
235484 if (auto* ctx = find_context(this))
474 {
475 235458 ++ctx->private_outstanding_work;
476 235458 ctx->private_queue.push(h);
477 235458 return;
478 }
479
480 // Slow path: cross-thread post requires mutex
481 26 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
482
483
1/1
✓ Branch 1 taken 26 times.
26 std::unique_lock lock(mutex_);
484 26 completed_ops_.push(h);
485
1/1
✓ Branch 1 taken 26 times.
26 wake_one_thread_and_unlock(lock);
486 26 }
487
488 void
489 5285 epoll_scheduler::
490 on_work_started() noexcept
491 {
492 5285 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
493 5285 }
494
495 void
496 5253 epoll_scheduler::
497 on_work_finished() noexcept
498 {
499
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5253 times.
10506 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
500 stop();
501 5253 }
502
503 bool
504 5500 epoll_scheduler::
505 running_in_this_thread() const noexcept
506 {
507
2/2
✓ Branch 1 taken 5290 times.
✓ Branch 2 taken 210 times.
5500 for (auto* c = context_stack.get(); c != nullptr; c = c->next)
508
1/2
✓ Branch 0 taken 5290 times.
✗ Branch 1 not taken.
5290 if (c->key == this)
509 5290 return true;
510 210 return false;
511 }
512
513 void
514 30 epoll_scheduler::
515 stop()
516 {
517
1/1
✓ Branch 1 taken 30 times.
30 std::unique_lock lock(mutex_);
518
2/2
✓ Branch 0 taken 20 times.
✓ Branch 1 taken 10 times.
30 if (!stopped_)
519 {
520 20 stopped_ = true;
521 20 signal_all(lock);
522
1/1
✓ Branch 1 taken 20 times.
20 interrupt_reactor();
523 }
524 30 }
525
526 bool
527 16 epoll_scheduler::
528 stopped() const noexcept
529 {
530 16 std::unique_lock lock(mutex_);
531 32 return stopped_;
532 16 }
533
534 void
535 49 epoll_scheduler::
536 restart()
537 {
538
1/1
✓ Branch 1 taken 49 times.
49 std::unique_lock lock(mutex_);
539 49 stopped_ = false;
540 49 }
541
542 std::size_t
543 175 epoll_scheduler::
544 run()
545 {
546
2/2
✓ Branch 1 taken 21 times.
✓ Branch 2 taken 154 times.
350 if (outstanding_work_.load(std::memory_order_acquire) == 0)
547 {
548
1/1
✓ Branch 1 taken 21 times.
21 stop();
549 21 return 0;
550 }
551
552 154 thread_context_guard ctx(this);
553
1/1
✓ Branch 1 taken 154 times.
154 std::unique_lock lock(mutex_);
554
555 154 std::size_t n = 0;
556 for (;;)
557 {
558
3/3
✓ Branch 1 taken 367685 times.
✓ Branch 3 taken 154 times.
✓ Branch 4 taken 367531 times.
367685 if (!do_one(lock, -1, &ctx.frame_))
559 154 break;
560
1/2
✓ Branch 1 taken 367531 times.
✗ Branch 2 not taken.
367531 if (n != (std::numeric_limits<std::size_t>::max)())
561 367531 ++n;
562
2/2
✓ Branch 1 taken 132102 times.
✓ Branch 2 taken 235429 times.
367531 if (!lock.owns_lock())
563
1/1
✓ Branch 1 taken 132102 times.
132102 lock.lock();
564 }
565 154 return n;
566 154 }
567
568 std::size_t
569 2 epoll_scheduler::
570 run_one()
571 {
572
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 2 times.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
573 {
574 stop();
575 return 0;
576 }
577
578 2 thread_context_guard ctx(this);
579
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
580
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, -1, &ctx.frame_);
581 2 }
582
583 std::size_t
584 14 epoll_scheduler::
585 wait_one(long usec)
586 {
587
2/2
✓ Branch 1 taken 5 times.
✓ Branch 2 taken 9 times.
28 if (outstanding_work_.load(std::memory_order_acquire) == 0)
588 {
589
1/1
✓ Branch 1 taken 5 times.
5 stop();
590 5 return 0;
591 }
592
593 9 thread_context_guard ctx(this);
594
1/1
✓ Branch 1 taken 9 times.
9 std::unique_lock lock(mutex_);
595
1/1
✓ Branch 1 taken 9 times.
9 return do_one(lock, usec, &ctx.frame_);
596 9 }
597
598 std::size_t
599 2 epoll_scheduler::
600 poll()
601 {
602
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
4 if (outstanding_work_.load(std::memory_order_acquire) == 0)
603 {
604
1/1
✓ Branch 1 taken 1 time.
1 stop();
605 1 return 0;
606 }
607
608 1 thread_context_guard ctx(this);
609
1/1
✓ Branch 1 taken 1 time.
1 std::unique_lock lock(mutex_);
610
611 1 std::size_t n = 0;
612 for (;;)
613 {
614
3/3
✓ Branch 1 taken 3 times.
✓ Branch 3 taken 1 time.
✓ Branch 4 taken 2 times.
3 if (!do_one(lock, 0, &ctx.frame_))
615 1 break;
616
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (n != (std::numeric_limits<std::size_t>::max)())
617 2 ++n;
618
1/2
✓ Branch 1 taken 2 times.
✗ Branch 2 not taken.
2 if (!lock.owns_lock())
619
1/1
✓ Branch 1 taken 2 times.
2 lock.lock();
620 }
621 1 return n;
622 1 }
623
624 std::size_t
625 4 epoll_scheduler::
626 poll_one()
627 {
628
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 2 times.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0)
629 {
630
1/1
✓ Branch 1 taken 2 times.
2 stop();
631 2 return 0;
632 }
633
634 2 thread_context_guard ctx(this);
635
1/1
✓ Branch 1 taken 2 times.
2 std::unique_lock lock(mutex_);
636
1/1
✓ Branch 1 taken 2 times.
2 return do_one(lock, 0, &ctx.frame_);
637 2 }
638
639 void
640 10108 epoll_scheduler::
641 register_descriptor(int fd, descriptor_state* desc) const
642 {
643 10108 epoll_event ev{};
644 10108 ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
645 10108 ev.data.ptr = desc;
646
647
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 10108 times.
10108 if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
648 detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
649
650 10108 desc->registered_events = ev.events;
651 10108 desc->fd = fd;
652 10108 desc->scheduler_ = this;
653
654
1/1
✓ Branch 1 taken 10108 times.
10108 std::lock_guard lock(desc->mutex);
655 10108 desc->read_ready = false;
656 10108 desc->write_ready = false;
657 10108 }
658
659 void
660 10108 epoll_scheduler::
661 deregister_descriptor(int fd) const
662 {
663 10108 ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
664 10108 }
665
666 void
667 10213 epoll_scheduler::
668 work_started() const noexcept
669 {
670 10213 outstanding_work_.fetch_add(1, std::memory_order_relaxed);
671 10213 }
672
673 void
674 17190 epoll_scheduler::
675 work_finished() const noexcept
676 {
677
2/2
✓ Branch 0 taken 148 times.
✓ Branch 1 taken 17042 times.
34380 if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
678 {
679 // Last work item completed - wake all threads so they can exit.
680 // signal_all() wakes threads waiting on the condvar.
681 // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
682 // Both are needed because they target different blocking mechanisms.
683 148 std::unique_lock lock(mutex_);
684 148 signal_all(lock);
685
3/4
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 145 times.
✓ Branch 2 taken 3 times.
✗ Branch 3 not taken.
148 if (task_running_ && !task_interrupted_)
686 {
687 3 task_interrupted_ = true;
688 3 lock.unlock();
689 3 interrupt_reactor();
690 }
691 148 }
692 17190 }
693
694 void
695 115084 epoll_scheduler::
696 compensating_work_started() const noexcept
697 {
698 115084 auto* ctx = find_context(this);
699
1/2
✓ Branch 0 taken 115084 times.
✗ Branch 1 not taken.
115084 if (ctx)
700 115084 ++ctx->private_outstanding_work;
701 115084 }
702
703 void
704 epoll_scheduler::
705 drain_thread_queue(op_queue& queue, long count) const
706 {
707 // Note: outstanding_work_ was already incremented when posting
708 std::unique_lock lock(mutex_);
709 completed_ops_.splice(queue);
710 if (count > 0)
711 maybe_unlock_and_signal_one(lock);
712 }
713
714 void
715 10085 epoll_scheduler::
716 post_deferred_completions(op_queue& ops) const
717 {
718
1/2
✓ Branch 1 taken 10085 times.
✗ Branch 2 not taken.
10085 if (ops.empty())
719 10085 return;
720
721 // Fast path: if on scheduler thread, use private queue
722 if (auto* ctx = find_context(this))
723 {
724 ctx->private_queue.splice(ops);
725 return;
726 }
727
728 // Slow path: add to global queue and wake a thread
729 std::unique_lock lock(mutex_);
730 completed_ops_.splice(ops);
731 wake_one_thread_and_unlock(lock);
732 }
733
734 void
735 238 epoll_scheduler::
736 interrupt_reactor() const
737 {
738 // Only write if not already armed to avoid redundant writes
739 238 bool expected = false;
740
2/2
✓ Branch 1 taken 224 times.
✓ Branch 2 taken 14 times.
238 if (eventfd_armed_.compare_exchange_strong(expected, true,
741 std::memory_order_release, std::memory_order_relaxed))
742 {
743 224 std::uint64_t val = 1;
744
1/1
✓ Branch 1 taken 224 times.
224 [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
745 }
746 238 }
747
748 void
749 357 epoll_scheduler::
750 signal_all(std::unique_lock<std::mutex>&) const
751 {
752 357 state_ |= 1;
753 357 cond_.notify_all();
754 357 }
755
756 bool
757 81138 epoll_scheduler::
758 maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
759 {
760 81138 state_ |= 1;
761
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 81138 times.
81138 if (state_ > 1)
762 {
763 lock.unlock();
764 cond_.notify_one();
765 return true;
766 }
767 81138 return false;
768 }
769
770 void
771 496003 epoll_scheduler::
772 unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
773 {
774 496003 state_ |= 1;
775 496003 bool have_waiters = state_ > 1;
776 496003 lock.unlock();
777
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 496003 times.
496003 if (have_waiters)
778 cond_.notify_one();
779 496003 }
780
781 void
782 epoll_scheduler::
783 clear_signal() const
784 {
785 state_ &= ~std::size_t(1);
786 }
787
788 void
789 epoll_scheduler::
790 wait_for_signal(std::unique_lock<std::mutex>& lock) const
791 {
792 while ((state_ & 1) == 0)
793 {
794 state_ += 2;
795 cond_.wait(lock);
796 state_ -= 2;
797 }
798 }
799
800 void
801 epoll_scheduler::
802 wait_for_signal_for(
803 std::unique_lock<std::mutex>& lock,
804 long timeout_us) const
805 {
806 if ((state_ & 1) == 0)
807 {
808 state_ += 2;
809 cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
810 state_ -= 2;
811 }
812 }
813
814 void
815 1649 epoll_scheduler::
816 wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
817 {
818
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1649 times.
1649 if (maybe_unlock_and_signal_one(lock))
819 return;
820
821
3/4
✓ Branch 0 taken 26 times.
✓ Branch 1 taken 1623 times.
✓ Branch 2 taken 26 times.
✗ Branch 3 not taken.
1649 if (task_running_ && !task_interrupted_)
822 {
823 26 task_interrupted_ = true;
824 26 lock.unlock();
825 26 interrupt_reactor();
826 }
827 else
828 {
829 1623 lock.unlock();
830 }
831 }
832
833 /** RAII guard for handler execution work accounting.
834
835 Handler consumes 1 work item, may produce N new items via fast-path posts.
836 Net change = N - 1:
837 - If N > 1: add (N-1) to global (more work produced than consumed)
838 - If N == 1: net zero, do nothing
839 - If N < 1: call work_finished() (work consumed, may trigger stop)
840
841 Also drains private queue to global for other threads to process.
842 */
843 struct work_cleanup
844 {
845 epoll_scheduler const* scheduler;
846 std::unique_lock<std::mutex>* lock;
847 scheduler_context* ctx;
848
849 367546 ~work_cleanup()
850 {
851
1/2
✓ Branch 0 taken 367546 times.
✗ Branch 1 not taken.
367546 if (ctx)
852 {
853 367546 long produced = ctx->private_outstanding_work;
854
2/2
✓ Branch 0 taken 48 times.
✓ Branch 1 taken 367498 times.
367546 if (produced > 1)
855 48 scheduler->outstanding_work_.fetch_add(produced - 1, std::memory_order_relaxed);
856
2/2
✓ Branch 0 taken 17030 times.
✓ Branch 1 taken 350468 times.
367498 else if (produced < 1)
857 17030 scheduler->work_finished();
858 // produced == 1: net zero, handler consumed what it produced
859 367546 ctx->private_outstanding_work = 0;
860
861
2/2
✓ Branch 1 taken 235432 times.
✓ Branch 2 taken 132114 times.
367546 if (!ctx->private_queue.empty())
862 {
863 235432 lock->lock();
864 235432 scheduler->completed_ops_.splice(ctx->private_queue);
865 }
866 }
867 else
868 {
869 // No thread context - slow-path op was already counted globally
870 scheduler->work_finished();
871 }
872 367546 }
873 };
874
875 /** RAII guard for reactor work accounting.
876
877 Reactor only produces work via timer/signal callbacks posting handlers.
878 Unlike handler execution which consumes 1, the reactor consumes nothing.
879 All produced work must be flushed to global counter.
880 */
881 struct task_cleanup
882 {
883 epoll_scheduler const* scheduler;
884 scheduler_context* ctx;
885
886 138746 ~task_cleanup()
887 {
888
3/4
✓ Branch 0 taken 138746 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 5246 times.
✓ Branch 3 taken 133500 times.
138746 if (ctx && ctx->private_outstanding_work > 0)
889 {
890 5246 scheduler->outstanding_work_.fetch_add(
891 5246 ctx->private_outstanding_work, std::memory_order_relaxed);
892 5246 ctx->private_outstanding_work = 0;
893 }
894 138746 }
895 };
896
897 void
898 10510 epoll_scheduler::
899 update_timerfd() const
900 {
901 10510 auto nearest = timer_svc_->nearest_expiry();
902
903 10510 itimerspec ts{};
904 10510 int flags = 0;
905
906
3/3
✓ Branch 2 taken 10510 times.
✓ Branch 4 taken 10467 times.
✓ Branch 5 taken 43 times.
10510 if (nearest == timer_service::time_point::max())
907 {
908 // No timers - disarm by setting to 0 (relative)
909 }
910 else
911 {
912 10467 auto now = std::chrono::steady_clock::now();
913
3/3
✓ Branch 1 taken 10467 times.
✓ Branch 4 taken 38 times.
✓ Branch 5 taken 10429 times.
10467 if (nearest <= now)
914 {
915 // Use 1ns instead of 0 - zero disarms the timerfd
916 38 ts.it_value.tv_nsec = 1;
917 }
918 else
919 {
920 10429 auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
921
1/1
✓ Branch 1 taken 10429 times.
20858 nearest - now).count();
922 10429 ts.it_value.tv_sec = nsec / 1000000000;
923 10429 ts.it_value.tv_nsec = nsec % 1000000000;
924 // Ensure non-zero to avoid disarming if duration rounds to 0
925
3/4
✓ Branch 0 taken 10417 times.
✓ Branch 1 taken 12 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 10417 times.
10429 if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
926 ts.it_value.tv_nsec = 1;
927 }
928 }
929
930
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 10510 times.
10510 if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
931 detail::throw_system_error(make_err(errno), "timerfd_settime");
932 10510 }
933
934 void
935 138746 epoll_scheduler::
936 run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx)
937 {
938
2/2
✓ Branch 0 taken 128457 times.
✓ Branch 1 taken 10289 times.
138746 int timeout_ms = task_interrupted_ ? 0 : -1;
939
940
2/2
✓ Branch 1 taken 10289 times.
✓ Branch 2 taken 128457 times.
138746 if (lock.owns_lock())
941
1/1
✓ Branch 1 taken 10289 times.
10289 lock.unlock();
942
943 // Flush private work count when reactor completes
944 138746 task_cleanup on_exit{this, ctx};
945 (void)on_exit;
946
947 // Event loop runs without mutex held
948
949 epoll_event events[128];
950
1/1
✓ Branch 1 taken 138746 times.
138746 int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
951 138746 int saved_errno = errno;
952
953
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 138746 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
138746 if (nfds < 0 && saved_errno != EINTR)
954 detail::throw_system_error(make_err(saved_errno), "epoll_wait");
955
956 138746 bool check_timers = false;
957 138746 op_queue local_ops;
958 138746 int completions_queued = 0;
959
960 // Process events without holding the mutex
961
2/2
✓ Branch 0 taken 130453 times.
✓ Branch 1 taken 138746 times.
269199 for (int i = 0; i < nfds; ++i)
962 {
963
2/2
✓ Branch 0 taken 35 times.
✓ Branch 1 taken 130418 times.
130453 if (events[i].data.ptr == nullptr)
964 {
965 std::uint64_t val;
966
1/1
✓ Branch 1 taken 35 times.
35 [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
967 35 eventfd_armed_.store(false, std::memory_order_relaxed);
968 35 continue;
969 35 }
970
971
2/2
✓ Branch 0 taken 5249 times.
✓ Branch 1 taken 125169 times.
130418 if (events[i].data.ptr == &timer_fd_)
972 {
973 std::uint64_t expirations;
974
1/1
✓ Branch 1 taken 5249 times.
5249 [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
975 5249 check_timers = true;
976 5249 continue;
977 5249 }
978
979 // Deferred I/O: just set ready events and enqueue descriptor
980 // No per-descriptor mutex locking in reactor hot path!
981 125169 auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
982 125169 desc->add_ready_events(events[i].events);
983
984 // Only enqueue if not already enqueued
985 125169 bool expected = false;
986
1/2
✓ Branch 1 taken 125169 times.
✗ Branch 2 not taken.
125169 if (desc->is_enqueued_.compare_exchange_strong(expected, true,
987 std::memory_order_release, std::memory_order_relaxed))
988 {
989 125169 local_ops.push(desc);
990 125169 ++completions_queued;
991 }
992 }
993
994 // Process timers only when timerfd fires
995
2/2
✓ Branch 0 taken 5249 times.
✓ Branch 1 taken 133497 times.
138746 if (check_timers)
996 {
997
1/1
✓ Branch 1 taken 5249 times.
5249 timer_svc_->process_expired();
998
1/1
✓ Branch 1 taken 5249 times.
5249 update_timerfd();
999 }
1000
1001 // --- Acquire mutex only for queue operations ---
1002
1/1
✓ Branch 1 taken 138746 times.
138746 lock.lock();
1003
1004
2/2
✓ Branch 1 taken 74245 times.
✓ Branch 2 taken 64501 times.
138746 if (!local_ops.empty())
1005 74245 completed_ops_.splice(local_ops);
1006
1007 // Drain private queue to global (work count handled by task_cleanup)
1008
5/6
✓ Branch 0 taken 138746 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 5246 times.
✓ Branch 4 taken 133500 times.
✓ Branch 5 taken 5246 times.
✓ Branch 6 taken 133500 times.
138746 if (ctx && !ctx->private_queue.empty())
1009 {
1010 5246 completions_queued += ctx->private_outstanding_work;
1011 5246 completed_ops_.splice(ctx->private_queue);
1012 }
1013
1014 // Signal and wake one waiter if work is queued
1015
2/2
✓ Branch 0 taken 79489 times.
✓ Branch 1 taken 59257 times.
138746 if (completions_queued > 0)
1016 {
1017
2/3
✓ Branch 1 taken 79489 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 79489 times.
79489 if (maybe_unlock_and_signal_one(lock))
1018 lock.lock();
1019 }
1020 138746 }
1021
1022 std::size_t
1023 367701 epoll_scheduler::
1024 do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx)
1025 {
1026 for (;;)
1027 {
1028
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 506445 times.
506447 if (stopped_)
1029 2 return 0;
1030
1031 506445 scheduler_op* op = completed_ops_.pop();
1032
1033 // Handle reactor sentinel - time to poll for I/O
1034
2/2
✓ Branch 0 taken 138895 times.
✓ Branch 1 taken 367550 times.
506445 if (op == &task_op_)
1035 {
1036
3/4
✓ Branch 1 taken 10438 times.
✓ Branch 2 taken 128457 times.
✓ Branch 3 taken 10438 times.
✗ Branch 4 not taken.
149333 bool more_handlers = !completed_ops_.empty() ||
1037
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 10438 times.
10438 (ctx && !ctx->private_queue.empty());
1038
1039 // Nothing to run the reactor for: no pending work to wait on,
1040 // or caller requested a non-blocking poll
1041
4/4
✓ Branch 0 taken 10438 times.
✓ Branch 1 taken 128457 times.
✓ Branch 2 taken 149 times.
✓ Branch 3 taken 138746 times.
149333 if (!more_handlers &&
1042
3/4
✓ Branch 1 taken 10289 times.
✓ Branch 2 taken 149 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 10289 times.
20876 (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1043 timeout_us == 0))
1044 {
1045 149 completed_ops_.push(&task_op_);
1046 149 return 0;
1047 }
1048
1049
3/4
✓ Branch 0 taken 10289 times.
✓ Branch 1 taken 128457 times.
✗ Branch 2 not taken.
✓ Branch 3 taken 10289 times.
138746 task_interrupted_ = more_handlers || timeout_us == 0;
1050 138746 task_running_ = true;
1051
1052
2/2
✓ Branch 0 taken 128457 times.
✓ Branch 1 taken 10289 times.
138746 if (more_handlers)
1053 128457 unlock_and_signal_one(lock);
1054
1055 138746 run_task(lock, ctx);
1056
1057 138746 task_running_ = false;
1058 138746 completed_ops_.push(&task_op_);
1059 138746 continue;
1060 138746 }
1061
1062 // Handle operation
1063
2/2
✓ Branch 0 taken 367546 times.
✓ Branch 1 taken 4 times.
367550 if (op != nullptr)
1064 {
1065
1/2
✓ Branch 1 taken 367546 times.
✗ Branch 2 not taken.
367546 if (!completed_ops_.empty())
1066
1/1
✓ Branch 1 taken 367546 times.
367546 unlock_and_signal_one(lock);
1067 else
1068 lock.unlock();
1069
1070 367546 work_cleanup on_exit{this, &lock, ctx};
1071 (void)on_exit;
1072
1073
1/1
✓ Branch 1 taken 367546 times.
367546 (*op)();
1074 367546 return 1;
1075 367546 }
1076
1077 // No work from global queue - try private queue before blocking
1078
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (drain_private_queue(ctx, outstanding_work_, completed_ops_))
1079 continue;
1080
1081 // No pending work to wait on, or caller requested non-blocking poll
1082
2/6
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
✗ Branch 3 not taken.
✗ Branch 4 not taken.
✓ Branch 5 taken 4 times.
✗ Branch 6 not taken.
8 if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1083 timeout_us == 0)
1084 4 return 0;
1085
1086 clear_signal();
1087 if (timeout_us < 0)
1088 wait_for_signal(lock);
1089 else
1090 wait_for_signal_for(lock, timeout_us);
1091 138746 }
1092 }
1093
1094 } // namespace boost::corosio::detail
1095
1096 #endif
1097