Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/corosio
8 : //
9 :
10 : #ifndef BOOST_COROSIO_IO_STREAM_HPP
11 : #define BOOST_COROSIO_IO_STREAM_HPP
12 :
13 : #include <boost/corosio/detail/config.hpp>
14 : #include <boost/corosio/io_object.hpp>
15 : #include <boost/capy/io_result.hpp>
16 : #include <boost/corosio/io_buffer_param.hpp>
17 : #include <boost/capy/ex/executor_ref.hpp>
18 : #include <system_error>
19 :
20 : #include <coroutine>
21 : #include <cstddef>
22 : #include <stop_token>
23 :
24 : namespace boost::corosio {
25 :
26 : /** Platform stream with read/write operations.
27 :
28 : This base class provides the fundamental async read and write
29 : operations for kernel-level stream I/O. Derived classes wrap
30 : OS-specific stream implementations (sockets, pipes, etc.) and
31 : satisfy @ref capy::ReadStream and @ref capy::WriteStream concepts.
32 :
33 : @par Semantics
34 : Concrete classes wrap direct platform I/O completed by the kernel.
35 : Functions taking `io_stream&` signal "platform implementation
36 : required" - use this when you need actual kernel I/O rather than
37 : a mock or test double.
38 :
39 : For generic stream algorithms that work with test mocks,
40 : use `template<capy::Stream S>` instead of `io_stream&`.
41 :
42 : @par Thread Safety
43 : Distinct objects: Safe.
44 : Shared objects: Unsafe. All calls to a single stream must be made
45 : from the same implicit or explicit serialization context.
46 :
47 : @par Example
48 : @code
49 : // Read until buffer full or EOF
50 : capy::task<> read_all( io_stream& stream, std::span<char> buf )
51 : {
52 : std::size_t total = 0;
53 : while( total < buf.size() )
54 : {
55 : auto [ec, n] = co_await stream.read_some(
56 : capy::buffer( buf.data() + total, buf.size() - total ) );
57 : if( ec == capy::cond::eof )
58 : break;
59 : if( ec.failed() )
60 : capy::detail::throw_system_error( ec );
61 : total += n;
62 : }
63 : }
64 : @endcode
65 :
66 : @see capy::Stream, capy::ReadStream, capy::WriteStream, tcp_socket
67 : */
68 : class BOOST_COROSIO_DECL io_stream : public io_object
69 : {
70 : public:
71 : /** Asynchronously read data from the stream.
72 :
73 : This operation suspends the calling coroutine and initiates a
74 : kernel-level read. The coroutine resumes when the operation
75 : completes.
76 :
77 : @li The operation completes when:
78 : @li At least one byte has been read into the buffer sequence
79 : @li The peer closes the connection (EOF)
80 : @li An error occurs
81 : @li The operation is cancelled via stop token or `cancel()`
82 :
83 : @par Concurrency
84 : At most one write operation may be in flight concurrently with
85 : this read. No other read operations may be in flight until this
86 : operation completes. Note that concurrent in-flight operations
87 : does not imply the initiating calls may be made concurrently;
88 : all calls must be serialized.
89 :
90 : @par Cancellation
91 : Supports cancellation via `std::stop_token` propagated through
92 : the IoAwaitable protocol, or via the I/O object's `cancel()`
93 : member. When cancelled, the operation completes with an error
94 : that compares equal to `capy::cond::canceled`.
95 :
96 : @par Preconditions
97 : The stream must be open and connected.
98 :
99 : @param buffers The buffer sequence to read data into. The caller
100 : retains ownership and must ensure validity until the
101 : operation completes.
102 :
103 : @return An awaitable yielding `(error_code, std::size_t)`.
104 : On success, `bytes_transferred` contains the number of bytes
105 : read. Compare error codes to conditions, not specific values:
106 : @li `capy::cond::eof` - Peer closed connection (TCP FIN)
107 : @li `capy::cond::canceled` - Operation was cancelled
108 :
109 : @par Example
110 : @code
111 : // Simple read with error handling
112 : auto [ec, n] = co_await stream.read_some( capy::buffer( buf ) );
113 : if( ec == capy::cond::eof )
114 : co_return; // Connection closed gracefully
115 : if( ec.failed() )
116 : capy::detail::throw_system_error( ec );
117 : process( buf, n );
118 : @endcode
119 :
120 : @note This operation may read fewer bytes than the buffer
121 : capacity. Use a loop or `capy::async_read` to read an
122 : exact amount.
123 :
124 : @see write_some, capy::async_read
125 : */
126 : template<capy::MutableBufferSequence MB>
127 231502 : auto read_some(MB const& buffers)
128 : {
129 231502 : return read_some_awaitable<MB>(*this, buffers);
130 : }
131 :
132 : /** Asynchronously write data to the stream.
133 :
134 : This operation suspends the calling coroutine and initiates a
135 : kernel-level write. The coroutine resumes when the operation
136 : completes.
137 :
138 : @li The operation completes when:
139 : @li At least one byte has been written from the buffer sequence
140 : @li An error occurs (including connection reset by peer)
141 : @li The operation is cancelled via stop token or `cancel()`
142 :
143 : @par Concurrency
144 : At most one read operation may be in flight concurrently with
145 : this write. No other write operations may be in flight until
146 : this operation completes. Note that concurrent in-flight
147 : operations does not imply the initiating calls may be made
148 : concurrently; all calls must be serialized.
149 :
150 : @par Cancellation
151 : Supports cancellation via `std::stop_token` propagated through
152 : the IoAwaitable protocol, or via the I/O object's `cancel()`
153 : member. When cancelled, the operation completes with an error
154 : that compares equal to `capy::cond::canceled`.
155 :
156 : @par Preconditions
157 : The stream must be open and connected.
158 :
159 : @param buffers The buffer sequence containing data to write.
160 : The caller retains ownership and must ensure validity
161 : until the operation completes.
162 :
163 : @return An awaitable yielding `(error_code, std::size_t)`.
164 : On success, `bytes_transferred` contains the number of bytes
165 : written. Compare error codes to conditions, not specific
166 : values:
167 : @li `capy::cond::canceled` - Operation was cancelled
168 : @li `std::errc::broken_pipe` - Peer closed connection
169 :
170 : @par Example
171 : @code
172 : // Write all data
173 : std::string_view data = "Hello, World!";
174 : std::size_t written = 0;
175 : while( written < data.size() )
176 : {
177 : auto [ec, n] = co_await stream.write_some(
178 : capy::buffer( data.data() + written,
179 : data.size() - written ) );
180 : if( ec.failed() )
181 : capy::detail::throw_system_error( ec );
182 : written += n;
183 : }
184 : @endcode
185 :
186 : @note This operation may write fewer bytes than the buffer
187 : contains. Use a loop or `capy::async_write` to write
188 : all data.
189 :
190 : @see read_some, capy::async_write
191 : */
192 : template<capy::ConstBufferSequence CB>
193 231257 : auto write_some(CB const& buffers)
194 : {
195 231257 : return write_some_awaitable<CB>(*this, buffers);
196 : }
197 :
198 : protected:
199 : /// Awaitable for async read operations.
200 : template<class MutableBufferSequence>
201 : struct read_some_awaitable
202 : {
203 : io_stream& ios_;
204 : MutableBufferSequence buffers_;
205 : std::stop_token token_;
206 : mutable std::error_code ec_;
207 : mutable std::size_t bytes_transferred_ = 0;
208 :
209 231502 : read_some_awaitable(
210 : io_stream& ios,
211 : MutableBufferSequence buffers) noexcept
212 231502 : : ios_(ios)
213 231502 : , buffers_(std::move(buffers))
214 : {
215 231502 : }
216 :
217 231502 : bool await_ready() const noexcept
218 : {
219 231502 : return token_.stop_requested();
220 : }
221 :
222 231502 : capy::io_result<std::size_t> await_resume() const noexcept
223 : {
224 231502 : if (token_.stop_requested())
225 198 : return {make_error_code(std::errc::operation_canceled), 0};
226 231304 : return {ec_, bytes_transferred_};
227 : }
228 :
229 231502 : auto await_suspend(
230 : std::coroutine_handle<> h,
231 : capy::executor_ref ex,
232 : std::stop_token token) -> std::coroutine_handle<>
233 : {
234 231502 : token_ = std::move(token);
235 231502 : return ios_.get().read_some(h, ex, buffers_, token_, &ec_, &bytes_transferred_);
236 : }
237 : };
238 :
239 : /// Awaitable for async write operations.
240 : template<class ConstBufferSequence>
241 : struct write_some_awaitable
242 : {
243 : io_stream& ios_;
244 : ConstBufferSequence buffers_;
245 : std::stop_token token_;
246 : mutable std::error_code ec_;
247 : mutable std::size_t bytes_transferred_ = 0;
248 :
249 231257 : write_some_awaitable(
250 : io_stream& ios,
251 : ConstBufferSequence buffers) noexcept
252 231257 : : ios_(ios)
253 231257 : , buffers_(std::move(buffers))
254 : {
255 231257 : }
256 :
257 231257 : bool await_ready() const noexcept
258 : {
259 231257 : return token_.stop_requested();
260 : }
261 :
262 231257 : capy::io_result<std::size_t> await_resume() const noexcept
263 : {
264 231257 : if (token_.stop_requested())
265 0 : return {make_error_code(std::errc::operation_canceled), 0};
266 231257 : return {ec_, bytes_transferred_};
267 : }
268 :
269 231257 : auto await_suspend(
270 : std::coroutine_handle<> h,
271 : capy::executor_ref ex,
272 : std::stop_token token) -> std::coroutine_handle<>
273 : {
274 231257 : token_ = std::move(token);
275 231257 : return ios_.get().write_some(h, ex, buffers_, token_, &ec_, &bytes_transferred_);
276 : }
277 : };
278 :
279 : public:
280 : /** Platform-specific stream implementation interface.
281 :
282 : Derived classes implement this interface to provide kernel-level
283 : read and write operations for each supported platform (IOCP,
284 : epoll, kqueue, io_uring).
285 : */
286 : struct io_stream_impl : io_object_impl
287 : {
288 : /// Initiate platform read operation.
289 : virtual std::coroutine_handle<> read_some(
290 : std::coroutine_handle<>,
291 : capy::executor_ref,
292 : io_buffer_param,
293 : std::stop_token,
294 : std::error_code*,
295 : std::size_t*) = 0;
296 :
297 : /// Initiate platform write operation.
298 : virtual std::coroutine_handle<> write_some(
299 : std::coroutine_handle<>,
300 : capy::executor_ref,
301 : io_buffer_param,
302 : std::stop_token,
303 : std::error_code*,
304 : std::size_t*) = 0;
305 : };
306 :
307 : protected:
308 : /// Construct stream bound to the given execution context.
309 : explicit
310 17957 : io_stream(
311 : capy::execution_context& ctx) noexcept
312 17957 : : io_object(ctx)
313 : {
314 17957 : }
315 :
316 : private:
317 : /// Return implementation downcasted to stream interface.
318 462759 : io_stream_impl& get() const noexcept
319 : {
320 462759 : return *static_cast<io_stream_impl*>(impl_);
321 : }
322 : };
323 :
324 : } // namespace boost::corosio
325 :
326 : #endif
|