Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #include <boost/corosio/detail/platform.hpp>
11 :
12 : #if BOOST_COROSIO_HAS_SELECT
13 :
14 : #include "src/detail/select/acceptors.hpp"
15 : #include "src/detail/select/sockets.hpp"
16 : #include "src/detail/endpoint_convert.hpp"
17 : #include "src/detail/make_err.hpp"
18 :
19 : #include <errno.h>
20 : #include <fcntl.h>
21 : #include <netinet/in.h>
22 : #include <sys/socket.h>
23 : #include <unistd.h>
24 :
25 : namespace boost::corosio::detail {
26 :
27 : void
28 0 : select_accept_op::
29 : cancel() noexcept
30 : {
31 0 : if (acceptor_impl_)
32 0 : acceptor_impl_->cancel_single_op(*this);
33 : else
34 0 : request_cancel();
35 0 : }
36 :
37 : void
38 3837 : select_accept_op::
39 : operator()()
40 : {
41 3837 : stop_cb.reset();
42 :
43 3837 : bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
44 :
45 3837 : if (ec_out)
46 : {
47 3837 : if (cancelled.load(std::memory_order_acquire))
48 3 : *ec_out = capy::error::canceled;
49 3834 : else if (errn != 0)
50 0 : *ec_out = make_err(errn);
51 : else
52 3834 : *ec_out = {};
53 : }
54 :
55 3837 : if (success && accepted_fd >= 0)
56 : {
57 3834 : if (acceptor_impl_)
58 : {
59 3834 : auto* socket_svc = static_cast<select_acceptor_impl*>(acceptor_impl_)
60 3834 : ->service().socket_service();
61 3834 : if (socket_svc)
62 : {
63 3834 : auto& impl = static_cast<select_socket_impl&>(socket_svc->create_impl());
64 3834 : impl.set_socket(accepted_fd);
65 :
66 3834 : sockaddr_in local_addr{};
67 3834 : socklen_t local_len = sizeof(local_addr);
68 3834 : sockaddr_in remote_addr{};
69 3834 : socklen_t remote_len = sizeof(remote_addr);
70 :
71 3834 : endpoint local_ep, remote_ep;
72 3834 : if (::getsockname(accepted_fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
73 3834 : local_ep = from_sockaddr_in(local_addr);
74 3834 : if (::getpeername(accepted_fd, reinterpret_cast<sockaddr*>(&remote_addr), &remote_len) == 0)
75 3834 : remote_ep = from_sockaddr_in(remote_addr);
76 :
77 3834 : impl.set_endpoints(local_ep, remote_ep);
78 :
79 3834 : if (impl_out)
80 3834 : *impl_out = &impl;
81 :
82 3834 : accepted_fd = -1;
83 : }
84 : else
85 : {
86 0 : if (ec_out && !*ec_out)
87 0 : *ec_out = make_err(ENOENT);
88 0 : ::close(accepted_fd);
89 0 : accepted_fd = -1;
90 0 : if (impl_out)
91 0 : *impl_out = nullptr;
92 : }
93 : }
94 : else
95 : {
96 0 : ::close(accepted_fd);
97 0 : accepted_fd = -1;
98 0 : if (impl_out)
99 0 : *impl_out = nullptr;
100 : }
101 3834 : }
102 : else
103 : {
104 3 : if (accepted_fd >= 0)
105 : {
106 0 : ::close(accepted_fd);
107 0 : accepted_fd = -1;
108 : }
109 :
110 3 : if (peer_impl)
111 : {
112 0 : peer_impl->release();
113 0 : peer_impl = nullptr;
114 : }
115 :
116 3 : if (impl_out)
117 3 : *impl_out = nullptr;
118 : }
119 :
120 : // Move to stack before destroying the frame
121 3837 : capy::executor_ref saved_ex( std::move( ex ) );
122 3837 : capy::coro saved_h( std::move( h ) );
123 3837 : impl_ptr.reset();
124 3837 : saved_ex.dispatch( saved_h );
125 3837 : }
126 :
127 54 : select_acceptor_impl::
128 54 : select_acceptor_impl(select_acceptor_service& svc) noexcept
129 54 : : svc_(svc)
130 : {
131 54 : }
132 :
133 : void
134 54 : select_acceptor_impl::
135 : release()
136 : {
137 54 : close_socket();
138 54 : svc_.destroy_acceptor_impl(*this);
139 54 : }
140 :
141 : std::coroutine_handle<>
142 3837 : select_acceptor_impl::
143 : accept(
144 : std::coroutine_handle<> h,
145 : capy::executor_ref ex,
146 : std::stop_token token,
147 : std::error_code* ec,
148 : io_object::io_object_impl** impl_out)
149 : {
150 3837 : auto& op = acc_;
151 3837 : op.reset();
152 3837 : op.h = h;
153 3837 : op.ex = ex;
154 3837 : op.ec_out = ec;
155 3837 : op.impl_out = impl_out;
156 3837 : op.fd = fd_;
157 3837 : op.start(token, this);
158 :
159 3837 : sockaddr_in addr{};
160 3837 : socklen_t addrlen = sizeof(addr);
161 3837 : int accepted = ::accept(fd_, reinterpret_cast<sockaddr*>(&addr), &addrlen);
162 :
163 3837 : if (accepted >= 0)
164 : {
165 : // Reject fds that exceed select()'s FD_SETSIZE limit.
166 : // Better to fail now than during later async operations.
167 2 : if (accepted >= FD_SETSIZE)
168 : {
169 0 : ::close(accepted);
170 0 : op.accepted_fd = -1;
171 0 : op.complete(EINVAL, 0);
172 0 : op.impl_ptr = shared_from_this();
173 0 : svc_.post(&op);
174 : // completion is always posted to scheduler queue, never inline.
175 0 : return std::noop_coroutine();
176 : }
177 :
178 : // Set non-blocking and close-on-exec flags.
179 : // A non-blocking socket is essential for the async reactor;
180 : // if we can't configure it, fail rather than risk blocking.
181 2 : int flags = ::fcntl(accepted, F_GETFL, 0);
182 2 : if (flags == -1)
183 : {
184 0 : int err = errno;
185 0 : ::close(accepted);
186 0 : op.accepted_fd = -1;
187 0 : op.complete(err, 0);
188 0 : op.impl_ptr = shared_from_this();
189 0 : svc_.post(&op);
190 : // completion is always posted to scheduler queue, never inline.
191 0 : return std::noop_coroutine();
192 : }
193 :
194 2 : if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
195 : {
196 0 : int err = errno;
197 0 : ::close(accepted);
198 0 : op.accepted_fd = -1;
199 0 : op.complete(err, 0);
200 0 : op.impl_ptr = shared_from_this();
201 0 : svc_.post(&op);
202 : // completion is always posted to scheduler queue, never inline.
203 0 : return std::noop_coroutine();
204 : }
205 :
206 2 : if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
207 : {
208 0 : int err = errno;
209 0 : ::close(accepted);
210 0 : op.accepted_fd = -1;
211 0 : op.complete(err, 0);
212 0 : op.impl_ptr = shared_from_this();
213 0 : svc_.post(&op);
214 : // completion is always posted to scheduler queue, never inline.
215 0 : return std::noop_coroutine();
216 : }
217 :
218 2 : op.accepted_fd = accepted;
219 2 : op.complete(0, 0);
220 2 : op.impl_ptr = shared_from_this();
221 2 : svc_.post(&op);
222 : // completion is always posted to scheduler queue, never inline.
223 2 : return std::noop_coroutine();
224 : }
225 :
226 3835 : if (errno == EAGAIN || errno == EWOULDBLOCK)
227 : {
228 3835 : svc_.work_started();
229 3835 : op.impl_ptr = shared_from_this();
230 :
231 : // Set registering BEFORE register_fd to close the race window where
232 : // reactor sees an event before we set registered.
233 3835 : op.registered.store(select_registration_state::registering, std::memory_order_release);
234 3835 : svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
235 :
236 : // Transition to registered. If this fails, reactor or cancel already
237 : // claimed the op (state is now unregistered), so we're done. However,
238 : // we must still deregister the fd because cancel's deregister_fd may
239 : // have run before our register_fd, leaving the fd orphaned.
240 3835 : auto expected = select_registration_state::registering;
241 3835 : if (!op.registered.compare_exchange_strong(
242 : expected, select_registration_state::registered, std::memory_order_acq_rel))
243 : {
244 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
245 : // completion is always posted to scheduler queue, never inline.
246 0 : return std::noop_coroutine();
247 : }
248 :
249 : // If cancelled was set before we registered, handle it now.
250 3835 : if (op.cancelled.load(std::memory_order_acquire))
251 : {
252 0 : auto prev = op.registered.exchange(
253 : select_registration_state::unregistered, std::memory_order_acq_rel);
254 0 : if (prev != select_registration_state::unregistered)
255 : {
256 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
257 0 : op.impl_ptr = shared_from_this();
258 0 : svc_.post(&op);
259 0 : svc_.work_finished();
260 : }
261 : }
262 : // completion is always posted to scheduler queue, never inline.
263 3835 : return std::noop_coroutine();
264 : }
265 :
266 0 : op.complete(errno, 0);
267 0 : op.impl_ptr = shared_from_this();
268 0 : svc_.post(&op);
269 : // completion is always posted to scheduler queue, never inline.
270 0 : return std::noop_coroutine();
271 : }
272 :
273 : void
274 109 : select_acceptor_impl::
275 : cancel() noexcept
276 : {
277 109 : std::shared_ptr<select_acceptor_impl> self;
278 : try {
279 109 : self = shared_from_this();
280 0 : } catch (const std::bad_weak_ptr&) {
281 0 : return;
282 0 : }
283 :
284 109 : auto prev = acc_.registered.exchange(
285 : select_registration_state::unregistered, std::memory_order_acq_rel);
286 109 : acc_.request_cancel();
287 :
288 109 : if (prev != select_registration_state::unregistered)
289 : {
290 3 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
291 3 : acc_.impl_ptr = self;
292 3 : svc_.post(&acc_);
293 3 : svc_.work_finished();
294 : }
295 109 : }
296 :
297 : void
298 0 : select_acceptor_impl::
299 : cancel_single_op(select_op& op) noexcept
300 : {
301 : // Called from stop_token callback to cancel a specific pending operation.
302 0 : auto prev = op.registered.exchange(
303 : select_registration_state::unregistered, std::memory_order_acq_rel);
304 0 : op.request_cancel();
305 :
306 0 : if (prev != select_registration_state::unregistered)
307 : {
308 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
309 :
310 : // Keep impl alive until op completes
311 : try {
312 0 : op.impl_ptr = shared_from_this();
313 0 : } catch (const std::bad_weak_ptr&) {
314 : // Impl is being destroyed, op will be orphaned but that's ok
315 0 : }
316 :
317 0 : svc_.post(&op);
318 0 : svc_.work_finished();
319 : }
320 0 : }
321 :
322 : void
323 108 : select_acceptor_impl::
324 : close_socket() noexcept
325 : {
326 108 : cancel();
327 :
328 108 : if (fd_ >= 0)
329 : {
330 : // Unconditionally remove from registered_fds_ to handle edge cases
331 42 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
332 42 : ::close(fd_);
333 42 : fd_ = -1;
334 : }
335 :
336 : // Clear cached endpoint
337 108 : local_endpoint_ = endpoint{};
338 108 : }
339 :
340 120 : select_acceptor_service::
341 120 : select_acceptor_service(capy::execution_context& ctx)
342 120 : : ctx_(ctx)
343 120 : , state_(std::make_unique<select_acceptor_state>(ctx.use_service<select_scheduler>()))
344 : {
345 120 : }
346 :
347 240 : select_acceptor_service::
348 120 : ~select_acceptor_service()
349 : {
350 240 : }
351 :
352 : void
353 120 : select_acceptor_service::
354 : shutdown()
355 : {
356 120 : std::lock_guard lock(state_->mutex_);
357 :
358 120 : while (auto* impl = state_->acceptor_list_.pop_front())
359 0 : impl->close_socket();
360 :
361 120 : state_->acceptor_ptrs_.clear();
362 120 : }
363 :
364 : tcp_acceptor::acceptor_impl&
365 54 : select_acceptor_service::
366 : create_acceptor_impl()
367 : {
368 54 : auto impl = std::make_shared<select_acceptor_impl>(*this);
369 54 : auto* raw = impl.get();
370 :
371 54 : std::lock_guard lock(state_->mutex_);
372 54 : state_->acceptor_list_.push_back(raw);
373 54 : state_->acceptor_ptrs_.emplace(raw, std::move(impl));
374 :
375 54 : return *raw;
376 54 : }
377 :
378 : void
379 54 : select_acceptor_service::
380 : destroy_acceptor_impl(tcp_acceptor::acceptor_impl& impl)
381 : {
382 54 : auto* select_impl = static_cast<select_acceptor_impl*>(&impl);
383 54 : std::lock_guard lock(state_->mutex_);
384 54 : state_->acceptor_list_.remove(select_impl);
385 54 : state_->acceptor_ptrs_.erase(select_impl);
386 54 : }
387 :
388 : std::error_code
389 54 : select_acceptor_service::
390 : open_acceptor(
391 : tcp_acceptor::acceptor_impl& impl,
392 : endpoint ep,
393 : int backlog)
394 : {
395 54 : auto* select_impl = static_cast<select_acceptor_impl*>(&impl);
396 54 : select_impl->close_socket();
397 :
398 54 : int fd = ::socket(AF_INET, SOCK_STREAM, 0);
399 54 : if (fd < 0)
400 0 : return make_err(errno);
401 :
402 : // Set non-blocking and close-on-exec
403 54 : int flags = ::fcntl(fd, F_GETFL, 0);
404 54 : if (flags == -1)
405 : {
406 0 : int errn = errno;
407 0 : ::close(fd);
408 0 : return make_err(errn);
409 : }
410 54 : if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
411 : {
412 0 : int errn = errno;
413 0 : ::close(fd);
414 0 : return make_err(errn);
415 : }
416 54 : if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
417 : {
418 0 : int errn = errno;
419 0 : ::close(fd);
420 0 : return make_err(errn);
421 : }
422 :
423 : // Check fd is within select() limits
424 54 : if (fd >= FD_SETSIZE)
425 : {
426 0 : ::close(fd);
427 0 : return make_err(EMFILE);
428 : }
429 :
430 54 : int reuse = 1;
431 54 : ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
432 :
433 54 : sockaddr_in addr = detail::to_sockaddr_in(ep);
434 54 : if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
435 : {
436 12 : int errn = errno;
437 12 : ::close(fd);
438 12 : return make_err(errn);
439 : }
440 :
441 42 : if (::listen(fd, backlog) < 0)
442 : {
443 0 : int errn = errno;
444 0 : ::close(fd);
445 0 : return make_err(errn);
446 : }
447 :
448 42 : select_impl->fd_ = fd;
449 :
450 : // Cache the local endpoint (queries OS for ephemeral port if port was 0)
451 42 : sockaddr_in local_addr{};
452 42 : socklen_t local_len = sizeof(local_addr);
453 42 : if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
454 42 : select_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
455 :
456 42 : return {};
457 : }
458 :
459 : void
460 5 : select_acceptor_service::
461 : post(select_op* op)
462 : {
463 5 : state_->sched_.post(op);
464 5 : }
465 :
466 : void
467 3835 : select_acceptor_service::
468 : work_started() noexcept
469 : {
470 3835 : state_->sched_.work_started();
471 3835 : }
472 :
473 : void
474 3 : select_acceptor_service::
475 : work_finished() noexcept
476 : {
477 3 : state_->sched_.work_finished();
478 3 : }
479 :
480 : select_socket_service*
481 3834 : select_acceptor_service::
482 : socket_service() const noexcept
483 : {
484 3834 : auto* svc = ctx_.find_service<detail::socket_service>();
485 3834 : return svc ? dynamic_cast<select_socket_service*>(svc) : nullptr;
486 : }
487 :
488 : } // namespace boost::corosio::detail
489 :
490 : #endif // BOOST_COROSIO_HAS_SELECT
|