LCOV - code coverage report
Current view: top level - /jenkins/workspace/boost-root/boost/corosio - io_stream.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 97.1 % 35 34
Test Date: 2026-02-06 05:04:16 Functions: 100.0 % 27 27

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/corosio
       8              : //
       9              : 
      10              : #ifndef BOOST_COROSIO_IO_STREAM_HPP
      11              : #define BOOST_COROSIO_IO_STREAM_HPP
      12              : 
      13              : #include <boost/corosio/detail/config.hpp>
      14              : #include <boost/corosio/io_object.hpp>
      15              : #include <boost/capy/io_result.hpp>
      16              : #include <boost/corosio/io_buffer_param.hpp>
      17              : #include <boost/capy/ex/executor_ref.hpp>
      18              : #include <system_error>
      19              : 
      20              : #include <coroutine>
      21              : #include <cstddef>
      22              : #include <stop_token>
      23              : 
      24              : namespace boost::corosio {
      25              : 
      26              : /** Platform stream with read/write operations.
      27              : 
      28              :     This base class provides the fundamental async read and write
      29              :     operations for kernel-level stream I/O. Derived classes wrap
      30              :     OS-specific stream implementations (sockets, pipes, etc.) and
      31              :     satisfy @ref capy::ReadStream and @ref capy::WriteStream concepts.
      32              : 
      33              :     @par Semantics
      34              :     Concrete classes wrap direct platform I/O completed by the kernel.
      35              :     Functions taking `io_stream&` signal "platform implementation
      36              :     required" - use this when you need actual kernel I/O rather than
      37              :     a mock or test double.
      38              : 
      39              :     For generic stream algorithms that work with test mocks,
      40              :     use `template<capy::Stream S>` instead of `io_stream&`.
      41              : 
      42              :     @par Thread Safety
      43              :     Distinct objects: Safe.
      44              :     Shared objects: Unsafe. All calls to a single stream must be made
      45              :     from the same implicit or explicit serialization context.
      46              : 
      47              :     @par Example
      48              :     @code
      49              :     // Read until buffer full or EOF
      50              :     capy::task<> read_all( io_stream& stream, std::span<char> buf )
      51              :     {
      52              :         std::size_t total = 0;
      53              :         while( total < buf.size() )
      54              :         {
      55              :             auto [ec, n] = co_await stream.read_some(
      56              :                 capy::buffer( buf.data() + total, buf.size() - total ) );
      57              :             if( ec == capy::cond::eof )
      58              :                 break;
      59              :             if( ec.failed() )
      60              :                 capy::detail::throw_system_error( ec );
      61              :             total += n;
      62              :         }
      63              :     }
      64              :     @endcode
      65              : 
      66              :     @see capy::Stream, capy::ReadStream, capy::WriteStream, tcp_socket
      67              : */
      68              : class BOOST_COROSIO_DECL io_stream : public io_object
      69              : {
      70              : public:
      71              :     /** Asynchronously read data from the stream.
      72              : 
      73              :         This operation suspends the calling coroutine and initiates a
      74              :         kernel-level read. The coroutine resumes when the operation
      75              :         completes.
      76              : 
      77              :         @li The operation completes when:
      78              :         @li At least one byte has been read into the buffer sequence
      79              :         @li The peer closes the connection (EOF)
      80              :         @li An error occurs
      81              :         @li The operation is cancelled via stop token or `cancel()`
      82              : 
      83              :         @par Concurrency
      84              :         At most one write operation may be in flight concurrently with
      85              :         this read. No other read operations may be in flight until this
      86              :         operation completes. Note that concurrent in-flight operations
      87              :         does not imply the initiating calls may be made concurrently;
      88              :         all calls must be serialized.
      89              : 
      90              :         @par Cancellation
      91              :         Supports cancellation via `std::stop_token` propagated through
      92              :         the IoAwaitable protocol, or via the I/O object's `cancel()`
      93              :         member. When cancelled, the operation completes with an error
      94              :         that compares equal to `capy::cond::canceled`.
      95              : 
      96              :         @par Preconditions
      97              :         The stream must be open and connected.
      98              : 
      99              :         @param buffers The buffer sequence to read data into. The caller
     100              :             retains ownership and must ensure validity until the
     101              :             operation completes.
     102              : 
     103              :         @return An awaitable yielding `(error_code, std::size_t)`.
     104              :             On success, `bytes_transferred` contains the number of bytes
     105              :             read. Compare error codes to conditions, not specific values:
     106              :             @li `capy::cond::eof` - Peer closed connection (TCP FIN)
     107              :             @li `capy::cond::canceled` - Operation was cancelled
     108              : 
     109              :         @par Example
     110              :         @code
     111              :         // Simple read with error handling
     112              :         auto [ec, n] = co_await stream.read_some( capy::buffer( buf ) );
     113              :         if( ec == capy::cond::eof )
     114              :             co_return;  // Connection closed gracefully
     115              :         if( ec.failed() )
     116              :             capy::detail::throw_system_error( ec );
     117              :         process( buf, n );
     118              :         @endcode
     119              : 
     120              :         @note This operation may read fewer bytes than the buffer
     121              :             capacity. Use a loop or `capy::async_read` to read an
     122              :             exact amount.
     123              : 
     124              :         @see write_some, capy::async_read
     125              :     */
     126              :     template<capy::MutableBufferSequence MB>
     127       231502 :     auto read_some(MB const& buffers)
     128              :     {
     129       231502 :         return read_some_awaitable<MB>(*this, buffers);
     130              :     }
     131              : 
     132              :     /** Asynchronously write data to the stream.
     133              : 
     134              :         This operation suspends the calling coroutine and initiates a
     135              :         kernel-level write. The coroutine resumes when the operation
     136              :         completes.
     137              : 
     138              :         @li The operation completes when:
     139              :         @li At least one byte has been written from the buffer sequence
     140              :         @li An error occurs (including connection reset by peer)
     141              :         @li The operation is cancelled via stop token or `cancel()`
     142              : 
     143              :         @par Concurrency
     144              :         At most one read operation may be in flight concurrently with
     145              :         this write. No other write operations may be in flight until
     146              :         this operation completes. Note that concurrent in-flight
     147              :         operations does not imply the initiating calls may be made
     148              :         concurrently; all calls must be serialized.
     149              : 
     150              :         @par Cancellation
     151              :         Supports cancellation via `std::stop_token` propagated through
     152              :         the IoAwaitable protocol, or via the I/O object's `cancel()`
     153              :         member. When cancelled, the operation completes with an error
     154              :         that compares equal to `capy::cond::canceled`.
     155              : 
     156              :         @par Preconditions
     157              :         The stream must be open and connected.
     158              : 
     159              :         @param buffers The buffer sequence containing data to write.
     160              :             The caller retains ownership and must ensure validity
     161              :             until the operation completes.
     162              : 
     163              :         @return An awaitable yielding `(error_code, std::size_t)`.
     164              :             On success, `bytes_transferred` contains the number of bytes
     165              :             written. Compare error codes to conditions, not specific
     166              :             values:
     167              :             @li `capy::cond::canceled` - Operation was cancelled
     168              :             @li `std::errc::broken_pipe` - Peer closed connection
     169              : 
     170              :         @par Example
     171              :         @code
     172              :         // Write all data
     173              :         std::string_view data = "Hello, World!";
     174              :         std::size_t written = 0;
     175              :         while( written < data.size() )
     176              :         {
     177              :             auto [ec, n] = co_await stream.write_some(
     178              :                 capy::buffer( data.data() + written,
     179              :                               data.size() - written ) );
     180              :             if( ec.failed() )
     181              :                 capy::detail::throw_system_error( ec );
     182              :             written += n;
     183              :         }
     184              :         @endcode
     185              : 
     186              :         @note This operation may write fewer bytes than the buffer
     187              :             contains. Use a loop or `capy::async_write` to write
     188              :             all data.
     189              : 
     190              :         @see read_some, capy::async_write
     191              :     */
     192              :     template<capy::ConstBufferSequence CB>
     193       231257 :     auto write_some(CB const& buffers)
     194              :     {
     195       231257 :         return write_some_awaitable<CB>(*this, buffers);
     196              :     }
     197              : 
     198              : protected:
     199              :     /// Awaitable for async read operations.
     200              :     template<class MutableBufferSequence>
     201              :     struct read_some_awaitable
     202              :     {
     203              :         io_stream& ios_;
     204              :         MutableBufferSequence buffers_;
     205              :         std::stop_token token_;
     206              :         mutable std::error_code ec_;
     207              :         mutable std::size_t bytes_transferred_ = 0;
     208              : 
     209       231502 :         read_some_awaitable(
     210              :             io_stream& ios,
     211              :             MutableBufferSequence buffers) noexcept
     212       231502 :             : ios_(ios)
     213       231502 :             , buffers_(std::move(buffers))
     214              :         {
     215       231502 :         }
     216              : 
     217       231502 :         bool await_ready() const noexcept
     218              :         {
     219       231502 :             return token_.stop_requested();
     220              :         }
     221              : 
     222       231502 :         capy::io_result<std::size_t> await_resume() const noexcept
     223              :         {
     224       231502 :             if (token_.stop_requested())
     225          198 :                 return {make_error_code(std::errc::operation_canceled), 0};
     226       231304 :             return {ec_, bytes_transferred_};
     227              :         }
     228              : 
     229       231502 :         auto await_suspend(
     230              :             std::coroutine_handle<> h,
     231              :             capy::executor_ref ex,
     232              :             std::stop_token token) -> std::coroutine_handle<>
     233              :         {
     234       231502 :             token_ = std::move(token);
     235       231502 :             return ios_.get().read_some(h, ex, buffers_, token_, &ec_, &bytes_transferred_);
     236              :         }
     237              :     };
     238              : 
     239              :     /// Awaitable for async write operations.
     240              :     template<class ConstBufferSequence>
     241              :     struct write_some_awaitable
     242              :     {
     243              :         io_stream& ios_;
     244              :         ConstBufferSequence buffers_;
     245              :         std::stop_token token_;
     246              :         mutable std::error_code ec_;
     247              :         mutable std::size_t bytes_transferred_ = 0;
     248              : 
     249       231257 :         write_some_awaitable(
     250              :             io_stream& ios,
     251              :             ConstBufferSequence buffers) noexcept
     252       231257 :             : ios_(ios)
     253       231257 :             , buffers_(std::move(buffers))
     254              :         {
     255       231257 :         }
     256              : 
     257       231257 :         bool await_ready() const noexcept
     258              :         {
     259       231257 :             return token_.stop_requested();
     260              :         }
     261              : 
     262       231257 :         capy::io_result<std::size_t> await_resume() const noexcept
     263              :         {
     264       231257 :             if (token_.stop_requested())
     265            0 :                 return {make_error_code(std::errc::operation_canceled), 0};
     266       231257 :             return {ec_, bytes_transferred_};
     267              :         }
     268              : 
     269       231257 :         auto await_suspend(
     270              :             std::coroutine_handle<> h,
     271              :             capy::executor_ref ex,
     272              :             std::stop_token token) -> std::coroutine_handle<>
     273              :         {
     274       231257 :             token_ = std::move(token);
     275       231257 :             return ios_.get().write_some(h, ex, buffers_, token_, &ec_, &bytes_transferred_);
     276              :         }
     277              :     };
     278              : 
     279              : public:
     280              :     /** Platform-specific stream implementation interface.
     281              : 
     282              :         Derived classes implement this interface to provide kernel-level
     283              :         read and write operations for each supported platform (IOCP,
     284              :         epoll, kqueue, io_uring).
     285              :     */
     286              :     struct io_stream_impl : io_object_impl
     287              :     {
     288              :         /// Initiate platform read operation.
     289              :         virtual std::coroutine_handle<> read_some(
     290              :             std::coroutine_handle<>,
     291              :             capy::executor_ref,
     292              :             io_buffer_param,
     293              :             std::stop_token,
     294              :             std::error_code*,
     295              :             std::size_t*) = 0;
     296              : 
     297              :         /// Initiate platform write operation.
     298              :         virtual std::coroutine_handle<> write_some(
     299              :             std::coroutine_handle<>,
     300              :             capy::executor_ref,
     301              :             io_buffer_param,
     302              :             std::stop_token,
     303              :             std::error_code*,
     304              :             std::size_t*) = 0;
     305              :     };
     306              : 
     307              : protected:
     308              :     /// Construct stream bound to the given execution context.
     309              :     explicit
     310        17957 :     io_stream(
     311              :         capy::execution_context& ctx) noexcept
     312        17957 :         : io_object(ctx)
     313              :     {
     314        17957 :     }
     315              : 
     316              : private:
     317              :     /// Return implementation downcasted to stream interface.
     318       462759 :     io_stream_impl& get() const noexcept
     319              :     {
     320       462759 :         return *static_cast<io_stream_impl*>(impl_);
     321              :     }
     322              : };
     323              : 
     324              : } // namespace boost::corosio
     325              : 
     326              : #endif
        

Generated by: LCOV version 2.3