1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/corosio
7  
// Official repository: https://github.com/cppalliance/corosio
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_COROSIO_IO_STREAM_HPP
10  
#ifndef BOOST_COROSIO_IO_STREAM_HPP
11  
#define BOOST_COROSIO_IO_STREAM_HPP
11  
#define BOOST_COROSIO_IO_STREAM_HPP
12  

12  

13  
#include <boost/corosio/detail/config.hpp>
13  
#include <boost/corosio/detail/config.hpp>
14  
#include <boost/corosio/io_object.hpp>
14  
#include <boost/corosio/io_object.hpp>
15  
#include <boost/capy/io_result.hpp>
15  
#include <boost/capy/io_result.hpp>
16  
#include <boost/corosio/io_buffer_param.hpp>
16  
#include <boost/corosio/io_buffer_param.hpp>
17  
#include <boost/capy/ex/executor_ref.hpp>
17  
#include <boost/capy/ex/executor_ref.hpp>
18  
#include <system_error>
18  
#include <system_error>
19  

19  

20  
#include <coroutine>
20  
#include <coroutine>
21  
#include <cstddef>
21  
#include <cstddef>
22  
#include <stop_token>
22  
#include <stop_token>
23  

23  

24  
namespace boost::corosio {
24  
namespace boost::corosio {
25  

25  

26  
/** Platform stream with read/write operations.
26  
/** Platform stream with read/write operations.
27  

27  

28  
    This base class provides the fundamental async read and write
28  
    This base class provides the fundamental async read and write
29  
    operations for kernel-level stream I/O. Derived classes wrap
29  
    operations for kernel-level stream I/O. Derived classes wrap
30  
    OS-specific stream implementations (sockets, pipes, etc.) and
30  
    OS-specific stream implementations (sockets, pipes, etc.) and
31  
    satisfy @ref capy::ReadStream and @ref capy::WriteStream concepts.
31  
    satisfy @ref capy::ReadStream and @ref capy::WriteStream concepts.
32  

32  

33  
    @par Semantics
33  
    @par Semantics
34  
    Concrete classes wrap direct platform I/O completed by the kernel.
34  
    Concrete classes wrap direct platform I/O completed by the kernel.
35  
    Functions taking `io_stream&` signal "platform implementation
35  
    Functions taking `io_stream&` signal "platform implementation
36  
    required" - use this when you need actual kernel I/O rather than
36  
    required" - use this when you need actual kernel I/O rather than
37  
    a mock or test double.
37  
    a mock or test double.
38  

38  

39  
    For generic stream algorithms that work with test mocks,
39  
    For generic stream algorithms that work with test mocks,
40  
    use `template<capy::Stream S>` instead of `io_stream&`.
40  
    use `template<capy::Stream S>` instead of `io_stream&`.
41  

41  

42  
    @par Thread Safety
42  
    @par Thread Safety
43  
    Distinct objects: Safe.
43  
    Distinct objects: Safe.
44  
    Shared objects: Unsafe. All calls to a single stream must be made
44  
    Shared objects: Unsafe. All calls to a single stream must be made
45  
    from the same implicit or explicit serialization context.
45  
    from the same implicit or explicit serialization context.
46  

46  

47  
    @par Example
47  
    @par Example
48  
    @code
48  
    @code
49  
    // Read until buffer full or EOF
49  
    // Read until buffer full or EOF
50  
    capy::task<> read_all( io_stream& stream, std::span<char> buf )
50  
    capy::task<> read_all( io_stream& stream, std::span<char> buf )
51  
    {
51  
    {
52  
        std::size_t total = 0;
52  
        std::size_t total = 0;
53  
        while( total < buf.size() )
53  
        while( total < buf.size() )
54  
        {
54  
        {
55  
            auto [ec, n] = co_await stream.read_some(
55  
            auto [ec, n] = co_await stream.read_some(
56  
                capy::buffer( buf.data() + total, buf.size() - total ) );
56  
                capy::buffer( buf.data() + total, buf.size() - total ) );
57  
            if( ec == capy::cond::eof )
57  
            if( ec == capy::cond::eof )
58  
                break;
58  
                break;
59  
            if( ec.failed() )
59  
            if( ec.failed() )
60  
                capy::detail::throw_system_error( ec );
60  
                capy::detail::throw_system_error( ec );
61  
            total += n;
61  
            total += n;
62  
        }
62  
        }
63  
    }
63  
    }
64  
    @endcode
64  
    @endcode
65  

65  

66  
    @see capy::Stream, capy::ReadStream, capy::WriteStream, tcp_socket
66  
    @see capy::Stream, capy::ReadStream, capy::WriteStream, tcp_socket
67  
*/
67  
*/
68  
class BOOST_COROSIO_DECL io_stream : public io_object
68  
class BOOST_COROSIO_DECL io_stream : public io_object
69  
{
69  
{
70  
public:
70  
public:
71  
    /** Asynchronously read data from the stream.
71  
    /** Asynchronously read data from the stream.
72  

72  

73  
        This operation suspends the calling coroutine and initiates a
73  
        This operation suspends the calling coroutine and initiates a
74  
        kernel-level read. The coroutine resumes when the operation
74  
        kernel-level read. The coroutine resumes when the operation
75  
        completes.
75  
        completes.
76  

76  

77  
        @li The operation completes when:
77  
        @li The operation completes when:
78  
        @li At least one byte has been read into the buffer sequence
78  
        @li At least one byte has been read into the buffer sequence
79  
        @li The peer closes the connection (EOF)
79  
        @li The peer closes the connection (EOF)
80  
        @li An error occurs
80  
        @li An error occurs
81  
        @li The operation is cancelled via stop token or `cancel()`
81  
        @li The operation is cancelled via stop token or `cancel()`
82  

82  

83  
        @par Concurrency
83  
        @par Concurrency
84  
        At most one write operation may be in flight concurrently with
84  
        At most one write operation may be in flight concurrently with
85  
        this read. No other read operations may be in flight until this
85  
        this read. No other read operations may be in flight until this
86  
        operation completes. Note that concurrent in-flight operations
86  
        operation completes. Note that concurrent in-flight operations
87  
        does not imply the initiating calls may be made concurrently;
87  
        does not imply the initiating calls may be made concurrently;
88  
        all calls must be serialized.
88  
        all calls must be serialized.
89  

89  

90  
        @par Cancellation
90  
        @par Cancellation
91  
        Supports cancellation via `std::stop_token` propagated through
91  
        Supports cancellation via `std::stop_token` propagated through
92  
        the IoAwaitable protocol, or via the I/O object's `cancel()`
92  
        the IoAwaitable protocol, or via the I/O object's `cancel()`
93  
        member. When cancelled, the operation completes with an error
93  
        member. When cancelled, the operation completes with an error
94  
        that compares equal to `capy::cond::canceled`.
94  
        that compares equal to `capy::cond::canceled`.
95  

95  

96  
        @par Preconditions
96  
        @par Preconditions
97  
        The stream must be open and connected.
97  
        The stream must be open and connected.
98  

98  

99  
        @param buffers The buffer sequence to read data into. The caller
99  
        @param buffers The buffer sequence to read data into. The caller
100  
            retains ownership and must ensure validity until the
100  
            retains ownership and must ensure validity until the
101  
            operation completes.
101  
            operation completes.
102  

102  

103  
        @return An awaitable yielding `(error_code, std::size_t)`.
103  
        @return An awaitable yielding `(error_code, std::size_t)`.
104  
            On success, `bytes_transferred` contains the number of bytes
104  
            On success, `bytes_transferred` contains the number of bytes
105  
            read. Compare error codes to conditions, not specific values:
105  
            read. Compare error codes to conditions, not specific values:
106  
            @li `capy::cond::eof` - Peer closed connection (TCP FIN)
106  
            @li `capy::cond::eof` - Peer closed connection (TCP FIN)
107  
            @li `capy::cond::canceled` - Operation was cancelled
107  
            @li `capy::cond::canceled` - Operation was cancelled
108  

108  

109  
        @par Example
109  
        @par Example
110  
        @code
110  
        @code
111  
        // Simple read with error handling
111  
        // Simple read with error handling
112  
        auto [ec, n] = co_await stream.read_some( capy::buffer( buf ) );
112  
        auto [ec, n] = co_await stream.read_some( capy::buffer( buf ) );
113  
        if( ec == capy::cond::eof )
113  
        if( ec == capy::cond::eof )
114  
            co_return;  // Connection closed gracefully
114  
            co_return;  // Connection closed gracefully
115  
        if( ec.failed() )
115  
        if( ec.failed() )
116  
            capy::detail::throw_system_error( ec );
116  
            capy::detail::throw_system_error( ec );
117  
        process( buf, n );
117  
        process( buf, n );
118  
        @endcode
118  
        @endcode
119  

119  

120  
        @note This operation may read fewer bytes than the buffer
120  
        @note This operation may read fewer bytes than the buffer
121  
            capacity. Use a loop or `capy::async_read` to read an
121  
            capacity. Use a loop or `capy::async_read` to read an
122  
            exact amount.
122  
            exact amount.
123  

123  

124  
        @see write_some, capy::async_read
124  
        @see write_some, capy::async_read
125  
    */
125  
    */
126  
    template<capy::MutableBufferSequence MB>
126  
    template<capy::MutableBufferSequence MB>
127  
    auto read_some(MB const& buffers)
127  
    auto read_some(MB const& buffers)
128  
    {
128  
    {
129  
        return read_some_awaitable<MB>(*this, buffers);
129  
        return read_some_awaitable<MB>(*this, buffers);
130  
    }
130  
    }
131  

131  

132  
    /** Asynchronously write data to the stream.
132  
    /** Asynchronously write data to the stream.
133  

133  

134  
        This operation suspends the calling coroutine and initiates a
134  
        This operation suspends the calling coroutine and initiates a
135  
        kernel-level write. The coroutine resumes when the operation
135  
        kernel-level write. The coroutine resumes when the operation
136  
        completes.
136  
        completes.
137  

137  

138  
        @li The operation completes when:
138  
        @li The operation completes when:
139  
        @li At least one byte has been written from the buffer sequence
139  
        @li At least one byte has been written from the buffer sequence
140  
        @li An error occurs (including connection reset by peer)
140  
        @li An error occurs (including connection reset by peer)
141  
        @li The operation is cancelled via stop token or `cancel()`
141  
        @li The operation is cancelled via stop token or `cancel()`
142  

142  

143  
        @par Concurrency
143  
        @par Concurrency
144  
        At most one read operation may be in flight concurrently with
144  
        At most one read operation may be in flight concurrently with
145  
        this write. No other write operations may be in flight until
145  
        this write. No other write operations may be in flight until
146  
        this operation completes. Note that concurrent in-flight
146  
        this operation completes. Note that concurrent in-flight
147  
        operations does not imply the initiating calls may be made
147  
        operations does not imply the initiating calls may be made
148  
        concurrently; all calls must be serialized.
148  
        concurrently; all calls must be serialized.
149  

149  

150  
        @par Cancellation
150  
        @par Cancellation
151  
        Supports cancellation via `std::stop_token` propagated through
151  
        Supports cancellation via `std::stop_token` propagated through
152  
        the IoAwaitable protocol, or via the I/O object's `cancel()`
152  
        the IoAwaitable protocol, or via the I/O object's `cancel()`
153  
        member. When cancelled, the operation completes with an error
153  
        member. When cancelled, the operation completes with an error
154  
        that compares equal to `capy::cond::canceled`.
154  
        that compares equal to `capy::cond::canceled`.
155  

155  

156  
        @par Preconditions
156  
        @par Preconditions
157  
        The stream must be open and connected.
157  
        The stream must be open and connected.
158  

158  

159  
        @param buffers The buffer sequence containing data to write.
159  
        @param buffers The buffer sequence containing data to write.
160  
            The caller retains ownership and must ensure validity
160  
            The caller retains ownership and must ensure validity
161  
            until the operation completes.
161  
            until the operation completes.
162  

162  

163  
        @return An awaitable yielding `(error_code, std::size_t)`.
163  
        @return An awaitable yielding `(error_code, std::size_t)`.
164  
            On success, `bytes_transferred` contains the number of bytes
164  
            On success, `bytes_transferred` contains the number of bytes
165  
            written. Compare error codes to conditions, not specific
165  
            written. Compare error codes to conditions, not specific
166  
            values:
166  
            values:
167  
            @li `capy::cond::canceled` - Operation was cancelled
167  
            @li `capy::cond::canceled` - Operation was cancelled
168  
            @li `std::errc::broken_pipe` - Peer closed connection
168  
            @li `std::errc::broken_pipe` - Peer closed connection
169  

169  

170  
        @par Example
170  
        @par Example
171  
        @code
171  
        @code
172  
        // Write all data
172  
        // Write all data
173  
        std::string_view data = "Hello, World!";
173  
        std::string_view data = "Hello, World!";
174  
        std::size_t written = 0;
174  
        std::size_t written = 0;
175  
        while( written < data.size() )
175  
        while( written < data.size() )
176  
        {
176  
        {
177  
            auto [ec, n] = co_await stream.write_some(
177  
            auto [ec, n] = co_await stream.write_some(
178  
                capy::buffer( data.data() + written,
178  
                capy::buffer( data.data() + written,
179  
                              data.size() - written ) );
179  
                              data.size() - written ) );
180  
            if( ec.failed() )
180  
            if( ec.failed() )
181  
                capy::detail::throw_system_error( ec );
181  
                capy::detail::throw_system_error( ec );
182  
            written += n;
182  
            written += n;
183  
        }
183  
        }
184  
        @endcode
184  
        @endcode
185  

185  

186  
        @note This operation may write fewer bytes than the buffer
186  
        @note This operation may write fewer bytes than the buffer
187  
            contains. Use a loop or `capy::async_write` to write
187  
            contains. Use a loop or `capy::async_write` to write
188  
            all data.
188  
            all data.
189  

189  

190  
        @see read_some, capy::async_write
190  
        @see read_some, capy::async_write
191  
    */
191  
    */
192  
    template<capy::ConstBufferSequence CB>
192  
    template<capy::ConstBufferSequence CB>
193  
    auto write_some(CB const& buffers)
193  
    auto write_some(CB const& buffers)
194  
    {
194  
    {
195  
        return write_some_awaitable<CB>(*this, buffers);
195  
        return write_some_awaitable<CB>(*this, buffers);
196  
    }
196  
    }
197  

197  

198  
protected:
198  
protected:
199  
    /// Awaitable for async read operations.
199  
    /// Awaitable for async read operations.
200  
    template<class MutableBufferSequence>
200  
    template<class MutableBufferSequence>
201  
    struct read_some_awaitable
201  
    struct read_some_awaitable
202  
    {
202  
    {
203  
        io_stream& ios_;
203  
        io_stream& ios_;
204  
        MutableBufferSequence buffers_;
204  
        MutableBufferSequence buffers_;
205  
        std::stop_token token_;
205  
        std::stop_token token_;
206  
        mutable std::error_code ec_;
206  
        mutable std::error_code ec_;
207  
        mutable std::size_t bytes_transferred_ = 0;
207  
        mutable std::size_t bytes_transferred_ = 0;
208  

208  

209  
        read_some_awaitable(
209  
        read_some_awaitable(
210  
            io_stream& ios,
210  
            io_stream& ios,
211  
            MutableBufferSequence buffers) noexcept
211  
            MutableBufferSequence buffers) noexcept
212  
            : ios_(ios)
212  
            : ios_(ios)
213  
            , buffers_(std::move(buffers))
213  
            , buffers_(std::move(buffers))
214  
        {
214  
        {
215  
        }
215  
        }
216  

216  

217  
        bool await_ready() const noexcept
217  
        bool await_ready() const noexcept
218  
        {
218  
        {
219  
            return token_.stop_requested();
219  
            return token_.stop_requested();
220  
        }
220  
        }
221  

221  

222  
        capy::io_result<std::size_t> await_resume() const noexcept
222  
        capy::io_result<std::size_t> await_resume() const noexcept
223  
        {
223  
        {
224  
            if (token_.stop_requested())
224  
            if (token_.stop_requested())
225  
                return {make_error_code(std::errc::operation_canceled), 0};
225  
                return {make_error_code(std::errc::operation_canceled), 0};
226  
            return {ec_, bytes_transferred_};
226  
            return {ec_, bytes_transferred_};
227  
        }
227  
        }
228  

228  

229  
        auto await_suspend(
229  
        auto await_suspend(
230  
            std::coroutine_handle<> h,
230  
            std::coroutine_handle<> h,
231  
            capy::executor_ref ex,
231  
            capy::executor_ref ex,
232  
            std::stop_token token) -> std::coroutine_handle<>
232  
            std::stop_token token) -> std::coroutine_handle<>
233  
        {
233  
        {
234  
            token_ = std::move(token);
234  
            token_ = std::move(token);
235  
            return ios_.get().read_some(h, ex, buffers_, token_, &ec_, &bytes_transferred_);
235  
            return ios_.get().read_some(h, ex, buffers_, token_, &ec_, &bytes_transferred_);
236  
        }
236  
        }
237  
    };
237  
    };
238  

238  

239  
    /// Awaitable for async write operations.
239  
    /// Awaitable for async write operations.
240  
    template<class ConstBufferSequence>
240  
    template<class ConstBufferSequence>
241  
    struct write_some_awaitable
241  
    struct write_some_awaitable
242  
    {
242  
    {
243  
        io_stream& ios_;
243  
        io_stream& ios_;
244  
        ConstBufferSequence buffers_;
244  
        ConstBufferSequence buffers_;
245  
        std::stop_token token_;
245  
        std::stop_token token_;
246  
        mutable std::error_code ec_;
246  
        mutable std::error_code ec_;
247  
        mutable std::size_t bytes_transferred_ = 0;
247  
        mutable std::size_t bytes_transferred_ = 0;
248  

248  

249  
        write_some_awaitable(
249  
        write_some_awaitable(
250  
            io_stream& ios,
250  
            io_stream& ios,
251  
            ConstBufferSequence buffers) noexcept
251  
            ConstBufferSequence buffers) noexcept
252  
            : ios_(ios)
252  
            : ios_(ios)
253  
            , buffers_(std::move(buffers))
253  
            , buffers_(std::move(buffers))
254  
        {
254  
        {
255  
        }
255  
        }
256  

256  

257  
        bool await_ready() const noexcept
257  
        bool await_ready() const noexcept
258  
        {
258  
        {
259  
            return token_.stop_requested();
259  
            return token_.stop_requested();
260  
        }
260  
        }
261  

261  

262  
        capy::io_result<std::size_t> await_resume() const noexcept
262  
        capy::io_result<std::size_t> await_resume() const noexcept
263  
        {
263  
        {
264  
            if (token_.stop_requested())
264  
            if (token_.stop_requested())
265  
                return {make_error_code(std::errc::operation_canceled), 0};
265  
                return {make_error_code(std::errc::operation_canceled), 0};
266  
            return {ec_, bytes_transferred_};
266  
            return {ec_, bytes_transferred_};
267  
        }
267  
        }
268  

268  

269  
        auto await_suspend(
269  
        auto await_suspend(
270  
            std::coroutine_handle<> h,
270  
            std::coroutine_handle<> h,
271  
            capy::executor_ref ex,
271  
            capy::executor_ref ex,
272  
            std::stop_token token) -> std::coroutine_handle<>
272  
            std::stop_token token) -> std::coroutine_handle<>
273  
        {
273  
        {
274  
            token_ = std::move(token);
274  
            token_ = std::move(token);
275  
            return ios_.get().write_some(h, ex, buffers_, token_, &ec_, &bytes_transferred_);
275  
            return ios_.get().write_some(h, ex, buffers_, token_, &ec_, &bytes_transferred_);
276  
        }
276  
        }
277  
    };
277  
    };
278  

278  

279  
public:
279  
public:
280  
    /** Platform-specific stream implementation interface.
280  
    /** Platform-specific stream implementation interface.
281  

281  

282  
        Derived classes implement this interface to provide kernel-level
282  
        Derived classes implement this interface to provide kernel-level
283  
        read and write operations for each supported platform (IOCP,
283  
        read and write operations for each supported platform (IOCP,
284  
        epoll, kqueue, io_uring).
284  
        epoll, kqueue, io_uring).
285  
    */
285  
    */
286  
    struct io_stream_impl : io_object_impl
286  
    struct io_stream_impl : io_object_impl
287  
    {
287  
    {
288  
        /// Initiate platform read operation.
288  
        /// Initiate platform read operation.
289  
        virtual std::coroutine_handle<> read_some(
289  
        virtual std::coroutine_handle<> read_some(
290  
            std::coroutine_handle<>,
290  
            std::coroutine_handle<>,
291  
            capy::executor_ref,
291  
            capy::executor_ref,
292  
            io_buffer_param,
292  
            io_buffer_param,
293  
            std::stop_token,
293  
            std::stop_token,
294  
            std::error_code*,
294  
            std::error_code*,
295  
            std::size_t*) = 0;
295  
            std::size_t*) = 0;
296  

296  

297  
        /// Initiate platform write operation.
297  
        /// Initiate platform write operation.
298  
        virtual std::coroutine_handle<> write_some(
298  
        virtual std::coroutine_handle<> write_some(
299  
            std::coroutine_handle<>,
299  
            std::coroutine_handle<>,
300  
            capy::executor_ref,
300  
            capy::executor_ref,
301  
            io_buffer_param,
301  
            io_buffer_param,
302  
            std::stop_token,
302  
            std::stop_token,
303  
            std::error_code*,
303  
            std::error_code*,
304  
            std::size_t*) = 0;
304  
            std::size_t*) = 0;
305  
    };
305  
    };
306  

306  

307  
protected:
307  
protected:
308  
    /// Construct stream bound to the given execution context.
308  
    /// Construct stream bound to the given execution context.
309  
    explicit
309  
    explicit
310  
    io_stream(
310  
    io_stream(
311  
        capy::execution_context& ctx) noexcept
311  
        capy::execution_context& ctx) noexcept
312  
        : io_object(ctx)
312  
        : io_object(ctx)
313  
    {
313  
    {
314  
    }
314  
    }
315  

315  

316  
private:
316  
private:
317  
    /// Return implementation downcasted to stream interface.
317  
    /// Return implementation downcasted to stream interface.
318  
    io_stream_impl& get() const noexcept
318  
    io_stream_impl& get() const noexcept
319  
    {
319  
    {
320  
        return *static_cast<io_stream_impl*>(impl_);
320  
        return *static_cast<io_stream_impl*>(impl_);
321  
    }
321  
    }
322  
};
322  
};
323  

323  

324  
} // namespace boost::corosio
324  
} // namespace boost::corosio
325  

325  

326  
#endif
326  
#endif