Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #include <boost/corosio/detail/platform.hpp>
11 :
12 : #if BOOST_COROSIO_HAS_SELECT
13 :
14 : #include "src/detail/select/sockets.hpp"
15 : #include "src/detail/endpoint_convert.hpp"
16 : #include "src/detail/make_err.hpp"
17 :
18 : #include <boost/capy/buffers.hpp>
19 :
20 : #include <errno.h>
21 : #include <fcntl.h>
22 : #include <netinet/in.h>
23 : #include <netinet/tcp.h>
24 : #include <sys/socket.h>
25 : #include <unistd.h>
26 :
27 : namespace boost::corosio::detail {
28 :
29 : void
30 99 : select_op::canceller::
31 : operator()() const noexcept
32 : {
33 99 : op->cancel();
34 99 : }
35 :
36 : void
37 0 : select_connect_op::
38 : cancel() noexcept
39 : {
40 0 : if (socket_impl_)
41 0 : socket_impl_->cancel_single_op(*this);
42 : else
43 0 : request_cancel();
44 0 : }
45 :
46 : void
47 99 : select_read_op::
48 : cancel() noexcept
49 : {
50 99 : if (socket_impl_)
51 99 : socket_impl_->cancel_single_op(*this);
52 : else
53 0 : request_cancel();
54 99 : }
55 :
56 : void
57 0 : select_write_op::
58 : cancel() noexcept
59 : {
60 0 : if (socket_impl_)
61 0 : socket_impl_->cancel_single_op(*this);
62 : else
63 0 : request_cancel();
64 0 : }
65 :
66 : void
67 3835 : select_connect_op::
68 : operator()()
69 : {
70 3835 : stop_cb.reset();
71 :
72 3835 : bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
73 :
74 : // Cache endpoints on successful connect
75 3835 : if (success && socket_impl_)
76 : {
77 : // Query local endpoint via getsockname (may fail, but remote is always known)
78 3834 : endpoint local_ep;
79 3834 : sockaddr_in local_addr{};
80 3834 : socklen_t local_len = sizeof(local_addr);
81 3834 : if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
82 3834 : local_ep = from_sockaddr_in(local_addr);
83 : // Always cache remote endpoint; local may be default if getsockname failed
84 3834 : static_cast<select_socket_impl*>(socket_impl_)->set_endpoints(local_ep, target_endpoint);
85 : }
86 :
87 3835 : if (ec_out)
88 : {
89 3835 : if (cancelled.load(std::memory_order_acquire))
90 0 : *ec_out = capy::error::canceled;
91 3835 : else if (errn != 0)
92 1 : *ec_out = make_err(errn);
93 : else
94 3834 : *ec_out = {};
95 : }
96 :
97 3835 : if (bytes_out)
98 0 : *bytes_out = bytes_transferred;
99 :
100 : // Move to stack before destroying the frame
101 3835 : capy::executor_ref saved_ex( std::move( ex ) );
102 3835 : capy::coro saved_h( std::move( h ) );
103 3835 : impl_ptr.reset();
104 3835 : saved_ex.dispatch( saved_h );
105 3835 : }
106 :
107 7680 : select_socket_impl::
108 7680 : select_socket_impl(select_socket_service& svc) noexcept
109 7680 : : svc_(svc)
110 : {
111 7680 : }
112 :
113 : void
114 7680 : select_socket_impl::
115 : release()
116 : {
117 7680 : close_socket();
118 7680 : svc_.destroy_impl(*this);
119 7680 : }
120 :
121 : std::coroutine_handle<>
122 3835 : select_socket_impl::
123 : connect(
124 : std::coroutine_handle<> h,
125 : capy::executor_ref ex,
126 : endpoint ep,
127 : std::stop_token token,
128 : std::error_code* ec)
129 : {
130 3835 : auto& op = conn_;
131 3835 : op.reset();
132 3835 : op.h = h;
133 3835 : op.ex = ex;
134 3835 : op.ec_out = ec;
135 3835 : op.fd = fd_;
136 3835 : op.target_endpoint = ep; // Store target for endpoint caching
137 3835 : op.start(token, this);
138 :
139 3835 : sockaddr_in addr = detail::to_sockaddr_in(ep);
140 3835 : int result = ::connect(fd_, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
141 :
142 3835 : if (result == 0)
143 : {
144 : // Sync success - cache endpoints immediately
145 0 : sockaddr_in local_addr{};
146 0 : socklen_t local_len = sizeof(local_addr);
147 0 : if (::getsockname(fd_, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
148 0 : local_endpoint_ = detail::from_sockaddr_in(local_addr);
149 0 : remote_endpoint_ = ep;
150 :
151 0 : op.complete(0, 0);
152 0 : op.impl_ptr = shared_from_this();
153 0 : svc_.post(&op);
154 : // completion is always posted to scheduler queue, never inline.
155 0 : return std::noop_coroutine();
156 : }
157 :
158 3835 : if (errno == EINPROGRESS)
159 : {
160 3835 : svc_.work_started();
161 3835 : op.impl_ptr = shared_from_this();
162 :
163 : // Set registering BEFORE register_fd to close the race window where
164 : // reactor sees an event before we set registered. The reactor treats
165 : // registering the same as registered when claiming the op.
166 3835 : op.registered.store(select_registration_state::registering, std::memory_order_release);
167 3835 : svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
168 :
169 : // Transition to registered. If this fails, reactor or cancel already
170 : // claimed the op (state is now unregistered), so we're done. However,
171 : // we must still deregister the fd because cancel's deregister_fd may
172 : // have run before our register_fd, leaving the fd orphaned.
173 3835 : auto expected = select_registration_state::registering;
174 3835 : if (!op.registered.compare_exchange_strong(
175 : expected, select_registration_state::registered, std::memory_order_acq_rel))
176 : {
177 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
178 : // completion is always posted to scheduler queue, never inline.
179 0 : return std::noop_coroutine();
180 : }
181 :
182 : // If cancelled was set before we registered, handle it now.
183 3835 : if (op.cancelled.load(std::memory_order_acquire))
184 : {
185 0 : auto prev = op.registered.exchange(
186 : select_registration_state::unregistered, std::memory_order_acq_rel);
187 0 : if (prev != select_registration_state::unregistered)
188 : {
189 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
190 0 : op.impl_ptr = shared_from_this();
191 0 : svc_.post(&op);
192 0 : svc_.work_finished();
193 : }
194 : }
195 : // completion is always posted to scheduler queue, never inline.
196 3835 : return std::noop_coroutine();
197 : }
198 :
199 0 : op.complete(errno, 0);
200 0 : op.impl_ptr = shared_from_this();
201 0 : svc_.post(&op);
202 : // completion is always posted to scheduler queue, never inline.
203 0 : return std::noop_coroutine();
204 : }
205 :
206 : std::coroutine_handle<>
207 116322 : select_socket_impl::
208 : read_some(
209 : std::coroutine_handle<> h,
210 : capy::executor_ref ex,
211 : io_buffer_param param,
212 : std::stop_token token,
213 : std::error_code* ec,
214 : std::size_t* bytes_out)
215 : {
216 116322 : auto& op = rd_;
217 116322 : op.reset();
218 116322 : op.h = h;
219 116322 : op.ex = ex;
220 116322 : op.ec_out = ec;
221 116322 : op.bytes_out = bytes_out;
222 116322 : op.fd = fd_;
223 116322 : op.start(token, this);
224 :
225 116322 : capy::mutable_buffer bufs[select_read_op::max_buffers];
226 116322 : op.iovec_count = static_cast<int>(param.copy_to(bufs, select_read_op::max_buffers));
227 :
228 116322 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
229 : {
230 1 : op.empty_buffer_read = true;
231 1 : op.complete(0, 0);
232 1 : op.impl_ptr = shared_from_this();
233 1 : svc_.post(&op);
234 1 : return std::noop_coroutine();
235 : }
236 :
237 232642 : for (int i = 0; i < op.iovec_count; ++i)
238 : {
239 116321 : op.iovecs[i].iov_base = bufs[i].data();
240 116321 : op.iovecs[i].iov_len = bufs[i].size();
241 : }
242 :
243 116321 : ssize_t n = ::readv(fd_, op.iovecs, op.iovec_count);
244 :
245 116321 : if (n > 0)
246 : {
247 116117 : op.complete(0, static_cast<std::size_t>(n));
248 116117 : op.impl_ptr = shared_from_this();
249 116117 : svc_.post(&op);
250 116117 : return std::noop_coroutine();
251 : }
252 :
253 204 : if (n == 0)
254 : {
255 5 : op.complete(0, 0);
256 5 : op.impl_ptr = shared_from_this();
257 5 : svc_.post(&op);
258 5 : return std::noop_coroutine();
259 : }
260 :
261 199 : if (errno == EAGAIN || errno == EWOULDBLOCK)
262 : {
263 199 : svc_.work_started();
264 199 : op.impl_ptr = shared_from_this();
265 :
266 : // Set registering BEFORE register_fd to close the race window where
267 : // reactor sees an event before we set registered.
268 199 : op.registered.store(select_registration_state::registering, std::memory_order_release);
269 199 : svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_read);
270 :
271 : // Transition to registered. If this fails, reactor or cancel already
272 : // claimed the op (state is now unregistered), so we're done. However,
273 : // we must still deregister the fd because cancel's deregister_fd may
274 : // have run before our register_fd, leaving the fd orphaned.
275 199 : auto expected = select_registration_state::registering;
276 199 : if (!op.registered.compare_exchange_strong(
277 : expected, select_registration_state::registered, std::memory_order_acq_rel))
278 : {
279 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
280 0 : return std::noop_coroutine();
281 : }
282 :
283 : // If cancelled was set before we registered, handle it now.
284 199 : if (op.cancelled.load(std::memory_order_acquire))
285 : {
286 0 : auto prev = op.registered.exchange(
287 : select_registration_state::unregistered, std::memory_order_acq_rel);
288 0 : if (prev != select_registration_state::unregistered)
289 : {
290 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_read);
291 0 : op.impl_ptr = shared_from_this();
292 0 : svc_.post(&op);
293 0 : svc_.work_finished();
294 : }
295 : }
296 199 : return std::noop_coroutine();
297 : }
298 :
299 0 : op.complete(errno, 0);
300 0 : op.impl_ptr = shared_from_this();
301 0 : svc_.post(&op);
302 0 : return std::noop_coroutine();
303 : }
304 :
305 : std::coroutine_handle<>
306 116200 : select_socket_impl::
307 : write_some(
308 : std::coroutine_handle<> h,
309 : capy::executor_ref ex,
310 : io_buffer_param param,
311 : std::stop_token token,
312 : std::error_code* ec,
313 : std::size_t* bytes_out)
314 : {
315 116200 : auto& op = wr_;
316 116200 : op.reset();
317 116200 : op.h = h;
318 116200 : op.ex = ex;
319 116200 : op.ec_out = ec;
320 116200 : op.bytes_out = bytes_out;
321 116200 : op.fd = fd_;
322 116200 : op.start(token, this);
323 :
324 116200 : capy::mutable_buffer bufs[select_write_op::max_buffers];
325 116200 : op.iovec_count = static_cast<int>(param.copy_to(bufs, select_write_op::max_buffers));
326 :
327 116200 : if (op.iovec_count == 0 || (op.iovec_count == 1 && bufs[0].size() == 0))
328 : {
329 1 : op.complete(0, 0);
330 1 : op.impl_ptr = shared_from_this();
331 1 : svc_.post(&op);
332 1 : return std::noop_coroutine();
333 : }
334 :
335 232398 : for (int i = 0; i < op.iovec_count; ++i)
336 : {
337 116199 : op.iovecs[i].iov_base = bufs[i].data();
338 116199 : op.iovecs[i].iov_len = bufs[i].size();
339 : }
340 :
341 116199 : msghdr msg{};
342 116199 : msg.msg_iov = op.iovecs;
343 116199 : msg.msg_iovlen = static_cast<std::size_t>(op.iovec_count);
344 :
345 116199 : ssize_t n = ::sendmsg(fd_, &msg, MSG_NOSIGNAL);
346 :
347 116199 : if (n > 0)
348 : {
349 116198 : op.complete(0, static_cast<std::size_t>(n));
350 116198 : op.impl_ptr = shared_from_this();
351 116198 : svc_.post(&op);
352 116198 : return std::noop_coroutine();
353 : }
354 :
355 1 : if (errno == EAGAIN || errno == EWOULDBLOCK)
356 : {
357 0 : svc_.work_started();
358 0 : op.impl_ptr = shared_from_this();
359 :
360 : // Set registering BEFORE register_fd to close the race window where
361 : // reactor sees an event before we set registered.
362 0 : op.registered.store(select_registration_state::registering, std::memory_order_release);
363 0 : svc_.scheduler().register_fd(fd_, &op, select_scheduler::event_write);
364 :
365 : // Transition to registered. If this fails, reactor or cancel already
366 : // claimed the op (state is now unregistered), so we're done. However,
367 : // we must still deregister the fd because cancel's deregister_fd may
368 : // have run before our register_fd, leaving the fd orphaned.
369 0 : auto expected = select_registration_state::registering;
370 0 : if (!op.registered.compare_exchange_strong(
371 : expected, select_registration_state::registered, std::memory_order_acq_rel))
372 : {
373 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
374 0 : return std::noop_coroutine();
375 : }
376 :
377 : // If cancelled was set before we registered, handle it now.
378 0 : if (op.cancelled.load(std::memory_order_acquire))
379 : {
380 0 : auto prev = op.registered.exchange(
381 : select_registration_state::unregistered, std::memory_order_acq_rel);
382 0 : if (prev != select_registration_state::unregistered)
383 : {
384 0 : svc_.scheduler().deregister_fd(fd_, select_scheduler::event_write);
385 0 : op.impl_ptr = shared_from_this();
386 0 : svc_.post(&op);
387 0 : svc_.work_finished();
388 : }
389 : }
390 0 : return std::noop_coroutine();
391 : }
392 :
393 1 : op.complete(errno ? errno : EIO, 0);
394 1 : op.impl_ptr = shared_from_this();
395 1 : svc_.post(&op);
396 1 : return std::noop_coroutine();
397 : }
398 :
399 : std::error_code
400 3 : select_socket_impl::
401 : shutdown(tcp_socket::shutdown_type what) noexcept
402 : {
403 : int how;
404 3 : switch (what)
405 : {
406 1 : case tcp_socket::shutdown_receive: how = SHUT_RD; break;
407 1 : case tcp_socket::shutdown_send: how = SHUT_WR; break;
408 1 : case tcp_socket::shutdown_both: how = SHUT_RDWR; break;
409 0 : default:
410 0 : return make_err(EINVAL);
411 : }
412 3 : if (::shutdown(fd_, how) != 0)
413 0 : return make_err(errno);
414 3 : return {};
415 : }
416 :
417 : std::error_code
418 5 : select_socket_impl::
419 : set_no_delay(bool value) noexcept
420 : {
421 5 : int flag = value ? 1 : 0;
422 5 : if (::setsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)) != 0)
423 0 : return make_err(errno);
424 5 : return {};
425 : }
426 :
427 : bool
428 5 : select_socket_impl::
429 : no_delay(std::error_code& ec) const noexcept
430 : {
431 5 : int flag = 0;
432 5 : socklen_t len = sizeof(flag);
433 5 : if (::getsockopt(fd_, IPPROTO_TCP, TCP_NODELAY, &flag, &len) != 0)
434 : {
435 0 : ec = make_err(errno);
436 0 : return false;
437 : }
438 5 : ec = {};
439 5 : return flag != 0;
440 : }
441 :
442 : std::error_code
443 4 : select_socket_impl::
444 : set_keep_alive(bool value) noexcept
445 : {
446 4 : int flag = value ? 1 : 0;
447 4 : if (::setsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, sizeof(flag)) != 0)
448 0 : return make_err(errno);
449 4 : return {};
450 : }
451 :
452 : bool
453 4 : select_socket_impl::
454 : keep_alive(std::error_code& ec) const noexcept
455 : {
456 4 : int flag = 0;
457 4 : socklen_t len = sizeof(flag);
458 4 : if (::getsockopt(fd_, SOL_SOCKET, SO_KEEPALIVE, &flag, &len) != 0)
459 : {
460 0 : ec = make_err(errno);
461 0 : return false;
462 : }
463 4 : ec = {};
464 4 : return flag != 0;
465 : }
466 :
467 : std::error_code
468 1 : select_socket_impl::
469 : set_receive_buffer_size(int size) noexcept
470 : {
471 1 : if (::setsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, sizeof(size)) != 0)
472 0 : return make_err(errno);
473 1 : return {};
474 : }
475 :
476 : int
477 3 : select_socket_impl::
478 : receive_buffer_size(std::error_code& ec) const noexcept
479 : {
480 3 : int size = 0;
481 3 : socklen_t len = sizeof(size);
482 3 : if (::getsockopt(fd_, SOL_SOCKET, SO_RCVBUF, &size, &len) != 0)
483 : {
484 0 : ec = make_err(errno);
485 0 : return 0;
486 : }
487 3 : ec = {};
488 3 : return size;
489 : }
490 :
491 : std::error_code
492 1 : select_socket_impl::
493 : set_send_buffer_size(int size) noexcept
494 : {
495 1 : if (::setsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, sizeof(size)) != 0)
496 0 : return make_err(errno);
497 1 : return {};
498 : }
499 :
500 : int
501 3 : select_socket_impl::
502 : send_buffer_size(std::error_code& ec) const noexcept
503 : {
504 3 : int size = 0;
505 3 : socklen_t len = sizeof(size);
506 3 : if (::getsockopt(fd_, SOL_SOCKET, SO_SNDBUF, &size, &len) != 0)
507 : {
508 0 : ec = make_err(errno);
509 0 : return 0;
510 : }
511 3 : ec = {};
512 3 : return size;
513 : }
514 :
515 : std::error_code
516 4 : select_socket_impl::
517 : set_linger(bool enabled, int timeout) noexcept
518 : {
519 4 : if (timeout < 0)
520 1 : return make_err(EINVAL);
521 : struct ::linger lg;
522 3 : lg.l_onoff = enabled ? 1 : 0;
523 3 : lg.l_linger = timeout;
524 3 : if (::setsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, sizeof(lg)) != 0)
525 0 : return make_err(errno);
526 3 : return {};
527 : }
528 :
529 : tcp_socket::linger_options
530 3 : select_socket_impl::
531 : linger(std::error_code& ec) const noexcept
532 : {
533 3 : struct ::linger lg{};
534 3 : socklen_t len = sizeof(lg);
535 3 : if (::getsockopt(fd_, SOL_SOCKET, SO_LINGER, &lg, &len) != 0)
536 : {
537 0 : ec = make_err(errno);
538 0 : return {};
539 : }
540 3 : ec = {};
541 3 : return {.enabled = lg.l_onoff != 0, .timeout = lg.l_linger};
542 : }
543 :
544 : void
545 11622 : select_socket_impl::
546 : cancel() noexcept
547 : {
548 11622 : std::shared_ptr<select_socket_impl> self;
549 : try {
550 11622 : self = shared_from_this();
551 0 : } catch (const std::bad_weak_ptr&) {
552 0 : return;
553 0 : }
554 :
555 34866 : auto cancel_op = [this, &self](select_op& op, int events) {
556 34866 : auto prev = op.registered.exchange(
557 : select_registration_state::unregistered, std::memory_order_acq_rel);
558 34866 : op.request_cancel();
559 34866 : if (prev != select_registration_state::unregistered)
560 : {
561 51 : svc_.scheduler().deregister_fd(fd_, events);
562 51 : op.impl_ptr = self;
563 51 : svc_.post(&op);
564 51 : svc_.work_finished();
565 : }
566 46488 : };
567 :
568 11622 : cancel_op(conn_, select_scheduler::event_write);
569 11622 : cancel_op(rd_, select_scheduler::event_read);
570 11622 : cancel_op(wr_, select_scheduler::event_write);
571 11622 : }
572 :
573 : void
574 99 : select_socket_impl::
575 : cancel_single_op(select_op& op) noexcept
576 : {
577 : // Called from stop_token callback to cancel a specific pending operation.
578 99 : auto prev = op.registered.exchange(
579 : select_registration_state::unregistered, std::memory_order_acq_rel);
580 99 : op.request_cancel();
581 :
582 99 : if (prev != select_registration_state::unregistered)
583 : {
584 : // Determine which event type to deregister
585 67 : int events = 0;
586 67 : if (&op == &conn_ || &op == &wr_)
587 0 : events = select_scheduler::event_write;
588 67 : else if (&op == &rd_)
589 67 : events = select_scheduler::event_read;
590 :
591 67 : svc_.scheduler().deregister_fd(fd_, events);
592 :
593 : // Keep impl alive until op completes
594 : try {
595 67 : op.impl_ptr = shared_from_this();
596 0 : } catch (const std::bad_weak_ptr&) {
597 : // Impl is being destroyed, op will be orphaned but that's ok
598 0 : }
599 :
600 67 : svc_.post(&op);
601 67 : svc_.work_finished();
602 : }
603 99 : }
604 :
605 : void
606 11526 : select_socket_impl::
607 : close_socket() noexcept
608 : {
609 11526 : cancel();
610 :
611 11526 : if (fd_ >= 0)
612 : {
613 : // Unconditionally remove from registered_fds_ to handle edge cases
614 : // where the fd might be registered but cancel() didn't clean it up
615 : // due to race conditions.
616 7680 : svc_.scheduler().deregister_fd(fd_,
617 : select_scheduler::event_read | select_scheduler::event_write);
618 7680 : ::close(fd_);
619 7680 : fd_ = -1;
620 : }
621 :
622 : // Clear cached endpoints
623 11526 : local_endpoint_ = endpoint{};
624 11526 : remote_endpoint_ = endpoint{};
625 11526 : }
626 :
627 120 : select_socket_service::
628 120 : select_socket_service(capy::execution_context& ctx)
629 120 : : state_(std::make_unique<select_socket_state>(ctx.use_service<select_scheduler>()))
630 : {
631 120 : }
632 :
633 240 : select_socket_service::
634 120 : ~select_socket_service()
635 : {
636 240 : }
637 :
638 : void
639 120 : select_socket_service::
640 : shutdown()
641 : {
642 120 : std::lock_guard lock(state_->mutex_);
643 :
644 120 : while (auto* impl = state_->socket_list_.pop_front())
645 0 : impl->close_socket();
646 :
647 120 : state_->socket_ptrs_.clear();
648 120 : }
649 :
650 : tcp_socket::socket_impl&
651 7680 : select_socket_service::
652 : create_impl()
653 : {
654 7680 : auto impl = std::make_shared<select_socket_impl>(*this);
655 7680 : auto* raw = impl.get();
656 :
657 : {
658 7680 : std::lock_guard lock(state_->mutex_);
659 7680 : state_->socket_list_.push_back(raw);
660 7680 : state_->socket_ptrs_.emplace(raw, std::move(impl));
661 7680 : }
662 :
663 7680 : return *raw;
664 7680 : }
665 :
666 : void
667 7680 : select_socket_service::
668 : destroy_impl(tcp_socket::socket_impl& impl)
669 : {
670 7680 : auto* select_impl = static_cast<select_socket_impl*>(&impl);
671 7680 : std::lock_guard lock(state_->mutex_);
672 7680 : state_->socket_list_.remove(select_impl);
673 7680 : state_->socket_ptrs_.erase(select_impl);
674 7680 : }
675 :
676 : std::error_code
677 3846 : select_socket_service::
678 : open_socket(tcp_socket::socket_impl& impl)
679 : {
680 3846 : auto* select_impl = static_cast<select_socket_impl*>(&impl);
681 3846 : select_impl->close_socket();
682 :
683 3846 : int fd = ::socket(AF_INET, SOCK_STREAM, 0);
684 3846 : if (fd < 0)
685 0 : return make_err(errno);
686 :
687 : // Set non-blocking and close-on-exec
688 3846 : int flags = ::fcntl(fd, F_GETFL, 0);
689 3846 : if (flags == -1)
690 : {
691 0 : int errn = errno;
692 0 : ::close(fd);
693 0 : return make_err(errn);
694 : }
695 3846 : if (::fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1)
696 : {
697 0 : int errn = errno;
698 0 : ::close(fd);
699 0 : return make_err(errn);
700 : }
701 3846 : if (::fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
702 : {
703 0 : int errn = errno;
704 0 : ::close(fd);
705 0 : return make_err(errn);
706 : }
707 :
708 : // Check fd is within select() limits
709 3846 : if (fd >= FD_SETSIZE)
710 : {
711 0 : ::close(fd);
712 0 : return make_err(EMFILE); // Too many open files
713 : }
714 :
715 3846 : select_impl->fd_ = fd;
716 3846 : return {};
717 : }
718 :
719 : void
720 232441 : select_socket_service::
721 : post(select_op* op)
722 : {
723 232441 : state_->sched_.post(op);
724 232441 : }
725 :
726 : void
727 4034 : select_socket_service::
728 : work_started() noexcept
729 : {
730 4034 : state_->sched_.work_started();
731 4034 : }
732 :
733 : void
734 118 : select_socket_service::
735 : work_finished() noexcept
736 : {
737 118 : state_->sched_.work_finished();
738 118 : }
739 :
740 : } // namespace boost::corosio::detail
741 :
742 : #endif // BOOST_COROSIO_HAS_SELECT
|