LCOV - code coverage report
Current view: top level - capy/test - stream.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 94.8 % 115 109 6
Test Date: 2026-03-21 03:20:11 Functions: 86.1 % 36 31 5

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_CAPY_TEST_STREAM_HPP
      11                 : #define BOOST_CAPY_TEST_STREAM_HPP
      12                 : 
      13                 : #include <boost/capy/detail/config.hpp>
      14                 : #include <boost/capy/buffers.hpp>
      15                 : #include <boost/capy/buffers/buffer_copy.hpp>
      16                 : #include <boost/capy/buffers/make_buffer.hpp>
      17                 : #include <boost/capy/continuation.hpp>
      18                 : #include <coroutine>
      19                 : #include <boost/capy/ex/io_env.hpp>
      20                 : #include <boost/capy/io_result.hpp>
      21                 : #include <boost/capy/error.hpp>
      22                 : #include <boost/capy/read.hpp>
      23                 : #include <boost/capy/task.hpp>
      24                 : #include <boost/capy/test/fuse.hpp>
      25                 : #include <boost/capy/test/run_blocking.hpp>
      26                 : 
      27                 : #include <memory>
      28                 : #include <stop_token>
      29                 : #include <string>
      30                 : #include <string_view>
      31                 : #include <utility>
      32                 : 
      33                 : namespace boost {
      34                 : namespace capy {
      35                 : namespace test {
      36                 : 
      37                 : /** A connected stream for testing bidirectional I/O.
      38                 : 
      39                 :     Streams are created in pairs via @ref make_stream_pair.
      40                 :     Data written to one end becomes available for reading on
      41                 :     the other. If no data is available when @ref read_some
      42                 :     is called, the calling coroutine suspends until the peer
      43                 :     calls @ref write_some. The shared @ref fuse enables error
      44                 :     injection at controlled points in both directions.
      45                 : 
      46                 :     When the fuse injects an error or throws on one end, the
      47                 :     other end is automatically closed: any suspended reader is
      48                 :     resumed with `error::eof`, and subsequent operations on
      49                 :     both ends return `error::eof`. Calling @ref close on one
      50                 :     end signals eof to the peer's reads after draining any
      51                 :     buffered data, while the peer may still write.
      52                 : 
      53                 :     @par Thread Safety
      54                 :     Single-threaded only. Both ends of the pair must be
      55                 :     accessed from the same thread. Concurrent access is
      56                 :     undefined behavior.
      57                 : 
      58                 :     @par Example
      59                 :     @code
      60                 :     fuse f;
      61                 :     auto [a, b] = make_stream_pair( f );
      62                 : 
      63                 :     auto r = f.armed( [&]( fuse& ) -> task<> {
      64                 :         auto [ec, n] = co_await a.write_some(
      65                 :             const_buffer( "hello", 5 ) );
      66                 :         if( ec )
      67                 :             co_return;
      68                 : 
      69                 :         char buf[32];
      70                 :         auto [ec2, n2] = co_await b.read_some(
      71                 :             mutable_buffer( buf, sizeof( buf ) ) );
      72                 :         if( ec2 )
      73                 :             co_return;
      74                 :         // buf contains "hello"
      75                 :     } );
      76                 :     @endcode
      77                 : 
      78                 :     @see make_stream_pair, fuse
      79                 : */
      80                 : class stream
      81                 : {
      82                 :     // Single-threaded only. No concurrent access to either
      83                 :     // end of the pair. Both streams and all operations must
      84                 :     // run on the same thread.
      85                 : 
      86                 :     struct half
      87                 :     {
      88                 :         std::string buf;
      89                 :         std::size_t max_read_size = std::size_t(-1);
      90                 :         continuation pending_cont_;
      91                 :         executor_ref pending_ex;
      92                 :         bool eof = false;
      93                 :     };
      94                 : 
      95                 :     struct state
      96                 :     {
      97                 :         fuse f;
      98                 :         bool closed = false;
      99                 :         half sides[2];
     100                 : 
     101 HIT         280 :         explicit state(fuse f_) noexcept
     102             840 :             : f(std::move(f_))
     103                 :         {
     104             280 :         }
     105                 : 
     106                 :         // Set closed and resume any suspended readers
     107                 :         // with eof on both sides.
     108             208 :         void close()
     109                 :         {
     110             208 :             closed = true;
     111             624 :             for(auto& side : sides)
     112                 :             {
     113             416 :                 if(side.pending_cont_.h)
     114                 :                 {
     115              12 :                     side.pending_ex.post(side.pending_cont_);
     116              12 :                     side.pending_cont_.h = {};
     117              12 :                     side.pending_ex = {};
     118                 :                 }
     119                 :             }
     120             208 :         }
     121                 :     };
     122                 : 
     123                 :     // Wraps the maybe_fail() call. If the guard is
     124                 :     // not disarmed before destruction (fuse returned
     125                 :     // an error, or threw an exception), closes both
     126                 :     // ends so any suspended peer gets eof.
     127                 :     struct close_guard
     128                 :     {
     129                 :         state* st;
     130                 :         bool armed = true;
     131             300 :         void disarm() noexcept { armed = false; }
     132             508 :         ~close_guard() noexcept(false) { if(armed) st->close(); }
     133                 :     };
     134                 : 
     135                 :     std::shared_ptr<state> state_;
     136                 :     int index_;
     137                 : 
     138             560 :     stream(
     139                 :         std::shared_ptr<state> sp,
     140                 :         int index) noexcept
     141             560 :         : state_(std::move(sp))
     142             560 :         , index_(index)
     143                 :     {
     144             560 :     }
     145                 : 
     146                 :     friend std::pair<stream, stream>
     147                 :     make_stream_pair(fuse);
     148                 : 
     149                 : public:
     150                 :     stream(stream const&) = delete;
     151                 :     stream& operator=(stream const&) = delete;
     152             660 :     stream(stream&&) = default;
     153                 :     stream& operator=(stream&&) = default;
     154                 : 
     155                 :     /** Signal end-of-stream to the peer.
     156                 : 
     157                 :         Marks the peer's read direction as closed.
     158                 :         If the peer is suspended in @ref read_some,
     159                 :         it is resumed. The peer drains any buffered
     160                 :         data before receiving `error::eof`. Writes
     161                 :         from the peer are unaffected.
     162                 :     */
     163                 :     void
     164               3 :     close()
     165                 :     {
     166               3 :         int peer = 1 - index_;
     167               3 :         auto& side = state_->sides[peer];
     168               3 :         side.eof = true;
     169               3 :         if(side.pending_cont_.h)
     170                 :         {
     171               1 :             side.pending_ex.post(side.pending_cont_);
     172               1 :             side.pending_cont_.h = {};
     173               1 :             side.pending_ex = {};
     174                 :         }
     175               3 :     }
     176                 : 
     177                 :     /** Set the maximum bytes returned per read.
     178                 : 
     179                 :         Limits how many bytes @ref read_some returns in
     180                 :         a single call, simulating chunked network delivery.
     181                 :         The default is unlimited.
     182                 : 
     183                 :         @param n Maximum bytes per read.
     184                 :     */
     185                 :     void
     186              54 :     set_max_read_size(std::size_t n) noexcept
     187                 :     {
     188              54 :         state_->sides[index_].max_read_size = n;
     189              54 :     }
     190                 : 
     191                 :     /** Asynchronously read data from the stream.
     192                 : 
     193                 :         Transfers up to `buffer_size(buffers)` bytes from
     194                 :         data written by the peer. If no data is available,
     195                 :         the calling coroutine suspends until the peer calls
     196                 :         @ref write_some. Before every read, the attached
     197                 :         @ref fuse is consulted to possibly inject an error.
     198                 :         If the fuse fires, the peer is automatically closed.
     199                 :         If the stream is closed, returns `error::eof`.
     200                 :         The returned `std::size_t` is the number of bytes
     201                 :         transferred.
     202                 : 
     203                 :         @param buffers The mutable buffer sequence to receive data.
     204                 : 
     205                 :         @return An awaitable that await-returns `(error_code,std::size_t)`.
     206                 : 
     207                 :         @see fuse, close
     208                 :     */
     209                 :     template<MutableBufferSequence MB>
     210                 :     auto
     211             275 :     read_some(MB buffers)
     212                 :     {
     213                 :         struct awaitable
     214                 :         {
     215                 :             stream* self_;
     216                 :             MB buffers_;
     217                 : 
     218             275 :             bool await_ready() const noexcept
     219                 :             {
     220             275 :                 if(buffer_empty(buffers_))
     221               8 :                     return true;
     222             267 :                 auto* st = self_->state_.get();
     223             267 :                 auto& side = st->sides[self_->index_];
     224             532 :                 return st->closed || side.eof ||
     225             532 :                     !side.buf.empty();
     226                 :             }
     227                 : 
     228              25 :             std::coroutine_handle<> await_suspend(
     229                 :                 std::coroutine_handle<> h,
     230                 :                 io_env const* env) noexcept
     231                 :             {
     232              25 :                 auto& side = self_->state_->sides[
     233              25 :                     self_->index_];
     234              25 :                 side.pending_cont_.h = h;
     235              25 :                 side.pending_ex = env->executor;
     236              25 :                 return std::noop_coroutine();
     237                 :             }
     238                 : 
     239                 :             io_result<std::size_t>
     240             275 :             await_resume()
     241                 :             {
     242             275 :                 if(buffer_empty(buffers_))
     243               8 :                     return {{}, 0};
     244                 : 
     245             267 :                 auto* st = self_->state_.get();
     246             267 :                 auto& side = st->sides[
     247             267 :                     self_->index_];
     248                 : 
     249             267 :                 if(st->closed)
     250              12 :                     return {error::eof, 0};
     251                 : 
     252             255 :                 if(side.eof && side.buf.empty())
     253               3 :                     return {error::eof, 0};
     254                 : 
     255             252 :                 if(!side.eof)
     256                 :                 {
     257             252 :                     close_guard g{st};
     258             252 :                     auto ec = st->f.maybe_fail();
     259             199 :                     if(ec)
     260              53 :                         return {ec, 0};
     261             146 :                     g.disarm();
     262             252 :                 }
     263                 : 
     264             292 :                 std::size_t const n = buffer_copy(
     265             146 :                     buffers_, make_buffer(side.buf),
     266                 :                     side.max_read_size);
     267             146 :                 side.buf.erase(0, n);
     268             146 :                 return {{}, n};
     269                 :             }
     270                 :         };
     271             275 :         return awaitable{this, buffers};
     272                 :     }
     273                 : 
     274                 :     /** Asynchronously write data to the stream.
     275                 : 
     276                 :         Transfers up to `buffer_size(buffers)` bytes to the
     277                 :         peer's incoming buffer. If the peer is suspended in
     278                 :         @ref read_some, it is resumed. Before every write,
     279                 :         the attached @ref fuse is consulted to possibly inject
     280                 :         an error. If the fuse fires, the peer is automatically
     281                 :         closed. If the stream is closed, returns `error::eof`.
     282                 :         The returned `std::size_t` is the number of bytes
     283                 :         transferred.
     284                 : 
     285                 :         @param buffers The const buffer sequence containing
     286                 :             data to write.
     287                 : 
     288                 :         @return An awaitable that await-returns `(error_code,std::size_t)`.
     289                 : 
     290                 :         @see fuse, close
     291                 :     */
     292                 :     template<ConstBufferSequence CB>
     293                 :     auto
     294             260 :     write_some(CB buffers)
     295                 :     {
     296                 :         struct awaitable
     297                 :         {
     298                 :             stream* self_;
     299                 :             CB buffers_;
     300                 : 
     301             260 :             bool await_ready() const noexcept { return true; }
     302                 : 
     303 MIS           0 :             void await_suspend(
     304                 :                 std::coroutine_handle<>,
     305                 :                 io_env const*) const noexcept
     306                 :             {
     307               0 :             }
     308                 : 
     309                 :             io_result<std::size_t>
     310 HIT         260 :             await_resume()
     311                 :             {
     312             260 :                 std::size_t n = buffer_size(buffers_);
     313             260 :                 if(n == 0)
     314               4 :                     return {{}, 0};
     315                 : 
     316             256 :                 auto* st = self_->state_.get();
     317                 : 
     318             256 :                 if(st->closed)
     319 MIS           0 :                     return {error::eof, 0};
     320                 : 
     321 HIT         256 :                 close_guard g{st};
     322             256 :                 auto ec = st->f.maybe_fail();
     323             205 :                 if(ec)
     324              51 :                     return {ec, 0};
     325             154 :                 g.disarm();
     326                 : 
     327             154 :                 int peer = 1 - self_->index_;
     328             154 :                 auto& side = st->sides[peer];
     329                 : 
     330             154 :                 std::size_t const old_size = side.buf.size();
     331             154 :                 side.buf.resize(old_size + n);
     332             154 :                 buffer_copy(make_buffer(
     333             154 :                     side.buf.data() + old_size, n),
     334             154 :                     buffers_, n);
     335                 : 
     336             154 :                 if(side.pending_cont_.h)
     337                 :                 {
     338              12 :                     side.pending_ex.post(side.pending_cont_);
     339              12 :                     side.pending_cont_.h = {};
     340              12 :                     side.pending_ex = {};
     341                 :                 }
     342                 : 
     343             154 :                 return {{}, n};
     344             256 :             }
     345                 :         };
     346             260 :         return awaitable{this, buffers};
     347                 :     }
     348                 : 
     349                 :     /** Inject data into this stream's peer for reading.
     350                 : 
     351                 :         Appends data directly to the peer's incoming buffer,
     352                 :         bypassing the fuse. If the peer is suspended in
     353                 :         @ref read_some, it is resumed. This is test setup,
     354                 :         not an operation under test.
     355                 : 
     356                 :         @param sv The data to inject.
     357                 : 
     358                 :         @see make_stream_pair
     359                 :     */
     360                 :     void
     361              87 :     provide(std::string_view sv)
     362                 :     {
     363              87 :         int peer = 1 - index_;
     364              87 :         auto& side = state_->sides[peer];
     365              87 :         side.buf.append(sv);
     366              87 :         if(side.pending_cont_.h)
     367                 :         {
     368 MIS           0 :             side.pending_ex.post(side.pending_cont_);
     369               0 :             side.pending_cont_.h = {};
     370               0 :             side.pending_ex = {};
     371                 :         }
     372 HIT          87 :     }
     373                 : 
     374                 :     /** Read from this stream and verify the content.
     375                 : 
     376                 :         Reads exactly `expected.size()` bytes from the stream
     377                 :         and compares against the expected string. The read goes
     378                 :         through the normal path including the fuse.
     379                 : 
     380                 :         @param expected The expected content.
     381                 : 
     382                 :         @return A pair of `(error_code, bool)`. The error_code
     383                 :             is set if a read error occurs (e.g. fuse injection).
     384                 :             The bool is true if the data matches.
     385                 : 
     386                 :         @see provide
     387                 :     */
     388                 :     std::pair<std::error_code, bool>
     389              38 :     expect(std::string_view expected)
     390                 :     {
     391              38 :         std::error_code result;
     392              38 :         bool match = false;
     393             141 :         run_blocking()([](
     394                 :             stream& self,
     395                 :             std::string_view expected,
     396                 :             std::error_code& result,
     397                 :             bool& match) -> task<>
     398                 :         {
     399                 :             std::string buf(expected.size(), '\0');
     400                 :             auto [ec, n] = co_await read(
     401                 :                 self, mutable_buffer(
     402                 :                     buf.data(), buf.size()));
     403                 :             if(ec)
     404                 :             {
     405                 :                 result = ec;
     406                 :                 co_return;
     407                 :             }
     408                 :             match = (std::string_view(
     409                 :                 buf.data(), n) == expected);
     410             161 :         }(*this, expected, result, match));
     411              58 :         return {result, match};
     412                 :     }
     413                 : 
     414                 :     /** Return the stream's pending read data.
     415                 : 
     416                 :         Returns a view of the data waiting to be read
     417                 :         from this stream. This is a direct peek at the
     418                 :         internal buffer, bypassing the fuse.
     419                 : 
     420                 :         @return A view of the pending data.
     421                 : 
     422                 :         @see provide, expect
     423                 :     */
     424                 :     std::string_view
     425                 :     data() const noexcept
     426                 :     {
     427                 :         return state_->sides[index_].buf;
     428                 :     }
     429                 : };
     430                 : 
     431                 : /** Create a connected pair of test streams.
     432                 : 
     433                 :     Data written to one stream becomes readable on the other.
     434                 :     If a coroutine calls @ref stream::read_some when no data
     435                 :     is available, it suspends until the peer writes. Before
     436                 :     every read or write, the @ref fuse is consulted to
     437                 :     possibly inject an error for testing fault scenarios.
     438                 :     When the fuse fires, the peer is automatically closed.
     439                 : 
     440                 :     @param f The fuse used to inject errors during operations.
     441                 : 
     442                 :     @return A pair of connected streams.
     443                 : 
     444                 :     @see stream, fuse
     445                 : */
     446                 : inline std::pair<stream, stream>
     447             280 : make_stream_pair(fuse f = {})
     448                 : {
     449             280 :     auto sp = std::make_shared<stream::state>(std::move(f));
     450             560 :     return {stream(sp, 0), stream(sp, 1)};
     451             280 : }
     452                 : 
     453                 : } // test
     454                 : } // capy
     455                 : } // boost
     456                 : 
     457                 : #endif
        

Generated by: LCOV version 2.3