Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
11 : #define BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
12 :
13 : #include <boost/corosio/detail/platform.hpp>
14 :
15 : #if BOOST_COROSIO_HAS_EPOLL
16 :
17 : #include <boost/corosio/detail/config.hpp>
18 : #include <boost/corosio/io_object.hpp>
19 : #include <boost/corosio/endpoint.hpp>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <boost/capy/coro.hpp>
22 : #include <boost/capy/error.hpp>
23 : #include <system_error>
24 :
25 : #include "src/detail/make_err.hpp"
26 : #include "src/detail/resume_coro.hpp"
27 : #include "src/detail/scheduler_op.hpp"
28 : #include "src/detail/endpoint_convert.hpp"
29 :
30 : #include <unistd.h>
31 : #include <errno.h>
32 :
33 : #include <atomic>
34 : #include <cstddef>
35 : #include <memory>
36 : #include <mutex>
37 : #include <optional>
38 : #include <stop_token>
39 :
40 : #include <netinet/in.h>
41 : #include <sys/socket.h>
42 : #include <sys/uio.h>
43 :
44 : /*
45 : epoll Operation State
46 : =====================
47 :
48 : Each async I/O operation has a corresponding epoll_op-derived struct that
49 : holds the operation's state while it's in flight. The socket impl owns
50 : fixed slots for each operation type (conn_, rd_, wr_), so only one
51 : operation of each type can be pending per socket at a time.
52 :
53 : Persistent Registration
54 : -----------------------
55 : File descriptors are registered with epoll once (via descriptor_state) and
56 : stay registered until closed. The descriptor_state tracks which operations
57 : are pending (read_op, write_op, connect_op). When an event arrives, the
58 : reactor dispatches to the appropriate pending operation.
59 :
60 : Impl Lifetime Management
61 : ------------------------
62 : When cancel() posts an op to the scheduler's ready queue, the socket impl
63 : might be destroyed before the scheduler processes the op. The `impl_ptr`
64 : member holds a shared_ptr to the impl, keeping it alive until the op
65 : completes. This is set by cancel() and cleared in operator() after the
66 : coroutine is resumed.
67 :
68 : EOF Detection
69 : -------------
70 : For reads, 0 bytes with no error means EOF. But an empty user buffer also
71 : returns 0 bytes. The `empty_buffer_read` flag distinguishes these cases.
72 :
73 : SIGPIPE Prevention
74 : ------------------
75 : Writes use sendmsg() with MSG_NOSIGNAL instead of writev() to prevent
76 : SIGPIPE when the peer has closed.
77 : */
78 :
79 : namespace boost::corosio::detail {
80 :
81 : // Forward declarations
82 : class epoll_socket_impl;
83 : class epoll_acceptor_impl;
84 : struct epoll_op;
85 :
86 : // Forward declaration
87 : class epoll_scheduler;
88 :
89 : /** Per-descriptor state for persistent epoll registration.
90 :
91 : Tracks pending operations for a file descriptor. The fd is registered
92 : once with epoll and stays registered until closed.
93 :
94 : This struct extends scheduler_op to support deferred I/O processing.
95 : When epoll events arrive, the reactor sets ready_events and queues
96 : this descriptor for processing. When popped from the scheduler queue,
97 : operator() performs the actual I/O and queues completion handlers.
98 :
99 : @par Deferred I/O Model
100 : The reactor no longer performs I/O directly. Instead:
101 : 1. Reactor sets ready_events and queues descriptor_state
102 : 2. Scheduler pops descriptor_state and calls operator()
103 : 3. operator() performs I/O under mutex and queues completions
104 :
105 : This eliminates per-descriptor mutex locking from the reactor hot path.
106 :
107 : @par Thread Safety
108 : The mutex protects operation pointers and ready flags during I/O.
109 : ready_events_ and is_enqueued_ are atomic for lock-free reactor access.
110 : */
111 : struct descriptor_state : scheduler_op
112 : {
113 : std::mutex mutex;
114 :
115 : // Protected by mutex
116 : epoll_op* read_op = nullptr;
117 : epoll_op* write_op = nullptr;
118 : epoll_op* connect_op = nullptr;
119 :
120 : // Caches edge events that arrived before an op was registered
121 : bool read_ready = false;
122 : bool write_ready = false;
123 :
124 : // Set during registration only (no mutex needed)
125 : std::uint32_t registered_events = 0;
126 : int fd = -1;
127 :
128 : // For deferred I/O - set by reactor, read by scheduler
129 : std::atomic<std::uint32_t> ready_events_{0};
130 : std::atomic<bool> is_enqueued_{false};
131 : epoll_scheduler const* scheduler_ = nullptr;
132 :
133 : // Prevents impl destruction while this descriptor_state is queued.
134 : // Set by close_socket() when is_enqueued_ is true, cleared by operator().
135 : std::shared_ptr<void> impl_ref_;
136 :
137 : /// Add ready events atomically.
138 125169 : void add_ready_events(std::uint32_t ev) noexcept
139 : {
140 125169 : ready_events_.fetch_or(ev, std::memory_order_relaxed);
141 125169 : }
142 :
143 : /// Perform deferred I/O and queue completions.
144 : void operator()() override;
145 :
146 : /// Destroy without invoking.
147 0 : void destroy() override {}
148 : };
149 :
150 : struct epoll_op : scheduler_op
151 : {
152 : struct canceller
153 : {
154 : epoll_op* op;
155 : void operator()() const noexcept;
156 : };
157 :
158 : capy::coro h;
159 : capy::executor_ref ex;
160 : std::error_code* ec_out = nullptr;
161 : std::size_t* bytes_out = nullptr;
162 :
163 : int fd = -1;
164 : int errn = 0;
165 : std::size_t bytes_transferred = 0;
166 :
167 : std::atomic<bool> cancelled{false};
168 : std::optional<std::stop_callback<canceller>> stop_cb;
169 :
170 : // Prevents use-after-free when socket is closed with pending ops.
171 : // See "Impl Lifetime Management" in file header.
172 : std::shared_ptr<void> impl_ptr;
173 :
174 : // For stop_token cancellation - pointer to owning socket/acceptor impl.
175 : // When stop is requested, we call back to the impl to perform actual I/O cancellation.
176 : epoll_socket_impl* socket_impl_ = nullptr;
177 : epoll_acceptor_impl* acceptor_impl_ = nullptr;
178 :
179 30210 : epoll_op() = default;
180 :
181 240281 : void reset() noexcept
182 : {
183 240281 : fd = -1;
184 240281 : errn = 0;
185 240281 : bytes_transferred = 0;
186 240281 : cancelled.store(false, std::memory_order_relaxed);
187 240281 : impl_ptr.reset();
188 240281 : socket_impl_ = nullptr;
189 240281 : acceptor_impl_ = nullptr;
190 240281 : }
191 :
192 230237 : void operator()() override
193 : {
194 230237 : stop_cb.reset();
195 :
196 230237 : if (ec_out)
197 : {
198 230237 : if (cancelled.load(std::memory_order_acquire))
199 206 : *ec_out = capy::error::canceled;
200 230031 : else if (errn != 0)
201 1 : *ec_out = make_err(errn);
202 230030 : else if (is_read_operation() && bytes_transferred == 0)
203 5 : *ec_out = capy::error::eof;
204 : else
205 230025 : *ec_out = {};
206 : }
207 :
208 230237 : if (bytes_out)
209 230237 : *bytes_out = bytes_transferred;
210 :
211 : // Move to stack before resuming coroutine. The coroutine might close
212 : // the socket, releasing the last wrapper ref. If impl_ptr were the
213 : // last ref and we destroyed it while still in operator(), we'd have
214 : // use-after-free. Moving to local ensures destruction happens at
215 : // function exit, after all member accesses are complete.
216 230237 : capy::executor_ref saved_ex( std::move( ex ) );
217 230237 : capy::coro saved_h( std::move( h ) );
218 230237 : auto prevent_premature_destruction = std::move(impl_ptr);
219 230237 : resume_coro(saved_ex, saved_h);
220 230237 : }
221 :
222 115052 : virtual bool is_read_operation() const noexcept { return false; }
223 : virtual void cancel() noexcept = 0;
224 :
225 0 : void destroy() override
226 : {
227 0 : stop_cb.reset();
228 0 : impl_ptr.reset();
229 0 : }
230 :
231 45769 : void request_cancel() noexcept
232 : {
233 45769 : cancelled.store(true, std::memory_order_release);
234 45769 : }
235 :
236 235255 : void start(std::stop_token token, epoll_socket_impl* impl)
237 : {
238 235255 : cancelled.store(false, std::memory_order_release);
239 235255 : stop_cb.reset();
240 235255 : socket_impl_ = impl;
241 235255 : acceptor_impl_ = nullptr;
242 :
243 235255 : if (token.stop_possible())
244 106 : stop_cb.emplace(token, canceller{this});
245 235255 : }
246 :
247 5026 : void start(std::stop_token token, epoll_acceptor_impl* impl)
248 : {
249 5026 : cancelled.store(false, std::memory_order_release);
250 5026 : stop_cb.reset();
251 5026 : socket_impl_ = nullptr;
252 5026 : acceptor_impl_ = impl;
253 :
254 5026 : if (token.stop_possible())
255 9 : stop_cb.emplace(token, canceller{this});
256 5026 : }
257 :
258 240199 : void complete(int err, std::size_t bytes) noexcept
259 : {
260 240199 : errn = err;
261 240199 : bytes_transferred = bytes;
262 240199 : }
263 :
264 0 : virtual void perform_io() noexcept {}
265 : };
266 :
267 :
268 : struct epoll_connect_op : epoll_op
269 : {
270 : endpoint target_endpoint;
271 :
272 5018 : void reset() noexcept
273 : {
274 5018 : epoll_op::reset();
275 5018 : target_endpoint = endpoint{};
276 5018 : }
277 :
278 5018 : void perform_io() noexcept override
279 : {
280 : // connect() completion status is retrieved via SO_ERROR, not return value
281 5018 : int err = 0;
282 5018 : socklen_t len = sizeof(err);
283 5018 : if (::getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &len) < 0)
284 0 : err = errno;
285 5018 : complete(err, 0);
286 5018 : }
287 :
288 : // Defined in sockets.cpp where epoll_socket_impl is complete
289 : void operator()() override;
290 : void cancel() noexcept override;
291 : };
292 :
293 :
294 : struct epoll_read_op : epoll_op
295 : {
296 : static constexpr std::size_t max_buffers = 16;
297 : iovec iovecs[max_buffers];
298 : int iovec_count = 0;
299 : bool empty_buffer_read = false;
300 :
301 114978 : bool is_read_operation() const noexcept override
302 : {
303 114978 : return !empty_buffer_read;
304 : }
305 :
306 115180 : void reset() noexcept
307 : {
308 115180 : epoll_op::reset();
309 115180 : iovec_count = 0;
310 115180 : empty_buffer_read = false;
311 115180 : }
312 :
313 98 : void perform_io() noexcept override
314 : {
315 98 : ssize_t n = ::readv(fd, iovecs, iovec_count);
316 98 : if (n >= 0)
317 52 : complete(0, static_cast<std::size_t>(n));
318 : else
319 46 : complete(errno, 0);
320 98 : }
321 :
322 : void cancel() noexcept override;
323 : };
324 :
325 :
326 : struct epoll_write_op : epoll_op
327 : {
328 : static constexpr std::size_t max_buffers = 16;
329 : iovec iovecs[max_buffers];
330 : int iovec_count = 0;
331 :
332 115057 : void reset() noexcept
333 : {
334 115057 : epoll_op::reset();
335 115057 : iovec_count = 0;
336 115057 : }
337 :
338 0 : void perform_io() noexcept override
339 : {
340 0 : msghdr msg{};
341 0 : msg.msg_iov = iovecs;
342 0 : msg.msg_iovlen = static_cast<std::size_t>(iovec_count);
343 :
344 0 : ssize_t n = ::sendmsg(fd, &msg, MSG_NOSIGNAL);
345 0 : if (n >= 0)
346 0 : complete(0, static_cast<std::size_t>(n));
347 : else
348 0 : complete(errno, 0);
349 0 : }
350 :
351 : void cancel() noexcept override;
352 : };
353 :
354 :
355 : struct epoll_accept_op : epoll_op
356 : {
357 : int accepted_fd = -1;
358 : io_object::io_object_impl* peer_impl = nullptr;
359 : io_object::io_object_impl** impl_out = nullptr;
360 :
361 5026 : void reset() noexcept
362 : {
363 5026 : epoll_op::reset();
364 5026 : accepted_fd = -1;
365 5026 : peer_impl = nullptr;
366 5026 : impl_out = nullptr;
367 5026 : }
368 :
369 5015 : void perform_io() noexcept override
370 : {
371 5015 : sockaddr_in addr{};
372 5015 : socklen_t addrlen = sizeof(addr);
373 5015 : int new_fd = ::accept4(fd, reinterpret_cast<sockaddr*>(&addr),
374 : &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
375 :
376 5015 : if (new_fd >= 0)
377 : {
378 5015 : accepted_fd = new_fd;
379 5015 : complete(0, 0);
380 : }
381 : else
382 : {
383 0 : complete(errno, 0);
384 : }
385 5015 : }
386 :
387 : // Defined in acceptors.cpp where epoll_acceptor_impl is complete
388 : void operator()() override;
389 : void cancel() noexcept override;
390 : };
391 :
392 : } // namespace boost::corosio::detail
393 :
394 : #endif // BOOST_COROSIO_HAS_EPOLL
395 :
396 : #endif // BOOST_COROSIO_DETAIL_EPOLL_OP_HPP
|