TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Steve Gerbino
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_WHEN_ALL_HPP
11 : #define BOOST_CAPY_WHEN_ALL_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/io_result_combinators.hpp>
15 : #include <boost/capy/continuation.hpp>
16 : #include <boost/capy/concept/executor.hpp>
17 : #include <boost/capy/concept/io_awaitable.hpp>
18 : #include <coroutine>
19 : #include <boost/capy/ex/io_env.hpp>
20 : #include <boost/capy/ex/frame_allocator.hpp>
21 : #include <boost/capy/task.hpp>
22 :
23 : #include <array>
24 : #include <atomic>
25 : #include <exception>
26 : #include <memory>
27 : #include <optional>
28 : #include <ranges>
29 : #include <stdexcept>
30 : #include <stop_token>
31 : #include <tuple>
32 : #include <type_traits>
33 : #include <utility>
34 : #include <vector>
35 :
36 : namespace boost {
37 : namespace capy {
38 :
39 : namespace detail {
40 :
41 : /** Holds the result of a single task within when_all.
42 : */
43 : template<typename T>
44 : struct result_holder
45 : {
46 : std::optional<T> value_;
47 :
48 HIT 81 : void set(T v)
49 : {
50 81 : value_ = std::move(v);
51 81 : }
52 :
53 69 : T get() &&
54 : {
55 69 : return std::move(*value_);
56 : }
57 : };
58 :
59 : /** Core shared state for when_all operations.
60 :
61 : Contains all members and methods common to both heterogeneous (variadic)
62 : and homogeneous (range) when_all implementations. State classes embed
63 : this via composition to avoid CRTP destructor ordering issues.
64 :
65 : @par Thread Safety
66 : Atomic operations protect exception capture and completion count.
67 : */
68 : struct when_all_core
69 : {
70 : std::atomic<std::size_t> remaining_count_;
71 :
72 : // Exception storage - first error wins, others discarded
73 : std::atomic<bool> has_exception_{false};
74 : std::exception_ptr first_exception_;
75 :
76 : std::stop_source stop_source_;
77 :
78 : // Bridges parent's stop token to our stop_source
79 : struct stop_callback_fn
80 : {
81 : std::stop_source* source_;
82 1 : void operator()() const { source_->request_stop(); }
83 : };
84 : using stop_callback_t = std::stop_callback<stop_callback_fn>;
85 : std::optional<stop_callback_t> parent_stop_callback_;
86 :
87 : continuation continuation_;
88 : io_env const* caller_env_ = nullptr;
89 :
90 72 : explicit when_all_core(std::size_t count) noexcept
91 72 : : remaining_count_(count)
92 : {
93 72 : }
94 :
95 : /** Capture an exception (first one wins). */
96 19 : void capture_exception(std::exception_ptr ep)
97 : {
98 19 : bool expected = false;
99 19 : if(has_exception_.compare_exchange_strong(
100 : expected, true, std::memory_order_relaxed))
101 17 : first_exception_ = ep;
102 19 : }
103 : };
104 :
105 : /** Shared state for heterogeneous when_all (variadic overload).
106 :
107 : @tparam Ts The result types of the tasks.
108 : */
109 : template<typename... Ts>
110 : struct when_all_state
111 : {
112 : static constexpr std::size_t task_count = sizeof...(Ts);
113 :
114 : when_all_core core_;
115 : std::tuple<result_holder<Ts>...> results_;
116 : std::array<continuation, task_count> runner_handles_{};
117 :
118 : std::atomic<bool> has_error_{false};
119 : std::error_code first_error_;
120 :
121 50 : when_all_state()
122 50 : : core_(task_count)
123 : {
124 50 : }
125 :
126 : /** Record the first error (subsequent errors are discarded). */
127 43 : void record_error(std::error_code ec)
128 : {
129 43 : bool expected = false;
130 43 : if(has_error_.compare_exchange_strong(
131 : expected, true, std::memory_order_relaxed))
132 29 : first_error_ = ec;
133 43 : }
134 : };
135 :
136 : /** Shared state for homogeneous when_all (range overload).
137 :
138 : Stores extracted io_result payloads in a vector indexed by task
139 : position. Tracks the first error_code for error propagation.
140 :
141 : @tparam T The payload type extracted from io_result.
142 : */
143 : template<typename T>
144 : struct when_all_homogeneous_state
145 : {
146 : when_all_core core_;
147 : std::vector<std::optional<T>> results_;
148 : std::unique_ptr<continuation[]> runner_handles_;
149 :
150 : std::atomic<bool> has_error_{false};
151 : std::error_code first_error_;
152 :
153 11 : explicit when_all_homogeneous_state(std::size_t count)
154 11 : : core_(count)
155 22 : , results_(count)
156 11 : , runner_handles_(std::make_unique<continuation[]>(count))
157 : {
158 11 : }
159 :
160 16 : void set_result(std::size_t index, T value)
161 : {
162 16 : results_[index].emplace(std::move(value));
163 16 : }
164 :
165 : /** Record the first error (subsequent errors are discarded). */
166 7 : void record_error(std::error_code ec)
167 : {
168 7 : bool expected = false;
169 7 : if(has_error_.compare_exchange_strong(
170 : expected, true, std::memory_order_relaxed))
171 5 : first_error_ = ec;
172 7 : }
173 : };
174 :
175 : /** Specialization for void io_result children (no payload storage). */
176 : template<>
177 : struct when_all_homogeneous_state<std::tuple<>>
178 : {
179 : when_all_core core_;
180 : std::unique_ptr<continuation[]> runner_handles_;
181 :
182 : std::atomic<bool> has_error_{false};
183 : std::error_code first_error_;
184 :
185 3 : explicit when_all_homogeneous_state(std::size_t count)
186 3 : : core_(count)
187 3 : , runner_handles_(std::make_unique<continuation[]>(count))
188 : {
189 3 : }
190 :
191 : /** Record the first error (subsequent errors are discarded). */
192 1 : void record_error(std::error_code ec)
193 : {
194 1 : bool expected = false;
195 1 : if(has_error_.compare_exchange_strong(
196 : expected, true, std::memory_order_relaxed))
197 1 : first_error_ = ec;
198 1 : }
199 : };
200 :
201 : /** Wrapper coroutine that intercepts task completion for when_all.
202 :
203 : Parameterized on StateType to work with both heterogeneous (variadic)
204 : and homogeneous (range) state types. All state types expose their
205 : shared members through a `core_` member of type when_all_core.
206 :
207 : @tparam StateType The state type (when_all_state or when_all_homogeneous_state).
208 : */
209 : template<typename StateType>
210 : struct when_all_runner
211 : {
212 : struct promise_type
213 : {
214 : StateType* state_ = nullptr;
215 : std::size_t index_ = 0;
216 : io_env env_;
217 :
218 145 : when_all_runner get_return_object() noexcept
219 : {
220 : return when_all_runner(
221 145 : std::coroutine_handle<promise_type>::from_promise(*this));
222 : }
223 :
224 145 : std::suspend_always initial_suspend() noexcept
225 : {
226 145 : return {};
227 : }
228 :
229 145 : auto final_suspend() noexcept
230 : {
231 : struct awaiter
232 : {
233 : promise_type* p_;
234 145 : bool await_ready() const noexcept { return false; }
235 145 : auto await_suspend(std::coroutine_handle<> h) noexcept
236 : {
237 145 : auto& core = p_->state_->core_;
238 145 : auto* counter = &core.remaining_count_;
239 145 : auto* caller_env = core.caller_env_;
240 145 : auto& cont = core.continuation_;
241 :
242 145 : h.destroy();
243 :
244 145 : auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
245 145 : if(remaining == 1)
246 72 : return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
247 73 : return detail::symmetric_transfer(std::noop_coroutine());
248 : }
249 MIS 0 : void await_resume() const noexcept {}
250 : };
251 HIT 145 : return awaiter{this};
252 : }
253 :
254 126 : void return_void() noexcept {}
255 :
256 19 : void unhandled_exception()
257 : {
258 19 : state_->core_.capture_exception(std::current_exception());
259 19 : state_->core_.stop_source_.request_stop();
260 19 : }
261 :
262 : template<class Awaitable>
263 : struct transform_awaiter
264 : {
265 : std::decay_t<Awaitable> a_;
266 : promise_type* p_;
267 :
268 145 : bool await_ready() { return a_.await_ready(); }
269 145 : decltype(auto) await_resume() { return a_.await_resume(); }
270 :
271 : template<class Promise>
272 144 : auto await_suspend(std::coroutine_handle<Promise> h)
273 : {
274 : using R = decltype(a_.await_suspend(h, &p_->env_));
275 : if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
276 144 : return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
277 : else
278 : return a_.await_suspend(h, &p_->env_);
279 : }
280 : };
281 :
282 : template<class Awaitable>
283 145 : auto await_transform(Awaitable&& a)
284 : {
285 : using A = std::decay_t<Awaitable>;
286 : if constexpr (IoAwaitable<A>)
287 : {
288 : return transform_awaiter<Awaitable>{
289 290 : std::forward<Awaitable>(a), this};
290 : }
291 : else
292 : {
293 : static_assert(sizeof(A) == 0, "requires IoAwaitable");
294 : }
295 145 : }
296 : };
297 :
298 : std::coroutine_handle<promise_type> h_;
299 :
300 145 : explicit when_all_runner(std::coroutine_handle<promise_type> h) noexcept
301 145 : : h_(h)
302 : {
303 145 : }
304 :
305 : // Enable move for all clang versions - some versions need it
306 : when_all_runner(when_all_runner&& other) noexcept
307 : : h_(std::exchange(other.h_, nullptr))
308 : {
309 : }
310 :
311 : when_all_runner(when_all_runner const&) = delete;
312 : when_all_runner& operator=(when_all_runner const&) = delete;
313 : when_all_runner& operator=(when_all_runner&&) = delete;
314 :
315 145 : auto release() noexcept
316 : {
317 145 : return std::exchange(h_, nullptr);
318 : }
319 : };
320 :
321 : /** Create an io_result-aware runner for a single awaitable (range path).
322 :
323 : Checks the error code, records errors and requests stop on failure,
324 : or extracts the payload on success.
325 : */
326 : template<IoAwaitable Awaitable, typename StateType>
327 : when_all_runner<StateType>
328 32 : make_when_all_homogeneous_runner(Awaitable inner, StateType* state, std::size_t index)
329 : {
330 : auto result = co_await std::move(inner);
331 :
332 : if(result.ec)
333 : {
334 : state->record_error(result.ec);
335 : state->core_.stop_source_.request_stop();
336 : }
337 : else
338 : {
339 : using PayloadT = io_result_payload_t<
340 : awaitable_result_t<Awaitable>>;
341 : if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
342 : {
343 : state->set_result(index,
344 : extract_io_payload(std::move(result)));
345 : }
346 : }
347 64 : }
348 :
349 : /** Create a runner for io_result children that requests stop on ec. */
350 : template<std::size_t Index, IoAwaitable Awaitable, typename... Ts>
351 : when_all_runner<when_all_state<Ts...>>
352 97 : make_when_all_io_runner(Awaitable inner, when_all_state<Ts...>* state)
353 : {
354 : auto result = co_await std::move(inner);
355 : auto ec = result.ec;
356 : std::get<Index>(state->results_).set(std::move(result));
357 :
358 : if(ec)
359 : {
360 : state->record_error(ec);
361 : state->core_.stop_source_.request_stop();
362 : }
363 194 : }
364 :
365 : /** Launcher that uses io_result-aware runners. */
366 : template<IoAwaitable... Awaitables>
367 : class when_all_io_launcher
368 : {
369 : using state_type = when_all_state<awaitable_result_t<Awaitables>...>;
370 :
371 : std::tuple<Awaitables...>* awaitables_;
372 : state_type* state_;
373 :
374 : public:
375 50 : when_all_io_launcher(
376 : std::tuple<Awaitables...>* awaitables,
377 : state_type* state)
378 50 : : awaitables_(awaitables)
379 50 : , state_(state)
380 : {
381 50 : }
382 :
383 50 : bool await_ready() const noexcept
384 : {
385 50 : return sizeof...(Awaitables) == 0;
386 : }
387 :
388 50 : std::coroutine_handle<> await_suspend(
389 : std::coroutine_handle<> continuation, io_env const* caller_env)
390 : {
391 50 : state_->core_.continuation_.h = continuation;
392 50 : state_->core_.caller_env_ = caller_env;
393 :
394 50 : if(caller_env->stop_token.stop_possible())
395 : {
396 2 : state_->core_.parent_stop_callback_.emplace(
397 1 : caller_env->stop_token,
398 1 : when_all_core::stop_callback_fn{&state_->core_.stop_source_});
399 :
400 1 : if(caller_env->stop_token.stop_requested())
401 MIS 0 : state_->core_.stop_source_.request_stop();
402 : }
403 :
404 HIT 50 : auto token = state_->core_.stop_source_.get_token();
405 46 : [&]<std::size_t... Is>(std::index_sequence<Is...>) {
406 50 : (..., launch_one<Is>(caller_env->executor, token));
407 50 : }(std::index_sequence_for<Awaitables...>{});
408 :
409 100 : return std::noop_coroutine();
410 50 : }
411 :
412 50 : void await_resume() const noexcept {}
413 :
414 : private:
415 : template<std::size_t I>
416 97 : void launch_one(executor_ref caller_ex, std::stop_token token)
417 : {
418 97 : auto runner = make_when_all_io_runner<I>(
419 97 : std::move(std::get<I>(*awaitables_)), state_);
420 :
421 97 : auto h = runner.release();
422 97 : h.promise().state_ = state_;
423 97 : h.promise().env_ = io_env{caller_ex, token,
424 97 : state_->core_.caller_env_->frame_allocator};
425 :
426 97 : state_->runner_handles_[I].h = std::coroutine_handle<>{h};
427 97 : state_->core_.caller_env_->executor.post(state_->runner_handles_[I]);
428 194 : }
429 : };
430 :
431 : /** Helper to extract a single result from state.
432 : This is a separate function to work around a GCC-11 ICE that occurs
433 : when using nested immediately-invoked lambdas with pack expansion.
434 : */
435 : template<std::size_t I, typename... Ts>
436 69 : auto extract_single_result(when_all_state<Ts...>& state)
437 : {
438 69 : return std::move(std::get<I>(state.results_)).get();
439 : }
440 :
441 : /** Extract all results from state as a tuple.
442 : */
443 : template<typename... Ts>
444 36 : auto extract_results(when_all_state<Ts...>& state)
445 : {
446 55 : return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
447 36 : return std::tuple(extract_single_result<Is>(state)...);
448 72 : }(std::index_sequence_for<Ts...>{});
449 : }
450 :
451 : /** Launches all homogeneous runners concurrently.
452 :
453 : Two-phase approach: create all runners first, then post all.
454 : This avoids lifetime issues if a task completes synchronously.
455 : */
456 : template<typename Range>
457 : class when_all_homogeneous_launcher
458 : {
459 : using Awaitable = std::ranges::range_value_t<Range>;
460 : using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
461 :
462 : Range* range_;
463 : when_all_homogeneous_state<PayloadT>* state_;
464 :
465 : public:
466 14 : when_all_homogeneous_launcher(
467 : Range* range,
468 : when_all_homogeneous_state<PayloadT>* state)
469 14 : : range_(range)
470 14 : , state_(state)
471 : {
472 14 : }
473 :
474 14 : bool await_ready() const noexcept
475 : {
476 14 : return std::ranges::empty(*range_);
477 : }
478 :
479 14 : std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
480 : {
481 14 : state_->core_.continuation_.h = continuation;
482 14 : state_->core_.caller_env_ = caller_env;
483 :
484 14 : if(caller_env->stop_token.stop_possible())
485 : {
486 2 : state_->core_.parent_stop_callback_.emplace(
487 1 : caller_env->stop_token,
488 1 : when_all_core::stop_callback_fn{&state_->core_.stop_source_});
489 :
490 1 : if(caller_env->stop_token.stop_requested())
491 MIS 0 : state_->core_.stop_source_.request_stop();
492 : }
493 :
494 HIT 14 : auto token = state_->core_.stop_source_.get_token();
495 :
496 : // Phase 1: Create all runners without dispatching.
497 14 : std::size_t index = 0;
498 46 : for(auto&& a : *range_)
499 : {
500 32 : auto runner = make_when_all_homogeneous_runner(
501 32 : std::move(a), state_, index);
502 :
503 32 : auto h = runner.release();
504 32 : h.promise().state_ = state_;
505 32 : h.promise().index_ = index;
506 32 : h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
507 :
508 32 : state_->runner_handles_[index].h = std::coroutine_handle<>{h};
509 32 : ++index;
510 : }
511 :
512 : // Phase 2: Post all runners. Any may complete synchronously.
513 : // After last post, state_ and this may be destroyed.
514 14 : auto* handles = state_->runner_handles_.get();
515 14 : std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed);
516 46 : for(std::size_t i = 0; i < count; ++i)
517 32 : caller_env->executor.post(handles[i]);
518 :
519 28 : return std::noop_coroutine();
520 46 : }
521 :
522 14 : void await_resume() const noexcept
523 : {
524 14 : }
525 : };
526 :
527 : } // namespace detail
528 :
529 : /** Execute a range of io_result-returning awaitables concurrently.
530 :
531 : Launches all awaitables simultaneously and waits for all to complete.
532 : On success, extracted payloads are collected in a vector preserving
533 : input order. The first error_code cancels siblings and is propagated
534 : in the outer io_result. Exceptions always beat error codes.
535 :
536 : @li All child awaitables run concurrently on the caller's executor
537 : @li Payloads are returned as a vector in input order
538 : @li First error_code wins and cancels siblings
539 : @li Exception always beats error_code
540 : @li Completes only after all children have finished
541 :
542 : @par Thread Safety
543 : The returned task must be awaited from a single execution context.
544 : Child awaitables execute concurrently but complete through the caller's
545 : executor.
546 :
547 : @param awaitables Range of io_result-returning awaitables to execute
548 : concurrently (must not be empty).
549 :
550 : @return A task yielding io_result<vector<PayloadT>> where PayloadT
551 : is the payload extracted from each child's io_result.
552 :
553 : @throws std::invalid_argument if range is empty (thrown before
554 : coroutine suspends).
555 : @throws Rethrows the first child exception after all children
556 : complete (exception beats error_code).
557 :
558 : @par Example
559 : @code
560 : task<void> example()
561 : {
562 : std::vector<io_task<size_t>> reads;
563 : for (auto& buf : buffers)
564 : reads.push_back(stream.read_some(buf));
565 :
566 : auto [ec, counts] = co_await when_all(std::move(reads));
567 : if (ec) { // handle error
568 : }
569 : }
570 : @endcode
571 :
572 : @see IoAwaitableRange, when_all
573 : */
574 : template<IoAwaitableRange R>
575 : requires detail::is_io_result_v<
576 : awaitable_result_t<std::ranges::range_value_t<R>>>
577 : && (!std::is_same_v<
578 : detail::io_result_payload_t<
579 : awaitable_result_t<std::ranges::range_value_t<R>>>,
580 : std::tuple<>>)
581 12 : [[nodiscard]] auto when_all(R&& awaitables)
582 : -> task<io_result<std::vector<
583 : detail::io_result_payload_t<
584 : awaitable_result_t<std::ranges::range_value_t<R>>>>>>
585 : {
586 : using Awaitable = std::ranges::range_value_t<R>;
587 : using PayloadT = detail::io_result_payload_t<
588 : awaitable_result_t<Awaitable>>;
589 : using OwnedRange = std::remove_cvref_t<R>;
590 :
591 : auto count = std::ranges::size(awaitables);
592 : if(count == 0)
593 : throw std::invalid_argument("when_all requires at least one awaitable");
594 :
595 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
596 :
597 : detail::when_all_homogeneous_state<PayloadT> state(count);
598 :
599 : co_await detail::when_all_homogeneous_launcher<OwnedRange>(
600 : &owned_awaitables, &state);
601 :
602 : if(state.core_.first_exception_)
603 : std::rethrow_exception(state.core_.first_exception_);
604 :
605 : if(state.has_error_.load(std::memory_order_relaxed))
606 : co_return io_result<std::vector<PayloadT>>{state.first_error_, {}};
607 :
608 : std::vector<PayloadT> results;
609 : results.reserve(count);
610 : for(auto& opt : state.results_)
611 : results.push_back(std::move(*opt));
612 :
613 : co_return io_result<std::vector<PayloadT>>{{}, std::move(results)};
614 24 : }
615 :
616 : /** Execute a range of void io_result-returning awaitables concurrently.
617 :
618 : Launches all awaitables simultaneously and waits for all to complete.
619 : Since all awaitables return io_result<>, no payload values are
620 : collected. The first error_code cancels siblings and is propagated.
621 : Exceptions always beat error codes.
622 :
623 : @param awaitables Range of io_result<>-returning awaitables to
624 : execute concurrently (must not be empty).
625 :
626 : @return A task yielding io_result<> whose ec is the first child
627 : error, or default-constructed on success.
628 :
629 : @throws std::invalid_argument if range is empty.
630 : @throws Rethrows the first child exception after all children
631 : complete (exception beats error_code).
632 :
633 : @par Example
634 : @code
635 : task<void> example()
636 : {
637 : std::vector<io_task<>> jobs;
638 : for (int i = 0; i < n; ++i)
639 : jobs.push_back(process(i));
640 :
641 : auto [ec] = co_await when_all(std::move(jobs));
642 : }
643 : @endcode
644 :
645 : @see IoAwaitableRange, when_all
646 : */
647 : template<IoAwaitableRange R>
648 : requires detail::is_io_result_v<
649 : awaitable_result_t<std::ranges::range_value_t<R>>>
650 : && std::is_same_v<
651 : detail::io_result_payload_t<
652 : awaitable_result_t<std::ranges::range_value_t<R>>>,
653 : std::tuple<>>
654 4 : [[nodiscard]] auto when_all(R&& awaitables) -> task<io_result<>>
655 : {
656 : using OwnedRange = std::remove_cvref_t<R>;
657 :
658 : auto count = std::ranges::size(awaitables);
659 : if(count == 0)
660 : throw std::invalid_argument("when_all requires at least one awaitable");
661 :
662 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
663 :
664 : detail::when_all_homogeneous_state<std::tuple<>> state(count);
665 :
666 : co_await detail::when_all_homogeneous_launcher<OwnedRange>(
667 : &owned_awaitables, &state);
668 :
669 : if(state.core_.first_exception_)
670 : std::rethrow_exception(state.core_.first_exception_);
671 :
672 : if(state.has_error_.load(std::memory_order_relaxed))
673 : co_return io_result<>{state.first_error_};
674 :
675 : co_return io_result<>{};
676 8 : }
677 :
678 : /** Execute io_result-returning awaitables concurrently, inspecting error codes.
679 :
680 : Overload selected when all children return io_result<Ts...>.
681 : The error_code is lifted out of each child into a single outer
682 : io_result. On success all values are returned; on failure the
683 : first error_code wins.
684 :
685 : @par Exception Safety
686 : Exception always beats error_code. If any child throws, the
687 : exception is rethrown regardless of error_code results.
688 :
689 : @param awaitables One or more awaitables each returning
690 : io_result<Ts...>.
691 :
692 : @return A task yielding io_result<R1, R2, ..., Rn> where each Ri
693 : follows the payload flattening rules.
694 : */
695 : template<IoAwaitable... As>
696 : requires (sizeof...(As) > 0)
697 : && detail::all_io_result_awaitables<As...>
698 50 : [[nodiscard]] auto when_all(As... awaitables)
699 : -> task<io_result<
700 : detail::io_result_payload_t<awaitable_result_t<As>>...>>
701 : {
702 : using result_type = io_result<
703 : detail::io_result_payload_t<awaitable_result_t<As>>...>;
704 :
705 : detail::when_all_state<awaitable_result_t<As>...> state;
706 : std::tuple<As...> awaitable_tuple(std::move(awaitables)...);
707 :
708 : co_await detail::when_all_io_launcher<As...>(&awaitable_tuple, &state);
709 :
710 : // Exception always wins over error_code
711 : if(state.core_.first_exception_)
712 : std::rethrow_exception(state.core_.first_exception_);
713 :
714 : auto r = detail::build_when_all_io_result<result_type>(
715 : detail::extract_results(state));
716 : if(state.has_error_.load(std::memory_order_relaxed))
717 : r.ec = state.first_error_;
718 : co_return r;
719 100 : }
720 :
721 : } // namespace capy
722 : } // namespace boost
723 :
724 : #endif
|