libs/corosio/src/corosio/src/detail/select/acceptors.cpp

64.3% Lines (160/249) 88.9% Functions (16/18) 45.5% Branches (66/145)
libs/corosio/src/corosio/src/detail/select/acceptors.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_SELECT
13
14 #include "src/detail/select/acceptors.hpp"
15 #include "src/detail/select/sockets.hpp"
16 #include "src/detail/endpoint_convert.hpp"
17 #include "src/detail/make_err.hpp"
18
19 #include <errno.h>
20 #include <fcntl.h>
21 #include <netinet/in.h>
22 #include <sys/socket.h>
23 #include <unistd.h>
24
25 namespace boost::corosio::detail {
26
27 void
28 select_accept_op::
29 cancel() noexcept
30 {
31 if (acceptor_impl_)
32 acceptor_impl_->cancel_single_op(*this);
33 else
34 request_cancel();
35 }
36
37 void
38 3837 select_accept_op::
39 operator()()
40 {
41 3837 stop_cb.reset();
42
43
3/4
✓ Branch 0 taken 3837 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 3834 times.
✓ Branch 4 taken 3 times.
3837 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
44
45
1/2
✓ Branch 0 taken 3837 times.
✗ Branch 1 not taken.
3837 if (ec_out)
46 {
47
2/2
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 3834 times.
3837 if (cancelled.load(std::memory_order_acquire))
48 3 *ec_out = capy::error::canceled;
49
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3834 times.
3834 else if (errn != 0)
50 *ec_out = make_err(errn);
51 else
52 3834 *ec_out = {};
53 }
54
55
3/4
✓ Branch 0 taken 3834 times.
✓ Branch 1 taken 3 times.
✓ Branch 2 taken 3834 times.
✗ Branch 3 not taken.
3837 if (success && accepted_fd >= 0)
56 {
57
1/2
✓ Branch 0 taken 3834 times.
✗ Branch 1 not taken.
3834 if (acceptor_impl_)
58 {
59 3834 auto* socket_svc = static_cast<select_acceptor_impl*>(acceptor_impl_)
60 3834 ->service().socket_service();
61
1/2
✓ Branch 0 taken 3834 times.
✗ Branch 1 not taken.
3834 if (socket_svc)
62 {
63
1/1
✓ Branch 1 taken 3834 times.
3834 auto& impl = static_cast<select_socket_impl&>(socket_svc->create_impl());
64 3834 impl.set_socket(accepted_fd);
65
66 3834 sockaddr_in local_addr{};
67 3834 socklen_t local_len = sizeof(local_addr);
68 3834 sockaddr_in remote_addr{};
69 3834 socklen_t remote_len = sizeof(remote_addr);
70
71 3834 endpoint local_ep, remote_ep;
72
1/2
✓ Branch 1 taken 3834 times.
✗ Branch 2 not taken.
3834 if (::getsockname(accepted_fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
73 3834 local_ep = from_sockaddr_in(local_addr);
74
1/2
✓ Branch 1 taken 3834 times.
✗ Branch 2 not taken.
3834 if (::getpeername(accepted_fd, reinterpret_cast<sockaddr*>(&remote_addr), &remote_len) == 0)
75 3834 remote_ep = from_sockaddr_in(remote_addr);
76
77 3834 impl.set_endpoints(local_ep, remote_ep);
78
79
1/2
✓ Branch 0 taken 3834 times.
✗ Branch 1 not taken.
3834 if (impl_out)
80 3834 *impl_out = &impl;
81
82 3834 accepted_fd = -1;
83 }
84 else
85 {
86 if (ec_out && !*ec_out)
87 *ec_out = make_err(ENOENT);
88 ::close(accepted_fd);
89 accepted_fd = -1;
90 if (impl_out)
91 *impl_out = nullptr;
92 }
93 }
94 else
95 {
96 ::close(accepted_fd);
97 accepted_fd = -1;
98 if (impl_out)
99 *impl_out = nullptr;
100 }
101 3834 }
102 else
103 {
104
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (accepted_fd >= 0)
105 {
106 ::close(accepted_fd);
107 accepted_fd = -1;
108 }
109
110
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if (peer_impl)
111 {
112 peer_impl->release();
113 peer_impl = nullptr;
114 }
115
116
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if (impl_out)
117 3 *impl_out = nullptr;
118 }
119
120 // Move to stack before destroying the frame
121 3837 capy::executor_ref saved_ex( std::move( ex ) );
122 3837 capy::coro saved_h( std::move( h ) );
123 3837 impl_ptr.reset();
124
1/1
✓ Branch 1 taken 3837 times.
3837 saved_ex.dispatch( saved_h );
125 3837 }
126
127 54 select_acceptor_impl::
128 54 select_acceptor_impl(select_acceptor_service& svc) noexcept
129 54 : svc_(svc)
130 {
131 54 }
132
133 void
134 54 select_acceptor_impl::
135 release()
136 {
137 54 close_socket();
138 54 svc_.destroy_acceptor_impl(*this);
139 54 }
140
141 std::coroutine_handle<>
142 3837 select_acceptor_impl::
143 accept(
144 std::coroutine_handle<> h,
145 capy::executor_ref ex,
146 std::stop_token token,
147 std::error_code* ec,
148 io_object::io_object_impl** impl_out)
149 {
150 3837 auto& op = acc_;
151 3837 op.reset();
152 3837 op.h = h;
153 3837 op.ex = ex;
154 3837 op.ec_out = ec;
155 3837 op.impl_out = impl_out;
156 3837 op.fd = fd_;
157 3837 op.start(token, this);
158
159 3837 sockaddr_in addr{};
160 3837 socklen_t addrlen = sizeof(addr);
161
1/1
✓ Branch 1 taken 3837 times.
3837 int accepted = ::accept(fd_, reinterpret_cast<sockaddr*>(&addr), &addrlen);
162
163
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 3835 times.
3837 if (accepted >= 0)
164 {
165 // Reject fds that exceed select()'s FD_SETSIZE limit.
166 // Better to fail now than during later async operations.
167
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (accepted >= FD_SETSIZE)
168 {
169 ::close(accepted);
170 op.accepted_fd = -1;
171 op.complete(EINVAL, 0);
172 op.impl_ptr = shared_from_this();
173 svc_.post(&op);
174 // completion is always posted to scheduler queue, never inline.
175 return std::noop_coroutine();
176 }
177
178 // Set non-blocking and close-on-exec flags.
179 // A non-blocking socket is essential for the async reactor;
180 // if we can't configure it, fail rather than risk blocking.
181
1/1
✓ Branch 1 taken 2 times.
2 int flags = ::fcntl(accepted, F_GETFL, 0);
182
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 2 times.
2 if (flags == -1)
183 {
184 int err = errno;
185 ::close(accepted);
186 op.accepted_fd = -1;
187 op.complete(err, 0);
188 op.impl_ptr = shared_from_this();
189 svc_.post(&op);
190 // completion is always posted to scheduler queue, never inline.
191 return std::noop_coroutine();
192 }
193
194
2/3
✓ Branch 1 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 2 times.
2 if (::fcntl(accepted, F_SETFL, flags | O_NONBLOCK) == -1)
195 {
196 int err = errno;
197 ::close(accepted);
198 op.accepted_fd = -1;
199 op.complete(err, 0);
200 op.impl_ptr = shared_from_this();
201 svc_.post(&op);
202 // completion is always posted to scheduler queue, never inline.
203 return std::noop_coroutine();
204 }
205
206
2/3
✓ Branch 1 taken 2 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 2 times.
2 if (::fcntl(accepted, F_SETFD, FD_CLOEXEC) == -1)
207 {
208 int err = errno;
209 ::close(accepted);
210 op.accepted_fd = -1;
211 op.complete(err, 0);
212 op.impl_ptr = shared_from_this();
213 svc_.post(&op);
214 // completion is always posted to scheduler queue, never inline.
215 return std::noop_coroutine();
216 }
217
218 2 op.accepted_fd = accepted;
219 2 op.complete(0, 0);
220
1/1
✓ Branch 1 taken 2 times.
2 op.impl_ptr = shared_from_this();
221
1/1
✓ Branch 1 taken 2 times.
2 svc_.post(&op);
222 // completion is always posted to scheduler queue, never inline.
223 2 return std::noop_coroutine();
224 }
225
226
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 3835 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
3835 if (errno == EAGAIN || errno == EWOULDBLOCK)
227 {
228 3835 svc_.work_started();
229
1/1
✓ Branch 1 taken 3835 times.
3835 op.impl_ptr = shared_from_this();
230
231 // Set registering BEFORE register_fd to close the race window where
232 // reactor sees an event before we set registered.
233 3835 op.registered.store(select_registration_state::registering, std::memory_order_release);
234
1/1
✓ Branch 2 taken 3835 times.
3835 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
235
236 // Transition to registered. If this fails, reactor or cancel already
237 // claimed the op (state is now unregistered), so we're done. However,
238 // we must still deregister the fd because cancel's deregister_fd may
239 // have run before our register_fd, leaving the fd orphaned.
240 3835 auto expected = select_registration_state::registering;
241
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3835 times.
3835 if (!op.registered.compare_exchange_strong(
242 expected, select_registration_state::registered, std::memory_order_acq_rel))
243 {
244 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
245 // completion is always posted to scheduler queue, never inline.
246 return std::noop_coroutine();
247 }
248
249 // If cancelled was set before we registered, handle it now.
250
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3835 times.
3835 if (op.cancelled.load(std::memory_order_acquire))
251 {
252 auto prev = op.registered.exchange(
253 select_registration_state::unregistered, std::memory_order_acq_rel);
254 if (prev != select_registration_state::unregistered)
255 {
256 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
257 op.impl_ptr = shared_from_this();
258 svc_.post(&op);
259 svc_.work_finished();
260 }
261 }
262 // completion is always posted to scheduler queue, never inline.
263 3835 return std::noop_coroutine();
264 }
265
266 op.complete(errno, 0);
267 op.impl_ptr = shared_from_this();
268 svc_.post(&op);
269 // completion is always posted to scheduler queue, never inline.
270 return std::noop_coroutine();
271 }
272
273 void
274 109 select_acceptor_impl::
275 cancel() noexcept
276 {
277 109 std::shared_ptr<select_acceptor_impl> self;
278 try {
279
1/1
✓ Branch 1 taken 109 times.
109 self = shared_from_this();
280 } catch (const std::bad_weak_ptr&) {
281 return;
282 }
283
284 109 auto prev = acc_.registered.exchange(
285 select_registration_state::unregistered, std::memory_order_acq_rel);
286 109 acc_.request_cancel();
287
288
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 106 times.
109 if (prev != select_registration_state::unregistered)
289 {
290 3 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
291 3 acc_.impl_ptr = self;
292 3 svc_.post(&acc_);
293 3 svc_.work_finished();
294 }
295 109 }
296
297 void
298 select_acceptor_impl::
299 cancel_single_op(select_op& op) noexcept
300 {
301 // Called from stop_token callback to cancel a specific pending operation.
302 auto prev = op.registered.exchange(
303 select_registration_state::unregistered, std::memory_order_acq_rel);
304 op.request_cancel();
305
306 if (prev != select_registration_state::unregistered)
307 {
308 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
309
310 // Keep impl alive until op completes
311 try {
312 op.impl_ptr = shared_from_this();
313 } catch (const std::bad_weak_ptr&) {
314 // Impl is being destroyed, op will be orphaned but that's ok
315 }
316
317 svc_.post(&op);
318 svc_.work_finished();
319 }
320 }
321
322 void
323 108 select_acceptor_impl::
324 close_socket() noexcept
325 {
326 108 cancel();
327
328
2/2
✓ Branch 0 taken 42 times.
✓ Branch 1 taken 66 times.
108 if (fd_ >= 0)
329 {
330 // Unconditionally remove from registered_fds_ to handle edge cases
331 42 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
332 42 ::close(fd_);
333 42 fd_ = -1;
334 }
335
336 // Clear cached endpoint
337 108 local_endpoint_ = endpoint{};
338 108 }
339
340 120 select_acceptor_service::
341 120 select_acceptor_service(capy::execution_context& ctx)
342 120 : ctx_(ctx)
343
2/2
✓ Branch 2 taken 120 times.
✓ Branch 5 taken 120 times.
120 , state_(std::make_unique<select_acceptor_state>(ctx.use_service<select_scheduler>()))
344 {
345 120 }
346
347 240 select_acceptor_service::
348 120 ~select_acceptor_service()
349 {
350 240 }
351
352 void
353 120 select_acceptor_service::
354 shutdown()
355 {
356
1/1
✓ Branch 2 taken 120 times.
120 std::lock_guard lock(state_->mutex_);
357
358
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 120 times.
120 while (auto* impl = state_->acceptor_list_.pop_front())
359 impl->close_socket();
360
361 120 state_->acceptor_ptrs_.clear();
362 120 }
363
364 tcp_acceptor::acceptor_impl&
365 54 select_acceptor_service::
366 create_acceptor_impl()
367 {
368
1/1
✓ Branch 1 taken 54 times.
54 auto impl = std::make_shared<select_acceptor_impl>(*this);
369 54 auto* raw = impl.get();
370
371
1/1
✓ Branch 2 taken 54 times.
54 std::lock_guard lock(state_->mutex_);
372 54 state_->acceptor_list_.push_back(raw);
373
1/1
✓ Branch 3 taken 54 times.
54 state_->acceptor_ptrs_.emplace(raw, std::move(impl));
374
375 54 return *raw;
376 54 }
377
378 void
379 54 select_acceptor_service::
380 destroy_acceptor_impl(tcp_acceptor::acceptor_impl& impl)
381 {
382 54 auto* select_impl = static_cast<select_acceptor_impl*>(&impl);
383
1/1
✓ Branch 2 taken 54 times.
54 std::lock_guard lock(state_->mutex_);
384 54 state_->acceptor_list_.remove(select_impl);
385
1/1
✓ Branch 2 taken 54 times.
54 state_->acceptor_ptrs_.erase(select_impl);
386 54 }
387
388 std::error_code
389 54 select_acceptor_service::
390 open_acceptor(
391 tcp_acceptor::acceptor_impl& impl,
392 endpoint ep,
393 int backlog)
394 {
395 54 auto* select_impl = static_cast<select_acceptor_impl*>(&impl);
396 54 select_impl->close_socket();
397
398 54 int fd = ::socket(AF_INET, SOCK_STREAM, 0);
399
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 54 times.
54 if (fd < 0)
400 return make_err(errno);
401
402 // Set non-blocking and close-on-exec
403
1/1
✓ Branch 1 taken 54 times.
54 int flags = ::fcntl(fd, F_GETFL, 0);
404
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 54 times.
54 if (flags == -1)
405 {
406 int errn = errno;
407 ::close(fd);
408 return make_err(errn);
409 }
410
2/3
✓ Branch 1 taken 54 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 54 times.
54 if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
411 {
412 int errn = errno;
413 ::close(fd);
414 return make_err(errn);
415 }
416
2/3
✓ Branch 1 taken 54 times.
✗ Branch 3 not taken.
✓ Branch 4 taken 54 times.
54 if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
417 {
418 int errn = errno;
419 ::close(fd);
420 return make_err(errn);
421 }
422
423 // Check fd is within select() limits
424
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 54 times.
54 if (fd >= FD_SETSIZE)
425 {
426 ::close(fd);
427 return make_err(EMFILE);
428 }
429
430 54 int reuse = 1;
431 54 ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
432
433 54 sockaddr_in addr = detail::to_sockaddr_in(ep);
434
2/2
✓ Branch 1 taken 12 times.
✓ Branch 2 taken 42 times.
54 if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
435 {
436 12 int errn = errno;
437
1/1
✓ Branch 1 taken 12 times.
12 ::close(fd);
438 12 return make_err(errn);
439 }
440
441
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 42 times.
42 if (::listen(fd, backlog) < 0)
442 {
443 int errn = errno;
444 ::close(fd);
445 return make_err(errn);
446 }
447
448 42 select_impl->fd_ = fd;
449
450 // Cache the local endpoint (queries OS for ephemeral port if port was 0)
451 42 sockaddr_in local_addr{};
452 42 socklen_t local_len = sizeof(local_addr);
453
1/2
✓ Branch 1 taken 42 times.
✗ Branch 2 not taken.
42 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
454 42 select_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
455
456 42 return {};
457 }
458
459 void
460 5 select_acceptor_service::
461 post(select_op* op)
462 {
463 5 state_->sched_.post(op);
464 5 }
465
466 void
467 3835 select_acceptor_service::
468 work_started() noexcept
469 {
470 3835 state_->sched_.work_started();
471 3835 }
472
473 void
474 3 select_acceptor_service::
475 work_finished() noexcept
476 {
477 3 state_->sched_.work_finished();
478 3 }
479
480 select_socket_service*
481 3834 select_acceptor_service::
482 socket_service() const noexcept
483 {
484 3834 auto* svc = ctx_.find_service<detail::socket_service>();
485
2/4
✓ Branch 0 taken 3834 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 3834 times.
✗ Branch 3 not taken.
3834 return svc ? dynamic_cast<select_socket_service*>(svc) : nullptr;
486 }
487
488 } // namespace boost::corosio::detail
489
490 #endif // BOOST_COROSIO_HAS_SELECT
491