libs/corosio/src/corosio/src/detail/epoll/sockets.hpp

91.7% Lines (11/12) 85.7% Functions (6/7) -% Branches (0/0)
libs/corosio/src/corosio/src/detail/epoll/sockets.hpp
Line Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #ifndef BOOST_COROSIO_DETAIL_EPOLL_SOCKETS_HPP
11 #define BOOST_COROSIO_DETAIL_EPOLL_SOCKETS_HPP
12
13 #include <boost/corosio/detail/platform.hpp>
14
15 #if BOOST_COROSIO_HAS_EPOLL
16
17 #include <boost/corosio/detail/config.hpp>
18 #include <boost/corosio/tcp_socket.hpp>
19 #include <boost/capy/ex/executor_ref.hpp>
20 #include <boost/capy/ex/execution_context.hpp>
21 #include "src/detail/intrusive.hpp"
22 #include "src/detail/socket_service.hpp"
23
24 #include "src/detail/cached_initiator.hpp"
25 #include "src/detail/epoll/op.hpp"
26 #include "src/detail/epoll/scheduler.hpp"
27
28 #include <coroutine>
29 #include <memory>
30 #include <mutex>
31 #include <unordered_map>
32
33 /*
34 epoll Socket Implementation
35 ===========================
36
37 Each I/O operation follows the same pattern:
38 1. Try the syscall immediately (non-blocking socket)
39 2. If it succeeds or fails with a real error, post to completion queue
40 3. If EAGAIN/EWOULDBLOCK, register with epoll and wait
41
42 This "try first" approach avoids unnecessary epoll round-trips for
43 operations that can complete immediately (common for small reads/writes
44 on fast local connections).
45
46 One-Shot Registration
47 ---------------------
48 We use one-shot epoll registration: each operation registers, waits for
49 one event, then unregisters. This simplifies the state machine since we
50 don't need to track whether an fd is currently registered or handle
51 re-arming. The tradeoff is slightly more epoll_ctl calls, but the
52 simplicity is worth it.
53
54 Cancellation
55 ------------
56 See op.hpp for the completion/cancellation race handling via the
57 `registered` atomic. cancel() must complete pending operations (post
58 them with cancelled flag) so coroutines waiting on them can resume.
59 close_socket() calls cancel() first to ensure this.
60
61 Impl Lifetime with shared_ptr
62 -----------------------------
63 Socket impls use enable_shared_from_this. The service owns impls via
64 shared_ptr maps (socket_ptrs_) keyed by raw pointer for O(1) lookup and
65 removal. When a user calls close(), we call cancel() which posts pending
66 ops to the scheduler.
67
68 CRITICAL: The posted ops must keep the impl alive until they complete.
69 Otherwise the scheduler would process a freed op (use-after-free). The
70 cancel() method captures shared_from_this() into op.impl_ptr before
71 posting. When the op completes, impl_ptr is cleared, allowing the impl
72 to be destroyed if no other references exist.
73
74 Service Ownership
75 -----------------
76 epoll_socket_service owns all socket impls. destroy_impl() removes the
77 shared_ptr from the map, but the impl may survive if ops still hold
78 impl_ptr refs. shutdown() closes all sockets and clears the map; any
79 in-flight ops will complete and release their refs.
80 */
81
82 namespace boost::corosio::detail {
83
84 class epoll_socket_service;
85 class epoll_socket_impl;
86
87 /// Socket implementation for epoll backend.
88 class epoll_socket_impl
89 : public tcp_socket::socket_impl
90 , public std::enable_shared_from_this<epoll_socket_impl>
91 , public intrusive_list<epoll_socket_impl>::node
92 {
93 friend class epoll_socket_service;
94
95 public:
96 explicit epoll_socket_impl(epoll_socket_service& svc) noexcept;
97 ~epoll_socket_impl();
98
99 void release() override;
100
101 std::coroutine_handle<> connect(
102 std::coroutine_handle<>,
103 capy::executor_ref,
104 endpoint,
105 std::stop_token,
106 std::error_code*) override;
107
108 std::coroutine_handle<> read_some(
109 std::coroutine_handle<>,
110 capy::executor_ref,
111 io_buffer_param,
112 std::stop_token,
113 std::error_code*,
114 std::size_t*) override;
115
116 std::coroutine_handle<> write_some(
117 std::coroutine_handle<>,
118 capy::executor_ref,
119 io_buffer_param,
120 std::stop_token,
121 std::error_code*,
122 std::size_t*) override;
123
124 std::error_code shutdown(tcp_socket::shutdown_type what) noexcept override;
125
126 native_handle_type native_handle() const noexcept override { return fd_; }
127
128 // Socket options
129 std::error_code set_no_delay(bool value) noexcept override;
130 bool no_delay(std::error_code& ec) const noexcept override;
131
132 std::error_code set_keep_alive(bool value) noexcept override;
133 bool keep_alive(std::error_code& ec) const noexcept override;
134
135 std::error_code set_receive_buffer_size(int size) noexcept override;
136 int receive_buffer_size(std::error_code& ec) const noexcept override;
137
138 std::error_code set_send_buffer_size(int size) noexcept override;
139 int send_buffer_size(std::error_code& ec) const noexcept override;
140
141 std::error_code set_linger(bool enabled, int timeout) noexcept override;
142 tcp_socket::linger_options linger(std::error_code& ec) const noexcept override;
143
144 16 endpoint local_endpoint() const noexcept override { return local_endpoint_; }
145 16 endpoint remote_endpoint() const noexcept override { return remote_endpoint_; }
146 bool is_open() const noexcept { return fd_ >= 0; }
147 void cancel() noexcept override;
148 void cancel_single_op(epoll_op& op) noexcept;
149 void close_socket() noexcept;
150 5017 void set_socket(int fd) noexcept { fd_ = fd; }
151 10034 void set_endpoints(endpoint local, endpoint remote) noexcept
152 {
153 10034 local_endpoint_ = local;
154 10034 remote_endpoint_ = remote;
155 10034 }
156
157 epoll_connect_op conn_;
158 epoll_read_op rd_;
159 epoll_write_op wr_;
160
161 /// Per-descriptor state for persistent epoll registration
162 descriptor_state desc_state_;
163
164 cached_initiator read_initiator_;
165 cached_initiator write_initiator_;
166
167 /// Execute the read I/O operation (called by initiator coroutine).
168 void do_read_io();
169
170 /// Execute the write I/O operation (called by initiator coroutine).
171 void do_write_io();
172
173 private:
174 epoll_socket_service& svc_;
175 int fd_ = -1;
176 endpoint local_endpoint_;
177 endpoint remote_endpoint_;
178 };
179
180 /** State for epoll socket service. */
181 class epoll_socket_state
182 {
183 public:
184 189 explicit epoll_socket_state(epoll_scheduler& sched) noexcept
185 189 : sched_(sched)
186 {
187 189 }
188
189 epoll_scheduler& sched_;
190 std::mutex mutex_;
191 intrusive_list<epoll_socket_impl> socket_list_;
192 std::unordered_map<epoll_socket_impl*, std::shared_ptr<epoll_socket_impl>> socket_ptrs_;
193 };
194
195 /** epoll socket service implementation.
196
197 Inherits from socket_service to enable runtime polymorphism.
198 Uses key_type = socket_service for service lookup.
199 */
200 class epoll_socket_service : public socket_service
201 {
202 public:
203 explicit epoll_socket_service(capy::execution_context& ctx);
204 ~epoll_socket_service();
205
206 epoll_socket_service(epoll_socket_service const&) = delete;
207 epoll_socket_service& operator=(epoll_socket_service const&) = delete;
208
209 void shutdown() override;
210
211 tcp_socket::socket_impl& create_impl() override;
212 void destroy_impl(tcp_socket::socket_impl& impl) override;
213 std::error_code open_socket(tcp_socket::socket_impl& impl) override;
214
215 20092 epoll_scheduler& scheduler() const noexcept { return state_->sched_; }
216 void post(epoll_op* op);
217 void work_started() noexcept;
218 void work_finished() noexcept;
219
220 private:
221 std::unique_ptr<epoll_socket_state> state_;
222 };
223
224 } // namespace boost::corosio::detail
225
226 #endif // BOOST_COROSIO_HAS_EPOLL
227
228 #endif // BOOST_COROSIO_DETAIL_EPOLL_SOCKETS_HPP
229