LCOV - code coverage report
Current view: top level - src - tcp_server.cpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 64.7 % 68 44
Test Date: 2026-02-06 05:04:16 Functions: 84.6 % 13 11

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2026 Vinnie Falco (vinnie dot falco at gmail dot com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #include <boost/corosio/tcp_server.hpp>
      11              : #include <boost/corosio/detail/except.hpp>
      12              : #include <condition_variable>
      13              : #include <mutex>
      14              : #include <utility>
      15              : 
      16              : namespace boost::corosio {
      17              : 
      18              : struct tcp_server::impl
      19              : {
      20              :     std::mutex join_mutex;
      21              :     std::condition_variable join_cv;
      22              :     capy::execution_context& ctx;
      23              :     std::vector<tcp_acceptor> ports;
      24              :     std::stop_source stop;
      25              : 
      26            9 :     explicit impl(capy::execution_context& c) noexcept
      27            9 :         : ctx(c)
      28              :     {
      29            9 :     }
      30              : };
      31              : 
      32              : tcp_server::impl*
      33            9 : tcp_server::make_impl(capy::execution_context& ctx)
      34              : {
      35            9 :     return new impl(ctx);
      36              : }
      37              : 
      38            9 : tcp_server::~tcp_server()
      39              : {
      40            9 :     delete impl_;
      41            9 : }
      42              : 
      43            0 : tcp_server::tcp_server(
      44            0 :     tcp_server&& o) noexcept
      45            0 :     : impl_(std::exchange(o.impl_, nullptr))
      46            0 :     , ex_(std::move(o.ex_))
      47            0 :     , waiters_(std::exchange(o.waiters_, nullptr))
      48            0 :     , idle_head_(std::exchange(o.idle_head_, nullptr))
      49            0 :     , active_head_(std::exchange(o.active_head_, nullptr))
      50            0 :     , active_tail_(std::exchange(o.active_tail_, nullptr))
      51            0 :     , active_accepts_(std::exchange(o.active_accepts_, 0))
      52            0 :     , storage_(std::move(o.storage_))
      53            0 :     , running_(std::exchange(o.running_, false))
      54              : {
      55            0 : }
      56              : 
      57              : tcp_server&
      58            0 : tcp_server::operator=(tcp_server&& o) noexcept
      59              : {
      60            0 :     delete impl_;
      61            0 :     impl_ = std::exchange(o.impl_, nullptr);
      62            0 :     ex_ = std::move(o.ex_);
      63            0 :     waiters_ = std::exchange(o.waiters_, nullptr);
      64            0 :     idle_head_ = std::exchange(o.idle_head_, nullptr);
      65            0 :     active_head_ = std::exchange(o.active_head_, nullptr);
      66            0 :     active_tail_ = std::exchange(o.active_tail_, nullptr);
      67            0 :     active_accepts_ = std::exchange(o.active_accepts_, 0);
      68            0 :     storage_ = std::move(o.storage_);
      69            0 :     running_ = std::exchange(o.running_, false);
      70            0 :     return *this;
      71              : }
      72              : 
      73              : // Accept loop: wait for idle worker, accept connection, dispatch
      74              : capy::task<void>
      75            8 : tcp_server::do_accept(tcp_acceptor& acc)
      76              : {
      77              :     auto st = co_await capy::this_coro::stop_token;
      78              :     while(! st.stop_requested())
      79              :     {
      80              :         // Wait for an idle worker before blocking on accept
      81              :         auto& w = co_await pop();
      82              :         auto [ec] = co_await acc.accept(w.socket());
      83              :         if(ec)
      84              :         {
      85              :             co_await push(w);
      86              :             continue;
      87              :         }
      88              :         w.run(launcher{*this, w});
      89              :     }
      90           16 : }
      91              : 
      92              : std::error_code
      93            9 : tcp_server::bind(endpoint ep)
      94              : {
      95            9 :     impl_->ports.emplace_back(impl_->ctx);
      96            9 :     auto ec = impl_->ports.back().listen(ep);
      97            9 :     if (ec)
      98            1 :         impl_->ports.pop_back();
      99            9 :     return ec;
     100              : }
     101              : 
     102              : void
     103           10 : tcp_server::
     104              : start()
     105              : {
     106              :     // Idempotent - only start if not already running
     107           10 :     if(running_)
     108            1 :         return;
     109              :     
     110              :     // Previous session must be fully stopped before restart
     111            9 :     if(active_accepts_ != 0)
     112            1 :         detail::throw_logic_error(
     113              :             "tcp_server::start: previous session not joined");
     114              :     
     115            8 :     running_ = true;
     116              :     
     117            8 :     impl_->stop = {};  // Fresh stop source
     118            8 :     auto st = impl_->stop.get_token();
     119              :     
     120            8 :     active_accepts_ = impl_->ports.size();
     121              :     
     122              :     // Launch with completion handler that decrements counter
     123           16 :     for(auto& t : impl_->ports)
     124           16 :         capy::run_async(ex_, st, [this]() {
     125            8 :             std::lock_guard lock(impl_->join_mutex);
     126            8 :             if(--active_accepts_ == 0)
     127            8 :                 impl_->join_cv.notify_all();
     128           16 :         })(do_accept(t));
     129            8 : }
     130              : 
     131              : void
     132           10 : tcp_server::
     133              : stop()
     134              : {
     135              :     // Idempotent - only stop if running
     136           10 :     if(!running_)
     137            2 :         return;
     138            8 :     running_ = false;
     139              :     
     140              :     // Stop accept loops
     141            8 :     impl_->stop.request_stop();
     142              :     
     143              :     // Launch cancellation coroutine on server executor
     144            8 :     capy::run_async(ex_, std::stop_token{})(do_stop());
     145              : }
     146              : 
     147              : void
     148            4 : tcp_server::
     149              : join()
     150              : {
     151            4 :     std::unique_lock lock(impl_->join_mutex);
     152            8 :     impl_->join_cv.wait(lock, [this] { return active_accepts_ == 0; });
     153            4 : }
     154              : 
     155              : capy::task<>
     156            8 : tcp_server::do_stop()
     157              : {
     158              :     // Running on server executor - safe to iterate active list
     159              :     // Just cancel, don't modify list - workers return themselves when done
     160              :     for(auto* w = active_head_; w; w = w->next_)
     161              :         w->stop_.request_stop();
     162              :     co_return;
     163           16 : }
     164              : 
     165              : } // namespace boost::corosio
        

Generated by: LCOV version 2.3