Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #include <boost/corosio/detail/platform.hpp>
11 :
12 : #if BOOST_COROSIO_HAS_EPOLL
13 :
14 : #include "src/detail/epoll/scheduler.hpp"
15 : #include "src/detail/epoll/op.hpp"
16 : #include "src/detail/make_err.hpp"
17 : #include "src/detail/posix/resolver_service.hpp"
18 : #include "src/detail/posix/signals.hpp"
19 :
20 : #include <boost/corosio/detail/except.hpp>
21 : #include <boost/corosio/detail/thread_local_ptr.hpp>
22 :
23 : #include <atomic>
24 : #include <chrono>
25 : #include <limits>
26 : #include <utility>
27 :
28 : #include <errno.h>
29 : #include <fcntl.h>
30 : #include <sys/epoll.h>
31 : #include <sys/eventfd.h>
32 : #include <sys/socket.h>
33 : #include <sys/timerfd.h>
34 : #include <unistd.h>
35 :
36 : /*
37 : epoll Scheduler - Single Reactor Model
38 : ======================================
39 :
40 : This scheduler uses a thread coordination strategy to provide handler
41 : parallelism and avoid the thundering herd problem.
42 : Instead of all threads blocking on epoll_wait(), one thread becomes the
43 : "reactor" while others wait on a condition variable for handler work.
44 :
45 : Thread Model
46 : ------------
47 : - ONE thread runs epoll_wait() at a time (the reactor thread)
48 : - OTHER threads wait on cond_ (condition variable) for handlers
49 : - When work is posted, exactly one waiting thread wakes via notify_one()
50 : - This matches Windows IOCP semantics where N posted items wake N threads
51 :
52 : Event Loop Structure (do_one)
53 : -----------------------------
54 : 1. Lock mutex, try to pop handler from queue
55 : 2. If got handler: execute it (unlocked), return
56 : 3. If queue empty and no reactor running: become reactor
57 : - Run epoll_wait (unlocked), queue I/O completions, loop back
58 : 4. If queue empty and reactor running: wait on condvar for work
59 :
60 : The task_running_ flag ensures only one thread owns epoll_wait().
61 : After the reactor queues I/O completions, it loops back to try getting
62 : a handler, giving priority to handler execution over more I/O polling.
63 :
64 : Signaling State (state_)
65 : ------------------------
66 : The state_ variable encodes two pieces of information:
67 : - Bit 0: signaled flag (1 = signaled, persists until cleared)
68 : - Upper bits: waiter count (each waiter adds 2 before blocking)
69 :
70 : This allows efficient coordination:
71 : - Signalers only call notify when waiters exist (state_ > 1)
72 : - Waiters check if already signaled before blocking (fast-path)
73 :
74 : Wake Coordination (wake_one_thread_and_unlock)
75 : ----------------------------------------------
76 : When posting work:
77 : - If waiters exist (state_ > 1): signal and notify_one()
78 : - Else if reactor running: interrupt via eventfd write
79 : - Else: no-op (thread will find work when it checks queue)
80 :
81 : This avoids waking threads unnecessarily. With cascading wakes,
82 : each handler execution wakes at most one additional thread if
83 : more work exists in the queue.
84 :
85 : Work Counting
86 : -------------
87 : outstanding_work_ tracks pending operations. When it hits zero, run()
88 : returns. Each operation increments on start, decrements on completion.
89 :
90 : Timer Integration
91 : -----------------
92 : Timers are handled by timer_service. The reactor adjusts epoll_wait
93 : timeout to wake for the nearest timer expiry. When a new timer is
94 : scheduled earlier than current, timer_service calls interrupt_reactor()
95 : to re-evaluate the timeout.
96 : */
97 :
98 : namespace boost::corosio::detail {
99 :
100 : struct scheduler_context
101 : {
102 : epoll_scheduler const* key;
103 : scheduler_context* next;
104 : op_queue private_queue;
105 : long private_outstanding_work;
106 :
107 168 : scheduler_context(epoll_scheduler const* k, scheduler_context* n)
108 168 : : key(k)
109 168 : , next(n)
110 168 : , private_outstanding_work(0)
111 : {
112 168 : }
113 : };
114 :
115 : namespace {
116 :
117 : corosio::detail::thread_local_ptr<scheduler_context> context_stack;
118 :
119 : struct thread_context_guard
120 : {
121 : scheduler_context frame_;
122 :
123 168 : explicit thread_context_guard(
124 : epoll_scheduler const* ctx) noexcept
125 168 : : frame_(ctx, context_stack.get())
126 : {
127 168 : context_stack.set(&frame_);
128 168 : }
129 :
130 168 : ~thread_context_guard() noexcept
131 : {
132 168 : if (!frame_.private_queue.empty())
133 0 : frame_.key->drain_thread_queue(frame_.private_queue, frame_.private_outstanding_work);
134 168 : context_stack.set(frame_.next);
135 168 : }
136 : };
137 :
138 : scheduler_context*
139 357461 : find_context(epoll_scheduler const* self) noexcept
140 : {
141 357461 : for (auto* c = context_stack.get(); c != nullptr; c = c->next)
142 355812 : if (c->key == self)
143 355812 : return c;
144 1649 : return nullptr;
145 : }
146 :
147 : /// Flush private work count to global counter.
148 : void
149 0 : flush_private_work(
150 : scheduler_context* ctx,
151 : std::atomic<long>& outstanding_work) noexcept
152 : {
153 0 : if (ctx && ctx->private_outstanding_work > 0)
154 : {
155 0 : outstanding_work.fetch_add(
156 : ctx->private_outstanding_work, std::memory_order_relaxed);
157 0 : ctx->private_outstanding_work = 0;
158 : }
159 0 : }
160 :
161 : /// Drain private queue to global queue, flushing work count first.
162 : ///
163 : /// @return True if any ops were drained.
164 : bool
165 4 : drain_private_queue(
166 : scheduler_context* ctx,
167 : std::atomic<long>& outstanding_work,
168 : op_queue& completed_ops) noexcept
169 : {
170 4 : if (!ctx || ctx->private_queue.empty())
171 4 : return false;
172 :
173 0 : flush_private_work(ctx, outstanding_work);
174 0 : completed_ops.splice(ctx->private_queue);
175 0 : return true;
176 : }
177 :
178 : } // namespace
179 :
180 : void
181 125169 : descriptor_state::
182 : operator()()
183 : {
184 125169 : is_enqueued_.store(false, std::memory_order_relaxed);
185 :
186 : // Take ownership of impl ref set by close_socket() to prevent
187 : // the owning impl from being freed while we're executing
188 125169 : auto prevent_impl_destruction = std::move(impl_ref_);
189 :
190 125169 : std::uint32_t ev = ready_events_.exchange(0, std::memory_order_acquire);
191 125169 : if (ev == 0)
192 : {
193 0 : scheduler_->compensating_work_started();
194 0 : return;
195 : }
196 :
197 125169 : op_queue local_ops;
198 :
199 125169 : int err = 0;
200 125169 : if (ev & EPOLLERR)
201 : {
202 1 : socklen_t len = sizeof(err);
203 1 : if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
204 0 : err = errno;
205 1 : if (err == 0)
206 1 : err = EIO;
207 : }
208 :
209 125169 : epoll_op* rd = nullptr;
210 125169 : epoll_op* wr = nullptr;
211 125169 : epoll_op* cn = nullptr;
212 : {
213 125169 : std::lock_guard lock(mutex);
214 125169 : if (ev & EPOLLIN)
215 : {
216 58871 : rd = std::exchange(read_op, nullptr);
217 58871 : if (!rd)
218 53804 : read_ready = true;
219 : }
220 125169 : if (ev & EPOLLOUT)
221 : {
222 120154 : cn = std::exchange(connect_op, nullptr);
223 120154 : wr = std::exchange(write_op, nullptr);
224 120154 : if (!cn && !wr)
225 115136 : write_ready = true;
226 : }
227 125169 : if (err && !(ev & (EPOLLIN | EPOLLOUT)))
228 : {
229 0 : rd = std::exchange(read_op, nullptr);
230 0 : wr = std::exchange(write_op, nullptr);
231 0 : cn = std::exchange(connect_op, nullptr);
232 : }
233 125169 : }
234 :
235 : // Non-null after I/O means EAGAIN; re-register under lock below
236 125169 : if (rd)
237 : {
238 5067 : if (err)
239 0 : rd->complete(err, 0);
240 : else
241 5067 : rd->perform_io();
242 :
243 5067 : if (rd->errn == EAGAIN || rd->errn == EWOULDBLOCK)
244 : {
245 0 : rd->errn = 0;
246 : }
247 : else
248 : {
249 5067 : local_ops.push(rd);
250 5067 : rd = nullptr;
251 : }
252 : }
253 :
254 125169 : if (cn)
255 : {
256 5018 : if (err)
257 0 : cn->complete(err, 0);
258 : else
259 5018 : cn->perform_io();
260 5018 : local_ops.push(cn);
261 5018 : cn = nullptr;
262 : }
263 :
264 125169 : if (wr)
265 : {
266 0 : if (err)
267 0 : wr->complete(err, 0);
268 : else
269 0 : wr->perform_io();
270 :
271 0 : if (wr->errn == EAGAIN || wr->errn == EWOULDBLOCK)
272 : {
273 0 : wr->errn = 0;
274 : }
275 : else
276 : {
277 0 : local_ops.push(wr);
278 0 : wr = nullptr;
279 : }
280 : }
281 :
282 125169 : if (rd || wr)
283 : {
284 0 : std::lock_guard lock(mutex);
285 0 : if (rd)
286 0 : read_op = rd;
287 0 : if (wr)
288 0 : write_op = wr;
289 0 : }
290 :
291 : // Execute first handler inline — the scheduler's work_cleanup
292 : // accounts for this as the "consumed" work item
293 125169 : scheduler_op* first = local_ops.pop();
294 125169 : if (first)
295 : {
296 10085 : scheduler_->post_deferred_completions(local_ops);
297 10085 : (*first)();
298 : }
299 : else
300 : {
301 115084 : scheduler_->compensating_work_started();
302 : }
303 125169 : }
304 :
305 189 : epoll_scheduler::
306 : epoll_scheduler(
307 : capy::execution_context& ctx,
308 189 : int)
309 189 : : epoll_fd_(-1)
310 189 : , event_fd_(-1)
311 189 : , timer_fd_(-1)
312 189 : , outstanding_work_(0)
313 189 : , stopped_(false)
314 189 : , shutdown_(false)
315 189 : , task_running_(false)
316 189 : , task_interrupted_(false)
317 378 : , state_(0)
318 : {
319 189 : epoll_fd_ = ::epoll_create1(EPOLL_CLOEXEC);
320 189 : if (epoll_fd_ < 0)
321 0 : detail::throw_system_error(make_err(errno), "epoll_create1");
322 :
323 189 : event_fd_ = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
324 189 : if (event_fd_ < 0)
325 : {
326 0 : int errn = errno;
327 0 : ::close(epoll_fd_);
328 0 : detail::throw_system_error(make_err(errn), "eventfd");
329 : }
330 :
331 189 : timer_fd_ = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
332 189 : if (timer_fd_ < 0)
333 : {
334 0 : int errn = errno;
335 0 : ::close(event_fd_);
336 0 : ::close(epoll_fd_);
337 0 : detail::throw_system_error(make_err(errn), "timerfd_create");
338 : }
339 :
340 189 : epoll_event ev{};
341 189 : ev.events = EPOLLIN | EPOLLET;
342 189 : ev.data.ptr = nullptr;
343 189 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, event_fd_, &ev) < 0)
344 : {
345 0 : int errn = errno;
346 0 : ::close(timer_fd_);
347 0 : ::close(event_fd_);
348 0 : ::close(epoll_fd_);
349 0 : detail::throw_system_error(make_err(errn), "epoll_ctl");
350 : }
351 :
352 189 : epoll_event timer_ev{};
353 189 : timer_ev.events = EPOLLIN | EPOLLERR;
354 189 : timer_ev.data.ptr = &timer_fd_;
355 189 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, timer_fd_, &timer_ev) < 0)
356 : {
357 0 : int errn = errno;
358 0 : ::close(timer_fd_);
359 0 : ::close(event_fd_);
360 0 : ::close(epoll_fd_);
361 0 : detail::throw_system_error(make_err(errn), "epoll_ctl (timerfd)");
362 : }
363 :
364 189 : timer_svc_ = &get_timer_service(ctx, *this);
365 189 : timer_svc_->set_on_earliest_changed(
366 : timer_service::callback(
367 : this,
368 5261 : [](void* p) { static_cast<epoll_scheduler*>(p)->update_timerfd(); }));
369 :
370 : // Initialize resolver service
371 189 : get_resolver_service(ctx, *this);
372 :
373 : // Initialize signal service
374 189 : get_signal_service(ctx, *this);
375 :
376 : // Push task sentinel to interleave reactor runs with handler execution
377 189 : completed_ops_.push(&task_op_);
378 189 : }
379 :
380 378 : epoll_scheduler::
381 189 : ~epoll_scheduler()
382 : {
383 189 : if (timer_fd_ >= 0)
384 189 : ::close(timer_fd_);
385 189 : if (event_fd_ >= 0)
386 189 : ::close(event_fd_);
387 189 : if (epoll_fd_ >= 0)
388 189 : ::close(epoll_fd_);
389 378 : }
390 :
391 : void
392 189 : epoll_scheduler::
393 : shutdown()
394 : {
395 : {
396 189 : std::unique_lock lock(mutex_);
397 189 : shutdown_ = true;
398 :
399 378 : while (auto* h = completed_ops_.pop())
400 : {
401 189 : if (h == &task_op_)
402 189 : continue;
403 0 : lock.unlock();
404 0 : h->destroy();
405 0 : lock.lock();
406 189 : }
407 :
408 189 : signal_all(lock);
409 189 : }
410 :
411 189 : outstanding_work_.store(0, std::memory_order_release);
412 :
413 189 : if (event_fd_ >= 0)
414 189 : interrupt_reactor();
415 189 : }
416 :
417 : void
418 6893 : epoll_scheduler::
419 : post(capy::coro h) const
420 : {
421 : struct post_handler final
422 : : scheduler_op
423 : {
424 : capy::coro h_;
425 :
426 : explicit
427 6893 : post_handler(capy::coro h)
428 6893 : : h_(h)
429 : {
430 6893 : }
431 :
432 13786 : ~post_handler() = default;
433 :
434 6893 : void operator()() override
435 : {
436 6893 : auto h = h_;
437 6893 : delete this;
438 : std::atomic_thread_fence(std::memory_order_acquire);
439 6893 : h.resume();
440 6893 : }
441 :
442 0 : void destroy() override
443 : {
444 0 : delete this;
445 0 : }
446 : };
447 :
448 6893 : auto ph = std::make_unique<post_handler>(h);
449 :
450 : // Fast path: same thread posts to private queue
451 : // Only count locally; work_cleanup batches to global counter
452 6893 : if (auto* ctx = find_context(this))
453 : {
454 5270 : ++ctx->private_outstanding_work;
455 5270 : ctx->private_queue.push(ph.release());
456 5270 : return;
457 : }
458 :
459 : // Slow path: cross-thread post requires mutex
460 1623 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
461 :
462 1623 : std::unique_lock lock(mutex_);
463 1623 : completed_ops_.push(ph.release());
464 1623 : wake_one_thread_and_unlock(lock);
465 6893 : }
466 :
467 : void
468 235484 : epoll_scheduler::
469 : post(scheduler_op* h) const
470 : {
471 : // Fast path: same thread posts to private queue
472 : // Only count locally; work_cleanup batches to global counter
473 235484 : if (auto* ctx = find_context(this))
474 : {
475 235458 : ++ctx->private_outstanding_work;
476 235458 : ctx->private_queue.push(h);
477 235458 : return;
478 : }
479 :
480 : // Slow path: cross-thread post requires mutex
481 26 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
482 :
483 26 : std::unique_lock lock(mutex_);
484 26 : completed_ops_.push(h);
485 26 : wake_one_thread_and_unlock(lock);
486 26 : }
487 :
488 : void
489 5285 : epoll_scheduler::
490 : on_work_started() noexcept
491 : {
492 5285 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
493 5285 : }
494 :
495 : void
496 5253 : epoll_scheduler::
497 : on_work_finished() noexcept
498 : {
499 10506 : if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
500 0 : stop();
501 5253 : }
502 :
503 : bool
504 5500 : epoll_scheduler::
505 : running_in_this_thread() const noexcept
506 : {
507 5500 : for (auto* c = context_stack.get(); c != nullptr; c = c->next)
508 5290 : if (c->key == this)
509 5290 : return true;
510 210 : return false;
511 : }
512 :
513 : void
514 30 : epoll_scheduler::
515 : stop()
516 : {
517 30 : std::unique_lock lock(mutex_);
518 30 : if (!stopped_)
519 : {
520 20 : stopped_ = true;
521 20 : signal_all(lock);
522 20 : interrupt_reactor();
523 : }
524 30 : }
525 :
526 : bool
527 16 : epoll_scheduler::
528 : stopped() const noexcept
529 : {
530 16 : std::unique_lock lock(mutex_);
531 32 : return stopped_;
532 16 : }
533 :
534 : void
535 49 : epoll_scheduler::
536 : restart()
537 : {
538 49 : std::unique_lock lock(mutex_);
539 49 : stopped_ = false;
540 49 : }
541 :
542 : std::size_t
543 175 : epoll_scheduler::
544 : run()
545 : {
546 350 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
547 : {
548 21 : stop();
549 21 : return 0;
550 : }
551 :
552 154 : thread_context_guard ctx(this);
553 154 : std::unique_lock lock(mutex_);
554 :
555 154 : std::size_t n = 0;
556 : for (;;)
557 : {
558 367685 : if (!do_one(lock, -1, &ctx.frame_))
559 154 : break;
560 367531 : if (n != (std::numeric_limits<std::size_t>::max)())
561 367531 : ++n;
562 367531 : if (!lock.owns_lock())
563 132102 : lock.lock();
564 : }
565 154 : return n;
566 154 : }
567 :
568 : std::size_t
569 2 : epoll_scheduler::
570 : run_one()
571 : {
572 4 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
573 : {
574 0 : stop();
575 0 : return 0;
576 : }
577 :
578 2 : thread_context_guard ctx(this);
579 2 : std::unique_lock lock(mutex_);
580 2 : return do_one(lock, -1, &ctx.frame_);
581 2 : }
582 :
583 : std::size_t
584 14 : epoll_scheduler::
585 : wait_one(long usec)
586 : {
587 28 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
588 : {
589 5 : stop();
590 5 : return 0;
591 : }
592 :
593 9 : thread_context_guard ctx(this);
594 9 : std::unique_lock lock(mutex_);
595 9 : return do_one(lock, usec, &ctx.frame_);
596 9 : }
597 :
598 : std::size_t
599 2 : epoll_scheduler::
600 : poll()
601 : {
602 4 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
603 : {
604 1 : stop();
605 1 : return 0;
606 : }
607 :
608 1 : thread_context_guard ctx(this);
609 1 : std::unique_lock lock(mutex_);
610 :
611 1 : std::size_t n = 0;
612 : for (;;)
613 : {
614 3 : if (!do_one(lock, 0, &ctx.frame_))
615 1 : break;
616 2 : if (n != (std::numeric_limits<std::size_t>::max)())
617 2 : ++n;
618 2 : if (!lock.owns_lock())
619 2 : lock.lock();
620 : }
621 1 : return n;
622 1 : }
623 :
624 : std::size_t
625 4 : epoll_scheduler::
626 : poll_one()
627 : {
628 8 : if (outstanding_work_.load(std::memory_order_acquire) == 0)
629 : {
630 2 : stop();
631 2 : return 0;
632 : }
633 :
634 2 : thread_context_guard ctx(this);
635 2 : std::unique_lock lock(mutex_);
636 2 : return do_one(lock, 0, &ctx.frame_);
637 2 : }
638 :
639 : void
640 10108 : epoll_scheduler::
641 : register_descriptor(int fd, descriptor_state* desc) const
642 : {
643 10108 : epoll_event ev{};
644 10108 : ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLERR | EPOLLHUP;
645 10108 : ev.data.ptr = desc;
646 :
647 10108 : if (::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd, &ev) < 0)
648 0 : detail::throw_system_error(make_err(errno), "epoll_ctl (register)");
649 :
650 10108 : desc->registered_events = ev.events;
651 10108 : desc->fd = fd;
652 10108 : desc->scheduler_ = this;
653 :
654 10108 : std::lock_guard lock(desc->mutex);
655 10108 : desc->read_ready = false;
656 10108 : desc->write_ready = false;
657 10108 : }
658 :
659 : void
660 10108 : epoll_scheduler::
661 : deregister_descriptor(int fd) const
662 : {
663 10108 : ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd, nullptr);
664 10108 : }
665 :
666 : void
667 10213 : epoll_scheduler::
668 : work_started() const noexcept
669 : {
670 10213 : outstanding_work_.fetch_add(1, std::memory_order_relaxed);
671 10213 : }
672 :
673 : void
674 17190 : epoll_scheduler::
675 : work_finished() const noexcept
676 : {
677 34380 : if (outstanding_work_.fetch_sub(1, std::memory_order_acq_rel) == 1)
678 : {
679 : // Last work item completed - wake all threads so they can exit.
680 : // signal_all() wakes threads waiting on the condvar.
681 : // interrupt_reactor() wakes the reactor thread blocked in epoll_wait().
682 : // Both are needed because they target different blocking mechanisms.
683 148 : std::unique_lock lock(mutex_);
684 148 : signal_all(lock);
685 148 : if (task_running_ && !task_interrupted_)
686 : {
687 3 : task_interrupted_ = true;
688 3 : lock.unlock();
689 3 : interrupt_reactor();
690 : }
691 148 : }
692 17190 : }
693 :
694 : void
695 115084 : epoll_scheduler::
696 : compensating_work_started() const noexcept
697 : {
698 115084 : auto* ctx = find_context(this);
699 115084 : if (ctx)
700 115084 : ++ctx->private_outstanding_work;
701 115084 : }
702 :
703 : void
704 0 : epoll_scheduler::
705 : drain_thread_queue(op_queue& queue, long count) const
706 : {
707 : // Note: outstanding_work_ was already incremented when posting
708 0 : std::unique_lock lock(mutex_);
709 0 : completed_ops_.splice(queue);
710 0 : if (count > 0)
711 0 : maybe_unlock_and_signal_one(lock);
712 0 : }
713 :
714 : void
715 10085 : epoll_scheduler::
716 : post_deferred_completions(op_queue& ops) const
717 : {
718 10085 : if (ops.empty())
719 10085 : return;
720 :
721 : // Fast path: if on scheduler thread, use private queue
722 0 : if (auto* ctx = find_context(this))
723 : {
724 0 : ctx->private_queue.splice(ops);
725 0 : return;
726 : }
727 :
728 : // Slow path: add to global queue and wake a thread
729 0 : std::unique_lock lock(mutex_);
730 0 : completed_ops_.splice(ops);
731 0 : wake_one_thread_and_unlock(lock);
732 0 : }
733 :
734 : void
735 238 : epoll_scheduler::
736 : interrupt_reactor() const
737 : {
738 : // Only write if not already armed to avoid redundant writes
739 238 : bool expected = false;
740 238 : if (eventfd_armed_.compare_exchange_strong(expected, true,
741 : std::memory_order_release, std::memory_order_relaxed))
742 : {
743 224 : std::uint64_t val = 1;
744 224 : [[maybe_unused]] auto r = ::write(event_fd_, &val, sizeof(val));
745 : }
746 238 : }
747 :
748 : void
749 357 : epoll_scheduler::
750 : signal_all(std::unique_lock<std::mutex>&) const
751 : {
752 357 : state_ |= 1;
753 357 : cond_.notify_all();
754 357 : }
755 :
756 : bool
757 81138 : epoll_scheduler::
758 : maybe_unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
759 : {
760 81138 : state_ |= 1;
761 81138 : if (state_ > 1)
762 : {
763 0 : lock.unlock();
764 0 : cond_.notify_one();
765 0 : return true;
766 : }
767 81138 : return false;
768 : }
769 :
770 : void
771 496003 : epoll_scheduler::
772 : unlock_and_signal_one(std::unique_lock<std::mutex>& lock) const
773 : {
774 496003 : state_ |= 1;
775 496003 : bool have_waiters = state_ > 1;
776 496003 : lock.unlock();
777 496003 : if (have_waiters)
778 0 : cond_.notify_one();
779 496003 : }
780 :
781 : void
782 0 : epoll_scheduler::
783 : clear_signal() const
784 : {
785 0 : state_ &= ~std::size_t(1);
786 0 : }
787 :
788 : void
789 0 : epoll_scheduler::
790 : wait_for_signal(std::unique_lock<std::mutex>& lock) const
791 : {
792 0 : while ((state_ & 1) == 0)
793 : {
794 0 : state_ += 2;
795 0 : cond_.wait(lock);
796 0 : state_ -= 2;
797 : }
798 0 : }
799 :
800 : void
801 0 : epoll_scheduler::
802 : wait_for_signal_for(
803 : std::unique_lock<std::mutex>& lock,
804 : long timeout_us) const
805 : {
806 0 : if ((state_ & 1) == 0)
807 : {
808 0 : state_ += 2;
809 0 : cond_.wait_for(lock, std::chrono::microseconds(timeout_us));
810 0 : state_ -= 2;
811 : }
812 0 : }
813 :
814 : void
815 1649 : epoll_scheduler::
816 : wake_one_thread_and_unlock(std::unique_lock<std::mutex>& lock) const
817 : {
818 1649 : if (maybe_unlock_and_signal_one(lock))
819 0 : return;
820 :
821 1649 : if (task_running_ && !task_interrupted_)
822 : {
823 26 : task_interrupted_ = true;
824 26 : lock.unlock();
825 26 : interrupt_reactor();
826 : }
827 : else
828 : {
829 1623 : lock.unlock();
830 : }
831 : }
832 :
833 : /** RAII guard for handler execution work accounting.
834 :
835 : Handler consumes 1 work item, may produce N new items via fast-path posts.
836 : Net change = N - 1:
837 : - If N > 1: add (N-1) to global (more work produced than consumed)
838 : - If N == 1: net zero, do nothing
839 : - If N < 1: call work_finished() (work consumed, may trigger stop)
840 :
841 : Also drains private queue to global for other threads to process.
842 : */
843 : struct work_cleanup
844 : {
845 : epoll_scheduler const* scheduler;
846 : std::unique_lock<std::mutex>* lock;
847 : scheduler_context* ctx;
848 :
849 367546 : ~work_cleanup()
850 : {
851 367546 : if (ctx)
852 : {
853 367546 : long produced = ctx->private_outstanding_work;
854 367546 : if (produced > 1)
855 48 : scheduler->outstanding_work_.fetch_add(produced - 1, std::memory_order_relaxed);
856 367498 : else if (produced < 1)
857 17030 : scheduler->work_finished();
858 : // produced == 1: net zero, handler consumed what it produced
859 367546 : ctx->private_outstanding_work = 0;
860 :
861 367546 : if (!ctx->private_queue.empty())
862 : {
863 235432 : lock->lock();
864 235432 : scheduler->completed_ops_.splice(ctx->private_queue);
865 : }
866 : }
867 : else
868 : {
869 : // No thread context - slow-path op was already counted globally
870 0 : scheduler->work_finished();
871 : }
872 367546 : }
873 : };
874 :
875 : /** RAII guard for reactor work accounting.
876 :
877 : Reactor only produces work via timer/signal callbacks posting handlers.
878 : Unlike handler execution which consumes 1, the reactor consumes nothing.
879 : All produced work must be flushed to global counter.
880 : */
881 : struct task_cleanup
882 : {
883 : epoll_scheduler const* scheduler;
884 : scheduler_context* ctx;
885 :
886 138746 : ~task_cleanup()
887 : {
888 138746 : if (ctx && ctx->private_outstanding_work > 0)
889 : {
890 5246 : scheduler->outstanding_work_.fetch_add(
891 5246 : ctx->private_outstanding_work, std::memory_order_relaxed);
892 5246 : ctx->private_outstanding_work = 0;
893 : }
894 138746 : }
895 : };
896 :
897 : void
898 10510 : epoll_scheduler::
899 : update_timerfd() const
900 : {
901 10510 : auto nearest = timer_svc_->nearest_expiry();
902 :
903 10510 : itimerspec ts{};
904 10510 : int flags = 0;
905 :
906 10510 : if (nearest == timer_service::time_point::max())
907 : {
908 : // No timers - disarm by setting to 0 (relative)
909 : }
910 : else
911 : {
912 10467 : auto now = std::chrono::steady_clock::now();
913 10467 : if (nearest <= now)
914 : {
915 : // Use 1ns instead of 0 - zero disarms the timerfd
916 38 : ts.it_value.tv_nsec = 1;
917 : }
918 : else
919 : {
920 10429 : auto nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(
921 20858 : nearest - now).count();
922 10429 : ts.it_value.tv_sec = nsec / 1000000000;
923 10429 : ts.it_value.tv_nsec = nsec % 1000000000;
924 : // Ensure non-zero to avoid disarming if duration rounds to 0
925 10429 : if (ts.it_value.tv_sec == 0 && ts.it_value.tv_nsec == 0)
926 0 : ts.it_value.tv_nsec = 1;
927 : }
928 : }
929 :
930 10510 : if (::timerfd_settime(timer_fd_, flags, &ts, nullptr) < 0)
931 0 : detail::throw_system_error(make_err(errno), "timerfd_settime");
932 10510 : }
933 :
934 : void
935 138746 : epoll_scheduler::
936 : run_task(std::unique_lock<std::mutex>& lock, scheduler_context* ctx)
937 : {
938 138746 : int timeout_ms = task_interrupted_ ? 0 : -1;
939 :
940 138746 : if (lock.owns_lock())
941 10289 : lock.unlock();
942 :
943 : // Flush private work count when reactor completes
944 138746 : task_cleanup on_exit{this, ctx};
945 : (void)on_exit;
946 :
947 : // Event loop runs without mutex held
948 :
949 : epoll_event events[128];
950 138746 : int nfds = ::epoll_wait(epoll_fd_, events, 128, timeout_ms);
951 138746 : int saved_errno = errno;
952 :
953 138746 : if (nfds < 0 && saved_errno != EINTR)
954 0 : detail::throw_system_error(make_err(saved_errno), "epoll_wait");
955 :
956 138746 : bool check_timers = false;
957 138746 : op_queue local_ops;
958 138746 : int completions_queued = 0;
959 :
960 : // Process events without holding the mutex
961 269199 : for (int i = 0; i < nfds; ++i)
962 : {
963 130453 : if (events[i].data.ptr == nullptr)
964 : {
965 : std::uint64_t val;
966 35 : [[maybe_unused]] auto r = ::read(event_fd_, &val, sizeof(val));
967 35 : eventfd_armed_.store(false, std::memory_order_relaxed);
968 35 : continue;
969 35 : }
970 :
971 130418 : if (events[i].data.ptr == &timer_fd_)
972 : {
973 : std::uint64_t expirations;
974 5249 : [[maybe_unused]] auto r = ::read(timer_fd_, &expirations, sizeof(expirations));
975 5249 : check_timers = true;
976 5249 : continue;
977 5249 : }
978 :
979 : // Deferred I/O: just set ready events and enqueue descriptor
980 : // No per-descriptor mutex locking in reactor hot path!
981 125169 : auto* desc = static_cast<descriptor_state*>(events[i].data.ptr);
982 125169 : desc->add_ready_events(events[i].events);
983 :
984 : // Only enqueue if not already enqueued
985 125169 : bool expected = false;
986 125169 : if (desc->is_enqueued_.compare_exchange_strong(expected, true,
987 : std::memory_order_release, std::memory_order_relaxed))
988 : {
989 125169 : local_ops.push(desc);
990 125169 : ++completions_queued;
991 : }
992 : }
993 :
994 : // Process timers only when timerfd fires
995 138746 : if (check_timers)
996 : {
997 5249 : timer_svc_->process_expired();
998 5249 : update_timerfd();
999 : }
1000 :
1001 : // --- Acquire mutex only for queue operations ---
1002 138746 : lock.lock();
1003 :
1004 138746 : if (!local_ops.empty())
1005 74245 : completed_ops_.splice(local_ops);
1006 :
1007 : // Drain private queue to global (work count handled by task_cleanup)
1008 138746 : if (ctx && !ctx->private_queue.empty())
1009 : {
1010 5246 : completions_queued += ctx->private_outstanding_work;
1011 5246 : completed_ops_.splice(ctx->private_queue);
1012 : }
1013 :
1014 : // Signal and wake one waiter if work is queued
1015 138746 : if (completions_queued > 0)
1016 : {
1017 79489 : if (maybe_unlock_and_signal_one(lock))
1018 0 : lock.lock();
1019 : }
1020 138746 : }
1021 :
1022 : std::size_t
1023 367701 : epoll_scheduler::
1024 : do_one(std::unique_lock<std::mutex>& lock, long timeout_us, scheduler_context* ctx)
1025 : {
1026 : for (;;)
1027 : {
1028 506447 : if (stopped_)
1029 2 : return 0;
1030 :
1031 506445 : scheduler_op* op = completed_ops_.pop();
1032 :
1033 : // Handle reactor sentinel - time to poll for I/O
1034 506445 : if (op == &task_op_)
1035 : {
1036 149333 : bool more_handlers = !completed_ops_.empty() ||
1037 10438 : (ctx && !ctx->private_queue.empty());
1038 :
1039 : // Nothing to run the reactor for: no pending work to wait on,
1040 : // or caller requested a non-blocking poll
1041 149333 : if (!more_handlers &&
1042 20876 : (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1043 : timeout_us == 0))
1044 : {
1045 149 : completed_ops_.push(&task_op_);
1046 149 : return 0;
1047 : }
1048 :
1049 138746 : task_interrupted_ = more_handlers || timeout_us == 0;
1050 138746 : task_running_ = true;
1051 :
1052 138746 : if (more_handlers)
1053 128457 : unlock_and_signal_one(lock);
1054 :
1055 138746 : run_task(lock, ctx);
1056 :
1057 138746 : task_running_ = false;
1058 138746 : completed_ops_.push(&task_op_);
1059 138746 : continue;
1060 138746 : }
1061 :
1062 : // Handle operation
1063 367550 : if (op != nullptr)
1064 : {
1065 367546 : if (!completed_ops_.empty())
1066 367546 : unlock_and_signal_one(lock);
1067 : else
1068 0 : lock.unlock();
1069 :
1070 367546 : work_cleanup on_exit{this, &lock, ctx};
1071 : (void)on_exit;
1072 :
1073 367546 : (*op)();
1074 367546 : return 1;
1075 367546 : }
1076 :
1077 : // No work from global queue - try private queue before blocking
1078 4 : if (drain_private_queue(ctx, outstanding_work_, completed_ops_))
1079 0 : continue;
1080 :
1081 : // No pending work to wait on, or caller requested non-blocking poll
1082 8 : if (outstanding_work_.load(std::memory_order_acquire) == 0 ||
1083 : timeout_us == 0)
1084 4 : return 0;
1085 :
1086 0 : clear_signal();
1087 0 : if (timeout_us < 0)
1088 0 : wait_for_signal(lock);
1089 : else
1090 0 : wait_for_signal_for(lock, timeout_us);
1091 138746 : }
1092 : }
1093 :
1094 : } // namespace boost::corosio::detail
1095 :
1096 : #endif
|