LCOV - code coverage report
Current view: top level - src/detail - timer_service.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 88.8 % 232 206
Test Date: 2026-02-06 05:04:16 Functions: 90.3 % 31 28

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Steve Gerbino
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #include "src/detail/timer_service.hpp"
      11              : 
      12              : #include <boost/corosio/detail/scheduler.hpp>
      13              : #include "src/detail/intrusive.hpp"
      14              : #include "src/detail/scheduler_op.hpp"
      15              : #include <boost/capy/error.hpp>
      16              : #include <boost/capy/coro.hpp>
      17              : #include <boost/capy/ex/executor_ref.hpp>
      18              : #include <system_error>
      19              : 
      20              : #include <coroutine>
      21              : #include <limits>
      22              : #include <mutex>
      23              : #include <stdexcept>
      24              : #include <stop_token>
      25              : #include <vector>
      26              : 
      27              : namespace boost::corosio::detail {
      28              : 
      29              : class timer_service_impl;
      30              : 
      31              : // Completion operation posted to scheduler when timer expires or is cancelled.
      32              : // Runs inside work_cleanup scope so work accounting is batched correctly.
      33              : struct timer_op final : scheduler_op
      34              : {
      35              :     capy::coro h;
      36              :     capy::executor_ref d;
      37              :     std::error_code* ec_out = nullptr;
      38              :     std::error_code ec_value;
      39              :     scheduler* sched = nullptr;
      40              : 
      41         9322 :     timer_op() noexcept
      42         9322 :         : scheduler_op(&timer_op::do_complete)
      43              :     {
      44         9322 :     }
      45              : 
      46            0 :     static void do_complete(
      47              :         void* owner,
      48              :         scheduler_op* base,
      49              :         std::uint32_t,
      50              :         std::uint32_t)
      51              :     {
      52            0 :         auto* self = static_cast<timer_op*>(base);
      53            0 :         if (!owner)
      54              :         {
      55            0 :             delete self;
      56            0 :             return;
      57              :         }
      58            0 :         (*self)();
      59              :     }
      60              : 
      61         9322 :     void operator()() override
      62              :     {
      63         9322 :         if (ec_out)
      64         9322 :             *ec_out = ec_value;
      65              : 
      66              :         // Capture before posting (coro may destroy this op)
      67         9322 :         auto* service = sched;
      68         9322 :         sched = nullptr;
      69              : 
      70         9322 :         d.post(h);
      71              : 
      72              :         // Balance the on_work_started() from timer_impl::wait()
      73         9322 :         if (service)
      74         9322 :             service->on_work_finished();
      75              : 
      76         9322 :         delete this;
      77         9322 :     }
      78              : 
      79            0 :     void destroy() override
      80              :     {
      81            0 :         delete this;
      82            0 :     }
      83              : };
      84              : 
      85              : struct timer_impl
      86              :     : timer::timer_impl
      87              :     , intrusive_list<timer_impl>::node
      88              : {
      89              :     using clock_type = std::chrono::steady_clock;
      90              :     using time_point = clock_type::time_point;
      91              :     using duration = clock_type::duration;
      92              : 
      93              :     timer_service_impl* svc_ = nullptr;
      94              :     time_point expiry_;
      95              :     std::size_t heap_index_ = (std::numeric_limits<std::size_t>::max)();
      96              : 
      97              :     // Wait operation state
      98              :     std::coroutine_handle<> h_;
      99              :     capy::executor_ref d_;
     100              :     std::error_code* ec_out_ = nullptr;
     101              :     std::stop_token token_;
     102              :     bool waiting_ = false;
     103              : 
     104          129 :     explicit timer_impl(timer_service_impl& svc) noexcept
     105          129 :         : svc_(&svc)
     106              :     {
     107          129 :     }
     108              : 
     109              :     void release() override;
     110              : 
     111              :     std::coroutine_handle<> wait(
     112              :         std::coroutine_handle<>,
     113              :         capy::executor_ref,
     114              :         std::stop_token,
     115              :         std::error_code*) override;
     116              : };
     117              : 
     118              : //------------------------------------------------------------------------------
     119              : 
     120              : class timer_service_impl : public timer_service
     121              : {
     122              : public:
     123              :     using clock_type = std::chrono::steady_clock;
     124              :     using time_point = clock_type::time_point;
     125              :     using key_type = timer_service;
     126              : 
     127              : private:
     128              :     struct heap_entry
     129              :     {
     130              :         time_point time_;
     131              :         timer_impl* timer_;
     132              :     };
     133              : 
     134              :     scheduler* sched_ = nullptr;
     135              :     mutable std::mutex mutex_;
     136              :     std::vector<heap_entry> heap_;
     137              :     intrusive_list<timer_impl> timers_;
     138              :     intrusive_list<timer_impl> free_list_;
     139              :     callback on_earliest_changed_;
     140              : 
     141              : public:
     142          309 :     timer_service_impl(capy::execution_context&, scheduler& sched)
     143          309 :         : timer_service()
     144          309 :         , sched_(&sched)
     145              :     {
     146          309 :     }
     147              : 
     148         9322 :     scheduler& get_scheduler() noexcept { return *sched_; }
     149              : 
     150          618 :     ~timer_service_impl()
     151          309 :     {
     152          618 :     }
     153              : 
     154              :     timer_service_impl(timer_service_impl const&) = delete;
     155              :     timer_service_impl& operator=(timer_service_impl const&) = delete;
     156              : 
     157          309 :     void set_on_earliest_changed(callback cb) override
     158              :     {
     159          309 :         on_earliest_changed_ = cb;
     160          309 :     }
     161              : 
     162          309 :     void shutdown() override
     163              :     {
     164              :         // Cancel all waiting timers and destroy coroutine handles
     165              :         // This properly decrements outstanding_work_ for each waiting timer
     166          309 :         while (auto* impl = timers_.pop_front())
     167              :         {
     168            0 :             if (impl->waiting_)
     169              :             {
     170            0 :                 impl->waiting_ = false;
     171              :                 // Destroy the coroutine handle without resuming
     172            0 :                 impl->h_.destroy();
     173              :                 // Decrement work count to avoid leak
     174            0 :                 sched_->on_work_finished();
     175              :             }
     176            0 :             delete impl;
     177            0 :         }
     178          438 :         while (auto* impl = free_list_.pop_front())
     179          129 :             delete impl;
     180          309 :     }
     181              : 
     182         9353 :     timer::timer_impl* create_impl() override
     183              :     {
     184         9353 :         std::lock_guard lock(mutex_);
     185              :         timer_impl* impl;
     186         9353 :         if (auto* p = free_list_.pop_front())
     187              :         {
     188         9224 :             impl = p;
     189         9224 :             impl->heap_index_ = (std::numeric_limits<std::size_t>::max)();
     190              :         }
     191              :         else
     192              :         {
     193          129 :             impl = new timer_impl(*this);
     194              :         }
     195         9353 :         timers_.push_back(impl);
     196         9353 :         return impl;
     197         9353 :     }
     198              : 
     199         9353 :     void destroy_impl(timer_impl& impl)
     200              :     {
     201         9353 :         std::lock_guard lock(mutex_);
     202         9353 :         remove_timer_impl(impl);
     203         9353 :         timers_.remove(&impl);
     204         9353 :         free_list_.push_back(&impl);
     205         9353 :     }
     206              : 
     207         9356 :     void update_timer(timer_impl& impl, time_point new_time)
     208              :     {
     209         9356 :         bool notify = false;
     210         9356 :         bool was_waiting = false;
     211         9356 :         std::coroutine_handle<> h;
     212         9356 :         capy::executor_ref d;
     213         9356 :         std::error_code* ec_out = nullptr;
     214              : 
     215              :         {
     216         9356 :             std::lock_guard lock(mutex_);
     217              : 
     218              :             // If currently waiting, cancel the pending wait
     219         9356 :             if (impl.waiting_)
     220              :             {
     221            2 :                 was_waiting = true;
     222            2 :                 impl.waiting_ = false;
     223            2 :                 h = impl.h_;
     224            2 :                 d = impl.d_;
     225            2 :                 ec_out = impl.ec_out_;
     226              :             }
     227              : 
     228         9356 :             if (impl.heap_index_ < heap_.size())
     229              :             {
     230              :                 // Already in heap, update position
     231            8 :                 time_point old_time = heap_[impl.heap_index_].time_;
     232            8 :                 heap_[impl.heap_index_].time_ = new_time;
     233              : 
     234            8 :                 if (new_time < old_time)
     235            8 :                     up_heap(impl.heap_index_);
     236              :                 else
     237            0 :                     down_heap(impl.heap_index_);
     238              :             }
     239              :             else
     240              :             {
     241              :                 // Not in heap, add it
     242         9348 :                 impl.heap_index_ = heap_.size();
     243         9348 :                 heap_.push_back({new_time, &impl});
     244         9348 :                 up_heap(heap_.size() - 1);
     245              :             }
     246              : 
     247              :             // Notify if this timer is now the earliest
     248         9356 :             notify = (impl.heap_index_ == 0);
     249         9356 :         }
     250              : 
     251              :         // Post cancelled waiter as scheduler_op (runs inside work_cleanup scope)
     252         9356 :         if (was_waiting)
     253              :         {
     254            2 :             auto* op = new timer_op;
     255            2 :             op->h = h;
     256            2 :             op->d = std::move(d);
     257            2 :             op->ec_out = ec_out;
     258            2 :             op->ec_value = make_error_code(capy::error::canceled);
     259            2 :             op->sched = sched_;
     260            2 :             sched_->post(op);
     261              :         }
     262              : 
     263         9356 :         if (notify)
     264         9343 :             on_earliest_changed_();
     265         9356 :     }
     266              : 
     267              :     void remove_timer(timer_impl& impl)
     268              :     {
     269              :         std::lock_guard lock(mutex_);
     270              :         remove_timer_impl(impl);
     271              :     }
     272              : 
     273           14 :     void cancel_timer(timer_impl& impl)
     274              :     {
     275           14 :         std::coroutine_handle<> h;
     276           14 :         capy::executor_ref d;
     277           14 :         std::error_code* ec_out = nullptr;
     278           14 :         bool was_waiting = false;
     279              : 
     280              :         {
     281           14 :             std::lock_guard lock(mutex_);
     282           14 :             remove_timer_impl(impl);
     283           14 :             if (impl.waiting_)
     284              :             {
     285            4 :                 was_waiting = true;
     286            4 :                 impl.waiting_ = false;
     287            4 :                 h = impl.h_;
     288            4 :                 d = std::move(impl.d_);
     289            4 :                 ec_out = impl.ec_out_;
     290              :             }
     291           14 :         }
     292              : 
     293              :         // Post cancelled waiter as scheduler_op (runs inside work_cleanup scope)
     294           14 :         if (was_waiting)
     295              :         {
     296            4 :             auto* op = new timer_op;
     297            4 :             op->h = h;
     298            4 :             op->d = std::move(d);
     299            4 :             op->ec_out = ec_out;
     300            4 :             op->ec_value = make_error_code(capy::error::canceled);
     301            4 :             op->sched = sched_;
     302            4 :             sched_->post(op);
     303              :         }
     304           14 :     }
     305              : 
     306            0 :     bool empty() const noexcept override
     307              :     {
     308            0 :         std::lock_guard lock(mutex_);
     309            0 :         return heap_.empty();
     310            0 :     }
     311              : 
     312        22276 :     time_point nearest_expiry() const noexcept override
     313              :     {
     314        22276 :         std::lock_guard lock(mutex_);
     315        22276 :         return heap_.empty() ? time_point::max() : heap_[0].time_;
     316        22276 :     }
     317              : 
     318       139122 :     std::size_t process_expired() override
     319              :     {
     320              :         // Collect expired timer_ops while holding lock
     321       139122 :         std::vector<timer_op*> expired;
     322              : 
     323              :         {
     324       139122 :             std::lock_guard lock(mutex_);
     325       139122 :             auto now = clock_type::now();
     326              : 
     327       287566 :             while (!heap_.empty() && heap_[0].time_ <= now)
     328              :             {
     329         9322 :                 timer_impl* t = heap_[0].timer_;
     330         9322 :                 remove_timer_impl(*t);
     331              : 
     332         9322 :                 if (t->waiting_)
     333              :                 {
     334         9316 :                     t->waiting_ = false;
     335         9316 :                     auto* op = new timer_op;
     336         9316 :                     op->h = t->h_;
     337         9316 :                     op->d = std::move(t->d_);
     338         9316 :                     op->ec_out = t->ec_out_;
     339         9316 :                     op->ec_value = {};  // Success
     340         9316 :                     op->sched = sched_;
     341         9316 :                     expired.push_back(op);
     342              :                 }
     343              :                 // If not waiting, timer is removed but not dispatched -
     344              :                 // wait() will handle this by checking expiry
     345              :             }
     346       139122 :         }
     347              : 
     348              :         // Post ops to scheduler (they run inside work_cleanup scope)
     349       148438 :         for (auto* op : expired)
     350         9316 :             sched_->post(op);
     351              : 
     352       278244 :         return expired.size();
     353       139122 :     }
     354              : 
     355              : private:
     356        18689 :     void remove_timer_impl(timer_impl& impl)
     357              :     {
     358        18689 :         std::size_t index = impl.heap_index_;
     359        18689 :         if (index >= heap_.size())
     360         9341 :             return; // Not in heap
     361              : 
     362         9348 :         if (index == heap_.size() - 1)
     363              :         {
     364              :             // Last element, just pop
     365          108 :             impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
     366          108 :             heap_.pop_back();
     367              :         }
     368              :         else
     369              :         {
     370              :             // Swap with last and reheapify
     371         9240 :             swap_heap(index, heap_.size() - 1);
     372         9240 :             impl.heap_index_ = (std::numeric_limits<std::size_t>::max)();
     373         9240 :             heap_.pop_back();
     374              : 
     375         9240 :             if (index > 0 && heap_[index].time_ < heap_[(index - 1) / 2].time_)
     376            0 :                 up_heap(index);
     377              :             else
     378         9240 :                 down_heap(index);
     379              :         }
     380              :     }
     381              : 
     382         9356 :     void up_heap(std::size_t index)
     383              :     {
     384        18583 :         while (index > 0)
     385              :         {
     386         9240 :             std::size_t parent = (index - 1) / 2;
     387         9240 :             if (!(heap_[index].time_ < heap_[parent].time_))
     388           13 :                 break;
     389         9227 :             swap_heap(index, parent);
     390         9227 :             index = parent;
     391              :         }
     392         9356 :     }
     393              : 
     394         9240 :     void down_heap(std::size_t index)
     395              :     {
     396         9240 :         std::size_t child = index * 2 + 1;
     397         9240 :         while (child < heap_.size())
     398              :         {
     399            2 :             std::size_t min_child = (child + 1 == heap_.size() ||
     400            0 :                 heap_[child].time_ < heap_[child + 1].time_)
     401            2 :                 ? child : child + 1;
     402              : 
     403            2 :             if (heap_[index].time_ < heap_[min_child].time_)
     404            2 :                 break;
     405              : 
     406            0 :             swap_heap(index, min_child);
     407            0 :             index = min_child;
     408            0 :             child = index * 2 + 1;
     409              :         }
     410         9240 :     }
     411              : 
     412        18467 :     void swap_heap(std::size_t i1, std::size_t i2)
     413              :     {
     414        18467 :         heap_entry tmp = heap_[i1];
     415        18467 :         heap_[i1] = heap_[i2];
     416        18467 :         heap_[i2] = tmp;
     417        18467 :         heap_[i1].timer_->heap_index_ = i1;
     418        18467 :         heap_[i2].timer_->heap_index_ = i2;
     419        18467 :     }
     420              : };
     421              : 
     422              : //------------------------------------------------------------------------------
     423              : 
     424              : void
     425         9353 : timer_impl::
     426              : release()
     427              : {
     428         9353 :     svc_->destroy_impl(*this);
     429         9353 : }
     430              : 
     431              : std::coroutine_handle<>
     432         9328 : timer_impl::
     433              : wait(
     434              :     std::coroutine_handle<> h,
     435              :     capy::executor_ref d,
     436              :     std::stop_token token,
     437              :     std::error_code* ec)
     438              : {
     439              :     // Check if timer already expired (not in heap anymore)
     440         9328 :     bool already_expired = (heap_index_ == (std::numeric_limits<std::size_t>::max)());
     441              : 
     442         9328 :     if (already_expired)
     443              :     {
     444              :         // Timer already expired - post for work tracking
     445            6 :         if (ec)
     446            6 :             *ec = {};
     447            6 :         d.post(h);
     448            6 :         return std::noop_coroutine();
     449              :     }
     450              : 
     451         9322 :     h_ = h;
     452         9322 :     d_ = std::move(d);
     453         9322 :     token_ = std::move(token);
     454         9322 :     ec_out_ = ec;
     455         9322 :     waiting_ = true;
     456         9322 :     svc_->get_scheduler().on_work_started();
     457              :     // completion is always posted to scheduler queue, never inline.
     458         9322 :     return std::noop_coroutine();
     459              : }
     460              : 
     461              : //------------------------------------------------------------------------------
     462              : //
     463              : // Extern free functions called from timer.cpp
     464              : //
     465              : //------------------------------------------------------------------------------
     466              : 
     467              : timer::timer_impl*
     468         9353 : timer_service_create(capy::execution_context& ctx)
     469              : {
     470         9353 :     auto* svc = ctx.find_service<timer_service>();
     471         9353 :     if (!svc)
     472              :     {
     473              :         // Timer service not yet created - this happens if io_context
     474              :         // hasn't been constructed yet, or if the scheduler didn't
     475              :         // initialize the timer service
     476            0 :         throw std::runtime_error("timer_service not found");
     477              :     }
     478         9353 :     return svc->create_impl();
     479              : }
     480              : 
     481              : void
     482         9353 : timer_service_destroy(timer::timer_impl& base) noexcept
     483              : {
     484         9353 :     static_cast<timer_impl&>(base).release();
     485         9353 : }
     486              : 
     487              : timer::time_point
     488           28 : timer_service_expiry(timer::timer_impl& base) noexcept
     489              : {
     490           28 :     return static_cast<timer_impl&>(base).expiry_;
     491              : }
     492              : 
     493              : void
     494           14 : timer_service_expires_at(timer::timer_impl& base, timer::time_point t)
     495              : {
     496           14 :     auto& impl = static_cast<timer_impl&>(base);
     497           14 :     impl.expiry_ = t;
     498           14 :     impl.svc_->update_timer(impl, t);
     499           14 : }
     500              : 
     501              : void
     502         9342 : timer_service_expires_after(timer::timer_impl& base, timer::duration d)
     503              : {
     504         9342 :     auto& impl = static_cast<timer_impl&>(base);
     505         9342 :     impl.expiry_ = timer::clock_type::now() + d;
     506         9342 :     impl.svc_->update_timer(impl, impl.expiry_);
     507         9342 : }
     508              : 
     509              : void
     510           14 : timer_service_cancel(timer::timer_impl& base) noexcept
     511              : {
     512           14 :     auto& impl = static_cast<timer_impl&>(base);
     513           14 :     impl.svc_->cancel_timer(impl);
     514           14 : }
     515              : 
     516              : timer_service&
     517          309 : get_timer_service(capy::execution_context& ctx, scheduler& sched)
     518              : {
     519          309 :     return ctx.make_service<timer_service_impl>(sched);
     520              : }
     521              : 
     522              : } // namespace boost::corosio::detail
        

Generated by: LCOV version 2.3