libs/corosio/src/corosio/src/detail/timer_service.cpp

88.8% Lines (206/232) 90.0% Functions (27/30) 72.8% Branches (75/103)
libs/corosio/src/corosio/src/detail/timer_service.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include "src/detail/timer_service.hpp"
11
12 #include <boost/corosio/detail/scheduler.hpp>
13 #include "src/detail/intrusive.hpp"
14 #include "src/detail/scheduler_op.hpp"
15 #include <boost/capy/error.hpp>
16 #include <boost/capy/coro.hpp>
17 #include <boost/capy/ex/executor_ref.hpp>
18 #include <system_error>
19
20 #include <coroutine>
21 #include <limits>
22 #include <mutex>
23 #include <stdexcept>
24 #include <stop_token>
25 #include <vector>
26
27 namespace boost::corosio::detail {
28
29 class timer_service_impl;
30
31 // Completion operation posted to scheduler when timer expires or is cancelled.
32 // Runs inside work_cleanup scope so work accounting is batched correctly.
33 struct timer_op final : scheduler_op
34 {
35 capy::coro h;
36 capy::executor_ref d;
37 std::error_code* ec_out = nullptr;
38 std::error_code ec_value;
39 scheduler* sched = nullptr;
40
41 9322 timer_op() noexcept
42 9322 : scheduler_op(&timer_op::do_complete)
43 {
44 9322 }
45
46 static void do_complete(
47 void* owner,
48 scheduler_op* base,
49 std::uint32_t,
50 std::uint32_t)
51 {
52 auto* self = static_cast<timer_op*>(base);
53 if (!owner)
54 {
55 delete self;
56 return;
57 }
58 (*self)();
59 }
60
61 9322 void operator()() override
62 {
63
1/2
✓ Branch 0 taken 9322 times.
✗ Branch 1 not taken.
9322 if (ec_out)
64 9322 *ec_out = ec_value;
65
66 // Capture before posting (coro may destroy this op)
67 9322 auto* service = sched;
68 9322 sched = nullptr;
69
70 9322 d.post(h);
71
72 // Balance the on_work_started() from timer_impl::wait()
73
1/2
✓ Branch 0 taken 9322 times.
✗ Branch 1 not taken.
9322 if (service)
74 9322 service->on_work_finished();
75
76
1/2
✓ Branch 0 taken 9322 times.
✗ Branch 1 not taken.
9322 delete this;
77 9322 }
78
79 void destroy() override
80 {
81 delete this;
82 }
83 };
84
85 struct timer_impl
86 : timer::timer_impl
87 , intrusive_list<timer_impl>::node
88 {
89 using clock_type = std::chrono::steady_clock;
90 using time_point = clock_type::time_point;
91 using duration = clock_type::duration;
92
93 timer_service_impl* svc_ = nullptr;
94 time_point expiry_;
95 std::size_t heap_index_ = (std::numeric_limits<std::size_t>::max)();
96
97 // Wait operation state
98 std::coroutine_handle<> h_;
99 capy::executor_ref d_;
100 std::error_code* ec_out_ = nullptr;
101 std::stop_token token_;
102 bool waiting_ = false;
103
104 129 explicit timer_impl(timer_service_impl& svc) noexcept
105 129 : svc_(&svc)
106 {
107 129 }
108
109 void release() override;
110
111 std::coroutine_handle<> wait(
112 std::coroutine_handle<>,
113 capy::executor_ref,
114 std::stop_token,
115 std::error_code*) override;
116 };
117
118 //------------------------------------------------------------------------------
119
120 class timer_service_impl : public timer_service
121 {
122 public:
123 using clock_type = std::chrono::steady_clock;
124 using time_point = clock_type::time_point;
125 using key_type = timer_service;
126
127 private:
128 struct heap_entry
129 {
130 time_point time_;
131 timer_impl* timer_;
132 };
133
134 scheduler* sched_ = nullptr;
135 mutable std::mutex mutex_;
136 std::vector<heap_entry> heap_;
137 intrusive_list<timer_impl> timers_;
138 intrusive_list<timer_impl> free_list_;
139 callback on_earliest_changed_;
140
141 public:
142 309 timer_service_impl(capy::execution_context&, scheduler& sched)
143 309 : timer_service()
144 309 , sched_(&sched)
145 {
146 309 }
147
148 9322 scheduler& get_scheduler() noexcept { return *sched_; }
149
150 618 ~timer_service_impl()
151 309 {
152 618 }
153
154 timer_service_impl(timer_service_impl const&) = delete;
155 timer_service_impl& operator=(timer_service_impl const&) = delete;
156
157 309 void set_on_earliest_changed(callback cb) override
158 {
159 309 on_earliest_changed_ = cb;
160 309 }
161
162 309 void shutdown() override
163 {
164 // Cancel all waiting timers and destroy coroutine handles
165 // This properly decrements outstanding_work_ for each waiting timer
166
1/2
✓ Branch 1 taken 309 times.
✗ Branch 2 not taken.
309 while (auto* impl = timers_.pop_front())
167 {
168 if (impl->waiting_)
169 {
170 impl->waiting_ = false;
171 // Destroy the coroutine handle without resuming
172 impl->h_.destroy();
173 // Decrement work count to avoid leak
174 sched_->on_work_finished();
175 }
176 delete impl;
177 }
178
2/2
✓ Branch 1 taken 129 times.
✓ Branch 2 taken 309 times.
438 while (auto* impl = free_list_.pop_front())
179
1/2
✓ Branch 0 taken 129 times.
✗ Branch 1 not taken.
129 delete impl;
180 309 }
181
182 9353 timer::timer_impl* create_impl() override
183 {
184
1/1
✓ Branch 1 taken 9353 times.
9353 std::lock_guard lock(mutex_);
185 timer_impl* impl;
186
2/2
✓ Branch 1 taken 9224 times.
✓ Branch 2 taken 129 times.
9353 if (auto* p = free_list_.pop_front())
187 {
188 9224 impl = p;
189 9224 impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
190 }
191 else
192 {
193
1/1
✓ Branch 1 taken 129 times.
129 impl = new timer_impl(*this);
194 }
195 9353 timers_.push_back(impl);
196 9353 return impl;
197 9353 }
198
199 9353 void destroy_impl(timer_impl& impl)
200 {
201
1/1
✓ Branch 1 taken 9353 times.
9353 std::lock_guard lock(mutex_);
202
1/1
✓ Branch 1 taken 9353 times.
9353 remove_timer_impl(impl);
203 9353 timers_.remove(&impl);
204 9353 free_list_.push_back(&impl);
205 9353 }
206
207 9356 void update_timer(timer_impl& impl, time_point new_time)
208 {
209 9356 bool notify = false;
210 9356 bool was_waiting = false;
211 9356 std::coroutine_handle<> h;
212 9356 capy::executor_ref d;
213 9356 std::error_code* ec_out = nullptr;
214
215 {
216
1/1
✓ Branch 1 taken 9356 times.
9356 std::lock_guard lock(mutex_);
217
218 // If currently waiting, cancel the pending wait
219
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 9354 times.
9356 if (impl.waiting_)
220 {
221 2 was_waiting = true;
222 2 impl.waiting_ = false;
223 2 h = impl.h_;
224 2 d = impl.d_;
225 2 ec_out = impl.ec_out_;
226 }
227
228
2/2
✓ Branch 1 taken 8 times.
✓ Branch 2 taken 9348 times.
9356 if (impl.heap_index_ < heap_.size())
229 {
230 // Already in heap, update position
231 8 time_point old_time = heap_[impl.heap_index_].time_;
232 8 heap_[impl.heap_index_].time_ = new_time;
233
234
2/3
✓ Branch 1 taken 8 times.
✓ Branch 4 taken 8 times.
✗ Branch 5 not taken.
8 if (new_time < old_time)
235
1/1
✓ Branch 1 taken 8 times.
8 up_heap(impl.heap_index_);
236 else
237 down_heap(impl.heap_index_);
238 }
239 else
240 {
241 // Not in heap, add it
242 9348 impl.heap_index_ = heap_.size();
243
1/1
✓ Branch 1 taken 9348 times.
9348 heap_.push_back({new_time, &impl});
244
1/1
✓ Branch 2 taken 9348 times.
9348 up_heap(heap_.size() - 1);
245 }
246
247 // Notify if this timer is now the earliest
248 9356 notify = (impl.heap_index_ == 0);
249 9356 }
250
251 // Post cancelled waiter as scheduler_op (runs inside work_cleanup scope)
252
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 9354 times.
9356 if (was_waiting)
253 {
254
1/1
✓ Branch 1 taken 2 times.
2 auto* op = new timer_op;
255 2 op->h = h;
256 2 op->d = std::move(d);
257 2 op->ec_out = ec_out;
258 2 op->ec_value = make_error_code(capy::error::canceled);
259 2 op->sched = sched_;
260
1/1
✓ Branch 1 taken 2 times.
2 sched_->post(op);
261 }
262
263
2/2
✓ Branch 0 taken 9343 times.
✓ Branch 1 taken 13 times.
9356 if (notify)
264
1/1
✓ Branch 1 taken 9343 times.
9343 on_earliest_changed_();
265 9356 }
266
267 void remove_timer(timer_impl& impl)
268 {
269 std::lock_guard lock(mutex_);
270 remove_timer_impl(impl);
271 }
272
273 14 void cancel_timer(timer_impl& impl)
274 {
275 14 std::coroutine_handle<> h;
276 14 capy::executor_ref d;
277 14 std::error_code* ec_out = nullptr;
278 14 bool was_waiting = false;
279
280 {
281
1/1
✓ Branch 1 taken 14 times.
14 std::lock_guard lock(mutex_);
282
1/1
✓ Branch 1 taken 14 times.
14 remove_timer_impl(impl);
283
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 10 times.
14 if (impl.waiting_)
284 {
285 4 was_waiting = true;
286 4 impl.waiting_ = false;
287 4 h = impl.h_;
288 4 d = std::move(impl.d_);
289 4 ec_out = impl.ec_out_;
290 }
291 14 }
292
293 // Post cancelled waiter as scheduler_op (runs inside work_cleanup scope)
294
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 10 times.
14 if (was_waiting)
295 {
296
1/1
✓ Branch 1 taken 4 times.
4 auto* op = new timer_op;
297 4 op->h = h;
298 4 op->d = std::move(d);
299 4 op->ec_out = ec_out;
300 4 op->ec_value = make_error_code(capy::error::canceled);
301 4 op->sched = sched_;
302
1/1
✓ Branch 1 taken 4 times.
4 sched_->post(op);
303 }
304 14 }
305
306 bool empty() const noexcept override
307 {
308 std::lock_guard lock(mutex_);
309 return heap_.empty();
310 }
311
312 22276 time_point nearest_expiry() const noexcept override
313 {
314 22276 std::lock_guard lock(mutex_);
315
2/2
✓ Branch 1 taken 78 times.
✓ Branch 2 taken 22198 times.
22276 return heap_.empty() ? time_point::max() : heap_[0].time_;
316 22276 }
317
318 139122 std::size_t process_expired() override
319 {
320 // Collect expired timer_ops while holding lock
321 139122 std::vector<timer_op*> expired;
322
323 {
324
1/1
✓ Branch 1 taken 139122 times.
139122 std::lock_guard lock(mutex_);
325 139122 auto now = clock_type::now();
326
327
7/7
✓ Branch 1 taken 148120 times.
✓ Branch 2 taken 324 times.
✓ Branch 5 taken 148120 times.
✓ Branch 8 taken 9322 times.
✓ Branch 9 taken 138798 times.
✓ Branch 10 taken 9322 times.
✓ Branch 11 taken 139122 times.
287566 while (!heap_.empty() && heap_[0].time_ <= now)
328 {
329 9322 timer_impl* t = heap_[0].timer_;
330
1/1
✓ Branch 1 taken 9322 times.
9322 remove_timer_impl(*t);
331
332
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 9316 times.
9322 if (t->waiting_)
333 {
334 9316 t->waiting_ = false;
335
1/1
✓ Branch 1 taken 9316 times.
9316 auto* op = new timer_op;
336 9316 op->h = t->h_;
337 9316 op->d = std::move(t->d_);
338 9316 op->ec_out = t->ec_out_;
339 9316 op->ec_value = {}; // Success
340 9316 op->sched = sched_;
341
1/1
✓ Branch 1 taken 9316 times.
9316 expired.push_back(op);
342 }
343 // If not waiting, timer is removed but not dispatched -
344 // wait() will handle this by checking expiry
345 }
346 139122 }
347
348 // Post ops to scheduler (they run inside work_cleanup scope)
349
2/2
✓ Branch 5 taken 9316 times.
✓ Branch 6 taken 139122 times.
148438 for (auto* op : expired)
350
1/1
✓ Branch 1 taken 9316 times.
9316 sched_->post(op);
351
352 278244 return expired.size();
353 139122 }
354
355 private:
356 18689 void remove_timer_impl(timer_impl& impl)
357 {
358 18689 std::size_t index = impl.heap_index_;
359
2/2
✓ Branch 1 taken 9341 times.
✓ Branch 2 taken 9348 times.
18689 if (index >= heap_.size())
360 9341 return; // Not in heap
361
362
2/2
✓ Branch 1 taken 108 times.
✓ Branch 2 taken 9240 times.
9348 if (index == heap_.size() - 1)
363 {
364 // Last element, just pop
365 108 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
366 108 heap_.pop_back();
367 }
368 else
369 {
370 // Swap with last and reheapify
371 9240 swap_heap(index, heap_.size() - 1);
372 9240 impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
373 9240 heap_.pop_back();
374
375
2/6
✗ Branch 0 not taken.
✓ Branch 1 taken 9240 times.
✗ Branch 6 not taken.
✗ Branch 7 not taken.
✗ Branch 8 not taken.
✓ Branch 9 taken 9240 times.
9240 if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
376 up_heap(index);
377 else
378 9240 down_heap(index);
379 }
380 }
381
382 9356 void up_heap(std::size_t index)
383 {
384
2/2
✓ Branch 0 taken 9240 times.
✓ Branch 1 taken 9343 times.
18583 while (index > 0)
385 {
386 9240 std::size_t parent = (index - 1) / 2;
387
2/2
✓ Branch 4 taken 13 times.
✓ Branch 5 taken 9227 times.
9240 if (!(heap_[index].time_ < heap_[parent].time_))
388 13 break;
389 9227 swap_heap(index, parent);
390 9227 index = parent;
391 }
392 9356 }
393
394 9240 void down_heap(std::size_t index)
395 {
396 9240 std::size_t child = index * 2 + 1;
397
2/2
✓ Branch 1 taken 2 times.
✓ Branch 2 taken 9238 times.
9240 while (child < heap_.size())
398 {
399 2 std::size_t min_child = (child + 1 == heap_.size() ||
400 heap_[child].time_ < heap_[child + 1].time_)
401
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 ? child : child + 1;
402
403
1/2
✓ Branch 4 taken 2 times.
✗ Branch 5 not taken.
2 if (heap_[index].time_ < heap_[min_child].time_)
404 2 break;
405
406 swap_heap(index, min_child);
407 index = min_child;
408 child = index * 2 + 1;
409 }
410 9240 }
411
412 18467 void swap_heap(std::size_t i1, std::size_t i2)
413 {
414 18467 heap_entry tmp = heap_[i1];
415 18467 heap_[i1] = heap_[i2];
416 18467 heap_[i2] = tmp;
417 18467 heap_[i1].timer_->heap_index_ = i1;
418 18467 heap_[i2].timer_->heap_index_ = i2;
419 18467 }
420 };
421
422 //------------------------------------------------------------------------------
423
424 void
425 9353 timer_impl::
426 release()
427 {
428 9353 svc_->destroy_impl(*this);
429 9353 }
430
431 std::coroutine_handle<>
432 9328 timer_impl::
433 wait(
434 std::coroutine_handle<> h,
435 capy::executor_ref d,
436 std::stop_token token,
437 std::error_code* ec)
438 {
439 // Check if timer already expired (not in heap anymore)
440 9328 bool already_expired = (heap_index_ == (std::numeric_limits<std::size_t>::max)());
441
442
2/2
✓ Branch 0 taken 6 times.
✓ Branch 1 taken 9322 times.
9328 if (already_expired)
443 {
444 // Timer already expired - post for work tracking
445
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (ec)
446 6 *ec = {};
447 6 d.post(h);
448 6 return std::noop_coroutine();
449 }
450
451 9322 h_ = h;
452 9322 d_ = std::move(d);
453 9322 token_ = std::move(token);
454 9322 ec_out_ = ec;
455 9322 waiting_ = true;
456 9322 svc_->get_scheduler().on_work_started();
457 // completion is always posted to scheduler queue, never inline.
458 9322 return std::noop_coroutine();
459 }
460
461 //------------------------------------------------------------------------------
462 //
463 // Extern free functions called from timer.cpp
464 //
465 //------------------------------------------------------------------------------
466
467 timer::timer_impl*
468 9353 timer_service_create(capy::execution_context& ctx)
469 {
470 9353 auto* svc = ctx.find_service<timer_service>();
471
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9353 times.
9353 if (!svc)
472 {
473 // Timer service not yet created - this happens if io_context
474 // hasn't been constructed yet, or if the scheduler didn't
475 // initialize the timer service
476 throw std::runtime_error("timer_service not found");
477 }
478 9353 return svc->create_impl();
479 }
480
481 void
482 9353 timer_service_destroy(timer::timer_impl& base) noexcept
483 {
484 9353 static_cast<timer_impl&>(base).release();
485 9353 }
486
487 timer::time_point
488 28 timer_service_expiry(timer::timer_impl& base) noexcept
489 {
490 28 return static_cast<timer_impl&>(base).expiry_;
491 }
492
493 void
494 14 timer_service_expires_at(timer::timer_impl& base, timer::time_point t)
495 {
496 14 auto& impl = static_cast<timer_impl&>(base);
497 14 impl.expiry_ = t;
498 14 impl.svc_->update_timer(impl, t);
499 14 }
500
501 void
502 9342 timer_service_expires_after(timer::timer_impl& base, timer::duration d)
503 {
504 9342 auto& impl = static_cast<timer_impl&>(base);
505
1/1
✓ Branch 2 taken 9342 times.
9342 impl.expiry_ = timer::clock_type::now() + d;
506 9342 impl.svc_->update_timer(impl, impl.expiry_);
507 9342 }
508
509 void
510 14 timer_service_cancel(timer::timer_impl& base) noexcept
511 {
512 14 auto& impl = static_cast<timer_impl&>(base);
513 14 impl.svc_->cancel_timer(impl);
514 14 }
515
516 timer_service&
517 309 get_timer_service(capy::execution_context& ctx, scheduler& sched)
518 {
519 309 return ctx.make_service<timer_service_impl>(sched);
520 }
521
522 } // namespace boost::corosio::detail
523