Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #include <boost/corosio/detail/platform.hpp>
11 :
12 : #if BOOST_COROSIO_HAS_EPOLL
13 :
14 : #include "src/detail/epoll/acceptors.hpp"
15 : #include "src/detail/epoll/sockets.hpp"
16 : #include "src/detail/endpoint_convert.hpp"
17 : #include "src/detail/make_err.hpp"
18 :
19 : #include <utility>
20 :
21 : #include <errno.h>
22 : #include <netinet/in.h>
23 : #include <sys/epoll.h>
24 : #include <sys/socket.h>
25 : #include <unistd.h>
26 :
27 : namespace boost::corosio::detail {
28 :
29 : void
30 6 : epoll_accept_op::
31 : cancel() noexcept
32 : {
33 6 : if (acceptor_impl_)
34 6 : acceptor_impl_->cancel_single_op(*this);
35 : else
36 0 : request_cancel();
37 6 : }
38 :
39 : void
40 5026 : epoll_accept_op::
41 : operator()()
42 : {
43 5026 : stop_cb.reset();
44 :
45 5026 : bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
46 :
47 5026 : if (ec_out)
48 : {
49 5026 : if (cancelled.load(std::memory_order_acquire))
50 9 : *ec_out = capy::error::canceled;
51 5017 : else if (errn != 0)
52 0 : *ec_out = make_err(errn);
53 : else
54 5017 : *ec_out = {};
55 : }
56 :
57 5026 : if (success && accepted_fd >= 0)
58 : {
59 5017 : if (acceptor_impl_)
60 : {
61 5017 : auto* socket_svc = static_cast<epoll_acceptor_impl*>(acceptor_impl_)
62 5017 : ->service().socket_service();
63 5017 : if (socket_svc)
64 : {
65 5017 : auto& impl = static_cast<epoll_socket_impl&>(socket_svc->create_impl());
66 5017 : impl.set_socket(accepted_fd);
67 :
68 : // Register accepted socket with epoll (edge-triggered mode)
69 5017 : impl.desc_state_.fd = accepted_fd;
70 : {
71 5017 : std::lock_guard lock(impl.desc_state_.mutex);
72 5017 : impl.desc_state_.read_op = nullptr;
73 5017 : impl.desc_state_.write_op = nullptr;
74 5017 : impl.desc_state_.connect_op = nullptr;
75 5017 : }
76 5017 : socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_state_);
77 :
78 5017 : sockaddr_in local_addr{};
79 5017 : socklen_t local_len = sizeof(local_addr);
80 5017 : sockaddr_in remote_addr{};
81 5017 : socklen_t remote_len = sizeof(remote_addr);
82 :
83 5017 : endpoint local_ep, remote_ep;
84 5017 : if (::getsockname(accepted_fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
85 5017 : local_ep = from_sockaddr_in(local_addr);
86 5017 : if (::getpeername(accepted_fd, reinterpret_cast<sockaddr*>(&remote_addr), &remote_len) == 0)
87 5017 : remote_ep = from_sockaddr_in(remote_addr);
88 :
89 5017 : impl.set_endpoints(local_ep, remote_ep);
90 :
91 5017 : if (impl_out)
92 5017 : *impl_out = &impl;
93 :
94 5017 : accepted_fd = -1;
95 : }
96 : else
97 : {
98 0 : if (ec_out && !*ec_out)
99 0 : *ec_out = make_err(ENOENT);
100 0 : ::close(accepted_fd);
101 0 : accepted_fd = -1;
102 0 : if (impl_out)
103 0 : *impl_out = nullptr;
104 : }
105 : }
106 : else
107 : {
108 0 : ::close(accepted_fd);
109 0 : accepted_fd = -1;
110 0 : if (impl_out)
111 0 : *impl_out = nullptr;
112 : }
113 5017 : }
114 : else
115 : {
116 9 : if (accepted_fd >= 0)
117 : {
118 0 : ::close(accepted_fd);
119 0 : accepted_fd = -1;
120 : }
121 :
122 9 : if (peer_impl)
123 : {
124 0 : peer_impl->release();
125 0 : peer_impl = nullptr;
126 : }
127 :
128 9 : if (impl_out)
129 9 : *impl_out = nullptr;
130 : }
131 :
132 : // Move to stack before resuming. See epoll_op::operator()() for rationale.
133 5026 : capy::executor_ref saved_ex( std::move( ex ) );
134 5026 : capy::coro saved_h( std::move( h ) );
135 5026 : auto prevent_premature_destruction = std::move(impl_ptr);
136 5026 : saved_ex.dispatch( saved_h );
137 5026 : }
138 :
139 72 : epoll_acceptor_impl::
140 72 : epoll_acceptor_impl(epoll_acceptor_service& svc) noexcept
141 72 : : svc_(svc)
142 : {
143 72 : }
144 :
145 : void
146 72 : epoll_acceptor_impl::
147 : release()
148 : {
149 72 : close_socket();
150 72 : svc_.destroy_acceptor_impl(*this);
151 72 : }
152 :
153 : std::coroutine_handle<>
154 5026 : epoll_acceptor_impl::
155 : accept(
156 : std::coroutine_handle<> h,
157 : capy::executor_ref ex,
158 : std::stop_token token,
159 : std::error_code* ec,
160 : io_object::io_object_impl** impl_out)
161 : {
162 5026 : auto& op = acc_;
163 5026 : op.reset();
164 5026 : op.h = h;
165 5026 : op.ex = ex;
166 5026 : op.ec_out = ec;
167 5026 : op.impl_out = impl_out;
168 5026 : op.fd = fd_;
169 5026 : op.start(token, this);
170 :
171 5026 : sockaddr_in addr{};
172 5026 : socklen_t addrlen = sizeof(addr);
173 5026 : int accepted = ::accept4(fd_, reinterpret_cast<sockaddr*>(&addr),
174 : &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
175 :
176 5026 : if (accepted >= 0)
177 : {
178 : {
179 2 : std::lock_guard lock(desc_state_.mutex);
180 2 : desc_state_.read_ready = false;
181 2 : }
182 2 : op.accepted_fd = accepted;
183 2 : op.complete(0, 0);
184 2 : op.impl_ptr = shared_from_this();
185 2 : svc_.post(&op);
186 : // completion is always posted to scheduler queue, never inline.
187 2 : return std::noop_coroutine();
188 : }
189 :
190 5024 : if (errno == EAGAIN || errno == EWOULDBLOCK)
191 : {
192 5024 : svc_.work_started();
193 5024 : op.impl_ptr = shared_from_this();
194 :
195 5024 : bool perform_now = false;
196 : {
197 5024 : std::lock_guard lock(desc_state_.mutex);
198 5024 : if (desc_state_.read_ready)
199 : {
200 0 : desc_state_.read_ready = false;
201 0 : perform_now = true;
202 : }
203 : else
204 : {
205 5024 : desc_state_.read_op = &op;
206 : }
207 5024 : }
208 :
209 5024 : if (perform_now)
210 : {
211 0 : op.perform_io();
212 0 : if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
213 : {
214 0 : op.errn = 0;
215 0 : std::lock_guard lock(desc_state_.mutex);
216 0 : desc_state_.read_op = &op;
217 0 : }
218 : else
219 : {
220 0 : svc_.post(&op);
221 0 : svc_.work_finished();
222 : }
223 0 : return std::noop_coroutine();
224 : }
225 :
226 5024 : if (op.cancelled.load(std::memory_order_acquire))
227 : {
228 0 : epoll_op* claimed = nullptr;
229 : {
230 0 : std::lock_guard lock(desc_state_.mutex);
231 0 : if (desc_state_.read_op == &op)
232 0 : claimed = std::exchange(desc_state_.read_op, nullptr);
233 0 : }
234 0 : if (claimed)
235 : {
236 0 : svc_.post(claimed);
237 0 : svc_.work_finished();
238 : }
239 : }
240 : // completion is always posted to scheduler queue, never inline.
241 5024 : return std::noop_coroutine();
242 : }
243 :
244 0 : op.complete(errno, 0);
245 0 : op.impl_ptr = shared_from_this();
246 0 : svc_.post(&op);
247 : // completion is always posted to scheduler queue, never inline.
248 0 : return std::noop_coroutine();
249 : }
250 :
251 : void
252 145 : epoll_acceptor_impl::
253 : cancel() noexcept
254 : {
255 145 : std::shared_ptr<epoll_acceptor_impl> self;
256 : try {
257 145 : self = shared_from_this();
258 0 : } catch (const std::bad_weak_ptr&) {
259 0 : return;
260 0 : }
261 :
262 145 : acc_.request_cancel();
263 :
264 145 : epoll_op* claimed = nullptr;
265 : {
266 145 : std::lock_guard lock(desc_state_.mutex);
267 145 : if (desc_state_.read_op == &acc_)
268 3 : claimed = std::exchange(desc_state_.read_op, nullptr);
269 145 : }
270 145 : if (claimed)
271 : {
272 3 : acc_.impl_ptr = self;
273 3 : svc_.post(&acc_);
274 3 : svc_.work_finished();
275 : }
276 145 : }
277 :
278 : void
279 6 : epoll_acceptor_impl::
280 : cancel_single_op(epoll_op& op) noexcept
281 : {
282 6 : op.request_cancel();
283 :
284 6 : epoll_op* claimed = nullptr;
285 : {
286 6 : std::lock_guard lock(desc_state_.mutex);
287 6 : if (desc_state_.read_op == &op)
288 6 : claimed = std::exchange(desc_state_.read_op, nullptr);
289 6 : }
290 6 : if (claimed)
291 : {
292 : try {
293 6 : op.impl_ptr = shared_from_this();
294 0 : } catch (const std::bad_weak_ptr&) {}
295 6 : svc_.post(&op);
296 6 : svc_.work_finished();
297 : }
298 6 : }
299 :
300 : void
301 144 : epoll_acceptor_impl::
302 : close_socket() noexcept
303 : {
304 144 : cancel();
305 :
306 144 : if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
307 : {
308 : try {
309 0 : desc_state_.impl_ref_ = shared_from_this();
310 0 : } catch (std::bad_weak_ptr const&) {}
311 : }
312 :
313 144 : if (fd_ >= 0)
314 : {
315 62 : if (desc_state_.registered_events != 0)
316 62 : svc_.scheduler().deregister_descriptor(fd_);
317 62 : ::close(fd_);
318 62 : fd_ = -1;
319 : }
320 :
321 144 : desc_state_.fd = -1;
322 : {
323 144 : std::lock_guard lock(desc_state_.mutex);
324 144 : desc_state_.read_op = nullptr;
325 144 : desc_state_.read_ready = false;
326 144 : desc_state_.write_ready = false;
327 144 : }
328 144 : desc_state_.registered_events = 0;
329 :
330 : // Clear cached endpoint
331 144 : local_endpoint_ = endpoint{};
332 144 : }
333 :
334 189 : epoll_acceptor_service::
335 189 : epoll_acceptor_service(capy::execution_context& ctx)
336 189 : : ctx_(ctx)
337 189 : , state_(std::make_unique<epoll_acceptor_state>(ctx.use_service<epoll_scheduler>()))
338 : {
339 189 : }
340 :
341 378 : epoll_acceptor_service::
342 189 : ~epoll_acceptor_service()
343 : {
344 378 : }
345 :
346 : void
347 189 : epoll_acceptor_service::
348 : shutdown()
349 : {
350 189 : std::lock_guard lock(state_->mutex_);
351 :
352 189 : while (auto* impl = state_->acceptor_list_.pop_front())
353 0 : impl->close_socket();
354 :
355 189 : state_->acceptor_ptrs_.clear();
356 189 : }
357 :
358 : tcp_acceptor::acceptor_impl&
359 72 : epoll_acceptor_service::
360 : create_acceptor_impl()
361 : {
362 72 : auto impl = std::make_shared<epoll_acceptor_impl>(*this);
363 72 : auto* raw = impl.get();
364 :
365 72 : std::lock_guard lock(state_->mutex_);
366 72 : state_->acceptor_list_.push_back(raw);
367 72 : state_->acceptor_ptrs_.emplace(raw, std::move(impl));
368 :
369 72 : return *raw;
370 72 : }
371 :
372 : void
373 72 : epoll_acceptor_service::
374 : destroy_acceptor_impl(tcp_acceptor::acceptor_impl& impl)
375 : {
376 72 : auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
377 72 : std::lock_guard lock(state_->mutex_);
378 72 : state_->acceptor_list_.remove(epoll_impl);
379 72 : state_->acceptor_ptrs_.erase(epoll_impl);
380 72 : }
381 :
382 : std::error_code
383 72 : epoll_acceptor_service::
384 : open_acceptor(
385 : tcp_acceptor::acceptor_impl& impl,
386 : endpoint ep,
387 : int backlog)
388 : {
389 72 : auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
390 72 : epoll_impl->close_socket();
391 :
392 72 : int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
393 72 : if (fd < 0)
394 0 : return make_err(errno);
395 :
396 72 : int reuse = 1;
397 72 : ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
398 :
399 72 : sockaddr_in addr = detail::to_sockaddr_in(ep);
400 72 : if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
401 : {
402 10 : int errn = errno;
403 10 : ::close(fd);
404 10 : return make_err(errn);
405 : }
406 :
407 62 : if (::listen(fd, backlog) < 0)
408 : {
409 0 : int errn = errno;
410 0 : ::close(fd);
411 0 : return make_err(errn);
412 : }
413 :
414 62 : epoll_impl->fd_ = fd;
415 :
416 : // Register fd with epoll (edge-triggered mode)
417 62 : epoll_impl->desc_state_.fd = fd;
418 : {
419 62 : std::lock_guard lock(epoll_impl->desc_state_.mutex);
420 62 : epoll_impl->desc_state_.read_op = nullptr;
421 62 : }
422 62 : scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
423 :
424 : // Cache the local endpoint (queries OS for ephemeral port if port was 0)
425 62 : sockaddr_in local_addr{};
426 62 : socklen_t local_len = sizeof(local_addr);
427 62 : if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
428 62 : epoll_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
429 :
430 62 : return {};
431 : }
432 :
433 : void
434 11 : epoll_acceptor_service::
435 : post(epoll_op* op)
436 : {
437 11 : state_->sched_.post(op);
438 11 : }
439 :
440 : void
441 5024 : epoll_acceptor_service::
442 : work_started() noexcept
443 : {
444 5024 : state_->sched_.work_started();
445 5024 : }
446 :
447 : void
448 9 : epoll_acceptor_service::
449 : work_finished() noexcept
450 : {
451 9 : state_->sched_.work_finished();
452 9 : }
453 :
454 : epoll_socket_service*
455 5017 : epoll_acceptor_service::
456 : socket_service() const noexcept
457 : {
458 5017 : auto* svc = ctx_.find_service<detail::socket_service>();
459 5017 : return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
460 : }
461 :
462 : } // namespace boost::corosio::detail
463 :
464 : #endif // BOOST_COROSIO_HAS_EPOLL
|