Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #include "src/detail/timer_service.hpp"
11 :
12 : #include <boost/corosio/detail/scheduler.hpp>
13 : #include "src/detail/intrusive.hpp"
14 : #include "src/detail/scheduler_op.hpp"
15 : #include <boost/capy/error.hpp>
16 : #include <boost/capy/coro.hpp>
17 : #include <boost/capy/ex/executor_ref.hpp>
18 : #include <system_error>
19 :
20 : #include <coroutine>
21 : #include <limits>
22 : #include <mutex>
23 : #include <stdexcept>
24 : #include <stop_token>
25 : #include <vector>
26 :
27 : namespace boost::corosio::detail {
28 :
29 : class timer_service_impl;
30 :
31 : // Completion operation posted to scheduler when timer expires or is cancelled.
32 : // Runs inside work_cleanup scope so work accounting is batched correctly.
33 : struct timer_op final : scheduler_op
34 : {
35 : capy::coro h;
36 : capy::executor_ref d;
37 : std::error_code* ec_out = nullptr;
38 : std::error_code ec_value;
39 : scheduler* sched = nullptr;
40 :
41 9322 : timer_op() noexcept
42 9322 : : scheduler_op(&timer_op::do_complete)
43 : {
44 9322 : }
45 :
46 0 : static void do_complete(
47 : void* owner,
48 : scheduler_op* base,
49 : std::uint32_t,
50 : std::uint32_t)
51 : {
52 0 : auto* self = static_cast<timer_op*>(base);
53 0 : if (!owner)
54 : {
55 0 : delete self;
56 0 : return;
57 : }
58 0 : (*self)();
59 : }
60 :
61 9322 : void operator()() override
62 : {
63 9322 : if (ec_out)
64 9322 : *ec_out = ec_value;
65 :
66 : // Capture before posting (coro may destroy this op)
67 9322 : auto* service = sched;
68 9322 : sched = nullptr;
69 :
70 9322 : d.post(h);
71 :
72 : // Balance the on_work_started() from timer_impl::wait()
73 9322 : if (service)
74 9322 : service->on_work_finished();
75 :
76 9322 : delete this;
77 9322 : }
78 :
79 0 : void destroy() override
80 : {
81 0 : delete this;
82 0 : }
83 : };
84 :
85 : struct timer_impl
86 : : timer::timer_impl
87 : , intrusive_list<timer_impl>::node
88 : {
89 : using clock_type = std::chrono::steady_clock;
90 : using time_point = clock_type::time_point;
91 : using duration = clock_type::duration;
92 :
93 : timer_service_impl* svc_ = nullptr;
94 : time_point expiry_;
95 : std::size_t heap_index_ = (std::numeric_limits<std::size_t>::max)();
96 :
97 : // Wait operation state
98 : std::coroutine_handle<> h_;
99 : capy::executor_ref d_;
100 : std::error_code* ec_out_ = nullptr;
101 : std::stop_token token_;
102 : bool waiting_ = false;
103 :
104 129 : explicit timer_impl(timer_service_impl& svc) noexcept
105 129 : : svc_(&svc)
106 : {
107 129 : }
108 :
109 : void release() override;
110 :
111 : std::coroutine_handle<> wait(
112 : std::coroutine_handle<>,
113 : capy::executor_ref,
114 : std::stop_token,
115 : std::error_code*) override;
116 : };
117 :
118 : //------------------------------------------------------------------------------
119 :
120 : class timer_service_impl : public timer_service
121 : {
122 : public:
123 : using clock_type = std::chrono::steady_clock;
124 : using time_point = clock_type::time_point;
125 : using key_type = timer_service;
126 :
127 : private:
128 : struct heap_entry
129 : {
130 : time_point time_;
131 : timer_impl* timer_;
132 : };
133 :
134 : scheduler* sched_ = nullptr;
135 : mutable std::mutex mutex_;
136 : std::vector<heap_entry> heap_;
137 : intrusive_list<timer_impl> timers_;
138 : intrusive_list<timer_impl> free_list_;
139 : callback on_earliest_changed_;
140 :
141 : public:
142 309 : timer_service_impl(capy::execution_context&, scheduler& sched)
143 309 : : timer_service()
144 309 : , sched_(&sched)
145 : {
146 309 : }
147 :
148 9322 : scheduler& get_scheduler() noexcept { return *sched_; }
149 :
150 618 : ~timer_service_impl()
151 309 : {
152 618 : }
153 :
154 : timer_service_impl(timer_service_impl const&) = delete;
155 : timer_service_impl& operator=(timer_service_impl const&) = delete;
156 :
157 309 : void set_on_earliest_changed(callback cb) override
158 : {
159 309 : on_earliest_changed_ = cb;
160 309 : }
161 :
162 309 : void shutdown() override
163 : {
164 : // Cancel all waiting timers and destroy coroutine handles
165 : // This properly decrements outstanding_work_ for each waiting timer
166 309 : while (auto* impl = timers_.pop_front())
167 : {
168 0 : if (impl->waiting_)
169 : {
170 0 : impl->waiting_ = false;
171 : // Destroy the coroutine handle without resuming
172 0 : impl->h_.destroy();
173 : // Decrement work count to avoid leak
174 0 : sched_->on_work_finished();
175 : }
176 0 : delete impl;
177 0 : }
178 438 : while (auto* impl = free_list_.pop_front())
179 129 : delete impl;
180 309 : }
181 :
182 9353 : timer::timer_impl* create_impl() override
183 : {
184 9353 : std::lock_guard lock(mutex_);
185 : timer_impl* impl;
186 9353 : if (auto* p = free_list_.pop_front())
187 : {
188 9224 : impl = p;
189 9224 : impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
190 : }
191 : else
192 : {
193 129 : impl = new timer_impl(*this);
194 : }
195 9353 : timers_.push_back(impl);
196 9353 : return impl;
197 9353 : }
198 :
199 9353 : void destroy_impl(timer_impl& impl)
200 : {
201 9353 : std::lock_guard lock(mutex_);
202 9353 : remove_timer_impl(impl);
203 9353 : timers_.remove(&impl);
204 9353 : free_list_.push_back(&impl);
205 9353 : }
206 :
207 9356 : void update_timer(timer_impl& impl, time_point new_time)
208 : {
209 9356 : bool notify = false;
210 9356 : bool was_waiting = false;
211 9356 : std::coroutine_handle<> h;
212 9356 : capy::executor_ref d;
213 9356 : std::error_code* ec_out = nullptr;
214 :
215 : {
216 9356 : std::lock_guard lock(mutex_);
217 :
218 : // If currently waiting, cancel the pending wait
219 9356 : if (impl.waiting_)
220 : {
221 2 : was_waiting = true;
222 2 : impl.waiting_ = false;
223 2 : h = impl.h_;
224 2 : d = impl.d_;
225 2 : ec_out = impl.ec_out_;
226 : }
227 :
228 9356 : if (impl.heap_index_ < heap_.size())
229 : {
230 : // Already in heap, update position
231 8 : time_point old_time = heap_[impl.heap_index_].time_;
232 8 : heap_[impl.heap_index_].time_ = new_time;
233 :
234 8 : if (new_time < old_time)
235 8 : up_heap(impl.heap_index_);
236 : else
237 0 : down_heap(impl.heap_index_);
238 : }
239 : else
240 : {
241 : // Not in heap, add it
242 9348 : impl.heap_index_ = heap_.size();
243 9348 : heap_.push_back({new_time, &impl});
244 9348 : up_heap(heap_.size() - 1);
245 : }
246 :
247 : // Notify if this timer is now the earliest
248 9356 : notify = (impl.heap_index_ == 0);
249 9356 : }
250 :
251 : // Post cancelled waiter as scheduler_op (runs inside work_cleanup scope)
252 9356 : if (was_waiting)
253 : {
254 2 : auto* op = new timer_op;
255 2 : op->h = h;
256 2 : op->d = std::move(d);
257 2 : op->ec_out = ec_out;
258 2 : op->ec_value = make_error_code(capy::error::canceled);
259 2 : op->sched = sched_;
260 2 : sched_->post(op);
261 : }
262 :
263 9356 : if (notify)
264 9343 : on_earliest_changed_();
265 9356 : }
266 :
267 : void remove_timer(timer_impl& impl)
268 : {
269 : std::lock_guard lock(mutex_);
270 : remove_timer_impl(impl);
271 : }
272 :
273 14 : void cancel_timer(timer_impl& impl)
274 : {
275 14 : std::coroutine_handle<> h;
276 14 : capy::executor_ref d;
277 14 : std::error_code* ec_out = nullptr;
278 14 : bool was_waiting = false;
279 :
280 : {
281 14 : std::lock_guard lock(mutex_);
282 14 : remove_timer_impl(impl);
283 14 : if (impl.waiting_)
284 : {
285 4 : was_waiting = true;
286 4 : impl.waiting_ = false;
287 4 : h = impl.h_;
288 4 : d = std::move(impl.d_);
289 4 : ec_out = impl.ec_out_;
290 : }
291 14 : }
292 :
293 : // Post cancelled waiter as scheduler_op (runs inside work_cleanup scope)
294 14 : if (was_waiting)
295 : {
296 4 : auto* op = new timer_op;
297 4 : op->h = h;
298 4 : op->d = std::move(d);
299 4 : op->ec_out = ec_out;
300 4 : op->ec_value = make_error_code(capy::error::canceled);
301 4 : op->sched = sched_;
302 4 : sched_->post(op);
303 : }
304 14 : }
305 :
306 0 : bool empty() const noexcept override
307 : {
308 0 : std::lock_guard lock(mutex_);
309 0 : return heap_.empty();
310 0 : }
311 :
312 22276 : time_point nearest_expiry() const noexcept override
313 : {
314 22276 : std::lock_guard lock(mutex_);
315 22276 : return heap_.empty() ? time_point::max() : heap_[0].time_;
316 22276 : }
317 :
318 139122 : std::size_t process_expired() override
319 : {
320 : // Collect expired timer_ops while holding lock
321 139122 : std::vector<timer_op*> expired;
322 :
323 : {
324 139122 : std::lock_guard lock(mutex_);
325 139122 : auto now = clock_type::now();
326 :
327 287566 : while (!heap_.empty() && heap_[0].time_ <= now)
328 : {
329 9322 : timer_impl* t = heap_[0].timer_;
330 9322 : remove_timer_impl(*t);
331 :
332 9322 : if (t->waiting_)
333 : {
334 9316 : t->waiting_ = false;
335 9316 : auto* op = new timer_op;
336 9316 : op->h = t->h_;
337 9316 : op->d = std::move(t->d_);
338 9316 : op->ec_out = t->ec_out_;
339 9316 : op->ec_value = {}; // Success
340 9316 : op->sched = sched_;
341 9316 : expired.push_back(op);
342 : }
343 : // If not waiting, timer is removed but not dispatched -
344 : // wait() will handle this by checking expiry
345 : }
346 139122 : }
347 :
348 : // Post ops to scheduler (they run inside work_cleanup scope)
349 148438 : for (auto* op : expired)
350 9316 : sched_->post(op);
351 :
352 278244 : return expired.size();
353 139122 : }
354 :
355 : private:
356 18689 : void remove_timer_impl(timer_impl& impl)
357 : {
358 18689 : std::size_t index = impl.heap_index_;
359 18689 : if (index >= heap_.size())
360 9341 : return; // Not in heap
361 :
362 9348 : if (index == heap_.size() - 1)
363 : {
364 : // Last element, just pop
365 108 : impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
366 108 : heap_.pop_back();
367 : }
368 : else
369 : {
370 : // Swap with last and reheapify
371 9240 : swap_heap(index, heap_.size() - 1);
372 9240 : impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
373 9240 : heap_.pop_back();
374 :
375 9240 : if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
376 0 : up_heap(index);
377 : else
378 9240 : down_heap(index);
379 : }
380 : }
381 :
382 9356 : void up_heap(std::size_t index)
383 : {
384 18583 : while (index > 0)
385 : {
386 9240 : std::size_t parent = (index - 1) / 2;
387 9240 : if (!(heap_[index].time_ < heap_[parent].time_))
388 13 : break;
389 9227 : swap_heap(index, parent);
390 9227 : index = parent;
391 : }
392 9356 : }
393 :
394 9240 : void down_heap(std::size_t index)
395 : {
396 9240 : std::size_t child = index * 2 + 1;
397 9240 : while (child < heap_.size())
398 : {
399 2 : std::size_t min_child = (child + 1 == heap_.size() ||
400 0 : heap_[child].time_ < heap_[child + 1].time_)
401 2 : ? child : child + 1;
402 :
403 2 : if (heap_[index].time_ < heap_[min_child].time_)
404 2 : break;
405 :
406 0 : swap_heap(index, min_child);
407 0 : index = min_child;
408 0 : child = index * 2 + 1;
409 : }
410 9240 : }
411 :
412 18467 : void swap_heap(std::size_t i1, std::size_t i2)
413 : {
414 18467 : heap_entry tmp = heap_[i1];
415 18467 : heap_[i1] = heap_[i2];
416 18467 : heap_[i2] = tmp;
417 18467 : heap_[i1].timer_->heap_index_ = i1;
418 18467 : heap_[i2].timer_->heap_index_ = i2;
419 18467 : }
420 : };
421 :
422 : //------------------------------------------------------------------------------
423 :
424 : void
425 9353 : timer_impl::
426 : release()
427 : {
428 9353 : svc_->destroy_impl(*this);
429 9353 : }
430 :
431 : std::coroutine_handle<>
432 9328 : timer_impl::
433 : wait(
434 : std::coroutine_handle<> h,
435 : capy::executor_ref d,
436 : std::stop_token token,
437 : std::error_code* ec)
438 : {
439 : // Check if timer already expired (not in heap anymore)
440 9328 : bool already_expired = (heap_index_ == (std::numeric_limits<std::size_t>::max)());
441 :
442 9328 : if (already_expired)
443 : {
444 : // Timer already expired - post for work tracking
445 6 : if (ec)
446 6 : *ec = {};
447 6 : d.post(h);
448 6 : return std::noop_coroutine();
449 : }
450 :
451 9322 : h_ = h;
452 9322 : d_ = std::move(d);
453 9322 : token_ = std::move(token);
454 9322 : ec_out_ = ec;
455 9322 : waiting_ = true;
456 9322 : svc_->get_scheduler().on_work_started();
457 : // completion is always posted to scheduler queue, never inline.
458 9322 : return std::noop_coroutine();
459 : }
460 :
461 : //------------------------------------------------------------------------------
462 : //
463 : // Extern free functions called from timer.cpp
464 : //
465 : //------------------------------------------------------------------------------
466 :
467 : timer::timer_impl*
468 9353 : timer_service_create(capy::execution_context& ctx)
469 : {
470 9353 : auto* svc = ctx.find_service<timer_service>();
471 9353 : if (!svc)
472 : {
473 : // Timer service not yet created - this happens if io_context
474 : // hasn't been constructed yet, or if the scheduler didn't
475 : // initialize the timer service
476 0 : throw std::runtime_error("timer_service not found");
477 : }
478 9353 : return svc->create_impl();
479 : }
480 :
481 : void
482 9353 : timer_service_destroy(timer::timer_impl& base) noexcept
483 : {
484 9353 : static_cast<timer_impl&>(base).release();
485 9353 : }
486 :
487 : timer::time_point
488 28 : timer_service_expiry(timer::timer_impl& base) noexcept
489 : {
490 28 : return static_cast<timer_impl&>(base).expiry_;
491 : }
492 :
493 : void
494 14 : timer_service_expires_at(timer::timer_impl& base, timer::time_point t)
495 : {
496 14 : auto& impl = static_cast<timer_impl&>(base);
497 14 : impl.expiry_ = t;
498 14 : impl.svc_->update_timer(impl, t);
499 14 : }
500 :
501 : void
502 9342 : timer_service_expires_after(timer::timer_impl& base, timer::duration d)
503 : {
504 9342 : auto& impl = static_cast<timer_impl&>(base);
505 9342 : impl.expiry_ = timer::clock_type::now() + d;
506 9342 : impl.svc_->update_timer(impl, impl.expiry_);
507 9342 : }
508 :
509 : void
510 14 : timer_service_cancel(timer::timer_impl& base) noexcept
511 : {
512 14 : auto& impl = static_cast<timer_impl&>(base);
513 14 : impl.svc_->cancel_timer(impl);
514 14 : }
515 :
516 : timer_service&
517 309 : get_timer_service(capy::execution_context& ctx, scheduler& sched)
518 : {
519 309 : return ctx.make_service<timer_service_impl>(sched);
520 : }
521 :
522 : } // namespace boost::corosio::detail
|