TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_TEST_STREAM_HPP
11 : #define BOOST_CAPY_TEST_STREAM_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/buffers.hpp>
15 : #include <boost/capy/buffers/buffer_copy.hpp>
16 : #include <boost/capy/buffers/make_buffer.hpp>
17 : #include <boost/capy/continuation.hpp>
18 : #include <coroutine>
19 : #include <boost/capy/ex/io_env.hpp>
20 : #include <boost/capy/io_result.hpp>
21 : #include <boost/capy/error.hpp>
22 : #include <boost/capy/read.hpp>
23 : #include <boost/capy/task.hpp>
24 : #include <boost/capy/test/fuse.hpp>
25 : #include <boost/capy/test/run_blocking.hpp>
26 :
27 : #include <memory>
28 : #include <stop_token>
29 : #include <string>
30 : #include <string_view>
31 : #include <utility>
32 :
33 : namespace boost {
34 : namespace capy {
35 : namespace test {
36 :
37 : /** A connected stream for testing bidirectional I/O.
38 :
39 : Streams are created in pairs via @ref make_stream_pair.
40 : Data written to one end becomes available for reading on
41 : the other. If no data is available when @ref read_some
42 : is called, the calling coroutine suspends until the peer
43 : calls @ref write_some. The shared @ref fuse enables error
44 : injection at controlled points in both directions.
45 :
46 : When the fuse injects an error or throws on one end, the
47 : other end is automatically closed: any suspended reader is
48 : resumed with `error::eof`, and subsequent operations on
49 : both ends return `error::eof`. Calling @ref close on one
50 : end signals eof to the peer's reads after draining any
51 : buffered data, while the peer may still write.
52 :
53 : @par Thread Safety
54 : Single-threaded only. Both ends of the pair must be
55 : accessed from the same thread. Concurrent access is
56 : undefined behavior.
57 :
58 : @par Example
59 : @code
60 : fuse f;
61 : auto [a, b] = make_stream_pair( f );
62 :
63 : auto r = f.armed( [&]( fuse& ) -> task<> {
64 : auto [ec, n] = co_await a.write_some(
65 : const_buffer( "hello", 5 ) );
66 : if( ec )
67 : co_return;
68 :
69 : char buf[32];
70 : auto [ec2, n2] = co_await b.read_some(
71 : mutable_buffer( buf, sizeof( buf ) ) );
72 : if( ec2 )
73 : co_return;
74 : // buf contains "hello"
75 : } );
76 : @endcode
77 :
78 : @see make_stream_pair, fuse
79 : */
80 : class stream
81 : {
82 : // Single-threaded only. No concurrent access to either
83 : // end of the pair. Both streams and all operations must
84 : // run on the same thread.
85 :
86 : struct half
87 : {
88 : std::string buf;
89 : std::size_t max_read_size = std::size_t(-1);
90 : continuation pending_cont_;
91 : executor_ref pending_ex;
92 : bool eof = false;
93 : };
94 :
95 : struct state
96 : {
97 : fuse f;
98 : bool closed = false;
99 : half sides[2];
100 :
101 HIT 280 : explicit state(fuse f_) noexcept
102 840 : : f(std::move(f_))
103 : {
104 280 : }
105 :
106 : // Set closed and resume any suspended readers
107 : // with eof on both sides.
108 208 : void close()
109 : {
110 208 : closed = true;
111 624 : for(auto& side : sides)
112 : {
113 416 : if(side.pending_cont_.h)
114 : {
115 12 : side.pending_ex.post(side.pending_cont_);
116 12 : side.pending_cont_.h = {};
117 12 : side.pending_ex = {};
118 : }
119 : }
120 208 : }
121 : };
122 :
123 : // Wraps the maybe_fail() call. If the guard is
124 : // not disarmed before destruction (fuse returned
125 : // an error, or threw an exception), closes both
126 : // ends so any suspended peer gets eof.
127 : struct close_guard
128 : {
129 : state* st;
130 : bool armed = true;
131 300 : void disarm() noexcept { armed = false; }
132 508 : ~close_guard() noexcept(false) { if(armed) st->close(); }
133 : };
134 :
135 : std::shared_ptr<state> state_;
136 : int index_;
137 :
138 560 : stream(
139 : std::shared_ptr<state> sp,
140 : int index) noexcept
141 560 : : state_(std::move(sp))
142 560 : , index_(index)
143 : {
144 560 : }
145 :
146 : friend std::pair<stream, stream>
147 : make_stream_pair(fuse);
148 :
149 : public:
150 : stream(stream const&) = delete;
151 : stream& operator=(stream const&) = delete;
152 660 : stream(stream&&) = default;
153 : stream& operator=(stream&&) = default;
154 :
155 : /** Signal end-of-stream to the peer.
156 :
157 : Marks the peer's read direction as closed.
158 : If the peer is suspended in @ref read_some,
159 : it is resumed. The peer drains any buffered
160 : data before receiving `error::eof`. Writes
161 : from the peer are unaffected.
162 : */
163 : void
164 3 : close()
165 : {
166 3 : int peer = 1 - index_;
167 3 : auto& side = state_->sides[peer];
168 3 : side.eof = true;
169 3 : if(side.pending_cont_.h)
170 : {
171 1 : side.pending_ex.post(side.pending_cont_);
172 1 : side.pending_cont_.h = {};
173 1 : side.pending_ex = {};
174 : }
175 3 : }
176 :
177 : /** Set the maximum bytes returned per read.
178 :
179 : Limits how many bytes @ref read_some returns in
180 : a single call, simulating chunked network delivery.
181 : The default is unlimited.
182 :
183 : @param n Maximum bytes per read.
184 : */
185 : void
186 54 : set_max_read_size(std::size_t n) noexcept
187 : {
188 54 : state_->sides[index_].max_read_size = n;
189 54 : }
190 :
191 : /** Asynchronously read data from the stream.
192 :
193 : Transfers up to `buffer_size(buffers)` bytes from
194 : data written by the peer. If no data is available,
195 : the calling coroutine suspends until the peer calls
196 : @ref write_some. Before every read, the attached
197 : @ref fuse is consulted to possibly inject an error.
198 : If the fuse fires, the peer is automatically closed.
199 : If the stream is closed, returns `error::eof`.
200 : The returned `std::size_t` is the number of bytes
201 : transferred.
202 :
203 : @param buffers The mutable buffer sequence to receive data.
204 :
205 : @return An awaitable that await-returns `(error_code,std::size_t)`.
206 :
207 : @see fuse, close
208 : */
209 : template<MutableBufferSequence MB>
210 : auto
211 275 : read_some(MB buffers)
212 : {
213 : struct awaitable
214 : {
215 : stream* self_;
216 : MB buffers_;
217 :
218 275 : bool await_ready() const noexcept
219 : {
220 275 : if(buffer_empty(buffers_))
221 8 : return true;
222 267 : auto* st = self_->state_.get();
223 267 : auto& side = st->sides[self_->index_];
224 532 : return st->closed || side.eof ||
225 532 : !side.buf.empty();
226 : }
227 :
228 25 : std::coroutine_handle<> await_suspend(
229 : std::coroutine_handle<> h,
230 : io_env const* env) noexcept
231 : {
232 25 : auto& side = self_->state_->sides[
233 25 : self_->index_];
234 25 : side.pending_cont_.h = h;
235 25 : side.pending_ex = env->executor;
236 25 : return std::noop_coroutine();
237 : }
238 :
239 : io_result<std::size_t>
240 275 : await_resume()
241 : {
242 275 : if(buffer_empty(buffers_))
243 8 : return {{}, 0};
244 :
245 267 : auto* st = self_->state_.get();
246 267 : auto& side = st->sides[
247 267 : self_->index_];
248 :
249 267 : if(st->closed)
250 12 : return {error::eof, 0};
251 :
252 255 : if(side.eof && side.buf.empty())
253 3 : return {error::eof, 0};
254 :
255 252 : if(!side.eof)
256 : {
257 252 : close_guard g{st};
258 252 : auto ec = st->f.maybe_fail();
259 199 : if(ec)
260 53 : return {ec, 0};
261 146 : g.disarm();
262 252 : }
263 :
264 292 : std::size_t const n = buffer_copy(
265 146 : buffers_, make_buffer(side.buf),
266 : side.max_read_size);
267 146 : side.buf.erase(0, n);
268 146 : return {{}, n};
269 : }
270 : };
271 275 : return awaitable{this, buffers};
272 : }
273 :
274 : /** Asynchronously write data to the stream.
275 :
276 : Transfers up to `buffer_size(buffers)` bytes to the
277 : peer's incoming buffer. If the peer is suspended in
278 : @ref read_some, it is resumed. Before every write,
279 : the attached @ref fuse is consulted to possibly inject
280 : an error. If the fuse fires, the peer is automatically
281 : closed. If the stream is closed, returns `error::eof`.
282 : The returned `std::size_t` is the number of bytes
283 : transferred.
284 :
285 : @param buffers The const buffer sequence containing
286 : data to write.
287 :
288 : @return An awaitable that await-returns `(error_code,std::size_t)`.
289 :
290 : @see fuse, close
291 : */
292 : template<ConstBufferSequence CB>
293 : auto
294 260 : write_some(CB buffers)
295 : {
296 : struct awaitable
297 : {
298 : stream* self_;
299 : CB buffers_;
300 :
301 260 : bool await_ready() const noexcept { return true; }
302 :
303 MIS 0 : void await_suspend(
304 : std::coroutine_handle<>,
305 : io_env const*) const noexcept
306 : {
307 0 : }
308 :
309 : io_result<std::size_t>
310 HIT 260 : await_resume()
311 : {
312 260 : std::size_t n = buffer_size(buffers_);
313 260 : if(n == 0)
314 4 : return {{}, 0};
315 :
316 256 : auto* st = self_->state_.get();
317 :
318 256 : if(st->closed)
319 MIS 0 : return {error::eof, 0};
320 :
321 HIT 256 : close_guard g{st};
322 256 : auto ec = st->f.maybe_fail();
323 205 : if(ec)
324 51 : return {ec, 0};
325 154 : g.disarm();
326 :
327 154 : int peer = 1 - self_->index_;
328 154 : auto& side = st->sides[peer];
329 :
330 154 : std::size_t const old_size = side.buf.size();
331 154 : side.buf.resize(old_size + n);
332 154 : buffer_copy(make_buffer(
333 154 : side.buf.data() + old_size, n),
334 154 : buffers_, n);
335 :
336 154 : if(side.pending_cont_.h)
337 : {
338 12 : side.pending_ex.post(side.pending_cont_);
339 12 : side.pending_cont_.h = {};
340 12 : side.pending_ex = {};
341 : }
342 :
343 154 : return {{}, n};
344 256 : }
345 : };
346 260 : return awaitable{this, buffers};
347 : }
348 :
349 : /** Inject data into this stream's peer for reading.
350 :
351 : Appends data directly to the peer's incoming buffer,
352 : bypassing the fuse. If the peer is suspended in
353 : @ref read_some, it is resumed. This is test setup,
354 : not an operation under test.
355 :
356 : @param sv The data to inject.
357 :
358 : @see make_stream_pair
359 : */
360 : void
361 87 : provide(std::string_view sv)
362 : {
363 87 : int peer = 1 - index_;
364 87 : auto& side = state_->sides[peer];
365 87 : side.buf.append(sv);
366 87 : if(side.pending_cont_.h)
367 : {
368 MIS 0 : side.pending_ex.post(side.pending_cont_);
369 0 : side.pending_cont_.h = {};
370 0 : side.pending_ex = {};
371 : }
372 HIT 87 : }
373 :
374 : /** Read from this stream and verify the content.
375 :
376 : Reads exactly `expected.size()` bytes from the stream
377 : and compares against the expected string. The read goes
378 : through the normal path including the fuse.
379 :
380 : @param expected The expected content.
381 :
382 : @return A pair of `(error_code, bool)`. The error_code
383 : is set if a read error occurs (e.g. fuse injection).
384 : The bool is true if the data matches.
385 :
386 : @see provide
387 : */
388 : std::pair<std::error_code, bool>
389 38 : expect(std::string_view expected)
390 : {
391 38 : std::error_code result;
392 38 : bool match = false;
393 141 : run_blocking()([](
394 : stream& self,
395 : std::string_view expected,
396 : std::error_code& result,
397 : bool& match) -> task<>
398 : {
399 : std::string buf(expected.size(), '\0');
400 : auto [ec, n] = co_await read(
401 : self, mutable_buffer(
402 : buf.data(), buf.size()));
403 : if(ec)
404 : {
405 : result = ec;
406 : co_return;
407 : }
408 : match = (std::string_view(
409 : buf.data(), n) == expected);
410 161 : }(*this, expected, result, match));
411 58 : return {result, match};
412 : }
413 :
414 : /** Return the stream's pending read data.
415 :
416 : Returns a view of the data waiting to be read
417 : from this stream. This is a direct peek at the
418 : internal buffer, bypassing the fuse.
419 :
420 : @return A view of the pending data.
421 :
422 : @see provide, expect
423 : */
424 : std::string_view
425 : data() const noexcept
426 : {
427 : return state_->sides[index_].buf;
428 : }
429 : };
430 :
431 : /** Create a connected pair of test streams.
432 :
433 : Data written to one stream becomes readable on the other.
434 : If a coroutine calls @ref stream::read_some when no data
435 : is available, it suspends until the peer writes. Before
436 : every read or write, the @ref fuse is consulted to
437 : possibly inject an error for testing fault scenarios.
438 : When the fuse fires, the peer is automatically closed.
439 :
440 : @param f The fuse used to inject errors during operations.
441 :
442 : @return A pair of connected streams.
443 :
444 : @see stream, fuse
445 : */
446 : inline std::pair<stream, stream>
447 280 : make_stream_pair(fuse f = {})
448 : {
449 280 : auto sp = std::make_shared<stream::state>(std::move(f));
450 560 : return {stream(sp, 0), stream(sp, 1)};
451 280 : }
452 :
453 : } // test
454 : } // capy
455 : } // boost
456 :
457 : #endif
|