Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_DETAIL_SELECT_OP_HPP
11 : #define BOOST_COROSIO_DETAIL_SELECT_OP_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_SELECT
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/corosio/io_object.hpp>
19 : #include <boost/corosio/endpoint.hpp>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <boost/capy/coro.hpp>
22 : #include <boost/capy/error.hpp>
23 : #include <system_error>
24 :
25 : #include "src/detail/make_err.hpp"
26 : #include "src/detail/scheduler_op.hpp"
27 : #include "src/detail/endpoint_convert.hpp"
28 :
29 : #include <unistd.h>
30 : #include <errno.h>
31 : #include <fcntl.h>
32 :
33 : #include <atomic>
34 : #include <cstddef>
35 : #include <memory>
36 : #include <optional>
37 : #include <stop_token>
38 :
39 : #include <netinet/in.h>
40 : #include <sys/select.h>
41 : #include <sys/socket.h>
42 : #include <sys/uio.h>
43 :
44 : /*
45 : select Operation State
46 : ======================
47 :
48 : Each async I/O operation has a corresponding select_op-derived struct that
49 : holds the operation's state while it's in flight. The socket impl owns
50 : fixed slots for each operation type (conn_, rd_, wr_), so only one
51 : operation of each type can be pending per socket at a time.
52 :
53 : This mirrors the epoll_op design for consistency across backends.
54 :
55 : Completion vs Cancellation Race
56 : -------------------------------
57 : The `registered` atomic uses a tri-state (unregistered, registering,
58 : registered) to handle two races: (1) between register_fd() and the
59 : reactor seeing an event, and (2) between reactor completion and cancel().
60 :
61 : The registering state closes the window where an event could arrive
62 : after register_fd() but before the boolean was set. The reactor and
63 : cancel() both treat registering the same as registered when claiming.
64 :
65 : Whoever atomically exchanges to unregistered "claims" the operation
66 : and is responsible for completing it. The loser sees unregistered and
67 : does nothing. The initiating thread uses compare_exchange to transition
68 : from registering to registered; if this fails, the reactor or cancel
69 : already claimed the op.
70 :
71 : Impl Lifetime Management
72 : ------------------------
73 : When cancel() posts an op to the scheduler's ready queue, the socket impl
74 : might be destroyed before the scheduler processes the op. The `impl_ptr`
75 : member holds a shared_ptr to the impl, keeping it alive until the op
76 : completes.
77 :
78 : EOF Detection
79 : -------------
80 : For reads, 0 bytes with no error means EOF. But an empty user buffer also
81 : returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
82 :
83 : SIGPIPE Prevention
84 : ------------------
85 : Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
86 : SIGPIPE when the peer has closed.
87 : */
88 :
89 : namespace boost::corosio::detail {
90 :
91 : // Forward declarations for cancellation support
92 : class select_socket_impl;
93 : class select_acceptor_impl;
94 :
95 : /** Registration state for async operations.
96 :
97 : Tri-state enum to handle the race between register_fd() and
98 : run_reactor() seeing an event. Setting REGISTERING before
99 : calling register_fd() ensures events delivered during the
100 : registration window are not dropped.
101 : */
102 : enum class select_registration_state : std::uint8_t
103 : {
104 : unregistered, ///< Not registered with reactor
105 : registering, ///< register_fd() called, not yet confirmed
106 : registered ///< Fully registered, ready for events
107 : };
108 :
109 : struct select_op : scheduler_op
110 : {
111 : struct canceller
112 : {
113 : select_op* op;
114 : void operator()() const noexcept;
115 : };
116 :
117 : capy::coro h;
118 : capy::executor_ref ex;
119 : std::error_code* ec_out = nullptr;
120 : std::size_t* bytes_out = nullptr;
121 :
122 : int fd = -1;
123 : int errn = 0;
124 : std::size_t bytes_transferred = 0;
125 :
126 : std::atomic<bool> cancelled{false};
127 : std::atomic<select_registration_state> registered{select_registration_state::unregistered};
128 : std::optional<std::stop_callback<canceller>> stop_cb;
129 :
130 : // Prevents use-after-free when socket is closed with pending ops.
131 : std::shared_ptr<void> impl_ptr;
132 :
133 : // For stop_token cancellation - pointer to owning socket/acceptor impl.
134 : select_socket_impl* socket_impl_ = nullptr;
135 : select_acceptor_impl* acceptor_impl_ = nullptr;
136 :
137 23094 : select_op() = default;
138 :
139 240194 : void reset() noexcept
140 : {
141 240194 : fd = -1;
142 240194 : errn = 0;
143 240194 : bytes_transferred = 0;
144 240194 : cancelled.store(false, std::memory_order_relaxed);
145 240194 : registered.store(select_registration_state::unregistered, std::memory_order_relaxed);
146 240194 : impl_ptr.reset();
147 240194 : socket_impl_ = nullptr;
148 240194 : acceptor_impl_ = nullptr;
149 240194 : }
150 :
151 232522 : void operator()() override
152 : {
153 232522 : stop_cb.reset();
154 :
155 232522 : if (ec_out)
156 : {
157 232522 : if (cancelled.load(std::memory_order_acquire))
158 204 : *ec_out = capy::error::canceled;
159 232318 : else if (errn != 0)
160 1 : *ec_out = make_err(errn);
161 232317 : else if (is_read_operation() && bytes_transferred == 0)
162 5 : *ec_out = capy::error::eof;
163 : else
164 232312 : *ec_out = {};
165 : }
166 :
167 232522 : if (bytes_out)
168 232522 : *bytes_out = bytes_transferred;
169 :
170 : // Move to stack before destroying the frame
171 232522 : capy::executor_ref saved_ex( std::move( ex ) );
172 232522 : capy::coro saved_h( std::move( h ) );
173 232522 : impl_ptr.reset();
174 232522 : saved_ex.dispatch( saved_h );
175 232522 : }
176 :
177 116195 : virtual bool is_read_operation() const noexcept { return false; }
178 : virtual void cancel() noexcept = 0;
179 :
180 0 : void destroy() override
181 : {
182 0 : stop_cb.reset();
183 0 : impl_ptr.reset();
184 0 : }
185 :
186 35074 : void request_cancel() noexcept
187 : {
188 35074 : cancelled.store(true, std::memory_order_release);
189 35074 : }
190 :
191 : void start(std::stop_token token)
192 : {
193 : cancelled.store(false, std::memory_order_release);
194 : stop_cb.reset();
195 : socket_impl_ = nullptr;
196 : acceptor_impl_ = nullptr;
197 :
198 : if (token.stop_possible())
199 : stop_cb.emplace(token, canceller{this});
200 : }
201 :
202 236357 : void start(std::stop_token token, select_socket_impl* impl)
203 : {
204 236357 : cancelled.store(false, std::memory_order_release);
205 236357 : stop_cb.reset();
206 236357 : socket_impl_ = impl;
207 236357 : acceptor_impl_ = nullptr;
208 :
209 236357 : if (token.stop_possible())
210 100 : stop_cb.emplace(token, canceller{this});
211 236357 : }
212 :
213 3837 : void start(std::stop_token token, select_acceptor_impl* impl)
214 : {
215 3837 : cancelled.store(false, std::memory_order_release);
216 3837 : stop_cb.reset();
217 3837 : socket_impl_ = nullptr;
218 3837 : acceptor_impl_ = impl;
219 :
220 3837 : if (token.stop_possible())
221 0 : stop_cb.emplace(token, canceller{this});
222 3837 : }
223 :
224 240073 : void complete(int err, std::size_t bytes) noexcept
225 : {
226 240073 : errn = err;
227 240073 : bytes_transferred = bytes;
228 240073 : }
229 :
230 0 : virtual void perform_io() noexcept {}
231 : };
232 :
233 :
234 : struct select_connect_op : select_op
235 : {
236 : endpoint target_endpoint;
237 :
238 3835 : void reset() noexcept
239 : {
240 3835 : select_op::reset();
241 3835 : target_endpoint = endpoint{};
242 3835 : }
243 :
244 3835 : void perform_io() noexcept override
245 : {
246 : // connect() completion status is retrieved via SO_ERROR, not return value
247 3835 : int err = 0;
248 3835 : socklen_t len = sizeof(err);
249 3835 : if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
250 0 : err = errno;
251 3835 : complete(err, 0);
252 3835 : }
253 :
254 : // Defined in sockets.cpp where select_socket_impl is complete
255 : void operator()() override;
256 : void cancel() noexcept override;
257 : };
258 :
259 :
260 : struct select_read_op : select_op
261 : {
262 : static constexpr std::size_t max_buffers = 16;
263 : iovec iovecs[max_buffers];
264 : int iovec_count = 0;
265 : bool empty_buffer_read = false;
266 :
267 116122 : bool is_read_operation() const noexcept override
268 : {
269 116122 : return !empty_buffer_read;
270 : }
271 :
272 116322 : void reset() noexcept
273 : {
274 116322 : select_op::reset();
275 116322 : iovec_count = 0;
276 116322 : empty_buffer_read = false;
277 116322 : }
278 :
279 81 : void perform_io() noexcept override
280 : {
281 81 : ssize_t n = ::readv(fd, iovecs, iovec_count);
282 81 : if (n >= 0)
283 81 : complete(0, static_cast<std::size_t>(n));
284 : else
285 0 : complete(errno, 0);
286 81 : }
287 :
288 : void cancel() noexcept override;
289 : };
290 :
291 :
292 : struct select_write_op : select_op
293 : {
294 : static constexpr std::size_t max_buffers = 16;
295 : iovec iovecs[max_buffers];
296 : int iovec_count = 0;
297 :
298 116200 : void reset() noexcept
299 : {
300 116200 : select_op::reset();
301 116200 : iovec_count = 0;
302 116200 : }
303 :
304 0 : void perform_io() noexcept override
305 : {
306 0 : msghdr msg{};
307 0 : msg.msg_iov = iovecs;
308 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
309 :
310 0 : ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
311 0 : if (n >= 0)
312 0 : complete(0, static_cast<std::size_t>(n));
313 : else
314 0 : complete(errno, 0);
315 0 : }
316 :
317 : void cancel() noexcept override;
318 : };
319 :
320 :
321 : struct select_accept_op : select_op
322 : {
323 : int accepted_fd = -1;
324 : io_object::io_object_impl* peer_impl = nullptr;
325 : io_object::io_object_impl** impl_out = nullptr;
326 :
327 3837 : void reset() noexcept
328 : {
329 3837 : select_op::reset();
330 3837 : accepted_fd = -1;
331 3837 : peer_impl = nullptr;
332 3837 : impl_out = nullptr;
333 3837 : }
334 :
335 3832 : void perform_io() noexcept override
336 : {
337 3832 : sockaddr_in addr{};
338 3832 : socklen_t addrlen = sizeof(addr);
339 :
340 : // Note: select backend uses accept() + fcntl instead of accept4()
341 : // for broader POSIX compatibility
342 3832 : int new_fd = ::accept(fd, reinterpret_cast<sockaddr*>(&addr), &addrlen);
343 :
344 3832 : if (new_fd >= 0)
345 : {
346 : // Reject fds that exceed select()'s FD_SETSIZE limit.
347 : // Better to fail now than during later async operations.
348 3832 : if (new_fd >= FD_SETSIZE)
349 : {
350 0 : ::close(new_fd);
351 0 : complete(EINVAL, 0);
352 0 : return;
353 : }
354 :
355 : // Set non-blocking and close-on-exec flags.
356 : // A non-blocking socket is essential for the async reactor;
357 : // if we can't configure it, fail rather than risk blocking.
358 3832 : int flags = ::fcntl(new_fd, F_GETFL, 0);
359 3832 : if (flags == -1)
360 : {
361 0 : int err = errno;
362 0 : ::close(new_fd);
363 0 : complete(err, 0);
364 0 : return;
365 : }
366 :
367 3832 : if (::fcntl(new_fd, F_SETFL, flags | O_NONBLOCK) == -1)
368 : {
369 0 : int err = errno;
370 0 : ::close(new_fd);
371 0 : complete(err, 0);
372 0 : return;
373 : }
374 :
375 3832 : if (::fcntl(new_fd, F_SETFD, FD_CLOEXEC) == -1)
376 : {
377 0 : int err = errno;
378 0 : ::close(new_fd);
379 0 : complete(err, 0);
380 0 : return;
381 : }
382 :
383 3832 : accepted_fd = new_fd;
384 3832 : complete(0, 0);
385 : }
386 : else
387 : {
388 0 : complete(errno, 0);
389 : }
390 : }
391 :
392 : // Defined in acceptors.cpp where select_acceptor_impl is complete
393 : void operator()() override;
394 : void cancel() noexcept override;
395 : };
396 :
397 : } // namespace boost::corosio::detail
398 :
399 : #endif // BOOST_COROSIO_HAS_SELECT
400 :
401 : #endif // BOOST_COROSIO_DETAIL_SELECT_OP_HPP
|