libs/corosio/src/corosio/src/detail/epoll/sockets.cpp

72.9% Lines (320/439) 94.4% Functions (34/36) 54.9% Branches (124/226)
libs/corosio/src/corosio/src/detail/epoll/sockets.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/sockets.hpp"
15 #include "src/detail/endpoint_convert.hpp"
16 #include "src/detail/make_err.hpp"
17 #include "src/detail/resume_coro.hpp"
18
19 #include <boost/corosio/detail/except.hpp>
20 #include <boost/capy/buffers.hpp>
21
22 #include <utility>
23
24 #include <errno.h>
25 #include <netinet/in.h>
26 #include <netinet/tcp.h>
27 #include <sys/epoll.h>
28 #include <sys/socket.h>
29 #include <unistd.h>
30
31 namespace boost::corosio::detail {
32
33 void
34 105 epoll_op::canceller::
35 operator()() const noexcept
36 {
37 105 op->cancel();
38 105 }
39
40 void
41 epoll_connect_op::
42 cancel() noexcept
43 {
44 if (socket_impl_)
45 socket_impl_->cancel_single_op(*this);
46 else
47 request_cancel();
48 }
49
50 void
51 99 epoll_read_op::
52 cancel() noexcept
53 {
54
1/2
✓ Branch 0 taken 99 times.
✗ Branch 1 not taken.
99 if (socket_impl_)
55 99 socket_impl_->cancel_single_op(*this);
56 else
57 request_cancel();
58 99 }
59
60 void
61 epoll_write_op::
62 cancel() noexcept
63 {
64 if (socket_impl_)
65 socket_impl_->cancel_single_op(*this);
66 else
67 request_cancel();
68 }
69
70 void
71 5018 epoll_connect_op::
72 operator()()
73 {
74 5018 stop_cb.reset();
75
76
3/4
✓ Branch 0 taken 5017 times.
✓ Branch 1 taken 1 time.
✓ Branch 3 taken 5017 times.
✗ Branch 4 not taken.
5018 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
77
78 // Cache endpoints on successful connect
79
3/4
✓ Branch 0 taken 5017 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 5017 times.
✗ Branch 3 not taken.
5018 if (success && socket_impl_)
80 {
81 // Query local endpoint via getsockname (may fail, but remote is always known)
82 5017 endpoint local_ep;
83 5017 sockaddr_in local_addr{};
84 5017 socklen_t local_len = sizeof(local_addr);
85
1/2
✓ Branch 1 taken 5017 times.
✗ Branch 2 not taken.
5017 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
86 5017 local_ep = from_sockaddr_in(local_addr);
87 // Always cache remote endpoint; local may be default if getsockname failed
88 5017 static_cast<epoll_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
89 }
90
91
1/2
✓ Branch 0 taken 5018 times.
✗ Branch 1 not taken.
5018 if (ec_out)
92 {
93
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5018 times.
5018 if (cancelled.load(std::memory_order_acquire))
94 *ec_out = capy::error::canceled;
95
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 5017 times.
5018 else if (errn != 0)
96 1 *ec_out = make_err(errn);
97 else
98 5017 *ec_out = {};
99 }
100
101
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5018 times.
5018 if (bytes_out)
102 *bytes_out = bytes_transferred;
103
104 // Move to stack before resuming. See epoll_op::operator()() for rationale.
105 5018 capy::executor_ref saved_ex( std::move( ex ) );
106 5018 capy::coro saved_h( std::move( h ) );
107 5018 auto prevent_premature_destruction = std::move(impl_ptr);
108
1/1
✓ Branch 1 taken 5018 times.
5018 resume_coro(saved_ex, saved_h);
109 5018 }
110
111 10046 epoll_socket_impl::
112 10046 epoll_socket_impl(epoll_socket_service& svc) noexcept
113 10046 : svc_(svc)
114 {
115 10046 }
116
117 10046 epoll_socket_impl::
118 ~epoll_socket_impl() = default;
119
120 void
121 10046 epoll_socket_impl::
122 release()
123 {
124 10046 close_socket();
125 10046 svc_.destroy_impl(*this);
126 10046 }
127
128 std::coroutine_handle<>
129 5018 epoll_socket_impl::
130 connect(
131 std::coroutine_handle<> h,
132 capy::executor_ref ex,
133 endpoint ep,
134 std::stop_token token,
135 std::error_code* ec)
136 {
137 5018 auto& op = conn_;
138 5018 op.reset();
139 5018 op.h = h;
140 5018 op.ex = ex;
141 5018 op.ec_out = ec;
142 5018 op.fd = fd_;
143 5018 op.target_endpoint = ep; // Store target for endpoint caching
144 5018 op.start(token, this);
145
146 5018 sockaddr_in addr = detail::to_sockaddr_in(ep);
147
1/1
✓ Branch 1 taken 5018 times.
5018 int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
148
149
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5018 times.
5018 if (result == 0)
150 {
151 // Sync success - cache endpoints immediately
152 // Remote is always known; local may fail but we still cache remote
153 sockaddr_in local_addr{};
154 socklen_t local_len = sizeof(local_addr);
155 if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
156 local_endpoint_ = detail::from_sockaddr_in(local_addr);
157 remote_endpoint_ = ep;
158
159 op.complete(0, 0);
160 op.impl_ptr = shared_from_this();
161 svc_.post(&op);
162 // completion is always posted to scheduler queue, never inline.
163 return std::noop_coroutine();
164 }
165
166
1/2
✓ Branch 0 taken 5018 times.
✗ Branch 1 not taken.
5018 if (errno == EINPROGRESS)
167 {
168 5018 svc_.work_started();
169
1/1
✓ Branch 1 taken 5018 times.
5018 op.impl_ptr = shared_from_this();
170
171 5018 bool perform_now = false;
172 {
173
1/1
✓ Branch 1 taken 5018 times.
5018 std::lock_guard lock(desc_state_.mutex);
174
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5018 times.
5018 if (desc_state_.write_ready)
175 {
176 desc_state_.write_ready = false;
177 perform_now = true;
178 }
179 else
180 {
181 5018 desc_state_.connect_op = &op;
182 }
183 5018 }
184
185
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5018 times.
5018 if (perform_now)
186 {
187 op.perform_io();
188 if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
189 {
190 op.errn = 0;
191 std::lock_guard lock(desc_state_.mutex);
192 desc_state_.connect_op = &op;
193 }
194 else
195 {
196 svc_.post(&op);
197 svc_.work_finished();
198 }
199 return std::noop_coroutine();
200 }
201
202
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5018 times.
5018 if (op.cancelled.load(std::memory_order_acquire))
203 {
204 epoll_op* claimed = nullptr;
205 {
206 std::lock_guard lock(desc_state_.mutex);
207 if (desc_state_.connect_op == &op)
208 claimed = std::exchange(desc_state_.connect_op, nullptr);
209 }
210 if (claimed)
211 {
212 svc_.post(claimed);
213 svc_.work_finished();
214 }
215 }
216 // completion is always posted to scheduler queue, never inline.
217 5018 return std::noop_coroutine();
218 }
219
220 op.complete(errno, 0);
221 op.impl_ptr = shared_from_this();
222 svc_.post(&op);
223 // completion is always posted to scheduler queue, never inline.
224 return std::noop_coroutine();
225 }
226
227 void
228 115179 epoll_socket_impl::
229 do_read_io()
230 {
231 115179 auto& op = rd_;
232
233 115179 ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
234
235
2/2
✓ Branch 0 taken 115003 times.
✓ Branch 1 taken 176 times.
115179 if (n > 0)
236 {
237 {
238
1/1
✓ Branch 1 taken 115003 times.
115003 std::lock_guard lock(desc_state_.mutex);
239 115003 desc_state_.read_ready = false;
240 115003 }
241 115003 op.complete(0, static_cast<std::size_t>(n));
242 115003 svc_.post(&op);
243 115003 return;
244 }
245
246
2/2
✓ Branch 0 taken 5 times.
✓ Branch 1 taken 171 times.
176 if (n == 0)
247 {
248 {
249
1/1
✓ Branch 1 taken 5 times.
5 std::lock_guard lock(desc_state_.mutex);
250 5 desc_state_.read_ready = false;
251 5 }
252 5 op.complete(0, 0);
253 5 svc_.post(&op);
254 5 return;
255 }
256
257
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 171 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
171 if (errno == EAGAIN || errno == EWOULDBLOCK)
258 {
259 171 svc_.work_started();
260
261 171 bool perform_now = false;
262 {
263
1/1
✓ Branch 1 taken 171 times.
171 std::lock_guard lock(desc_state_.mutex);
264
2/2
✓ Branch 0 taken 46 times.
✓ Branch 1 taken 125 times.
171 if (desc_state_.read_ready)
265 {
266 46 desc_state_.read_ready = false;
267 46 perform_now = true;
268 }
269 else
270 {
271 125 desc_state_.read_op = &op;
272 }
273 171 }
274
275
2/2
✓ Branch 0 taken 46 times.
✓ Branch 1 taken 125 times.
171 if (perform_now)
276 {
277 46 op.perform_io();
278
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 46 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
46 if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
279 {
280 46 op.errn = 0;
281
1/1
✓ Branch 1 taken 46 times.
46 std::lock_guard lock(desc_state_.mutex);
282 46 desc_state_.read_op = &op;
283 46 }
284 else
285 {
286 svc_.post(&op);
287 svc_.work_finished();
288 }
289 46 return;
290 }
291
292
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 125 times.
125 if (op.cancelled.load(std::memory_order_acquire))
293 {
294 epoll_op* claimed = nullptr;
295 {
296 std::lock_guard lock(desc_state_.mutex);
297 if (desc_state_.read_op == &op)
298 claimed = std::exchange(desc_state_.read_op, nullptr);
299 }
300 if (claimed)
301 {
302 svc_.post(claimed);
303 svc_.work_finished();
304 }
305 }
306 125 return;
307 }
308
309 op.complete(errno, 0);
310 svc_.post(&op);
311 }
312
313 void
314 115056 epoll_socket_impl::
315 do_write_io()
316 {
317 115056 auto& op = wr_;
318
319 115056 msghdr msg{};
320 115056 msg.msg_iov = op.iovecs;
321 115056 msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
322
323
1/1
✓ Branch 1 taken 115056 times.
115056 ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
324
325
2/2
✓ Branch 0 taken 115055 times.
✓ Branch 1 taken 1 time.
115056 if (n > 0)
326 {
327 {
328
1/1
✓ Branch 1 taken 115055 times.
115055 std::lock_guard lock(desc_state_.mutex);
329 115055 desc_state_.write_ready = false;
330 115055 }
331 115055 op.complete(0, static_cast<std::size_t>(n));
332
1/1
✓ Branch 1 taken 115055 times.
115055 svc_.post(&op);
333 115055 return;
334 }
335
336
2/4
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
✗ Branch 2 not taken.
✓ Branch 3 taken 1 time.
1 if (errno == EAGAIN || errno == EWOULDBLOCK)
337 {
338 svc_.work_started();
339
340 bool perform_now = false;
341 {
342 std::lock_guard lock(desc_state_.mutex);
343 if (desc_state_.write_ready)
344 {
345 desc_state_.write_ready = false;
346 perform_now = true;
347 }
348 else
349 {
350 desc_state_.write_op = &op;
351 }
352 }
353
354 if (perform_now)
355 {
356 op.perform_io();
357 if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
358 {
359 op.errn = 0;
360 std::lock_guard lock(desc_state_.mutex);
361 desc_state_.write_op = &op;
362 }
363 else
364 {
365 svc_.post(&op);
366 svc_.work_finished();
367 }
368 return;
369 }
370
371 if (op.cancelled.load(std::memory_order_acquire))
372 {
373 epoll_op* claimed = nullptr;
374 {
375 std::lock_guard lock(desc_state_.mutex);
376 if (desc_state_.write_op == &op)
377 claimed = std::exchange(desc_state_.write_op, nullptr);
378 }
379 if (claimed)
380 {
381 svc_.post(claimed);
382 svc_.work_finished();
383 }
384 }
385 return;
386 }
387
388
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 op.complete(errno ? errno : EIO, 0);
389
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
390 }
391
392 std::coroutine_handle<>
393 115180 epoll_socket_impl::
394 read_some(
395 std::coroutine_handle<> h,
396 capy::executor_ref ex,
397 io_buffer_param param,
398 std::stop_token token,
399 std::error_code* ec,
400 std::size_t* bytes_out)
401 {
402 115180 auto& op = rd_;
403 115180 op.reset();
404 115180 op.h = h;
405 115180 op.ex = ex;
406 115180 op.ec_out = ec;
407 115180 op.bytes_out = bytes_out;
408 115180 op.fd = fd_;
409 115180 op.start(token, this);
410
1/1
✓ Branch 1 taken 115180 times.
115180 op.impl_ptr = shared_from_this();
411
412 // Must prepare buffers before initiator runs
413 115180 capy::mutable_buffer bufs[epoll_read_op::max_buffers];
414 115180 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
415
416
6/8
✓ Branch 0 taken 115179 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 115179 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 115179 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 115179 times.
115180 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
417 {
418 1 op.empty_buffer_read = true;
419 1 op.complete(0, 0);
420
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
421 1 return std::noop_coroutine();
422 }
423
424
2/2
✓ Branch 0 taken 115179 times.
✓ Branch 1 taken 115179 times.
230358 for (int i = 0; i < op.iovec_count; ++i)
425 {
426 115179 op.iovecs[i].iov_base = bufs[i].data();
427 115179 op.iovecs[i].iov_len = bufs[i].size();
428 }
429
430 // Symmetric transfer ensures caller is suspended before I/O starts
431
1/1
✓ Branch 1 taken 115179 times.
115179 return read_initiator_.start<&epoll_socket_impl::do_read_io>(this);
432 }
433
434 std::coroutine_handle<>
435 115057 epoll_socket_impl::
436 write_some(
437 std::coroutine_handle<> h,
438 capy::executor_ref ex,
439 io_buffer_param param,
440 std::stop_token token,
441 std::error_code* ec,
442 std::size_t* bytes_out)
443 {
444 115057 auto& op = wr_;
445 115057 op.reset();
446 115057 op.h = h;
447 115057 op.ex = ex;
448 115057 op.ec_out = ec;
449 115057 op.bytes_out = bytes_out;
450 115057 op.fd = fd_;
451 115057 op.start(token, this);
452
1/1
✓ Branch 1 taken 115057 times.
115057 op.impl_ptr = shared_from_this();
453
454 // Must prepare buffers before initiator runs
455 115057 capy::mutable_buffer bufs[epoll_write_op::max_buffers];
456 115057 op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
457
458
6/8
✓ Branch 0 taken 115056 times.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 115056 times.
✗ Branch 3 not taken.
✗ Branch 5 not taken.
✓ Branch 6 taken 115056 times.
✓ Branch 7 taken 1 time.
✓ Branch 8 taken 115056 times.
115057 if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
459 {
460 1 op.complete(0, 0);
461
1/1
✓ Branch 1 taken 1 time.
1 svc_.post(&op);
462 1 return std::noop_coroutine();
463 }
464
465
2/2
✓ Branch 0 taken 115056 times.
✓ Branch 1 taken 115056 times.
230112 for (int i = 0; i < op.iovec_count; ++i)
466 {
467 115056 op.iovecs[i].iov_base = bufs[i].data();
468 115056 op.iovecs[i].iov_len = bufs[i].size();
469 }
470
471 // Symmetric transfer ensures caller is suspended before I/O starts
472
1/1
✓ Branch 1 taken 115056 times.
115056 return write_initiator_.start<&epoll_socket_impl::do_write_io>(this);
473 }
474
475 std::error_code
476 3 epoll_socket_impl::
477 shutdown(tcp_socket::shutdown_type what) noexcept
478 {
479 int how;
480
3/4
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 1 time.
✓ Branch 2 taken 1 time.
✗ Branch 3 not taken.
3 switch (what)
481 {
482 1 case tcp_socket::shutdown_receive: how = SHUT_RD; break;
483 1 case tcp_socket::shutdown_send: how = SHUT_WR; break;
484 1 case tcp_socket::shutdown_both: how = SHUT_RDWR; break;
485 default:
486 return make_err(EINVAL);
487 }
488
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::shutdown(fd_, how) != 0)
489 return make_err(errno);
490 3 return {};
491 }
492
493 std::error_code
494 5 epoll_socket_impl::
495 set_no_delay(bool value) noexcept
496 {
497
2/2
✓ Branch 0 taken 4 times.
✓ Branch 1 taken 1 time.
5 int flag = value ? 1 : 0;
498
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
499 return make_err(errno);
500 5 return {};
501 }
502
503 bool
504 5 epoll_socket_impl::
505 no_delay(std::error_code& ec) const noexcept
506 {
507 5 int flag = 0;
508 5 socklen_t len = sizeof(flag);
509
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5 times.
5 if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
510 {
511 ec = make_err(errno);
512 return false;
513 }
514 5 ec = {};
515 5 return flag != 0;
516 }
517
518 std::error_code
519 4 epoll_socket_impl::
520 set_keep_alive(bool value) noexcept
521 {
522
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 1 time.
4 int flag = value ? 1 : 0;
523
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
524 return make_err(errno);
525 4 return {};
526 }
527
528 bool
529 4 epoll_socket_impl::
530 keep_alive(std::error_code& ec) const noexcept
531 {
532 4 int flag = 0;
533 4 socklen_t len = sizeof(flag);
534
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 4 times.
4 if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
535 {
536 ec = make_err(errno);
537 return false;
538 }
539 4 ec = {};
540 4 return flag != 0;
541 }
542
543 std::error_code
544 1 epoll_socket_impl::
545 set_receive_buffer_size(int size) noexcept
546 {
547
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
548 return make_err(errno);
549 1 return {};
550 }
551
552 int
553 3 epoll_socket_impl::
554 receive_buffer_size(std::error_code& ec) const noexcept
555 {
556 3 int size = 0;
557 3 socklen_t len = sizeof(size);
558
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
559 {
560 ec = make_err(errno);
561 return 0;
562 }
563 3 ec = {};
564 3 return size;
565 }
566
567 std::error_code
568 1 epoll_socket_impl::
569 set_send_buffer_size(int size) noexcept
570 {
571
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 1 time.
1 if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
572 return make_err(errno);
573 1 return {};
574 }
575
576 int
577 3 epoll_socket_impl::
578 send_buffer_size(std::error_code& ec) const noexcept
579 {
580 3 int size = 0;
581 3 socklen_t len = sizeof(size);
582
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
583 {
584 ec = make_err(errno);
585 return 0;
586 }
587 3 ec = {};
588 3 return size;
589 }
590
591 std::error_code
592 4 epoll_socket_impl::
593 set_linger(bool enabled, int timeout) noexcept
594 {
595
2/2
✓ Branch 0 taken 1 time.
✓ Branch 1 taken 3 times.
4 if (timeout < 0)
596 1 return make_err(EINVAL);
597 struct ::linger lg;
598
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 1 time.
3 lg.l_onoff = enabled ? 1 : 0;
599 3 lg.l_linger = timeout;
600
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
601 return make_err(errno);
602 3 return {};
603 }
604
605 tcp_socket::linger_options
606 3 epoll_socket_impl::
607 linger(std::error_code& ec) const noexcept
608 {
609 3 struct ::linger lg{};
610 3 socklen_t len = sizeof(lg);
611
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 3 times.
3 if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
612 {
613 ec = make_err(errno);
614 return {};
615 }
616 3 ec = {};
617 3 return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
618 }
619
620 void
621 15173 epoll_socket_impl::
622 cancel() noexcept
623 {
624 15173 std::shared_ptr<epoll_socket_impl> self;
625 try {
626
1/1
✓ Branch 1 taken 15173 times.
15173 self = shared_from_this();
627 } catch (const std::bad_weak_ptr&) {
628 return;
629 }
630
631 15173 conn_.request_cancel();
632 15173 rd_.request_cancel();
633 15173 wr_.request_cancel();
634
635 15173 epoll_op* conn_claimed = nullptr;
636 15173 epoll_op* rd_claimed = nullptr;
637 15173 epoll_op* wr_claimed = nullptr;
638 {
639 15173 std::lock_guard lock(desc_state_.mutex);
640
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15173 times.
15173 if (desc_state_.connect_op == &conn_)
641 conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
642
2/2
✓ Branch 0 taken 52 times.
✓ Branch 1 taken 15121 times.
15173 if (desc_state_.read_op == &rd_)
643 52 rd_claimed = std::exchange(desc_state_.read_op, nullptr);
644
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15173 times.
15173 if (desc_state_.write_op == &wr_)
645 wr_claimed = std::exchange(desc_state_.write_op, nullptr);
646 15173 }
647
648
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15173 times.
15173 if (conn_claimed)
649 {
650 conn_.impl_ptr = self;
651 svc_.post(&conn_);
652 svc_.work_finished();
653 }
654
2/2
✓ Branch 0 taken 52 times.
✓ Branch 1 taken 15121 times.
15173 if (rd_claimed)
655 {
656 52 rd_.impl_ptr = self;
657 52 svc_.post(&rd_);
658 52 svc_.work_finished();
659 }
660
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 15173 times.
15173 if (wr_claimed)
661 {
662 wr_.impl_ptr = self;
663 svc_.post(&wr_);
664 svc_.work_finished();
665 }
666 15173 }
667
668 void
669 99 epoll_socket_impl::
670 cancel_single_op(epoll_op& op) noexcept
671 {
672 99 op.request_cancel();
673
674 99 epoll_op** desc_op_ptr = nullptr;
675
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 99 times.
99 if (&op == &conn_) desc_op_ptr = &desc_state_.connect_op;
676
1/2
✓ Branch 0 taken 99 times.
✗ Branch 1 not taken.
99 else if (&op == &rd_) desc_op_ptr = &desc_state_.read_op;
677 else if (&op == &wr_) desc_op_ptr = &desc_state_.write_op;
678
679
1/2
✓ Branch 0 taken 99 times.
✗ Branch 1 not taken.
99 if (desc_op_ptr)
680 {
681 99 epoll_op* claimed = nullptr;
682 {
683 99 std::lock_guard lock(desc_state_.mutex);
684
2/2
✓ Branch 0 taken 67 times.
✓ Branch 1 taken 32 times.
99 if (*desc_op_ptr == &op)
685 67 claimed = std::exchange(*desc_op_ptr, nullptr);
686 99 }
687
2/2
✓ Branch 0 taken 67 times.
✓ Branch 1 taken 32 times.
99 if (claimed)
688 {
689 try {
690
1/1
✓ Branch 1 taken 67 times.
67 op.impl_ptr = shared_from_this();
691 } catch (const std::bad_weak_ptr&) {}
692 67 svc_.post(&op);
693 67 svc_.work_finished();
694 }
695 }
696 99 }
697
698 void
699 15075 epoll_socket_impl::
700 close_socket() noexcept
701 {
702 15075 cancel();
703
704 // Keep impl alive if descriptor_state is queued in the scheduler.
705 // Without this, destroy_impl() drops the last shared_ptr while
706 // the queued descriptor_state node would become dangling.
707
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 15071 times.
15075 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
708 {
709 try {
710
1/1
✓ Branch 1 taken 4 times.
4 desc_state_.impl_ref_ = shared_from_this();
711 } catch (std::bad_weak_ptr const&) {}
712 }
713
714
2/2
✓ Branch 0 taken 10046 times.
✓ Branch 1 taken 5029 times.
15075 if (fd_ >= 0)
715 {
716
1/2
✓ Branch 0 taken 10046 times.
✗ Branch 1 not taken.
10046 if (desc_state_.registered_events != 0)
717 10046 svc_.scheduler().deregister_descriptor(fd_);
718 10046 ::close(fd_);
719 10046 fd_ = -1;
720 }
721
722 15075 desc_state_.fd = -1;
723 {
724 15075 std::lock_guard lock(desc_state_.mutex);
725 15075 desc_state_.read_op = nullptr;
726 15075 desc_state_.write_op = nullptr;
727 15075 desc_state_.connect_op = nullptr;
728 15075 desc_state_.read_ready = false;
729 15075 desc_state_.write_ready = false;
730 15075 }
731 15075 desc_state_.registered_events = 0;
732
733 15075 local_endpoint_ = endpoint{};
734 15075 remote_endpoint_ = endpoint{};
735 15075 }
736
737 189 epoll_socket_service::
738 189 epoll_socket_service(capy::execution_context& ctx)
739
2/2
✓ Branch 2 taken 189 times.
✓ Branch 5 taken 189 times.
189 : state_(std::make_unique<epoll_socket_state>(ctx.use_service<epoll_scheduler>()))
740 {
741 189 }
742
743 378 epoll_socket_service::
744 189 ~epoll_socket_service()
745 {
746 378 }
747
748 void
749 189 epoll_socket_service::
750 shutdown()
751 {
752
1/1
✓ Branch 2 taken 189 times.
189 std::lock_guard lock(state_->mutex_);
753
754
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 189 times.
189 while (auto* impl = state_->socket_list_.pop_front())
755 impl->close_socket();
756
757 189 state_->socket_ptrs_.clear();
758 189 }
759
760 tcp_socket::socket_impl&
761 10046 epoll_socket_service::
762 create_impl()
763 {
764
1/1
✓ Branch 1 taken 10046 times.
10046 auto impl = std::make_shared<epoll_socket_impl>(*this);
765 10046 auto* raw = impl.get();
766
767 {
768
1/1
✓ Branch 2 taken 10046 times.
10046 std::lock_guard lock(state_->mutex_);
769 10046 state_->socket_list_.push_back(raw);
770
1/1
✓ Branch 3 taken 10046 times.
10046 state_->socket_ptrs_.emplace(raw, std::move(impl));
771 10046 }
772
773 10046 return *raw;
774 10046 }
775
776 void
777 10046 epoll_socket_service::
778 destroy_impl(tcp_socket::socket_impl& impl)
779 {
780 10046 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
781
1/1
✓ Branch 2 taken 10046 times.
10046 std::lock_guard lock(state_->mutex_);
782 10046 state_->socket_list_.remove(epoll_impl);
783
1/1
✓ Branch 2 taken 10046 times.
10046 state_->socket_ptrs_.erase(epoll_impl);
784 10046 }
785
786 std::error_code
787 5029 epoll_socket_service::
788 open_socket(tcp_socket::socket_impl& impl)
789 {
790 5029 auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
791 5029 epoll_impl->close_socket();
792
793 5029 int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
794
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5029 times.
5029 if (fd < 0)
795 return make_err(errno);
796
797 5029 epoll_impl->fd_ = fd;
798
799 // Register fd with epoll (edge-triggered mode)
800 5029 epoll_impl->desc_state_.fd = fd;
801 {
802
1/1
✓ Branch 1 taken 5029 times.
5029 std::lock_guard lock(epoll_impl->desc_state_.mutex);
803 5029 epoll_impl->desc_state_.read_op = nullptr;
804 5029 epoll_impl->desc_state_.write_op = nullptr;
805 5029 epoll_impl->desc_state_.connect_op = nullptr;
806 5029 }
807 5029 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
808
809 5029 return {};
810 }
811
812 void
813 230185 epoll_socket_service::
814 post(epoll_op* op)
815 {
816 230185 state_->sched_.post(op);
817 230185 }
818
819 void
820 5189 epoll_socket_service::
821 work_started() noexcept
822 {
823 5189 state_->sched_.work_started();
824 5189 }
825
826 void
827 119 epoll_socket_service::
828 work_finished() noexcept
829 {
830 119 state_->sched_.work_finished();
831 119 }
832
833 } // namespace boost::corosio::detail
834
835 #endif // BOOST_COROSIO_HAS_EPOLL
836