1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_TEST_STREAM_HPP
10  
#ifndef BOOST_CAPY_TEST_STREAM_HPP
11  
#define BOOST_CAPY_TEST_STREAM_HPP
11  
#define BOOST_CAPY_TEST_STREAM_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/buffers.hpp>
14  
#include <boost/capy/buffers.hpp>
15  
#include <boost/capy/buffers/buffer_copy.hpp>
15  
#include <boost/capy/buffers/buffer_copy.hpp>
16  
#include <boost/capy/buffers/make_buffer.hpp>
16  
#include <boost/capy/buffers/make_buffer.hpp>
17  
#include <boost/capy/continuation.hpp>
17  
#include <boost/capy/continuation.hpp>
18  
#include <coroutine>
18  
#include <coroutine>
19  
#include <boost/capy/ex/io_env.hpp>
19  
#include <boost/capy/ex/io_env.hpp>
20  
#include <boost/capy/io_result.hpp>
20  
#include <boost/capy/io_result.hpp>
21  
#include <boost/capy/error.hpp>
21  
#include <boost/capy/error.hpp>
22  
#include <boost/capy/read.hpp>
22  
#include <boost/capy/read.hpp>
23  
#include <boost/capy/task.hpp>
23  
#include <boost/capy/task.hpp>
24  
#include <boost/capy/test/fuse.hpp>
24  
#include <boost/capy/test/fuse.hpp>
25  
#include <boost/capy/test/run_blocking.hpp>
25  
#include <boost/capy/test/run_blocking.hpp>
26  

26  

27  
#include <memory>
27  
#include <memory>
28  
#include <stop_token>
28  
#include <stop_token>
29  
#include <string>
29  
#include <string>
30  
#include <string_view>
30  
#include <string_view>
31  
#include <utility>
31  
#include <utility>
32  

32  

