libs/corosio/src/corosio/src/tcp_server.cpp

64.7% Lines (44/68) 84.6% Functions (11/13) 85.7% Branches (24/28)
libs/corosio/src/corosio/src/tcp_server.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/tcp_server.hpp>
11 #include <boost/corosio/detail/except.hpp>
12 #include <condition_variable>
13 #include <mutex>
14 #include <utility>
15
16 namespace boost::corosio {
17
18 struct tcp_server::impl
19 {
20 std::mutex join_mutex;
21 std::condition_variable join_cv;
22 capy::execution_context& ctx;
23 std::vector<tcp_acceptor> ports;
24 std::stop_source stop;
25
26 9 explicit impl(capy::execution_context& c) noexcept
27 9 : ctx(c)
28 {
29 9 }
30 };
31
32 tcp_server::impl*
33 9 tcp_server::make_impl(capy::execution_context& ctx)
34 {
35 9 return new impl(ctx);
36 }
37
38 9 tcp_server::~tcp_server()
39 {
40
1/2
✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
9 delete impl_;
41 9 }
42
43 tcp_server::tcp_server(
44 tcp_server&& o) noexcept
45 : impl_(std::exchange(o.impl_, nullptr))
46 , ex_(std::move(o.ex_))
47 , waiters_(std::exchange(o.waiters_, nullptr))
48 , idle_head_(std::exchange(o.idle_head_, nullptr))
49 , active_head_(std::exchange(o.active_head_, nullptr))
50 , active_tail_(std::exchange(o.active_tail_, nullptr))
51 , active_accepts_(std::exchange(o.active_accepts_, 0))
52 , storage_(std::move(o.storage_))
53 , running_(std::exchange(o.running_, false))
54 {
55 }
56
57 tcp_server&
58 tcp_server::operator=(tcp_server&& o) noexcept
59 {
60 delete impl_;
61 impl_ = std::exchange(o.impl_, nullptr);
62 ex_ = std::move(o.ex_);
63 waiters_ = std::exchange(o.waiters_, nullptr);
64 idle_head_ = std::exchange(o.idle_head_, nullptr);
65 active_head_ = std::exchange(o.active_head_, nullptr);
66 active_tail_ = std::exchange(o.active_tail_, nullptr);
67 active_accepts_ = std::exchange(o.active_accepts_, 0);
68 storage_ = std::move(o.storage_);
69 running_ = std::exchange(o.running_, false);
70 return *this;
71 }
72
73 // Accept loop: wait for idle worker, accept connection, dispatch
74 capy::task<void>
75
1/1
✓ Branch 1 taken 8 times.
8 tcp_server::do_accept(tcp_acceptor& acc)
76 {
77 auto st = co_await capy::this_coro::stop_token;
78 while(! st.stop_requested())
79 {
80 // Wait for an idle worker before blocking on accept
81 auto& w = co_await pop();
82 auto [ec] = co_await acc.accept(w.socket());
83 if(ec)
84 {
85 co_await push(w);
86 continue;
87 }
88 w.run(launcher{*this, w});
89 }
90 16 }
91
92 std::error_code
93 9 tcp_server::bind(endpoint ep)
94 {
95
1/1
✓ Branch 1 taken 9 times.
9 impl_->ports.emplace_back(impl_->ctx);
96
1/1
✓ Branch 2 taken 9 times.
9 auto ec = impl_->ports.back().listen(ep);
97
2/2
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 8 times.
9 if (ec)
98 1 impl_->ports.pop_back();
99 9 return ec;
100 }
101
102 void
103 10 tcp_server::
104 start()
105 {
106 // Idempotent - only start if not already running
107
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 9 times.
10 if(running_)
108 1 return;
109
110 // Previous session must be fully stopped before restart
111
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 8 times.
9 if(active_accepts_ != 0)
112 1 detail::throw_logic_error(
113 "tcp_server::start: previous session not joined");
114
115 8 running_ = true;
116
117
1/1
✓ Branch 1 taken 8 times.
8 impl_->stop = {}; // Fresh stop source
118 8 auto st = impl_->stop.get_token();
119
120 8 active_accepts_ = impl_->ports.size();
121
122 // Launch with completion handler that decrements counter
123
2/2
✓ Branch 4 taken 8 times.
✓ Branch 5 taken 8 times.
16 for(auto& t : impl_->ports)
124 16 capy::run_async(ex_, st, [this]() {
125
1/1
✓ Branch 1 taken 8 times.
8 std::lock_guard lock(impl_->join_mutex);
126
1/2
✓ Branch 0 taken 8 times.
✗ Branch 1 not taken.
8 if(--active_accepts_ == 0)
127 8 impl_->join_cv.notify_all();
128
2/2
✓ Branch 1 taken 8 times.
✓ Branch 4 taken 8 times.
16 })(do_accept(t));
129 8 }
130
131 void
132 10 tcp_server::
133 stop()
134 {
135 // Idempotent - only stop if running
136
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 8 times.
10 if(!running_)
137 2 return;
138 8 running_ = false;
139
140 // Stop accept loops
141 8 impl_->stop.request_stop();
142
143 // Launch cancellation coroutine on server executor
144
2/2
✓ Branch 4 taken 8 times.
✓ Branch 7 taken 8 times.
8 capy::run_async(ex_, std::stop_token{})(do_stop());
145 }
146
147 void
148 4 tcp_server::
149 join()
150 {
151
1/1
✓ Branch 1 taken 4 times.
4 std::unique_lock lock(impl_->join_mutex);
152
1/1
✓ Branch 1 taken 4 times.
8 impl_->join_cv.wait(lock, [this] { return active_accepts_ == 0; });
153 4 }
154
155 capy::task<>
156
1/1
✓ Branch 1 taken 8 times.
8 tcp_server::do_stop()
157 {
158 // Running on server executor - safe to iterate active list
159 // Just cancel, don't modify list - workers return themselves when done
160 for(auto* w = active_head_; w; w = w->next_)
161 w->stop_.request_stop();
162 co_return;
163 16 }
164
165 } // namespace boost::corosio
166