libs/corosio/src/corosio/src/detail/select/sockets.cpp

73.5% Lines (274/373) 94.1% Functions (32/34) 57.1% Branches (112/196)
libs/corosio/src/corosio/src/detail/select/sockets.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_SELECT
13
14 #include "src/detail/select/sockets.hpp"
15 #include "src/detail/endpoint_convert.hpp"
16 #include "src/detail/make_err.hpp"
17
18 #include <boost/capy/buffers.hpp>
19
20 #include <errno.h>
21 #include <fcntl.h>
22 #include <netinet/in.h>
23 #include <netinet/tcp.h>
24 #include <sys/socket.h>
25 #include <unistd.h>
26
27 namespace boost::corosio::detail {
28
29 void
30 99 select_op::canceller::
31 operator()() const noexcept
32 {
33 99 op->cancel();
34 99 }
35
36 void
37 select_connect_op::
38 cancel() noexcept
39 {
40 if (socket_impl_)
41 socket_impl_->cancel_single_op(*this);
42 else
43 request_cancel();
44 }
45
46 void
47 99 select_read_op::
48 cancel() noexcept
49 {
50
1/2
✓ Branch 0 taken 99 times.
✗ Branch 1 not taken.
99 if (socket_impl_)
51 99 socket_impl_->cancel_single_op(*this);
52 else
53 request_cancel();
54 99 }
55
56 void
57 select_write_op::
58 cancel() noexcept
59 {
60 if (socket_impl_)
61 socket_impl_->cancel_single_op(*this);
62 else
63 request_cancel();
64 }
65
66 void
67 3835 select_connect_op::
68 operator()()
69 {
70 3835 stop_cb.reset();
71
72
3/4
✓ Branch 0 taken 3834 times.
✓ Branch 1 taken 1 time.
✓ Branch 3 taken 3834 times.
✗ Branch 4 not taken.
3835 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
73
74 // Cache endpoints on successful connect
75
3/4
✓ Branch 0 taken 3834 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 3834 times.
✗ Branch 3 not taken.
3835 if (success && socket_impl_)
76 {
77 // Query local endpoint via getsockname (may fail, but remote is always known)
78 3834 endpoint local_ep;
79 3834 sockaddr_in local_addr{};
80 3834 socklen_t local_len = sizeof(local_addr);
81
1/2
✓ Branch 1 taken 3834 times.
✗ Branch 2 not taken.
3834 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
82 3834 local_ep = from_sockaddr_in(local_addr);
83 // Always cache remote endpoint; local may be default if getsockname failed
84 3834 static_cast<select_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
85 }
86
87
1/2
✓ Branch 0 taken 3835 times.
✗ Branch 1 not taken.
3835 if (ec_out)
88 {
89
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3835 times.
3835 if (cancelled.load(std::memory_order_acquire))
90 *ec_out = capy::error::canceled;
91
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 3834 times.
3835 else if (errn != 0)
92 1 *ec_out = make_err(errn);
93 else
94 3834 *ec_out = {};
95 }
96
97
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3835 times.
3835 if (bytes_out)
98 *bytes_out = bytes_transferred;
99
100 // Move to stack before destroying the frame
101 3835 capy::executor_ref saved_ex( std::move( ex ) );
102 3835 capy::coro saved_h( std::move( h ) );
103 3835 impl_ptr.reset();
104
1/1
✓ Branch 1 taken 3835 times.
3835 saved_ex.dispatch( saved_h );
105 3835 }
106
107 7680 select_socket_impl::
108 7680 select_socket_impl(select_socket_service& svc) noexcept
109 7680 : svc_(svc)
110 {
111 7680 }
112
113 void
114 7680 select_socket_impl::
115 release()
116 {
117 7680 close_socket();
118 7680 svc_.destroy_impl(*this);
119 7680 }
120
121 std::coroutine_handle<>
122 3835 select_socket_impl::
123 connect(
124 std::coroutine_handle<> h,
125 capy::executor_ref ex,
126 endpoint ep,
127 std::stop_token token,
128 std::error_code* ec)
129 {
130 3835 auto& op = conn_;
131 3835 op.reset();
132 3835 op.h = h;
133 3835 op.ex = ex;
134 3835 op.ec_out = ec;
135 3835 op.fd = fd_;
136 3835 op.target_endpoint = ep; // Store target for endpoint caching
137 3835 op.start(token, this);
138
139 3835 sockaddr_in addr = detail::to_sockaddr_in(ep);
140
1/1
✓ Branch 1 taken 3835 times.
3835 int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
141
142
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3835 times.
3835 if (result == 0)
143 {
144 // Sync success - cache endpoints immediately
145 sockaddr_in local_addr{};
146 socklen_t local_len = sizeof(local_addr);
147 if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
148 local_endpoint_ = detail::from_sockaddr_in(local_addr);
149 remote_endpoint_ = ep;
150
151 op.complete(0, 0);
152 op.impl_ptr = shared_from_this();
153 svc_.post(&op);
154 // completion is always posted to scheduler queue, never inline.
155 return std::noop_coroutine();
156 }
157
158
1/2
✓ Branch 0 taken 3835 times.
✗ Branch 1 not taken.
3835 if (errno == EINPROGRESS)
159 {
160 3835 svc_.work_started();
161
1/1
✓ Branch 1 taken 3835 times.
3835 op.impl_ptr = shared_from_this();
162
163 // Set registering BEFORE register_fd to close the race window where
164 // reactor sees an event before we set registered. The reactor treats
165 // registering the same as registered when claiming the op.
166 3835 op.registered.store(select_registration_state::registering, std::memory_order_release);
167
1/1
✓ Branch 2 taken 3835 times.
3835 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
168
169 // Transition to registered. If this fails, reactor or cancel already
170 // claimed the op (state is now unregistered), so we're done. However,
171 // we must still deregister the fd because cancel's deregister_fd may
172 // have run before our register_fd, leaving the fd orphaned.
173 3835 auto expected = select_registration_state::registering;
174
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3835 times.
3835 if (!op.registered.compare_exchange_strong(
175 expected, select_registration_state::registered, std::memory_order_acq_rel))
176 {
177 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
178 // completion is always posted to scheduler queue, never inline.
179 return std::noop_coroutine();
180 }
181
182 // If cancelled was set before we registered, handle it now.
183
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3835 times.
3835 if (op.cancelled.load(std::memory_order_acquire))
184 {
185 auto prev = op.registered.exchange(
186 select_registration_state::unregistered, std::memory_order_acq_rel);
187 if (prev != select_registration_state::unregistered)
188 {
189 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
190 op.impl_ptr = shared_from_this();
191 svc_.post(&op);
192 svc_.work_finished();
193 }
194 }
195 // completion is always posted to scheduler queue, never inline.
196 3835 return std::noop_coroutine();
197 }
198
199 op.complete(errno, 0);
200 op.impl_ptr = shared_from_this();
201 svc_.post(&op);
202 // completion is always posted to scheduler queue, never inline.
203 return std::noop_coroutine();
204 }
205
206 std::coroutine_handle<>
207 116322 select_socket_impl::
208 read_some(
209 std::coroutine_handle<> h,
210 capy::executor_ref ex,
211 io_buffer_param param,
212 std::stop_token token,
213 std::error_code* ec,
214 std::size_t* bytes_out)
215 {
216 116322 auto& op = rd_;
217 116322 op.reset();
218 116322 op.h = h;
219 116322 op.ex = ex;
220 116322 op.ec_out = ec;
221 116322 op.bytes_out = bytes_out;
222 116322 op.fd = fd_;
223 116322 op.start(token, this);
224
225 116322 capy::mutable_buffer bufs[select_read_op::max_buffers];
226 116322 op.iovec_count = static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
227
228
6/8
✓ Branch 0 taken 116321 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 116321 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 116321 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 116321 times.
116322 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
229 {
230 1 op.empty_buffer_read = true;
231 1 op.complete(0, 0);
232
1/1
✓ Branch 1 taken 1 time.
1 op.impl_ptr = shared_from_this();
233
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
234 1 return std::noop_coroutine();
235 }
236
237
2/2
✓ Branch 0 taken 116321 times.
✓ Branch 1 taken 116321 times.
232642 for (int i = 0; i < op.iovec_count; ++i)
238 {
239 116321 op.iovecs[i].iov_base = bufs[i].data();
240 116321 op.iovecs[i].iov_len = bufs[i].size();
241 }
242
243
1/1
✓ Branch 1 taken 116321 times.
116321 ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
244
245
2/2
✓ Branch 0 taken 116117 times.
✓ Branch 1 taken 204 times.
116321 if (n > 0)
246 {
247 116117 op.complete(0, static_cast<std::size_t>(n));
248
1/1
✓ Branch 1 taken 116117 times.
116117 op.impl_ptr = shared_from_this();
249
1/1
✓ Branch 1 taken 116117 times.
116117 svc_.post(&op);
250 116117 return std::noop_coroutine();
251 }
252
253
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 199 times.
204 if (n == 0)
254 {
255 5 op.complete(0, 0);
256
1/1
✓ Branch 1 taken 5 times.
5 op.impl_ptr = shared_from_this();
257
1/1
✓ Branch 1 taken 5 times.
5 svc_.post(&op);
258 5 return std::noop_coroutine();
259 }
260
261
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 199 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
199 if (errno == EAGAIN || errno == EWOULDBLOCK)
262 {
263 199 svc_.work_started();
264
1/1
✓ Branch 1 taken 199 times.
199 op.impl_ptr = shared_from_this();
265
266 // Set registering BEFORE register_fd to close the race window where
267 // reactor sees an event before we set registered.
268 199 op.registered.store(select_registration_state::registering, std::memory_order_release);
269
1/1
✓ Branch 2 taken 199 times.
199 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
270
271 // Transition to registered. If this fails, reactor or cancel already
272 // claimed the op (state is now unregistered), so we're done. However,
273 // we must still deregister the fd because cancel's deregister_fd may
274 // have run before our register_fd, leaving the fd orphaned.
275 199 auto expected = select_registration_state::registering;
276
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 199 times.
199 if (!op.registered.compare_exchange_strong(
277 expected, select_registration_state::registered, std::memory_order_acq_rel))
278 {
279 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
280 return std::noop_coroutine();
281 }
282
283 // If cancelled was set before we registered, handle it now.
284
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 199 times.
199 if (op.cancelled.load(std::memory_order_acquire))
285 {
286 auto prev = op.registered.exchange(
287 select_registration_state::unregistered, std::memory_order_acq_rel);
288 if (prev != select_registration_state::unregistered)
289 {
290 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
291 op.impl_ptr = shared_from_this();
292 svc_.post(&op);
293 svc_.work_finished();
294 }
295 }
296 199 return std::noop_coroutine();
297 }
298
299 op.complete(errno, 0);
300 op.impl_ptr = shared_from_this();
301 svc_.post(&op);
302 return std::noop_coroutine();
303 }
304
305 std::coroutine_handle<>
306 116200 select_socket_impl::
307 write_some(
308 std::coroutine_handle<> h,
309 capy::executor_ref ex,
310 io_buffer_param param,
311 std::stop_token token,
312 std::error_code* ec,
313 std::size_t* bytes_out)
314 {
315 116200 auto& op = wr_;
316 116200 op.reset();
317 116200 op.h = h;
318 116200 op.ex = ex;
319 116200 op.ec_out = ec;
320 116200 op.bytes_out = bytes_out;
321 116200 op.fd = fd_;
322 116200 op.start(token, this);
323
324 116200 capy::mutable_buffer bufs[select_write_op::max_buffers];
325 116200 op.iovec_count = static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
326
327
6/8
✓ Branch 0 taken 116199 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 116199 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 116199 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 116199 times.
116200 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
328 {
329 1 op.complete(0, 0);
330
1/1
✓ Branch 1 taken 1 time.
1 op.impl_ptr = shared_from_this();
331
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
332 1 return std::noop_coroutine();
333 }
334
335
2/2
✓ Branch 0 taken 116199 times.
✓ Branch 1 taken 116199 times.
232398 for (int i = 0; i < op.iovec_count; ++i)
336 {
337 116199 op.iovecs[i].iov_base = bufs[i].data();
338 116199 op.iovecs[i].iov_len = bufs[i].size();
339 }
340
341 116199 msghdr msg{};
342 116199 msg.msg_iov = op.iovecs;
343 116199 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
344
345
1/1
✓ Branch 1 taken 116199 times.
116199 ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
346
347
2/2
✓ Branch 0 taken 116198 times.
✓ Branch 1 taken 1 time.
116199 if (n > 0)
348 {
349 116198 op.complete(0, static_cast<std::size_t>(n));
350
1/1
✓ Branch 1 taken 116198 times.
116198 op.impl_ptr = shared_from_this();
351
1/1
✓ Branch 1 taken 116198 times.
116198 svc_.post(&op);
352 116198 return std::noop_coroutine();
353 }
354
355
2/4
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
1 if (errno == EAGAIN || errno == EWOULDBLOCK)
356 {
357 svc_.work_started();
358 op.impl_ptr = shared_from_this();
359
360 // Set registering BEFORE register_fd to close the race window where
361 // reactor sees an event before we set registered.
362 op.registered.store(select_registration_state::registering, std::memory_order_release);
363 svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
364
365 // Transition to registered. If this fails, reactor or cancel already
366 // claimed the op (state is now unregistered), so we're done. However,
367 // we must still deregister the fd because cancel's deregister_fd may
368 // have run before our register_fd, leaving the fd orphaned.
369 auto expected = select_registration_state::registering;
370 if (!op.registered.compare_exchange_strong(
371 expected, select_registration_state::registered, std::memory_order_acq_rel))
372 {
373 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
374 return std::noop_coroutine();
375 }
376
377 // If cancelled was set before we registered, handle it now.
378 if (op.cancelled.load(std::memory_order_acquire))
379 {
380 auto prev = op.registered.exchange(
381 select_registration_state::unregistered, std::memory_order_acq_rel);
382 if (prev != select_registration_state::unregistered)
383 {
384 svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
385 op.impl_ptr = shared_from_this();
386 svc_.post(&op);
387 svc_.work_finished();
388 }
389 }
390 return std::noop_coroutine();
391 }
392
393
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 op.complete(errno ? errno : EIO, 0);
394
1/1
✓ Branch 1 taken 1 time.
1 op.impl_ptr = shared_from_this();
395
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
396 1 return std::noop_coroutine();
397 }
398
399 std::error_code
400 3 select_socket_impl::
401 shutdown(tcp_socket::shutdown_type what) noexcept
402 {
403 int how;
404
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
3 switch (what)
405 {
406 1 case tcp_socket::shutdown_receive: how = SHUT_RD; break;
407 1 case tcp_socket::shutdown_send: how = SHUT_WR; break;
408 1 case tcp_socket::shutdown_both: how = SHUT_RDWR; break;
409 default:
410 return make_err(EINVAL);
411 }
412
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::shutdown(fd_, how) != 0)
413 return make_err(errno);
414 3 return {};
415 }
416
417 std::error_code
418 5 select_socket_impl::
419 set_no_delay(bool value) noexcept
420 {
421
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 time.
5 int flag = value ? 1 : 0;
422
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
423 return make_err(errno);
424 5 return {};
425 }
426
427 bool
428 5 select_socket_impl::
429 no_delay(std::error_code& ec) const noexcept
430 {
431 5 int flag = 0;
432 5 socklen_t len = sizeof(flag);
433
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
434 {
435 ec = make_err(errno);
436 return false;
437 }
438 5 ec = {};
439 5 return flag != 0;
440 }
441
442 std::error_code
443 4 select_socket_impl::
444 set_keep_alive(bool value) noexcept
445 {
446
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 time.
4 int flag = value ? 1 : 0;
447
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
448 return make_err(errno);
449 4 return {};
450 }
451
452 bool
453 4 select_socket_impl::
454 keep_alive(std::error_code& ec) const noexcept
455 {
456 4 int flag = 0;
457 4 socklen_t len = sizeof(flag);
458
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
459 {
460 ec = make_err(errno);
461 return false;
462 }
463 4 ec = {};
464 4 return flag != 0;
465 }
466
467 std::error_code
468 1 select_socket_impl::
469 set_receive_buffer_size(int size) noexcept
470 {
471
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
472 return make_err(errno);
473 1 return {};
474 }
475
476 int
477 3 select_socket_impl::
478 receive_buffer_size(std::error_code& ec) const noexcept
479 {
480 3 int size = 0;
481 3 socklen_t len = sizeof(size);
482
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
483 {
484 ec = make_err(errno);
485 return 0;
486 }
487 3 ec = {};
488 3 return size;
489 }
490
491 std::error_code
492 1 select_socket_impl::
493 set_send_buffer_size(int size) noexcept
494 {
495
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
496 return make_err(errno);
497 1 return {};
498 }
499
500 int
501 3 select_socket_impl::
502 send_buffer_size(std::error_code& ec) const noexcept
503 {
504 3 int size = 0;
505 3 socklen_t len = sizeof(size);
506
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
507 {
508 ec = make_err(errno);
509 return 0;
510 }
511 3 ec = {};
512 3 return size;
513 }
514
515 std::error_code
516 4 select_socket_impl::
517 set_linger(bool enabled, int timeout) noexcept
518 {
519
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 3 times.
4 if (timeout < 0)
520 1 return make_err(EINVAL);
521 struct ::linger lg;
522
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 time.
3 lg.l_onoff = enabled ? 1 : 0;
523 3 lg.l_linger = timeout;
524
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
525 return make_err(errno);
526 3 return {};
527 }
528
529 tcp_socket::linger_options
530 3 select_socket_impl::
531 linger(std::error_code& ec) const noexcept
532 {
533 3 struct ::linger lg{};
534 3 socklen_t len = sizeof(lg);
535
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
536 {
537 ec = make_err(errno);
538 return {};
539 }
540 3 ec = {};
541 3 return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
542 }
543
544 void
545 11622 select_socket_impl::
546 cancel() noexcept
547 {
548 11622 std::shared_ptr<select_socket_impl> self;
549 try {
550
1/1
✓ Branch 1 taken 11622 times.
11622 self = shared_from_this();
551 } catch (const std::bad_weak_ptr&) {
552 return;
553 }
554
555 34866 auto cancel_op = [this, &self](select_op& op, int events) {
556 34866 auto prev = op.registered.exchange(
557 select_registration_state::unregistered, std::memory_order_acq_rel);
558 34866 op.request_cancel();
559
2/2
✓ Branch 0 taken 51 times.
✓ Branch 1 taken 34815 times.
34866 if (prev != select_registration_state::unregistered)
560 {
561 51 svc_.scheduler().deregister_fd(fd_, events);
562 51 op.impl_ptr = self;
563 51 svc_.post(&op);
564 51 svc_.work_finished();
565 }
566 46488 };
567
568 11622 cancel_op(conn_, select_scheduler::event_write);
569 11622 cancel_op(rd_, select_scheduler::event_read);
570 11622 cancel_op(wr_, select_scheduler::event_write);
571 11622 }
572
573 void
574 99 select_socket_impl::
575 cancel_single_op(select_op& op) noexcept
576 {
577 // Called from stop_token callback to cancel a specific pending operation.
578 99 auto prev = op.registered.exchange(
579 select_registration_state::unregistered, std::memory_order_acq_rel);
580 99 op.request_cancel();
581
582
2/2
✓ Branch 0 taken 67 times.
✓ Branch 1 taken 32 times.
99 if (prev != select_registration_state::unregistered)
583 {
584 // Determine which event type to deregister
585 67 int events = 0;
586
2/4
✓ Branch 0 taken 67 times.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 67 times.
67 if (&op == &conn_ || &op == &wr_)
587 events = select_scheduler::event_write;
588
1/2
✓ Branch 0 taken 67 times.
✗ Branch 1 not taken.
67 else if (&op == &rd_)
589 67 events = select_scheduler::event_read;
590
591 67 svc_.scheduler().deregister_fd(fd_, events);
592
593 // Keep impl alive until op completes
594 try {
595
1/1
✓ Branch 1 taken 67 times.
67 op.impl_ptr = shared_from_this();
596 } catch (const std::bad_weak_ptr&) {
597 // Impl is being destroyed, op will be orphaned but that's ok
598 }
599
600 67 svc_.post(&op);
601 67 svc_.work_finished();
602 }
603 99 }
604
605 void
606 11526 select_socket_impl::
607 close_socket() noexcept
608 {
609 11526 cancel();
610
611
2/2
✓ Branch 0 taken 7680 times.
✓ Branch 1 taken 3846 times.
11526 if (fd_ >= 0)
612 {
613 // Unconditionally remove from registered_fds_ to handle edge cases
614 // where the fd might be registered but cancel() didn't clean it up
615 // due to race conditions.
616 7680 svc_.scheduler().deregister_fd(fd_,
617 select_scheduler::event_read | select_scheduler::event_write);
618 7680 ::close(fd_);
619 7680 fd_ = -1;
620 }
621
622 // Clear cached endpoints
623 11526 local_endpoint_ = endpoint{};
624 11526 remote_endpoint_ = endpoint{};
625 11526 }
626
627 120 select_socket_service::
628 120 select_socket_service(capy::execution_context& ctx)
629
2/2
✓ Branch 2 taken 120 times.
✓ Branch 5 taken 120 times.
120 : state_(std::make_unique<select_socket_state>(ctx.use_service<select_scheduler>()))
630 {
631 120 }
632
633 240 select_socket_service::
634 120 ~select_socket_service()
635 {
636 240 }
637
638 void
639 120 select_socket_service::
640 shutdown()
641 {
642
1/1
✓ Branch 2 taken 120 times.
120 std::lock_guard lock(state_->mutex_);
643
644
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 120 times.
120 while (auto* impl = state_->socket_list_.pop_front())
645 impl->close_socket();
646
647 120 state_->socket_ptrs_.clear();
648 120 }
649
650 tcp_socket::socket_impl&
651 7680 select_socket_service::
652 create_impl()
653 {
654
1/1
✓ Branch 1 taken 7680 times.
7680 auto impl = std::make_shared<select_socket_impl>(*this);
655 7680 auto* raw = impl.get();
656
657 {
658
1/1
✓ Branch 2 taken 7680 times.
7680 std::lock_guard lock(state_->mutex_);
659 7680 state_->socket_list_.push_back(raw);
660
1/1
✓ Branch 3 taken 7680 times.
7680 state_->socket_ptrs_.emplace(raw, std::move(impl));
661 7680 }
662
663 7680 return *raw;
664 7680 }
665
666 void
667 7680 select_socket_service::
668 destroy_impl(tcp_socket::socket_impl& impl)
669 {
670 7680 auto* select_impl = static_cast<select_socket_impl*>(&impl);
671
1/1
✓ Branch 2 taken 7680 times.
7680 std::lock_guard lock(state_->mutex_);
672 7680 state_->socket_list_.remove(select_impl);
673
1/1
✓ Branch 2 taken 7680 times.
7680 state_->socket_ptrs_.erase(select_impl);
674 7680 }
675
676 std::error_code
677 3846 select_socket_service::
678 open_socket(tcp_socket::socket_impl& impl)
679 {
680 3846 auto* select_impl = static_cast<select_socket_impl*>(&impl);
681 3846 select_impl->close_socket();
682
683 3846 int fd = ::socket(AF_INET, SOCK_STREAM, 0);
684
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3846 times.
3846 if (fd < 0)
685 return make_err(errno);
686
687 // Set non-blocking and close-on-exec
688 3846 int flags = ::fcntl(fd, F_GETFL, 0);
689
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3846 times.
3846 if (flags == -1)
690 {
691 int errn = errno;
692 ::close(fd);
693 return make_err(errn);
694 }
695
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3846 times.
3846 if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
696 {
697 int errn = errno;
698 ::close(fd);
699 return make_err(errn);
700 }
701
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3846 times.
3846 if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
702 {
703 int errn = errno;
704 ::close(fd);
705 return make_err(errn);
706 }
707
708 // Check fd is within select() limits
709
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3846 times.
3846 if (fd >= FD_SETSIZE)
710 {
711 ::close(fd);
712 return make_err(EMFILE); // Too many open files
713 }
714
715 3846 select_impl->fd_ = fd;
716 3846 return {};
717 }
718
719 void
720 232441 select_socket_service::
721 post(select_op* op)
722 {
723 232441 state_->sched_.post(op);
724 232441 }
725
726 void
727 4034 select_socket_service::
728 work_started() noexcept
729 {
730 4034 state_->sched_.work_started();
731 4034 }
732
733 void
734 118 select_socket_service::
735 work_finished() noexcept
736 {
737 118 state_->sched_.work_finished();
738 118 }
739
740 } // namespace boost::corosio::detail
741
742 #endif // BOOST_COROSIO_HAS_SELECT
743