Line data Source code
1 : //
2 : // Copyright (c) 2026 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #include <boost/corosio/tcp_server.hpp>
11 : #include <boost/corosio/detail/except.hpp>
12 : #include <condition_variable>
13 : #include <mutex>
14 : #include <utility>
15 :
16 : namespace boost::corosio {
17 :
18 : struct tcp_server::impl
19 : {
20 : std::mutex join_mutex;
21 : std::condition_variable join_cv;
22 : capy::execution_context& ctx;
23 : std::vector<tcp_acceptor> ports;
24 : std::stop_source stop;
25 :
26 9 : explicit impl(capy::execution_context& c) noexcept
27 9 : : ctx(c)
28 : {
29 9 : }
30 : };
31 :
32 : tcp_server::impl*
33 9 : tcp_server::make_impl(capy::execution_context& ctx)
34 : {
35 9 : return new impl(ctx);
36 : }
37 :
38 9 : tcp_server::~tcp_server()
39 : {
40 9 : delete impl_;
41 9 : }
42 :
43 0 : tcp_server::tcp_server(
44 0 : tcp_server&& o) noexcept
45 0 : : impl_(std::exchange(o.impl_, nullptr))
46 0 : , ex_(std::move(o.ex_))
47 0 : , waiters_(std::exchange(o.waiters_, nullptr))
48 0 : , idle_head_(std::exchange(o.idle_head_, nullptr))
49 0 : , active_head_(std::exchange(o.active_head_, nullptr))
50 0 : , active_tail_(std::exchange(o.active_tail_, nullptr))
51 0 : , active_accepts_(std::exchange(o.active_accepts_, 0))
52 0 : , storage_(std::move(o.storage_))
53 0 : , running_(std::exchange(o.running_, false))
54 : {
55 0 : }
56 :
57 : tcp_server&
58 0 : tcp_server::operator=(tcp_server&& o) noexcept
59 : {
60 0 : delete impl_;
61 0 : impl_ = std::exchange(o.impl_, nullptr);
62 0 : ex_ = std::move(o.ex_);
63 0 : waiters_ = std::exchange(o.waiters_, nullptr);
64 0 : idle_head_ = std::exchange(o.idle_head_, nullptr);
65 0 : active_head_ = std::exchange(o.active_head_, nullptr);
66 0 : active_tail_ = std::exchange(o.active_tail_, nullptr);
67 0 : active_accepts_ = std::exchange(o.active_accepts_, 0);
68 0 : storage_ = std::move(o.storage_);
69 0 : running_ = std::exchange(o.running_, false);
70 0 : return *this;
71 : }
72 :
73 : // Accept loop: wait for idle worker, accept connection, dispatch
74 : capy::task<void>
75 8 : tcp_server::do_accept(tcp_acceptor& acc)
76 : {
77 : auto st = co_await capy::this_coro::stop_token;
78 : while(! st.stop_requested())
79 : {
80 : // Wait for an idle worker before blocking on accept
81 : auto& w = co_await pop();
82 : auto [ec] = co_await acc.accept(w.socket());
83 : if(ec)
84 : {
85 : co_await push(w);
86 : continue;
87 : }
88 : w.run(launcher{*this, w});
89 : }
90 16 : }
91 :
92 : std::error_code
93 9 : tcp_server::bind(endpoint ep)
94 : {
95 9 : impl_->ports.emplace_back(impl_->ctx);
96 9 : auto ec = impl_->ports.back().listen(ep);
97 9 : if (ec)
98 1 : impl_->ports.pop_back();
99 9 : return ec;
100 : }
101 :
102 : void
103 10 : tcp_server::
104 : start()
105 : {
106 : // Idempotent - only start if not already running
107 10 : if(running_)
108 1 : return;
109 :
110 : // Previous session must be fully stopped before restart
111 9 : if(active_accepts_ != 0)
112 1 : detail::throw_logic_error(
113 : "tcp_server::start: previous session not joined");
114 :
115 8 : running_ = true;
116 :
117 8 : impl_->stop = {}; // Fresh stop source
118 8 : auto st = impl_->stop.get_token();
119 :
120 8 : active_accepts_ = impl_->ports.size();
121 :
122 : // Launch with completion handler that decrements counter
123 16 : for(auto& t : impl_->ports)
124 16 : capy::run_async(ex_, st, [this]() {
125 8 : std::lock_guard lock(impl_->join_mutex);
126 8 : if(--active_accepts_ == 0)
127 8 : impl_->join_cv.notify_all();
128 16 : })(do_accept(t));
129 8 : }
130 :
131 : void
132 10 : tcp_server::
133 : stop()
134 : {
135 : // Idempotent - only stop if running
136 10 : if(!running_)
137 2 : return;
138 8 : running_ = false;
139 :
140 : // Stop accept loops
141 8 : impl_->stop.request_stop();
142 :
143 : // Launch cancellation coroutine on server executor
144 8 : capy::run_async(ex_, std::stop_token{})(do_stop());
145 : }
146 :
147 : void
148 4 : tcp_server::
149 : join()
150 : {
151 4 : std::unique_lock lock(impl_->join_mutex);
152 8 : impl_->join_cv.wait(lock, [this] { return active_accepts_ == 0; });
153 4 : }
154 :
155 : capy::task<>
156 8 : tcp_server::do_stop()
157 : {
158 : // Running on server executor - safe to iterate active list
159 : // Just cancel, don't modify list - workers return themselves when done
160 : for(auto* w = active_head_; w; w = w->next_)
161 : w->stop_.request_stop();
162 : co_return;
163 16 : }
164 :
165 : } // namespace boost::corosio
|