Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #include <boost/corosio/detail/platform.hpp>
11 :
12 : #if BOOST_COROSIO_HAS_EPOLL
13 :
14 : #include "src/detail/epoll/sockets.hpp"
15 : #include "src/detail/endpoint_convert.hpp"
16 : #include "src/detail/make_err.hpp"
17 : #include "src/detail/resume_coro.hpp"
18 :
19 : #include <boost/corosio/detail/except.hpp>
20 : #include <boost/capy/buffers.hpp>
21 :
22 : #include <utility>
23 :
24 : #include <errno.h>
25 : #include <netinet/in.h>
26 : #include <netinet/tcp.h>
27 : #include <sys/epoll.h>
28 : #include <sys/socket.h>
29 : #include <unistd.h>
30 :
31 : namespace boost::corosio::detail {
32 :
33 : void
34 105 : epoll_op::canceller::
35 : operator()() const noexcept
36 : {
37 105 : op->cancel();
38 105 : }
39 :
40 : void
41 0 : epoll_connect_op::
42 : cancel() noexcept
43 : {
44 0 : if (socket_impl_)
45 0 : socket_impl_->cancel_single_op(*this);
46 : else
47 0 : request_cancel();
48 0 : }
49 :
50 : void
51 99 : epoll_read_op::
52 : cancel() noexcept
53 : {
54 99 : if (socket_impl_)
55 99 : socket_impl_->cancel_single_op(*this);
56 : else
57 0 : request_cancel();
58 99 : }
59 :
60 : void
61 0 : epoll_write_op::
62 : cancel() noexcept
63 : {
64 0 : if (socket_impl_)
65 0 : socket_impl_->cancel_single_op(*this);
66 : else
67 0 : request_cancel();
68 0 : }
69 :
70 : void
71 5018 : epoll_connect_op::
72 : operator()()
73 : {
74 5018 : stop_cb.reset();
75 :
76 5018 : bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
77 :
78 : // Cache endpoints on successful connect
79 5018 : if (success && socket_impl_)
80 : {
81 : // Query local endpoint via getsockname (may fail, but remote is always known)
82 5017 : endpoint local_ep;
83 5017 : sockaddr_in local_addr{};
84 5017 : socklen_t local_len = sizeof(local_addr);
85 5017 : if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
86 5017 : local_ep = from_sockaddr_in(local_addr);
87 : // Always cache remote endpoint; local may be default if getsockname failed
88 5017 : static_cast<epoll_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
89 : }
90 :
91 5018 : if (ec_out)
92 : {
93 5018 : if (cancelled.load(std::memory_order_acquire))
94 0 : *ec_out = capy::error::canceled;
95 5018 : else if (errn != 0)
96 1 : *ec_out = make_err(errn);
97 : else
98 5017 : *ec_out = {};
99 : }
100 :
101 5018 : if (bytes_out)
102 0 : *bytes_out = bytes_transferred;
103 :
104 : // Move to stack before resuming. See epoll_op::operator()() for rationale.
105 5018 : capy::executor_ref saved_ex( std::move( ex ) );
106 5018 : capy::coro saved_h( std::move( h ) );
107 5018 : auto prevent_premature_destruction = std::move(impl_ptr);
108 5018 : resume_coro(saved_ex, saved_h);
109 5018 : }
110 :
111 10046 : epoll_socket_impl::
112 10046 : epoll_socket_impl(epoll_socket_service& svc) noexcept
113 10046 : : svc_(svc)
114 : {
115 10046 : }
116 :
117 10046 : epoll_socket_impl::
118 : ~epoll_socket_impl() = default;
119 :
120 : void
121 10046 : epoll_socket_impl::
122 : release()
123 : {
124 10046 : close_socket();
125 10046 : svc_.destroy_impl(*this);
126 10046 : }
127 :
128 : std::coroutine_handle<>
129 5018 : epoll_socket_impl::
130 : connect(
131 : std::coroutine_handle<> h,
132 : capy::executor_ref ex,
133 : endpoint ep,
134 : std::stop_token token,
135 : std::error_code* ec)
136 : {
137 5018 : auto& op = conn_;
138 5018 : op.reset();
139 5018 : op.h = h;
140 5018 : op.ex = ex;
141 5018 : op.ec_out = ec;
142 5018 : op.fd = fd_;
143 5018 : op.target_endpoint = ep; // Store target for endpoint caching
144 5018 : op.start(token, this);
145 :
146 5018 : sockaddr_in addr = detail::to_sockaddr_in(ep);
147 5018 : int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
148 :
149 5018 : if (result == 0)
150 : {
151 : // Sync success - cache endpoints immediately
152 : // Remote is always known; local may fail but we still cache remote
153 0 : sockaddr_in local_addr{};
154 0 : socklen_t local_len = sizeof(local_addr);
155 0 : if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
156 0 : local_endpoint_ = detail::from_sockaddr_in(local_addr);
157 0 : remote_endpoint_ = ep;
158 :
159 0 : op.complete(0, 0);
160 0 : op.impl_ptr = shared_from_this();
161 0 : svc_.post(&op);
162 : // completion is always posted to scheduler queue, never inline.
163 0 : return std::noop_coroutine();
164 : }
165 :
166 5018 : if (errno == EINPROGRESS)
167 : {
168 5018 : svc_.work_started();
169 5018 : op.impl_ptr = shared_from_this();
170 :
171 5018 : bool perform_now = false;
172 : {
173 5018 : std::lock_guard lock(desc_state_.mutex);
174 5018 : if (desc_state_.write_ready)
175 : {
176 0 : desc_state_.write_ready = false;
177 0 : perform_now = true;
178 : }
179 : else
180 : {
181 5018 : desc_state_.connect_op = &op;
182 : }
183 5018 : }
184 :
185 5018 : if (perform_now)
186 : {
187 0 : op.perform_io();
188 0 : if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
189 : {
190 0 : op.errn = 0;
191 0 : std::lock_guard lock(desc_state_.mutex);
192 0 : desc_state_.connect_op = &op;
193 0 : }
194 : else
195 : {
196 0 : svc_.post(&op);
197 0 : svc_.work_finished();
198 : }
199 0 : return std::noop_coroutine();
200 : }
201 :
202 5018 : if (op.cancelled.load(std::memory_order_acquire))
203 : {
204 0 : epoll_op* claimed = nullptr;
205 : {
206 0 : std::lock_guard lock(desc_state_.mutex);
207 0 : if (desc_state_.connect_op == &op)
208 0 : claimed = std::exchange(desc_state_.connect_op, nullptr);
209 0 : }
210 0 : if (claimed)
211 : {
212 0 : svc_.post(claimed);
213 0 : svc_.work_finished();
214 : }
215 : }
216 : // completion is always posted to scheduler queue, never inline.
217 5018 : return std::noop_coroutine();
218 : }
219 :
220 0 : op.complete(errno, 0);
221 0 : op.impl_ptr = shared_from_this();
222 0 : svc_.post(&op);
223 : // completion is always posted to scheduler queue, never inline.
224 0 : return std::noop_coroutine();
225 : }
226 :
227 : void
228 115179 : epoll_socket_impl::
229 : do_read_io()
230 : {
231 115179 : auto& op = rd_;
232 :
233 115179 : ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
234 :
235 115179 : if (n > 0)
236 : {
237 : {
238 115003 : std::lock_guard lock(desc_state_.mutex);
239 115003 : desc_state_.read_ready = false;
240 115003 : }
241 115003 : op.complete(0, static_cast<std::size_t>(n));
242 115003 : svc_.post(&op);
243 115003 : return;
244 : }
245 :
246 176 : if (n == 0)
247 : {
248 : {
249 5 : std::lock_guard lock(desc_state_.mutex);
250 5 : desc_state_.read_ready = false;
251 5 : }
252 5 : op.complete(0, 0);
253 5 : svc_.post(&op);
254 5 : return;
255 : }
256 :
257 171 : if (errno == EAGAIN || errno == EWOULDBLOCK)
258 : {
259 171 : svc_.work_started();
260 :
261 171 : bool perform_now = false;
262 : {
263 171 : std::lock_guard lock(desc_state_.mutex);
264 171 : if (desc_state_.read_ready)
265 : {
266 46 : desc_state_.read_ready = false;
267 46 : perform_now = true;
268 : }
269 : else
270 : {
271 125 : desc_state_.read_op = &op;
272 : }
273 171 : }
274 :
275 171 : if (perform_now)
276 : {
277 46 : op.perform_io();
278 46 : if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
279 : {
280 46 : op.errn = 0;
281 46 : std::lock_guard lock(desc_state_.mutex);
282 46 : desc_state_.read_op = &op;
283 46 : }
284 : else
285 : {
286 0 : svc_.post(&op);
287 0 : svc_.work_finished();
288 : }
289 46 : return;
290 : }
291 :
292 125 : if (op.cancelled.load(std::memory_order_acquire))
293 : {
294 0 : epoll_op* claimed = nullptr;
295 : {
296 0 : std::lock_guard lock(desc_state_.mutex);
297 0 : if (desc_state_.read_op == &op)
298 0 : claimed = std::exchange(desc_state_.read_op, nullptr);
299 0 : }
300 0 : if (claimed)
301 : {
302 0 : svc_.post(claimed);
303 0 : svc_.work_finished();
304 : }
305 : }
306 125 : return;
307 : }
308 :
309 0 : op.complete(errno, 0);
310 0 : svc_.post(&op);
311 : }
312 :
313 : void
314 115056 : epoll_socket_impl::
315 : do_write_io()
316 : {
317 115056 : auto& op = wr_;
318 :
319 115056 : msghdr msg{};
320 115056 : msg.msg_iov = op.iovecs;
321 115056 : msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
322 :
323 115056 : ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
324 :
325 115056 : if (n > 0)
326 : {
327 : {
328 115055 : std::lock_guard lock(desc_state_.mutex);
329 115055 : desc_state_.write_ready = false;
330 115055 : }
331 115055 : op.complete(0, static_cast<std::size_t>(n));
332 115055 : svc_.post(&op);
333 115055 : return;
334 : }
335 :
336 1 : if (errno == EAGAIN || errno == EWOULDBLOCK)
337 : {
338 0 : svc_.work_started();
339 :
340 0 : bool perform_now = false;
341 : {
342 0 : std::lock_guard lock(desc_state_.mutex);
343 0 : if (desc_state_.write_ready)
344 : {
345 0 : desc_state_.write_ready = false;
346 0 : perform_now = true;
347 : }
348 : else
349 : {
350 0 : desc_state_.write_op = &op;
351 : }
352 0 : }
353 :
354 0 : if (perform_now)
355 : {
356 0 : op.perform_io();
357 0 : if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
358 : {
359 0 : op.errn = 0;
360 0 : std::lock_guard lock(desc_state_.mutex);
361 0 : desc_state_.write_op = &op;
362 0 : }
363 : else
364 : {
365 0 : svc_.post(&op);
366 0 : svc_.work_finished();
367 : }
368 0 : return;
369 : }
370 :
371 0 : if (op.cancelled.load(std::memory_order_acquire))
372 : {
373 0 : epoll_op* claimed = nullptr;
374 : {
375 0 : std::lock_guard lock(desc_state_.mutex);
376 0 : if (desc_state_.write_op == &op)
377 0 : claimed = std::exchange(desc_state_.write_op, nullptr);
378 0 : }
379 0 : if (claimed)
380 : {
381 0 : svc_.post(claimed);
382 0 : svc_.work_finished();
383 : }
384 : }
385 0 : return;
386 : }
387 :
388 1 : op.complete(errno ? errno : EIO, 0);
389 1 : svc_.post(&op);
390 : }
391 :
392 : std::coroutine_handle<>
393 115180 : epoll_socket_impl::
394 : read_some(
395 : std::coroutine_handle<> h,
396 : capy::executor_ref ex,
397 : io_buffer_param param,
398 : std::stop_token token,
399 : std::error_code* ec,
400 : std::size_t* bytes_out)
401 : {
402 115180 : auto& op = rd_;
403 115180 : op.reset();
404 115180 : op.h = h;
405 115180 : op.ex = ex;
406 115180 : op.ec_out = ec;
407 115180 : op.bytes_out = bytes_out;
408 115180 : op.fd = fd_;
409 115180 : op.start(token, this);
410 115180 : op.impl_ptr = shared_from_this();
411 :
412 : // Must prepare buffers before initiator runs
413 115180 : capy::mutable_buffer bufs[epoll_read_op::max_buffers];
414 115180 : op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_read_op::max_buffers));
415 :
416 115180 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
417 : {
418 1 : op.empty_buffer_read = true;
419 1 : op.complete(0, 0);
420 1 : svc_.post(&op);
421 1 : return std::noop_coroutine();
422 : }
423 :
424 230358 : for (int i = 0; i < op.iovec_count; ++i)
425 : {
426 115179 : op.iovecs[i].iov_base = bufs[i].data();
427 115179 : op.iovecs[i].iov_len = bufs[i].size();
428 : }
429 :
430 : // Symmetric transfer ensures caller is suspended before I/O starts
431 115179 : return read_initiator_.start<&epoll_socket_impl::do_read_io>(this);
432 : }
433 :
434 : std::coroutine_handle<>
435 115057 : epoll_socket_impl::
436 : write_some(
437 : std::coroutine_handle<> h,
438 : capy::executor_ref ex,
439 : io_buffer_param param,
440 : std::stop_token token,
441 : std::error_code* ec,
442 : std::size_t* bytes_out)
443 : {
444 115057 : auto& op = wr_;
445 115057 : op.reset();
446 115057 : op.h = h;
447 115057 : op.ex = ex;
448 115057 : op.ec_out = ec;
449 115057 : op.bytes_out = bytes_out;
450 115057 : op.fd = fd_;
451 115057 : op.start(token, this);
452 115057 : op.impl_ptr = shared_from_this();
453 :
454 : // Must prepare buffers before initiator runs
455 115057 : capy::mutable_buffer bufs[epoll_write_op::max_buffers];
456 115057 : op.iovec_count = static_cast<int>(param.copy_to(bufs, epoll_write_op::max_buffers));
457 :
458 115057 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
459 : {
460 1 : op.complete(0, 0);
461 1 : svc_.post(&op);
462 1 : return std::noop_coroutine();
463 : }
464 :
465 230112 : for (int i = 0; i < op.iovec_count; ++i)
466 : {
467 115056 : op.iovecs[i].iov_base = bufs[i].data();
468 115056 : op.iovecs[i].iov_len = bufs[i].size();
469 : }
470 :
471 : // Symmetric transfer ensures caller is suspended before I/O starts
472 115056 : return write_initiator_.start<&epoll_socket_impl::do_write_io>(this);
473 : }
474 :
475 : std::error_code
476 3 : epoll_socket_impl::
477 : shutdown(tcp_socket::shutdown_type what) noexcept
478 : {
479 : int how;
480 3 : switch (what)
481 : {
482 1 : case tcp_socket::shutdown_receive: how = SHUT_RD; break;
483 1 : case tcp_socket::shutdown_send: how = SHUT_WR; break;
484 1 : case tcp_socket::shutdown_both: how = SHUT_RDWR; break;
485 0 : default:
486 0 : return make_err(EINVAL);
487 : }
488 3 : if (::shutdown(fd_, how) != 0)
489 0 : return make_err(errno);
490 3 : return {};
491 : }
492 :
493 : std::error_code
494 5 : epoll_socket_impl::
495 : set_no_delay(bool value) noexcept
496 : {
497 5 : int flag = value ? 1 : 0;
498 5 : if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
499 0 : return make_err(errno);
500 5 : return {};
501 : }
502 :
503 : bool
504 5 : epoll_socket_impl::
505 : no_delay(std::error_code& ec) const noexcept
506 : {
507 5 : int flag = 0;
508 5 : socklen_t len = sizeof(flag);
509 5 : if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
510 : {
511 0 : ec = make_err(errno);
512 0 : return false;
513 : }
514 5 : ec = {};
515 5 : return flag != 0;
516 : }
517 :
518 : std::error_code
519 4 : epoll_socket_impl::
520 : set_keep_alive(bool value) noexcept
521 : {
522 4 : int flag = value ? 1 : 0;
523 4 : if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
524 0 : return make_err(errno);
525 4 : return {};
526 : }
527 :
528 : bool
529 4 : epoll_socket_impl::
530 : keep_alive(std::error_code& ec) const noexcept
531 : {
532 4 : int flag = 0;
533 4 : socklen_t len = sizeof(flag);
534 4 : if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
535 : {
536 0 : ec = make_err(errno);
537 0 : return false;
538 : }
539 4 : ec = {};
540 4 : return flag != 0;
541 : }
542 :
543 : std::error_code
544 1 : epoll_socket_impl::
545 : set_receive_buffer_size(int size) noexcept
546 : {
547 1 : if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
548 0 : return make_err(errno);
549 1 : return {};
550 : }
551 :
552 : int
553 3 : epoll_socket_impl::
554 : receive_buffer_size(std::error_code& ec) const noexcept
555 : {
556 3 : int size = 0;
557 3 : socklen_t len = sizeof(size);
558 3 : if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
559 : {
560 0 : ec = make_err(errno);
561 0 : return 0;
562 : }
563 3 : ec = {};
564 3 : return size;
565 : }
566 :
567 : std::error_code
568 1 : epoll_socket_impl::
569 : set_send_buffer_size(int size) noexcept
570 : {
571 1 : if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
572 0 : return make_err(errno);
573 1 : return {};
574 : }
575 :
576 : int
577 3 : epoll_socket_impl::
578 : send_buffer_size(std::error_code& ec) const noexcept
579 : {
580 3 : int size = 0;
581 3 : socklen_t len = sizeof(size);
582 3 : if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
583 : {
584 0 : ec = make_err(errno);
585 0 : return 0;
586 : }
587 3 : ec = {};
588 3 : return size;
589 : }
590 :
591 : std::error_code
592 4 : epoll_socket_impl::
593 : set_linger(bool enabled, int timeout) noexcept
594 : {
595 4 : if (timeout < 0)
596 1 : return make_err(EINVAL);
597 : struct ::linger lg;
598 3 : lg.l_onoff = enabled ? 1 : 0;
599 3 : lg.l_linger = timeout;
600 3 : if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
601 0 : return make_err(errno);
602 3 : return {};
603 : }
604 :
605 : tcp_socket::linger_options
606 3 : epoll_socket_impl::
607 : linger(std::error_code& ec) const noexcept
608 : {
609 3 : struct ::linger lg{};
610 3 : socklen_t len = sizeof(lg);
611 3 : if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
612 : {
613 0 : ec = make_err(errno);
614 0 : return {};
615 : }
616 3 : ec = {};
617 3 : return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
618 : }
619 :
620 : void
621 15173 : epoll_socket_impl::
622 : cancel() noexcept
623 : {
624 15173 : std::shared_ptr<epoll_socket_impl> self;
625 : try {
626 15173 : self = shared_from_this();
627 0 : } catch (const std::bad_weak_ptr&) {
628 0 : return;
629 0 : }
630 :
631 15173 : conn_.request_cancel();
632 15173 : rd_.request_cancel();
633 15173 : wr_.request_cancel();
634 :
635 15173 : epoll_op* conn_claimed = nullptr;
636 15173 : epoll_op* rd_claimed = nullptr;
637 15173 : epoll_op* wr_claimed = nullptr;
638 : {
639 15173 : std::lock_guard lock(desc_state_.mutex);
640 15173 : if (desc_state_.connect_op == &conn_)
641 0 : conn_claimed = std::exchange(desc_state_.connect_op, nullptr);
642 15173 : if (desc_state_.read_op == &rd_)
643 52 : rd_claimed = std::exchange(desc_state_.read_op, nullptr);
644 15173 : if (desc_state_.write_op == &wr_)
645 0 : wr_claimed = std::exchange(desc_state_.write_op, nullptr);
646 15173 : }
647 :
648 15173 : if (conn_claimed)
649 : {
650 0 : conn_.impl_ptr = self;
651 0 : svc_.post(&conn_);
652 0 : svc_.work_finished();
653 : }
654 15173 : if (rd_claimed)
655 : {
656 52 : rd_.impl_ptr = self;
657 52 : svc_.post(&rd_);
658 52 : svc_.work_finished();
659 : }
660 15173 : if (wr_claimed)
661 : {
662 0 : wr_.impl_ptr = self;
663 0 : svc_.post(&wr_);
664 0 : svc_.work_finished();
665 : }
666 15173 : }
667 :
668 : void
669 99 : epoll_socket_impl::
670 : cancel_single_op(epoll_op& op) noexcept
671 : {
672 99 : op.request_cancel();
673 :
674 99 : epoll_op** desc_op_ptr = nullptr;
675 99 : if (&op == &conn_) desc_op_ptr = &desc_state_.connect_op;
676 99 : else if (&op == &rd_) desc_op_ptr = &desc_state_.read_op;
677 0 : else if (&op == &wr_) desc_op_ptr = &desc_state_.write_op;
678 :
679 99 : if (desc_op_ptr)
680 : {
681 99 : epoll_op* claimed = nullptr;
682 : {
683 99 : std::lock_guard lock(desc_state_.mutex);
684 99 : if (*desc_op_ptr == &op)
685 67 : claimed = std::exchange(*desc_op_ptr, nullptr);
686 99 : }
687 99 : if (claimed)
688 : {
689 : try {
690 67 : op.impl_ptr = shared_from_this();
691 0 : } catch (const std::bad_weak_ptr&) {}
692 67 : svc_.post(&op);
693 67 : svc_.work_finished();
694 : }
695 : }
696 99 : }
697 :
698 : void
699 15075 : epoll_socket_impl::
700 : close_socket() noexcept
701 : {
702 15075 : cancel();
703 :
704 : // Keep impl alive if descriptor_state is queued in the scheduler.
705 : // Without this, destroy_impl() drops the last shared_ptr while
706 : // the queued descriptor_state node would become dangling.
707 15075 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
708 : {
709 : try {
710 4 : desc_state_.impl_ref_ = shared_from_this();
711 0 : } catch (std::bad_weak_ptr const&) {}
712 : }
713 :
714 15075 : if (fd_ >= 0)
715 : {
716 10046 : if (desc_state_.registered_events != 0)
717 10046 : svc_.scheduler().deregister_descriptor(fd_);
718 10046 : ::close(fd_);
719 10046 : fd_ = -1;
720 : }
721 :
722 15075 : desc_state_.fd = -1;
723 : {
724 15075 : std::lock_guard lock(desc_state_.mutex);
725 15075 : desc_state_.read_op = nullptr;
726 15075 : desc_state_.write_op = nullptr;
727 15075 : desc_state_.connect_op = nullptr;
728 15075 : desc_state_.read_ready = false;
729 15075 : desc_state_.write_ready = false;
730 15075 : }
731 15075 : desc_state_.registered_events = 0;
732 :
733 15075 : local_endpoint_ = endpoint{};
734 15075 : remote_endpoint_ = endpoint{};
735 15075 : }
736 :
737 189 : epoll_socket_service::
738 189 : epoll_socket_service(capy::execution_context& ctx)
739 189 : : state_(std::make_unique<epoll_socket_state>(ctx.use_service<epoll_scheduler>()))
740 : {
741 189 : }
742 :
743 378 : epoll_socket_service::
744 189 : ~epoll_socket_service()
745 : {
746 378 : }
747 :
748 : void
749 189 : epoll_socket_service::
750 : shutdown()
751 : {
752 189 : std::lock_guard lock(state_->mutex_);
753 :
754 189 : while (auto* impl = state_->socket_list_.pop_front())
755 0 : impl->close_socket();
756 :
757 189 : state_->socket_ptrs_.clear();
758 189 : }
759 :
760 : tcp_socket::socket_impl&
761 10046 : epoll_socket_service::
762 : create_impl()
763 : {
764 10046 : auto impl = std::make_shared<epoll_socket_impl>(*this);
765 10046 : auto* raw = impl.get();
766 :
767 : {
768 10046 : std::lock_guard lock(state_->mutex_);
769 10046 : state_->socket_list_.push_back(raw);
770 10046 : state_->socket_ptrs_.emplace(raw, std::move(impl));
771 10046 : }
772 :
773 10046 : return *raw;
774 10046 : }
775 :
776 : void
777 10046 : epoll_socket_service::
778 : destroy_impl(tcp_socket::socket_impl& impl)
779 : {
780 10046 : auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
781 10046 : std::lock_guard lock(state_->mutex_);
782 10046 : state_->socket_list_.remove(epoll_impl);
783 10046 : state_->socket_ptrs_.erase(epoll_impl);
784 10046 : }
785 :
786 : std::error_code
787 5029 : epoll_socket_service::
788 : open_socket(tcp_socket::socket_impl& impl)
789 : {
790 5029 : auto* epoll_impl = static_cast<epoll_socket_impl*>(&impl);
791 5029 : epoll_impl->close_socket();
792 :
793 5029 : int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
794 5029 : if (fd < 0)
795 0 : return make_err(errno);
796 :
797 5029 : epoll_impl->fd_ = fd;
798 :
799 : // Register fd with epoll (edge-triggered mode)
800 5029 : epoll_impl->desc_state_.fd = fd;
801 : {
802 5029 : std::lock_guard lock(epoll_impl->desc_state_.mutex);
803 5029 : epoll_impl->desc_state_.read_op = nullptr;
804 5029 : epoll_impl->desc_state_.write_op = nullptr;
805 5029 : epoll_impl->desc_state_.connect_op = nullptr;
806 5029 : }
807 5029 : scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
808 :
809 5029 : return {};
810 : }
811 :
812 : void
813 230185 : epoll_socket_service::
814 : post(epoll_op* op)
815 : {
816 230185 : state_->sched_.post(op);
817 230185 : }
818 :
819 : void
820 5189 : epoll_socket_service::
821 : work_started() noexcept
822 : {
823 5189 : state_->sched_.work_started();
824 5189 : }
825 :
826 : void
827 119 : epoll_socket_service::
828 : work_finished() noexcept
829 : {
830 119 : state_->sched_.work_finished();
831 119 : }
832 :
833 : } // namespace boost::corosio::detail
834 :
835 : #endif // BOOST_COROSIO_HAS_EPOLL
|