33  
namespace boost {
33  
namespace boost {
34  
namespace capy {
34  
namespace capy {
35  
namespace test {
35  
namespace test {
36  

36  

37  
/** A connected stream for testing bidirectional I/O.
37  
/** A connected stream for testing bidirectional I/O.
38  

38  

39  
    Streams are created in pairs via @ref make_stream_pair.
39  
    Streams are created in pairs via @ref make_stream_pair.
40  
    Data written to one end becomes available for reading on
40  
    Data written to one end becomes available for reading on
41  
    the other. If no data is available when @ref read_some
41  
    the other. If no data is available when @ref read_some
42  
    is called, the calling coroutine suspends until the peer
42  
    is called, the calling coroutine suspends until the peer
43  
    calls @ref write_some. The shared @ref fuse enables error
43  
    calls @ref write_some. The shared @ref fuse enables error
44  
    injection at controlled points in both directions.
44  
    injection at controlled points in both directions.
45  

45  

46  
    When the fuse injects an error or throws on one end, the
46  
    When the fuse injects an error or throws on one end, the
47  
    other end is automatically closed: any suspended reader is
47  
    other end is automatically closed: any suspended reader is
48  
    resumed with `error::eof`, and subsequent operations on
48  
    resumed with `error::eof`, and subsequent operations on
49  
    both ends return `error::eof`. Calling @ref close on one
49  
    both ends return `error::eof`. Calling @ref close on one
50  
    end signals eof to the peer's reads after draining any
50  
    end signals eof to the peer's reads after draining any
51  
    buffered data, while the peer may still write.
51  
    buffered data, while the peer may still write.
52  

52  

53  
    @par Thread Safety
53  
    @par Thread Safety
54  
    Single-threaded only. Both ends of the pair must be
54  
    Single-threaded only. Both ends of the pair must be
55  
    accessed from the same thread. Concurrent access is
55  
    accessed from the same thread. Concurrent access is
56  
    undefined behavior.
56  
    undefined behavior.
57  

57  

58  
    @par Example
58  
    @par Example
59  
    @code
59  
    @code
60  
    fuse f;
60  
    fuse f;
61  
    auto [a, b] = make_stream_pair( f );
61  
    auto [a, b] = make_stream_pair( f );
62  

62  

63  
    auto r = f.armed( [&]( fuse& ) -> task<> {
63  
    auto r = f.armed( [&]( fuse& ) -> task<> {
64  
        auto [ec, n] = co_await a.write_some(
64  
        auto [ec, n] = co_await a.write_some(
65  
            const_buffer( "hello", 5 ) );
65  
            const_buffer( "hello", 5 ) );
66  
        if( ec )
66  
        if( ec )
67  
            co_return;
67  
            co_return;
68  

68  

69  
        char buf[32];
69  
        char buf[32];
70  
        auto [ec2, n2] = co_await b.read_some(
70  
        auto [ec2, n2] = co_await b.read_some(
71  
            mutable_buffer( buf, sizeof( buf ) ) );
71  
            mutable_buffer( buf, sizeof( buf ) ) );
72  
        if( ec2 )
72  
        if( ec2 )
73  
            co_return;
73  
            co_return;
74  
        // buf contains "hello"
74  
        // buf contains "hello"
75  
    } );
75  
    } );
76  
    @endcode
76  
    @endcode
77  

77  

78  
    @see make_stream_pair, fuse
78  
    @see make_stream_pair, fuse
79  
*/
79  
*/
80  
class stream
80  
class stream
81  
{
81  
{
82  
    // Single-threaded only. No concurrent access to either
82  
    // Single-threaded only. No concurrent access to either
83  
    // end of the pair. Both streams and all operations must
83  
    // end of the pair. Both streams and all operations must
84  
    // run on the same thread.
84  
    // run on the same thread.
85  

85  

86  
    struct half
86  
    struct half
87  
    {
87  
    {
88  
        std::string buf;
88  
        std::string buf;
89  
        std::size_t max_read_size = std::size_t(-1);
89  
        std::size_t max_read_size = std::size_t(-1);
90  
        continuation pending_cont_;
90  
        continuation pending_cont_;
91  
        executor_ref pending_ex;
91  
        executor_ref pending_ex;
92  
        bool eof = false;
92  
        bool eof = false;
93  
    };
93  
    };
94  

94  

95  
    struct state
95  
    struct state
96  
    {
96  
    {
97  
        fuse f;
97  
        fuse f;
98  
        bool closed = false;
98  
        bool closed = false;
99  
        half sides[2];
99  
        half sides[2];
100  

100  

101  
        explicit state(fuse f_) noexcept
101  
        explicit state(fuse f_) noexcept
102  
            : f(std::move(f_))
102  
            : f(std::move(f_))
103  
        {
103  
        {
104  
        }
104  
        }
105  

105  

106  
        // Set closed and resume any suspended readers
106  
        // Set closed and resume any suspended readers
107  
        // with eof on both sides.
107  
        // with eof on both sides.
108  
        void close()
108  
        void close()
109  
        {
109  
        {
110  
            closed = true;
110  
            closed = true;
111  
            for(auto& side : sides)
111  
            for(auto& side : sides)
112  
            {
112  
            {
113  
                if(side.pending_cont_.h)
113  
                if(side.pending_cont_.h)
114  
                {
114  
                {
115  
                    side.pending_ex.post(side.pending_cont_);
115  
                    side.pending_ex.post(side.pending_cont_);
116  
                    side.pending_cont_.h = {};
116  
                    side.pending_cont_.h = {};
117  
                    side.pending_ex = {};
117  
                    side.pending_ex = {};
118  
                }
118  
                }
119  
            }
119  
            }
120  
        }
120  
        }
121  
    };
121  
    };
122  

122  

123  
    // Wraps the maybe_fail() call. If the guard is
123  
    // Wraps the maybe_fail() call. If the guard is
124  
    // not disarmed before destruction (fuse returned
124  
    // not disarmed before destruction (fuse returned
125  
    // an error, or threw an exception), closes both
125  
    // an error, or threw an exception), closes both
126  
    // ends so any suspended peer gets eof.
126  
    // ends so any suspended peer gets eof.
127  
    struct close_guard
127  
    struct close_guard
128  
    {
128  
    {
129  
        state* st;
129  
        state* st;
130  
        bool armed = true;
130  
        bool armed = true;
131  
        void disarm() noexcept { armed = false; }
131  
        void disarm() noexcept { armed = false; }
132  
        ~close_guard() noexcept(false) { if(armed) st->close(); }
132  
        ~close_guard() noexcept(false) { if(armed) st->close(); }
133  
    };
133  
    };
134  

134  

135  
    std::shared_ptr<state> state_;
135  
    std::shared_ptr<state> state_;
136  
    int index_;
136  
    int index_;
137  

137  

138  
    stream(
138  
    stream(
139  
        std::shared_ptr<state> sp,
139  
        std::shared_ptr<state> sp,
140  
        int index) noexcept
140  
        int index) noexcept
141  
        : state_(std::move(sp))
141  
        : state_(std::move(sp))
142  
        , index_(index)
142  
        , index_(index)
143  
    {
143  
    {
144  
    }
144  
    }
145  

145  

146  
    friend std::pair<stream, stream>
146  
    friend std::pair<stream, stream>
147  
    make_stream_pair(fuse);
147  
    make_stream_pair(fuse);
148  

148  

149  
public:
149  
public:
150  
    stream(stream const&) = delete;
150  
    stream(stream const&) = delete;
151  
    stream& operator=(stream const&) = delete;
151  
    stream& operator=(stream const&) = delete;
152  
    stream(stream&&) = default;
152  
    stream(stream&&) = default;
153  
    stream& operator=(stream&&) = default;
153  
    stream& operator=(stream&&) = default;
154  

154  

155  
    /** Signal end-of-stream to the peer.
155  
    /** Signal end-of-stream to the peer.
156  

156  

157  
        Marks the peer's read direction as closed.
157  
        Marks the peer's read direction as closed.
158  
        If the peer is suspended in @ref read_some,
158  
        If the peer is suspended in @ref read_some,
159  
        it is resumed. The peer drains any buffered
159  
        it is resumed. The peer drains any buffered
160  
        data before receiving `error::eof`. Writes
160  
        data before receiving `error::eof`. Writes
161  
        from the peer are unaffected.
161  
        from the peer are unaffected.
162  
    */
162  
    */
163  
    void
163  
    void
164  
    close()
164  
    close()
165  
    {
165  
    {
166  
        int peer = 1 - index_;
166  
        int peer = 1 - index_;
167  
        auto& side = state_->sides[peer];
167  
        auto& side = state_->sides[peer];
168  
        side.eof = true;
168  
        side.eof = true;
169  
        if(side.pending_cont_.h)
169  
        if(side.pending_cont_.h)
170  
        {
170  
        {
171  
            side.pending_ex.post(side.pending_cont_);
171  
            side.pending_ex.post(side.pending_cont_);
172  
            side.pending_cont_.h = {};
172  
            side.pending_cont_.h = {};
173  
            side.pending_ex = {};
173  
            side.pending_ex = {};
174  
        }
174  
        }
175  
    }
175  
    }
176  

176  

177  
    /** Set the maximum bytes returned per read.
177  
    /** Set the maximum bytes returned per read.
178  

178  

179  
        Limits how many bytes @ref read_some returns in
179  
        Limits how many bytes @ref read_some returns in
180  
        a single call, simulating chunked network delivery.
180  
        a single call, simulating chunked network delivery.
181  
        The default is unlimited.
181  
        The default is unlimited.
182  

182  

183  
        @param n Maximum bytes per read.
183  
        @param n Maximum bytes per read.
184  
    */
184  
    */
185  
    void
185  
    void
186  
    set_max_read_size(std::size_t n) noexcept
186  
    set_max_read_size(std::size_t n) noexcept
187  
    {
187  
    {
188  
        state_->sides[index_].max_read_size = n;
188  
        state_->sides[index_].max_read_size = n;
189  
    }
189  
    }
190  

190  

191  
    /** Asynchronously read data from the stream.
191  
    /** Asynchronously read data from the stream.
192  

192  

193  
        Transfers up to `buffer_size(buffers)` bytes from
193  
        Transfers up to `buffer_size(buffers)` bytes from
194  
        data written by the peer. If no data is available,
194  
        data written by the peer. If no data is available,
195  
        the calling coroutine suspends until the peer calls
195  
        the calling coroutine suspends until the peer calls
196  
        @ref write_some. Before every read, the attached
196  
        @ref write_some. Before every read, the attached
197  
        @ref fuse is consulted to possibly inject an error.
197  
        @ref fuse is consulted to possibly inject an error.
198  
        If the fuse fires, the peer is automatically closed.
198  
        If the fuse fires, the peer is automatically closed.
199  
        If the stream is closed, returns `error::eof`.
199  
        If the stream is closed, returns `error::eof`.
200  
        The returned `std::size_t` is the number of bytes
200  
        The returned `std::size_t` is the number of bytes
201  
        transferred.
201  
        transferred.
202  

202  

203  
        @param buffers The mutable buffer sequence to receive data.
203  
        @param buffers The mutable buffer sequence to receive data.
204  

204  

205  
        @return An awaitable that await-returns `(error_code,std::size_t)`.
205  
        @return An awaitable that await-returns `(error_code,std::size_t)`.
206  

206  

207  
        @see fuse, close
207  
        @see fuse, close
208  
    */
208  
    */
209  
    template<MutableBufferSequence MB>
209  
    template<MutableBufferSequence MB>
210  
    auto
210  
    auto
211  
    read_some(MB buffers)
211  
    read_some(MB buffers)
212  
    {
212  
    {
213  
        struct awaitable
213  
        struct awaitable
214  
        {
214  
        {
215  
            stream* self_;
215  
            stream* self_;
216  
            MB buffers_;
216  
            MB buffers_;
217  

217  

218  
            bool await_ready() const noexcept
218  
            bool await_ready() const noexcept
219  
            {
219  
            {
220  
                if(buffer_empty(buffers_))
220  
                if(buffer_empty(buffers_))
221  
                    return true;
221  
                    return true;
222  
                auto* st = self_->state_.get();
222  
                auto* st = self_->state_.get();
223  
                auto& side = st->sides[self_->index_];
223  
                auto& side = st->sides[self_->index_];
224  
                return st->closed || side.eof ||
224  
                return st->closed || side.eof ||
225  
                    !side.buf.empty();
225  
                    !side.buf.empty();
226  
            }
226  
            }
227  

227  

228  
            std::coroutine_handle<> await_suspend(
228  
            std::coroutine_handle<> await_suspend(
229  
                std::coroutine_handle<> h,
229  
                std::coroutine_handle<> h,
230  
                io_env const* env) noexcept
230  
                io_env const* env) noexcept
231  
            {
231  
            {
232  
                auto& side = self_->state_->sides[
232  
                auto& side = self_->state_->sides[
233  
                    self_->index_];
233  
                    self_->index_];
234  
                side.pending_cont_.h = h;
234  
                side.pending_cont_.h = h;
235  
                side.pending_ex = env->executor;
235  
                side.pending_ex = env->executor;
236  
                return std::noop_coroutine();
236  
                return std::noop_coroutine();
237  
            }
237  
            }
238  

238  

239  
            io_result<std::size_t>
239  
            io_result<std::size_t>
240  
            await_resume()
240  
            await_resume()
241  
            {
241  
            {
242  
                if(buffer_empty(buffers_))
242  
                if(buffer_empty(buffers_))
243  
                    return {{}, 0};
243  
                    return {{}, 0};
244  

244  

245  
                auto* st = self_->state_.get();
245  
                auto* st = self_->state_.get();
246  
                auto& side = st->sides[
246  
                auto& side = st->sides[
247  
                    self_->index_];
247  
                    self_->index_];
248  

248  

249  
                if(st->closed)
249  
                if(st->closed)
250  
                    return {error::eof, 0};
250  
                    return {error::eof, 0};
251  

251  

252  
                if(side.eof && side.buf.empty())
252  
                if(side.eof && side.buf.empty())
253  
                    return {error::eof, 0};
253  
                    return {error::eof, 0};
254  

254  

255  
                if(!side.eof)
255  
                if(!side.eof)
256  
                {
256  
                {
257  
                    close_guard g{st};
257  
                    close_guard g{st};
258  
                    auto ec = st->f.maybe_fail();
258  
                    auto ec = st->f.maybe_fail();
259  
                    if(ec)
259  
                    if(ec)
260  
                        return {ec, 0};
260  
                        return {ec, 0};
261  
                    g.disarm();
261  
                    g.disarm();
262  
                }
262  
                }
263  

263  

264  
                std::size_t const n = buffer_copy(
264  
                std::size_t const n = buffer_copy(
265  
                    buffers_, make_buffer(side.buf),
265  
                    buffers_, make_buffer(side.buf),
266  
                    side.max_read_size);
266  
                    side.max_read_size);
267  
                side.buf.erase(0, n);
267  
                side.buf.erase(0, n);
268  
                return {{}, n};
268  
                return {{}, n};
269  
            }
269  
            }
270  
        };
270  
        };
271  
        return awaitable{this, buffers};
271  
        return awaitable{this, buffers};
272  
    }
272  
    }
273  

273  

274  
    /** Asynchronously write data to the stream.
274  
    /** Asynchronously write data to the stream.
275  

275  

276  
        Transfers up to `buffer_size(buffers)` bytes to the
276  
        Transfers up to `buffer_size(buffers)` bytes to the
277  
        peer's incoming buffer. If the peer is suspended in
277  
        peer's incoming buffer. If the peer is suspended in
278  
        @ref read_some, it is resumed. Before every write,
278  
        @ref read_some, it is resumed. Before every write,
279  
        the attached @ref fuse is consulted to possibly inject
279  
        the attached @ref fuse is consulted to possibly inject
280  
        an error. If the fuse fires, the peer is automatically
280  
        an error. If the fuse fires, the peer is automatically
281  
        closed. If the stream is closed, returns `error::eof`.
281  
        closed. If the stream is closed, returns `error::eof`.
282  
        The returned `std::size_t` is the number of bytes
282  
        The returned `std::size_t` is the number of bytes
283  
        transferred.
283  
        transferred.
284  

284  

285  
        @param buffers The const buffer sequence containing
285  
        @param buffers The const buffer sequence containing
286  
            data to write.
286  
            data to write.
287  

287  

288  
        @return An awaitable that await-returns `(error_code,std::size_t)`.
288  
        @return An awaitable that await-returns `(error_code,std::size_t)`.
289  

289  

290  
        @see fuse, close
290  
        @see fuse, close
291  
    */
291  
    */
292  
    template<ConstBufferSequence CB>
292  
    template<ConstBufferSequence CB>
293  
    auto
293  
    auto
294  
    write_some(CB buffers)
294  
    write_some(CB buffers)
295  
    {
295  
    {
296  
        struct awaitable
296  
        struct awaitable
297  
        {
297  
        {
298  
            stream* self_;
298  
            stream* self_;
299  
            CB buffers_;
299  
            CB buffers_;
300  

300  

301  
            bool await_ready() const noexcept { return true; }
301  
            bool await_ready() const noexcept { return true; }
302  

302  

303  
            void await_suspend(
303  
            void await_suspend(
304  
                std::coroutine_handle<>,
304  
                std::coroutine_handle<>,
305  
                io_env const*) const noexcept
305  
                io_env const*) const noexcept
306  
            {
306  
            {
307  
            }
307  
            }
308  

308  

309  
            io_result<std::size_t>
309  
            io_result<std::size_t>
310  
            await_resume()
310  
            await_resume()
311  
            {
311  
            {
312  
                std::size_t n = buffer_size(buffers_);
312  
                std::size_t n = buffer_size(buffers_);
313  
                if(n == 0)
313  
                if(n == 0)
314  
                    return {{}, 0};
314  
                    return {{}, 0};
315  

315  

316  
                auto* st = self_->state_.get();
316  
                auto* st = self_->state_.get();
317  

317  

318  
                if(st->closed)
318  
                if(st->closed)
319  
                    return {error::eof, 0};
319  
                    return {error::eof, 0};
320  

320  

321  
                close_guard g{st};
321  
                close_guard g{st};
322  
                auto ec = st->f.maybe_fail();
322  
                auto ec = st->f.maybe_fail();
323  
                if(ec)
323  
                if(ec)
324  
                    return {ec, 0};
324  
                    return {ec, 0};
325  
                g.disarm();
325  
                g.disarm();
326  

326  

327  
                int peer = 1 - self_->index_;
327  
                int peer = 1 - self_->index_;
328  
                auto& side = st->sides[peer];
328  
                auto& side = st->sides[peer];
329  

329  

330  
                std::size_t const old_size = side.buf.size();
330  
                std::size_t const old_size = side.buf.size();
331  
                side.buf.resize(old_size + n);
331  
                side.buf.resize(old_size + n);
332  
                buffer_copy(make_buffer(
332  
                buffer_copy(make_buffer(
333  
                    side.buf.data() + old_size, n),
333  
                    side.buf.data() + old_size, n),
334  
                    buffers_, n);
334  
                    buffers_, n);
335  

335  

336  
                if(side.pending_cont_.h)
336  
                if(side.pending_cont_.h)
337  
                {
337  
                {
338  
                    side.pending_ex.post(side.pending_cont_);
338  
                    side.pending_ex.post(side.pending_cont_);
339  
                    side.pending_cont_.h = {};
339  
                    side.pending_cont_.h = {};
340  
                    side.pending_ex = {};
340  
                    side.pending_ex = {};
341  
                }
341  
                }
342  

342  

343  
                return {{}, n};
343  
                return {{}, n};
344  
            }
344  
            }
345  
        };
345  
        };
346  
        return awaitable{this, buffers};
346  
        return awaitable{this, buffers};
347  
    }
347  
    }
348  

348  

349  
    /** Inject data into this stream's peer for reading.
349  
    /** Inject data into this stream's peer for reading.
350  

350  

351  
        Appends data directly to the peer's incoming buffer,
351  
        Appends data directly to the peer's incoming buffer,
352  
        bypassing the fuse. If the peer is suspended in
352  
        bypassing the fuse. If the peer is suspended in
353  
        @ref read_some, it is resumed. This is test setup,
353  
        @ref read_some, it is resumed. This is test setup,
354  
        not an operation under test.
354  
        not an operation under test.
355  

355  

356  
        @param sv The data to inject.
356  
        @param sv The data to inject.
357  

357  

358  
        @see make_stream_pair
358  
        @see make_stream_pair
359  
    */
359  
    */
360  
    void
360  
    void
361  
    provide(std::string_view sv)
361  
    provide(std::string_view sv)
362  
    {
362  
    {
363  
        int peer = 1 - index_;
363  
        int peer = 1 - index_;
364  
        auto& side = state_->sides[peer];
364  
        auto& side = state_->sides[peer];
365  
        side.buf.append(sv);
365  
        side.buf.append(sv);
366  
        if(side.pending_cont_.h)
366  
        if(side.pending_cont_.h)
367  
        {
367  
        {
368  
            side.pending_ex.post(side.pending_cont_);
368  
            side.pending_ex.post(side.pending_cont_);
369  
            side.pending_cont_.h = {};
369  
            side.pending_cont_.h = {};
370  
            side.pending_ex = {};
370  
            side.pending_ex = {};
371  
        }
371  
        }
372  
    }
372  
    }
373  

373  

374  
    /** Read from this stream and verify the content.
374  
    /** Read from this stream and verify the content.
375  

375  

376  
        Reads exactly `expected.size()` bytes from the stream
376  
        Reads exactly `expected.size()` bytes from the stream
377  
        and compares against the expected string. The read goes
377  
        and compares against the expected string. The read goes
378  
        through the normal path including the fuse.
378  
        through the normal path including the fuse.
379  

379  

380  
        @param expected The expected content.
380  
        @param expected The expected content.
381  

381  

382  
        @return A pair of `(error_code, bool)`. The error_code
382  
        @return A pair of `(error_code, bool)`. The error_code
383  
            is set if a read error occurs (e.g. fuse injection).
383  
            is set if a read error occurs (e.g. fuse injection).
384  
            The bool is true if the data matches.
384  
            The bool is true if the data matches.
385  

385  

386  
        @see provide
386  
        @see provide
387  
    */
387  
    */
388  
    std::pair<std::error_code, bool>
388  
    std::pair<std::error_code, bool>
389  
    expect(std::string_view expected)
389  
    expect(std::string_view expected)
390  
    {
390  
    {
391  
        std::error_code result;
391  
        std::error_code result;
392  
        bool match = false;
392  
        bool match = false;
393  
        run_blocking()([](
393  
        run_blocking()([](
394  
            stream& self,
394  
            stream& self,
395  
            std::string_view expected,
395  
            std::string_view expected,
396  
            std::error_code& result,
396  
            std::error_code& result,
397  
            bool& match) -> task<>
397  
            bool& match) -> task<>
398  
        {
398  
        {
399  
            std::string buf(expected.size(), '\0');
399  
            std::string buf(expected.size(), '\0');
400  
            auto [ec, n] = co_await read(
400  
            auto [ec, n] = co_await read(
401  
                self, mutable_buffer(
401  
                self, mutable_buffer(
402  
                    buf.data(), buf.size()));
402  
                    buf.data(), buf.size()));
403  
            if(ec)
403  
            if(ec)
404  
            {
404  
            {
405  
                result = ec;
405  
                result = ec;
406  
                co_return;
406  
                co_return;
407  
            }
407  
            }
408  
            match = (std::string_view(
408  
            match = (std::string_view(
409  
                buf.data(), n) == expected);
409  
                buf.data(), n) == expected);
410  
        }(*this, expected, result, match));
410  
        }(*this, expected, result, match));
411  
        return {result, match};
411  
        return {result, match};
412  
    }
412  
    }
413  

413  

414  
    /** Return the stream's pending read data.
414  
    /** Return the stream's pending read data.
415  

415  

416  
        Returns a view of the data waiting to be read
416  
        Returns a view of the data waiting to be read
417  
        from this stream. This is a direct peek at the
417  
        from this stream. This is a direct peek at the
418  
        internal buffer, bypassing the fuse.
418  
        internal buffer, bypassing the fuse.
419  

419  

420  
        @return A view of the pending data.
420  
        @return A view of the pending data.
421  

421  

422  
        @see provide, expect
422  
        @see provide, expect
423  
    */
423  
    */
424  
    std::string_view
424  
    std::string_view
425  
    data() const noexcept
425  
    data() const noexcept
426  
    {
426  
    {
427  
        return state_->sides[index_].buf;
427  
        return state_->sides[index_].buf;
428  
    }
428  
    }
429  
};
429  
};
430  

430  

431  
/** Create a connected pair of test streams.
431  
/** Create a connected pair of test streams.
432  

432  

433  
    Data written to one stream becomes readable on the other.
433  
    Data written to one stream becomes readable on the other.
434  
    If a coroutine calls @ref stream::read_some when no data
434  
    If a coroutine calls @ref stream::read_some when no data
435  
    is available, it suspends until the peer writes. Before
435  
    is available, it suspends until the peer writes. Before
436  
    every read or write, the @ref fuse is consulted to
436  
    every read or write, the @ref fuse is consulted to
437  
    possibly inject an error for testing fault scenarios.
437  
    possibly inject an error for testing fault scenarios.
438  
    When the fuse fires, the peer is automatically closed.
438  
    When the fuse fires, the peer is automatically closed.
439  

439  

440  
    @param f The fuse used to inject errors during operations.
440  
    @param f The fuse used to inject errors during operations.
441  

441  

442  
    @return A pair of connected streams.
442  
    @return A pair of connected streams.
443  

443  

444  
    @see stream, fuse
444  
    @see stream, fuse
445  
*/
445  
*/
446  
inline std::pair<stream, stream>
446  
inline std::pair<stream, stream>
447  
make_stream_pair(fuse f = {})
447  
make_stream_pair(fuse f = {})
448  
{
448  
{
449  
    auto sp = std::make_shared<stream::state>(std::move(f));
449  
    auto sp = std::make_shared<stream::state>(std::move(f));
450  
    return {stream(sp, 0), stream(sp, 1)};
450  
    return {stream(sp, 0), stream(sp, 1)};
451  
}
451  
}
452  

452  

453  
} // test
453  
} // test
454  
} // capy
454  
} // capy
455  
} // boost
455  
} // boost
456  

456  

457  
#endif
457  
#endif