libs/corosio/src/corosio/src/detail/epoll/acceptors.cpp

79.6% Lines (195/245) 100.0% Functions (18/18) 50.4% Branches (66/131)
libs/corosio/src/corosio/src/detail/epoll/acceptors.cpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/corosio
8 //
9
10 #include <boost/corosio/detail/platform.hpp>
11
12 #if BOOST_COROSIO_HAS_EPOLL
13
14 #include "src/detail/epoll/acceptors.hpp"
15 #include "src/detail/epoll/sockets.hpp"
16 #include "src/detail/endpoint_convert.hpp"
17 #include "src/detail/make_err.hpp"
18
19 #include <utility>
20
21 #include <errno.h>
22 #include <netinet/in.h>
23 #include <sys/epoll.h>
24 #include <sys/socket.h>
25 #include <unistd.h>
26
27 namespace boost::corosio::detail {
28
29 void
30 6 epoll_accept_op::
31 cancel() noexcept
32 {
33
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (acceptor_impl_)
34 6 acceptor_impl_->cancel_single_op(*this);
35 else
36 request_cancel();
37 6 }
38
39 void
40 5026 epoll_accept_op::
41 operator()()
42 {
43 5026 stop_cb.reset();
44
45
3/4
✓ Branch 0 taken 5026 times.
✗ Branch 1 not taken.
✓ Branch 3 taken 5017 times.
✓ Branch 4 taken 9 times.
5026 bool success = (errn == 0 && !cancelled.load(std::memory_order_acquire));
46
47
1/2
✓ Branch 0 taken 5026 times.
✗ Branch 1 not taken.
5026 if (ec_out)
48 {
49
2/2
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 5017 times.
5026 if (cancelled.load(std::memory_order_acquire))
50 9 *ec_out = capy::error::canceled;
51
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5017 times.
5017 else if (errn != 0)
52 *ec_out = make_err(errn);
53 else
54 5017 *ec_out = {};
55 }
56
57
3/4
✓ Branch 0 taken 5017 times.
✓ Branch 1 taken 9 times.
✓ Branch 2 taken 5017 times.
✗ Branch 3 not taken.
5026 if (success && accepted_fd >= 0)
58 {
59
1/2
✓ Branch 0 taken 5017 times.
✗ Branch 1 not taken.
5017 if (acceptor_impl_)
60 {
61 5017 auto* socket_svc = static_cast<epoll_acceptor_impl*>(acceptor_impl_)
62 5017 ->service().socket_service();
63
1/2
✓ Branch 0 taken 5017 times.
✗ Branch 1 not taken.
5017 if (socket_svc)
64 {
65
1/1
✓ Branch 1 taken 5017 times.
5017 auto& impl = static_cast<epoll_socket_impl&>(socket_svc->create_impl());
66 5017 impl.set_socket(accepted_fd);
67
68 // Register accepted socket with epoll (edge-triggered mode)
69 5017 impl.desc_state_.fd = accepted_fd;
70 {
71
1/1
✓ Branch 1 taken 5017 times.
5017 std::lock_guard lock(impl.desc_state_.mutex);
72 5017 impl.desc_state_.read_op = nullptr;
73 5017 impl.desc_state_.write_op = nullptr;
74 5017 impl.desc_state_.connect_op = nullptr;
75 5017 }
76
1/1
✓ Branch 2 taken 5017 times.
5017 socket_svc->scheduler().register_descriptor(accepted_fd, &impl.desc_state_);
77
78 5017 sockaddr_in local_addr{};
79 5017 socklen_t local_len = sizeof(local_addr);
80 5017 sockaddr_in remote_addr{};
81 5017 socklen_t remote_len = sizeof(remote_addr);
82
83 5017 endpoint local_ep, remote_ep;
84
1/2
✓ Branch 1 taken 5017 times.
✗ Branch 2 not taken.
5017 if (::getsockname(accepted_fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
85 5017 local_ep = from_sockaddr_in(local_addr);
86
1/2
✓ Branch 1 taken 5017 times.
✗ Branch 2 not taken.
5017 if (::getpeername(accepted_fd, reinterpret_cast<sockaddr*>(&remote_addr), &remote_len) == 0)
87 5017 remote_ep = from_sockaddr_in(remote_addr);
88
89 5017 impl.set_endpoints(local_ep, remote_ep);
90
91
1/2
✓ Branch 0 taken 5017 times.
✗ Branch 1 not taken.
5017 if (impl_out)
92 5017 *impl_out = &impl;
93
94 5017 accepted_fd = -1;
95 }
96 else
97 {
98 if (ec_out && !*ec_out)
99 *ec_out = make_err(ENOENT);
100 ::close(accepted_fd);
101 accepted_fd = -1;
102 if (impl_out)
103 *impl_out = nullptr;
104 }
105 }
106 else
107 {
108 ::close(accepted_fd);
109 accepted_fd = -1;
110 if (impl_out)
111 *impl_out = nullptr;
112 }
113 5017 }
114 else
115 {
116
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9 if (accepted_fd >= 0)
117 {
118 ::close(accepted_fd);
119 accepted_fd = -1;
120 }
121
122
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 9 times.
9 if (peer_impl)
123 {
124 peer_impl->release();
125 peer_impl = nullptr;
126 }
127
128
1/2
✓ Branch 0 taken 9 times.
✗ Branch 1 not taken.
9 if (impl_out)
129 9 *impl_out = nullptr;
130 }
131
132 // Move to stack before resuming. See epoll_op::operator()() for rationale.
133 5026 capy::executor_ref saved_ex( std::move( ex ) );
134 5026 capy::coro saved_h( std::move( h ) );
135 5026 auto prevent_premature_destruction = std::move(impl_ptr);
136
1/1
✓ Branch 1 taken 5026 times.
5026 saved_ex.dispatch( saved_h );
137 5026 }
138
139 72 epoll_acceptor_impl::
140 72 epoll_acceptor_impl(epoll_acceptor_service& svc) noexcept
141 72 : svc_(svc)
142 {
143 72 }
144
145 void
146 72 epoll_acceptor_impl::
147 release()
148 {
149 72 close_socket();
150 72 svc_.destroy_acceptor_impl(*this);
151 72 }
152
153 std::coroutine_handle<>
154 5026 epoll_acceptor_impl::
155 accept(
156 std::coroutine_handle<> h,
157 capy::executor_ref ex,
158 std::stop_token token,
159 std::error_code* ec,
160 io_object::io_object_impl** impl_out)
161 {
162 5026 auto& op = acc_;
163 5026 op.reset();
164 5026 op.h = h;
165 5026 op.ex = ex;
166 5026 op.ec_out = ec;
167 5026 op.impl_out = impl_out;
168 5026 op.fd = fd_;
169 5026 op.start(token, this);
170
171 5026 sockaddr_in addr{};
172 5026 socklen_t addrlen = sizeof(addr);
173
1/1
✓ Branch 1 taken 5026 times.
5026 int accepted = ::accept4(fd_, reinterpret_cast<sockaddr*>(&addr),
174 &addrlen, SOCK_NONBLOCK | SOCK_CLOEXEC);
175
176
2/2
✓ Branch 0 taken 2 times.
✓ Branch 1 taken 5024 times.
5026 if (accepted >= 0)
177 {
178 {
179
1/1
✓ Branch 1 taken 2 times.
2 std::lock_guard lock(desc_state_.mutex);
180 2 desc_state_.read_ready = false;
181 2 }
182 2 op.accepted_fd = accepted;
183 2 op.complete(0, 0);
184
1/1
✓ Branch 1 taken 2 times.
2 op.impl_ptr = shared_from_this();
185
1/1
✓ Branch 1 taken 2 times.
2 svc_.post(&op);
186 // completion is always posted to scheduler queue, never inline.
187 2 return std::noop_coroutine();
188 }
189
190
1/4
✗ Branch 0 not taken.
✓ Branch 1 taken 5024 times.
✗ Branch 2 not taken.
✗ Branch 3 not taken.
5024 if (errno == EAGAIN || errno == EWOULDBLOCK)
191 {
192 5024 svc_.work_started();
193
1/1
✓ Branch 1 taken 5024 times.
5024 op.impl_ptr = shared_from_this();
194
195 5024 bool perform_now = false;
196 {
197
1/1
✓ Branch 1 taken 5024 times.
5024 std::lock_guard lock(desc_state_.mutex);
198
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5024 times.
5024 if (desc_state_.read_ready)
199 {
200 desc_state_.read_ready = false;
201 perform_now = true;
202 }
203 else
204 {
205 5024 desc_state_.read_op = &op;
206 }
207 5024 }
208
209
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 5024 times.
5024 if (perform_now)
210 {
211 op.perform_io();
212 if (op.errn == EAGAIN || op.errn == EWOULDBLOCK)
213 {
214 op.errn = 0;
215 std::lock_guard lock(desc_state_.mutex);
216 desc_state_.read_op = &op;
217 }
218 else
219 {
220 svc_.post(&op);
221 svc_.work_finished();
222 }
223 return std::noop_coroutine();
224 }
225
226
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 5024 times.
5024 if (op.cancelled.load(std::memory_order_acquire))
227 {
228 epoll_op* claimed = nullptr;
229 {
230 std::lock_guard lock(desc_state_.mutex);
231 if (desc_state_.read_op == &op)
232 claimed = std::exchange(desc_state_.read_op, nullptr);
233 }
234 if (claimed)
235 {
236 svc_.post(claimed);
237 svc_.work_finished();
238 }
239 }
240 // completion is always posted to scheduler queue, never inline.
241 5024 return std::noop_coroutine();
242 }
243
244 op.complete(errno, 0);
245 op.impl_ptr = shared_from_this();
246 svc_.post(&op);
247 // completion is always posted to scheduler queue, never inline.
248 return std::noop_coroutine();
249 }
250
251 void
252 145 epoll_acceptor_impl::
253 cancel() noexcept
254 {
255 145 std::shared_ptr<epoll_acceptor_impl> self;
256 try {
257
1/1
✓ Branch 1 taken 145 times.
145 self = shared_from_this();
258 } catch (const std::bad_weak_ptr&) {
259 return;
260 }
261
262 145 acc_.request_cancel();
263
264 145 epoll_op* claimed = nullptr;
265 {
266 145 std::lock_guard lock(desc_state_.mutex);
267
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 142 times.
145 if (desc_state_.read_op == &acc_)
268 3 claimed = std::exchange(desc_state_.read_op, nullptr);
269 145 }
270
2/2
✓ Branch 0 taken 3 times.
✓ Branch 1 taken 142 times.
145 if (claimed)
271 {
272 3 acc_.impl_ptr = self;
273 3 svc_.post(&acc_);
274 3 svc_.work_finished();
275 }
276 145 }
277
278 void
279 6 epoll_acceptor_impl::
280 cancel_single_op(epoll_op& op) noexcept
281 {
282 6 op.request_cancel();
283
284 6 epoll_op* claimed = nullptr;
285 {
286 6 std::lock_guard lock(desc_state_.mutex);
287
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (desc_state_.read_op == &op)
288 6 claimed = std::exchange(desc_state_.read_op, nullptr);
289 6 }
290
1/2
✓ Branch 0 taken 6 times.
✗ Branch 1 not taken.
6 if (claimed)
291 {
292 try {
293
1/1
✓ Branch 1 taken 6 times.
6 op.impl_ptr = shared_from_this();
294 } catch (const std::bad_weak_ptr&) {}
295 6 svc_.post(&op);
296 6 svc_.work_finished();
297 }
298 6 }
299
300 void
301 144 epoll_acceptor_impl::
302 close_socket() noexcept
303 {
304 144 cancel();
305
306
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 144 times.
144 if (desc_state_.is_enqueued_.load(std::memory_order_acquire))
307 {
308 try {
309 desc_state_.impl_ref_ = shared_from_this();
310 } catch (std::bad_weak_ptr const&) {}
311 }
312
313
2/2
✓ Branch 0 taken 62 times.
✓ Branch 1 taken 82 times.
144 if (fd_ >= 0)
314 {
315
1/2
✓ Branch 0 taken 62 times.
✗ Branch 1 not taken.
62 if (desc_state_.registered_events != 0)
316 62 svc_.scheduler().deregister_descriptor(fd_);
317 62 ::close(fd_);
318 62 fd_ = -1;
319 }
320
321 144 desc_state_.fd = -1;
322 {
323 144 std::lock_guard lock(desc_state_.mutex);
324 144 desc_state_.read_op = nullptr;
325 144 desc_state_.read_ready = false;
326 144 desc_state_.write_ready = false;
327 144 }
328 144 desc_state_.registered_events = 0;
329
330 // Clear cached endpoint
331 144 local_endpoint_ = endpoint{};
332 144 }
333
334 189 epoll_acceptor_service::
335 189 epoll_acceptor_service(capy::execution_context& ctx)
336 189 : ctx_(ctx)
337
2/2
✓ Branch 2 taken 189 times.
✓ Branch 5 taken 189 times.
189 , state_(std::make_unique<epoll_acceptor_state>(ctx.use_service<epoll_scheduler>()))
338 {
339 189 }
340
341 378 epoll_acceptor_service::
342 189 ~epoll_acceptor_service()
343 {
344 378 }
345
346 void
347 189 epoll_acceptor_service::
348 shutdown()
349 {
350
1/1
✓ Branch 2 taken 189 times.
189 std::lock_guard lock(state_->mutex_);
351
352
1/2
✗ Branch 2 not taken.
✓ Branch 3 taken 189 times.
189 while (auto* impl = state_->acceptor_list_.pop_front())
353 impl->close_socket();
354
355 189 state_->acceptor_ptrs_.clear();
356 189 }
357
358 tcp_acceptor::acceptor_impl&
359 72 epoll_acceptor_service::
360 create_acceptor_impl()
361 {
362
1/1
✓ Branch 1 taken 72 times.
72 auto impl = std::make_shared<epoll_acceptor_impl>(*this);
363 72 auto* raw = impl.get();
364
365
1/1
✓ Branch 2 taken 72 times.
72 std::lock_guard lock(state_->mutex_);
366 72 state_->acceptor_list_.push_back(raw);
367
1/1
✓ Branch 3 taken 72 times.
72 state_->acceptor_ptrs_.emplace(raw, std::move(impl));
368
369 72 return *raw;
370 72 }
371
372 void
373 72 epoll_acceptor_service::
374 destroy_acceptor_impl(tcp_acceptor::acceptor_impl& impl)
375 {
376 72 auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
377
1/1
✓ Branch 2 taken 72 times.
72 std::lock_guard lock(state_->mutex_);
378 72 state_->acceptor_list_.remove(epoll_impl);
379
1/1
✓ Branch 2 taken 72 times.
72 state_->acceptor_ptrs_.erase(epoll_impl);
380 72 }
381
382 std::error_code
383 72 epoll_acceptor_service::
384 open_acceptor(
385 tcp_acceptor::acceptor_impl& impl,
386 endpoint ep,
387 int backlog)
388 {
389 72 auto* epoll_impl = static_cast<epoll_acceptor_impl*>(&impl);
390 72 epoll_impl->close_socket();
391
392 72 int fd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
393
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 72 times.
72 if (fd < 0)
394 return make_err(errno);
395
396 72 int reuse = 1;
397 72 ::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse));
398
399 72 sockaddr_in addr = detail::to_sockaddr_in(ep);
400
2/2
✓ Branch 1 taken 10 times.
✓ Branch 2 taken 62 times.
72 if (::bind(fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) < 0)
401 {
402 10 int errn = errno;
403
1/1
✓ Branch 1 taken 10 times.
10 ::close(fd);
404 10 return make_err(errn);
405 }
406
407
1/2
✗ Branch 1 not taken.
✓ Branch 2 taken 62 times.
62 if (::listen(fd, backlog) < 0)
408 {
409 int errn = errno;
410 ::close(fd);
411 return make_err(errn);
412 }
413
414 62 epoll_impl->fd_ = fd;
415
416 // Register fd with epoll (edge-triggered mode)
417 62 epoll_impl->desc_state_.fd = fd;
418 {
419
1/1
✓ Branch 1 taken 62 times.
62 std::lock_guard lock(epoll_impl->desc_state_.mutex);
420 62 epoll_impl->desc_state_.read_op = nullptr;
421 62 }
422
1/1
✓ Branch 2 taken 62 times.
62 scheduler().register_descriptor(fd, &epoll_impl->desc_state_);
423
424 // Cache the local endpoint (queries OS for ephemeral port if port was 0)
425 62 sockaddr_in local_addr{};
426 62 socklen_t local_len = sizeof(local_addr);
427
1/2
✓ Branch 1 taken 62 times.
✗ Branch 2 not taken.
62 if (::getsockname(fd, reinterpret_cast<sockaddr*>(&local_addr), &local_len) == 0)
428 62 epoll_impl->set_local_endpoint(detail::from_sockaddr_in(local_addr));
429
430 62 return {};
431 }
432
433 void
434 11 epoll_acceptor_service::
435 post(epoll_op* op)
436 {
437 11 state_->sched_.post(op);
438 11 }
439
440 void
441 5024 epoll_acceptor_service::
442 work_started() noexcept
443 {
444 5024 state_->sched_.work_started();
445 5024 }
446
447 void
448 9 epoll_acceptor_service::
449 work_finished() noexcept
450 {
451 9 state_->sched_.work_finished();
452 9 }
453
454 epoll_socket_service*
455 5017 epoll_acceptor_service::
456 socket_service() const noexcept
457 {
458 5017 auto* svc = ctx_.find_service<detail::socket_service>();
459
2/4
✓ Branch 0 taken 5017 times.
✗ Branch 1 not taken.
✓ Branch 2 taken 5017 times.
✗ Branch 3 not taken.
5017 return svc ? dynamic_cast<epoll_socket_service*>(svc) : nullptr;
460 }
461
462 } // namespace boost::corosio::detail
463
464 #endif // BOOST_COROSIO_HAS_EPOLL
465