1  
//
1  
//
2  
// Copyright (c) 2026 Steve Gerbino
2  
// Copyright (c) 2026 Steve Gerbino
3  
//
3  
//
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
4  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
//
6  
//
7  
// Official repository: https://github.com/cppalliance/capy
7  
// Official repository: https://github.com/cppalliance/capy
8  
//
8  
//
9  

9  

10  
#ifndef BOOST_CAPY_WHEN_ALL_HPP
10  
#ifndef BOOST_CAPY_WHEN_ALL_HPP
11  
#define BOOST_CAPY_WHEN_ALL_HPP
11  
#define BOOST_CAPY_WHEN_ALL_HPP
12  

12  

13  
#include <boost/capy/detail/config.hpp>
13  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/io_result_combinators.hpp>
14  
#include <boost/capy/detail/io_result_combinators.hpp>
15  
#include <boost/capy/continuation.hpp>
15  
#include <boost/capy/continuation.hpp>
16  
#include <boost/capy/concept/executor.hpp>
16  
#include <boost/capy/concept/executor.hpp>
17  
#include <boost/capy/concept/io_awaitable.hpp>
17  
#include <boost/capy/concept/io_awaitable.hpp>
18  
#include <coroutine>
18  
#include <coroutine>
19  
#include <boost/capy/ex/io_env.hpp>
19  
#include <boost/capy/ex/io_env.hpp>
20  
#include <boost/capy/ex/frame_allocator.hpp>
20  
#include <boost/capy/ex/frame_allocator.hpp>
21  
#include <boost/capy/task.hpp>
21  
#include <boost/capy/task.hpp>
22  

22  

23  
#include <array>
23  
#include <array>
24  
#include <atomic>
24  
#include <atomic>
25  
#include <exception>
25  
#include <exception>
26  
#include <memory>
26  
#include <memory>
27  
#include <optional>
27  
#include <optional>
28  
#include <ranges>
28  
#include <ranges>
29  
#include <stdexcept>
29  
#include <stdexcept>
30  
#include <stop_token>
30  
#include <stop_token>
31  
#include <tuple>
31  
#include <tuple>
32  
#include <type_traits>
32  
#include <type_traits>
33  
#include <utility>
33  
#include <utility>
34  
#include <vector>
34  
#include <vector>
35  

35  

36  
namespace boost {
36  
namespace boost {
37  
namespace capy {
37  
namespace capy {
38  

38  

39  
namespace detail {
39  
namespace detail {
40  

40  

41  
/** Holds the result of a single task within when_all.
41  
/** Holds the result of a single task within when_all.
42  
*/
42  
*/
43  
template<typename T>
43  
template<typename T>
44  
struct result_holder
44  
struct result_holder
45  
{
45  
{
46  
    std::optional<T> value_;
46  
    std::optional<T> value_;
47  

47  

48  
    void set(T v)
48  
    void set(T v)
49  
    {
49  
    {
50  
        value_ = std::move(v);
50  
        value_ = std::move(v);
51  
    }
51  
    }
52  

52  

53  
    T get() &&
53  
    T get() &&
54  
    {
54  
    {
55  
        return std::move(*value_);
55  
        return std::move(*value_);
56  
    }
56  
    }
57  
};
57  
};
58  

58  

59  
/** Core shared state for when_all operations.
59  
/** Core shared state for when_all operations.
60  

60  

61  
    Contains all members and methods common to both heterogeneous (variadic)
61  
    Contains all members and methods common to both heterogeneous (variadic)
62  
    and homogeneous (range) when_all implementations. State classes embed
62  
    and homogeneous (range) when_all implementations. State classes embed
63  
    this via composition to avoid CRTP destructor ordering issues.
63  
    this via composition to avoid CRTP destructor ordering issues.
64  

64  

65  
    @par Thread Safety
65  
    @par Thread Safety
66  
    Atomic operations protect exception capture and completion count.
66  
    Atomic operations protect exception capture and completion count.
67  
*/
67  
*/
68  
struct when_all_core
68  
struct when_all_core
69  
{
69  
{
70  
    std::atomic<std::size_t> remaining_count_;
70  
    std::atomic<std::size_t> remaining_count_;
71  

71  

72  
    // Exception storage - first error wins, others discarded
72  
    // Exception storage - first error wins, others discarded
73  
    std::atomic<bool> has_exception_{false};
73  
    std::atomic<bool> has_exception_{false};
74  
    std::exception_ptr first_exception_;
74  
    std::exception_ptr first_exception_;
75  

75  

76  
    std::stop_source stop_source_;
76  
    std::stop_source stop_source_;
77  

77  

78  
    // Bridges parent's stop token to our stop_source
78  
    // Bridges parent's stop token to our stop_source
79  
    struct stop_callback_fn
79  
    struct stop_callback_fn
80  
    {
80  
    {
81  
        std::stop_source* source_;
81  
        std::stop_source* source_;
82  
        void operator()() const { source_->request_stop(); }
82  
        void operator()() const { source_->request_stop(); }
83  
    };
83  
    };
84  
    using stop_callback_t = std::stop_callback<stop_callback_fn>;
84  
    using stop_callback_t = std::stop_callback<stop_callback_fn>;
85  
    std::optional<stop_callback_t> parent_stop_callback_;
85  
    std::optional<stop_callback_t> parent_stop_callback_;
86  

86  

87  
    continuation continuation_;
87  
    continuation continuation_;
88  
    io_env const* caller_env_ = nullptr;
88  
    io_env const* caller_env_ = nullptr;
89  

89  

90  
    explicit when_all_core(std::size_t count) noexcept
90  
    explicit when_all_core(std::size_t count) noexcept
91  
        : remaining_count_(count)
91  
        : remaining_count_(count)
92  
    {
92  
    {
93  
    }
93  
    }
94  

94  

95  
    /** Capture an exception (first one wins). */
95  
    /** Capture an exception (first one wins). */
96  
    void capture_exception(std::exception_ptr ep)
96  
    void capture_exception(std::exception_ptr ep)
97  
    {
97  
    {
98  
        bool expected = false;
98  
        bool expected = false;
99  
        if(has_exception_.compare_exchange_strong(
99  
        if(has_exception_.compare_exchange_strong(
100  
            expected, true, std::memory_order_relaxed))
100  
            expected, true, std::memory_order_relaxed))
101  
            first_exception_ = ep;
101  
            first_exception_ = ep;
102  
    }
102  
    }
103  
};
103  
};
104  

104  

105  
/** Shared state for heterogeneous when_all (variadic overload).
105  
/** Shared state for heterogeneous when_all (variadic overload).
106  

106  

107  
    @tparam Ts The result types of the tasks.
107  
    @tparam Ts The result types of the tasks.
108  
*/
108  
*/
109  
template<typename... Ts>
109  
template<typename... Ts>
110  
struct when_all_state
110  
struct when_all_state
111  
{
111  
{
112  
    static constexpr std::size_t task_count = sizeof...(Ts);
112  
    static constexpr std::size_t task_count = sizeof...(Ts);
113  

113  

114  
    when_all_core core_;
114  
    when_all_core core_;
115  
    std::tuple<result_holder<Ts>...> results_;
115  
    std::tuple<result_holder<Ts>...> results_;
116  
    std::array<continuation, task_count> runner_handles_{};
116  
    std::array<continuation, task_count> runner_handles_{};
117  

117  

118  
    std::atomic<bool> has_error_{false};
118  
    std::atomic<bool> has_error_{false};
119  
    std::error_code first_error_;
119  
    std::error_code first_error_;
120  

120  

121  
    when_all_state()
121  
    when_all_state()
122  
        : core_(task_count)
122  
        : core_(task_count)
123  
    {
123  
    {
124  
    }
124  
    }
125  

125  

126  
    /** Record the first error (subsequent errors are discarded). */
126  
    /** Record the first error (subsequent errors are discarded). */
127  
    void record_error(std::error_code ec)
127  
    void record_error(std::error_code ec)
128  
    {
128  
    {
129  
        bool expected = false;
129  
        bool expected = false;
130  
        if(has_error_.compare_exchange_strong(
130  
        if(has_error_.compare_exchange_strong(
131  
            expected, true, std::memory_order_relaxed))
131  
            expected, true, std::memory_order_relaxed))
132  
            first_error_ = ec;
132  
            first_error_ = ec;
133  
    }
133  
    }
134  
};
134  
};
135  

135  

136  
/** Shared state for homogeneous when_all (range overload).
136  
/** Shared state for homogeneous when_all (range overload).
137  

137  

138  
    Stores extracted io_result payloads in a vector indexed by task
138  
    Stores extracted io_result payloads in a vector indexed by task
139  
    position. Tracks the first error_code for error propagation.
139  
    position. Tracks the first error_code for error propagation.
140  

140  

141  
    @tparam T The payload type extracted from io_result.
141  
    @tparam T The payload type extracted from io_result.
142  
*/
142  
*/
143  
template<typename T>
143  
template<typename T>
144  
struct when_all_homogeneous_state
144  
struct when_all_homogeneous_state
145  
{
145  
{
146  
    when_all_core core_;
146  
    when_all_core core_;
147  
    std::vector<std::optional<T>> results_;
147  
    std::vector<std::optional<T>> results_;
148  
    std::unique_ptr<continuation[]> runner_handles_;
148  
    std::unique_ptr<continuation[]> runner_handles_;
149  

149  

150  
    std::atomic<bool> has_error_{false};
150  
    std::atomic<bool> has_error_{false};
151  
    std::error_code first_error_;
151  
    std::error_code first_error_;
152  

152  

153  
    explicit when_all_homogeneous_state(std::size_t count)
153  
    explicit when_all_homogeneous_state(std::size_t count)
154  
        : core_(count)
154  
        : core_(count)
155  
        , results_(count)
155  
        , results_(count)
156  
        , runner_handles_(std::make_unique<continuation[]>(count))
156  
        , runner_handles_(std::make_unique<continuation[]>(count))
157  
    {
157  
    {
158  
    }
158  
    }
159  

159  

160  
    void set_result(std::size_t index, T value)
160  
    void set_result(std::size_t index, T value)
161  
    {
161  
    {
162  
        results_[index].emplace(std::move(value));
162  
        results_[index].emplace(std::move(value));
163  
    }
163  
    }
164  

164  

165  
    /** Record the first error (subsequent errors are discarded). */
165  
    /** Record the first error (subsequent errors are discarded). */
166  
    void record_error(std::error_code ec)
166  
    void record_error(std::error_code ec)
167  
    {
167  
    {
168  
        bool expected = false;
168  
        bool expected = false;
169  
        if(has_error_.compare_exchange_strong(
169  
        if(has_error_.compare_exchange_strong(
170  
            expected, true, std::memory_order_relaxed))
170  
            expected, true, std::memory_order_relaxed))
171  
            first_error_ = ec;
171  
            first_error_ = ec;
172  
    }
172  
    }
173  
};
173  
};
174  

174  

175  
/** Specialization for void io_result children (no payload storage). */
175  
/** Specialization for void io_result children (no payload storage). */
176  
template<>
176  
template<>
177  
struct when_all_homogeneous_state<std::tuple<>>
177  
struct when_all_homogeneous_state<std::tuple<>>
178  
{
178  
{
179  
    when_all_core core_;
179  
    when_all_core core_;
180  
    std::unique_ptr<continuation[]> runner_handles_;
180  
    std::unique_ptr<continuation[]> runner_handles_;
181  

181  

182  
    std::atomic<bool> has_error_{false};
182  
    std::atomic<bool> has_error_{false};
183  
    std::error_code first_error_;
183  
    std::error_code first_error_;
184  

184  

185  
    explicit when_all_homogeneous_state(std::size_t count)
185  
    explicit when_all_homogeneous_state(std::size_t count)
186  
        : core_(count)
186  
        : core_(count)
187  
        , runner_handles_(std::make_unique<continuation[]>(count))
187  
        , runner_handles_(std::make_unique<continuation[]>(count))
188  
    {
188  
    {
189  
    }
189  
    }
190  

190  

191  
    /** Record the first error (subsequent errors are discarded). */
191  
    /** Record the first error (subsequent errors are discarded). */
192  
    void record_error(std::error_code ec)
192  
    void record_error(std::error_code ec)
193  
    {
193  
    {
194  
        bool expected = false;
194  
        bool expected = false;
195  
        if(has_error_.compare_exchange_strong(
195  
        if(has_error_.compare_exchange_strong(
196  
            expected, true, std::memory_order_relaxed))
196  
            expected, true, std::memory_order_relaxed))
197  
            first_error_ = ec;
197  
            first_error_ = ec;
198  
    }
198  
    }
199  
};
199  
};
200  

200  

201  
/** Wrapper coroutine that intercepts task completion for when_all.
201  
/** Wrapper coroutine that intercepts task completion for when_all.
202  

202  

203  
    Parameterized on StateType to work with both heterogeneous (variadic)
203  
    Parameterized on StateType to work with both heterogeneous (variadic)
204  
    and homogeneous (range) state types. All state types expose their
204  
    and homogeneous (range) state types. All state types expose their
205  
    shared members through a `core_` member of type when_all_core.
205  
    shared members through a `core_` member of type when_all_core.
206  

206  

207  
    @tparam StateType The state type (when_all_state or when_all_homogeneous_state).
207  
    @tparam StateType The state type (when_all_state or when_all_homogeneous_state).
208  
*/
208  
*/
209  
template<typename StateType>
209  
template<typename StateType>
210  
struct when_all_runner
210  
struct when_all_runner
211  
{
211  
{
212  
    struct promise_type
212  
    struct promise_type
213  
    {
213  
    {
214  
        StateType* state_ = nullptr;
214  
        StateType* state_ = nullptr;
215  
        std::size_t index_ = 0;
215  
        std::size_t index_ = 0;
216  
        io_env env_;
216  
        io_env env_;
217  

217  

218  
        when_all_runner get_return_object() noexcept
218  
        when_all_runner get_return_object() noexcept
219  
        {
219  
        {
220  
            return when_all_runner(
220  
            return when_all_runner(
221  
                std::coroutine_handle<promise_type>::from_promise(*this));
221  
                std::coroutine_handle<promise_type>::from_promise(*this));
222  
        }
222  
        }
223  

223  

224  
        std::suspend_always initial_suspend() noexcept
224  
        std::suspend_always initial_suspend() noexcept
225  
        {
225  
        {
226  
            return {};
226  
            return {};
227  
        }
227  
        }
228  

228  

229  
        auto final_suspend() noexcept
229  
        auto final_suspend() noexcept
230  
        {
230  
        {
231  
            struct awaiter
231  
            struct awaiter
232  
            {
232  
            {
233  
                promise_type* p_;
233  
                promise_type* p_;
234  
                bool await_ready() const noexcept { return false; }
234  
                bool await_ready() const noexcept { return false; }
235  
                auto await_suspend(std::coroutine_handle<> h) noexcept
235  
                auto await_suspend(std::coroutine_handle<> h) noexcept
236  
                {
236  
                {
237  
                    auto& core = p_->state_->core_;
237  
                    auto& core = p_->state_->core_;
238  
                    auto* counter = &core.remaining_count_;
238  
                    auto* counter = &core.remaining_count_;
239  
                    auto* caller_env = core.caller_env_;
239  
                    auto* caller_env = core.caller_env_;
240  
                    auto& cont = core.continuation_;
240  
                    auto& cont = core.continuation_;
241  

241  

242  
                    h.destroy();
242  
                    h.destroy();
243  

243  

244  
                    auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
244  
                    auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
245  
                    if(remaining == 1)
245  
                    if(remaining == 1)
246  
                        return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
246  
                        return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
247  
                    return detail::symmetric_transfer(std::noop_coroutine());
247  
                    return detail::symmetric_transfer(std::noop_coroutine());
248  
                }
248  
                }
249  
                void await_resume() const noexcept {}
249  
                void await_resume() const noexcept {}
250  
            };
250  
            };
251  
            return awaiter{this};
251  
            return awaiter{this};
252  
        }
252  
        }
253  

253  

254  
        void return_void() noexcept {}
254  
        void return_void() noexcept {}
255  

255  

256  
        void unhandled_exception()
256  
        void unhandled_exception()
257  
        {
257  
        {
258  
            state_->core_.capture_exception(std::current_exception());
258  
            state_->core_.capture_exception(std::current_exception());
259  
            state_->core_.stop_source_.request_stop();
259  
            state_->core_.stop_source_.request_stop();
260  
        }
260  
        }
261  

261  

262  
        template<class Awaitable>
262  
        template<class Awaitable>
263  
        struct transform_awaiter
263  
        struct transform_awaiter
264  
        {
264  
        {
265  
            std::decay_t<Awaitable> a_;
265  
            std::decay_t<Awaitable> a_;
266  
            promise_type* p_;
266  
            promise_type* p_;
267  

267  

268  
            bool await_ready() { return a_.await_ready(); }
268  
            bool await_ready() { return a_.await_ready(); }
269  
            decltype(auto) await_resume() { return a_.await_resume(); }
269  
            decltype(auto) await_resume() { return a_.await_resume(); }
270  

270  

271  
            template<class Promise>
271  
            template<class Promise>
272  
            auto await_suspend(std::coroutine_handle<Promise> h)
272  
            auto await_suspend(std::coroutine_handle<Promise> h)
273  
            {
273  
            {
274  
                using R = decltype(a_.await_suspend(h, &p_->env_));
274  
                using R = decltype(a_.await_suspend(h, &p_->env_));
275  
                if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
275  
                if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
276  
                    return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
276  
                    return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
277  
                else
277  
                else
278  
                    return a_.await_suspend(h, &p_->env_);
278  
                    return a_.await_suspend(h, &p_->env_);
279  
            }
279  
            }
280  
        };
280  
        };
281  

281  

282  
        template<class Awaitable>
282  
        template<class Awaitable>
283  
        auto await_transform(Awaitable&& a)
283  
        auto await_transform(Awaitable&& a)
284  
        {
284  
        {
285  
            using A = std::decay_t<Awaitable>;
285  
            using A = std::decay_t<Awaitable>;
286  
            if constexpr (IoAwaitable<A>)
286  
            if constexpr (IoAwaitable<A>)
287  
            {
287  
            {
288  
                return transform_awaiter<Awaitable>{
288  
                return transform_awaiter<Awaitable>{
289  
                    std::forward<Awaitable>(a), this};
289  
                    std::forward<Awaitable>(a), this};
290  
            }
290  
            }
291  
            else
291  
            else
292  
            {
292  
            {
293  
                static_assert(sizeof(A) == 0, "requires IoAwaitable");
293  
                static_assert(sizeof(A) == 0, "requires IoAwaitable");
294  
            }
294  
            }
295  
        }
295  
        }
296  
    };
296  
    };
297  

297  

298  
    std::coroutine_handle<promise_type> h_;
298  
    std::coroutine_handle<promise_type> h_;
299  

299  

300  
    explicit when_all_runner(std::coroutine_handle<promise_type> h) noexcept
300  
    explicit when_all_runner(std::coroutine_handle<promise_type> h) noexcept
301  
        : h_(h)
301  
        : h_(h)
302  
    {
302  
    {
303  
    }
303  
    }
304  

304  

305  
    // Enable move for all clang versions - some versions need it
305  
    // Enable move for all clang versions - some versions need it
306  
    when_all_runner(when_all_runner&& other) noexcept
306  
    when_all_runner(when_all_runner&& other) noexcept
307  
        : h_(std::exchange(other.h_, nullptr))
307  
        : h_(std::exchange(other.h_, nullptr))
308  
    {
308  
    {
309  
    }
309  
    }
310  

310  

311  
    when_all_runner(when_all_runner const&) = delete;
311  
    when_all_runner(when_all_runner const&) = delete;
312  
    when_all_runner& operator=(when_all_runner const&) = delete;
312  
    when_all_runner& operator=(when_all_runner const&) = delete;
313  
    when_all_runner& operator=(when_all_runner&&) = delete;
313  
    when_all_runner& operator=(when_all_runner&&) = delete;
314  

314  

315  
    auto release() noexcept
315  
    auto release() noexcept
316  
    {
316  
    {
317  
        return std::exchange(h_, nullptr);
317  
        return std::exchange(h_, nullptr);
318  
    }
318  
    }
319  
};
319  
};
320  

320  

321  
/** Create an io_result-aware runner for a single awaitable (range path).
321  
/** Create an io_result-aware runner for a single awaitable (range path).
322  

322  

323  
    Checks the error code, records errors and requests stop on failure,
323  
    Checks the error code, records errors and requests stop on failure,
324  
    or extracts the payload on success.
324  
    or extracts the payload on success.
325  
*/
325  
*/
326  
template<IoAwaitable Awaitable, typename StateType>
326  
template<IoAwaitable Awaitable, typename StateType>
327  
when_all_runner<StateType>
327  
when_all_runner<StateType>
328  
make_when_all_homogeneous_runner(Awaitable inner, StateType* state, std::size_t index)
328  
make_when_all_homogeneous_runner(Awaitable inner, StateType* state, std::size_t index)
329  
{
329  
{
330  
    auto result = co_await std::move(inner);
330  
    auto result = co_await std::move(inner);
331  

331  

332  
    if(result.ec)
332  
    if(result.ec)
333  
    {
333  
    {
334  
        state->record_error(result.ec);
334  
        state->record_error(result.ec);
335  
        state->core_.stop_source_.request_stop();
335  
        state->core_.stop_source_.request_stop();
336  
    }
336  
    }
337  
    else
337  
    else
338  
    {
338  
    {
339  
        using PayloadT = io_result_payload_t<
339  
        using PayloadT = io_result_payload_t<
340  
            awaitable_result_t<Awaitable>>;
340  
            awaitable_result_t<Awaitable>>;
341  
        if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
341  
        if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
342  
        {
342  
        {
343  
            state->set_result(index,
343  
            state->set_result(index,
344  
                extract_io_payload(std::move(result)));
344  
                extract_io_payload(std::move(result)));
345  
        }
345  
        }
346  
    }
346  
    }
347  
}
347  
}
348  

348  

349  
/** Create a runner for io_result children that requests stop on ec. */
349  
/** Create a runner for io_result children that requests stop on ec. */
350  
template<std::size_t Index, IoAwaitable Awaitable, typename... Ts>
350  
template<std::size_t Index, IoAwaitable Awaitable, typename... Ts>
351  
when_all_runner<when_all_state<Ts...>>
351  
when_all_runner<when_all_state<Ts...>>
352  
make_when_all_io_runner(Awaitable inner, when_all_state<Ts...>* state)
352  
make_when_all_io_runner(Awaitable inner, when_all_state<Ts...>* state)
353  
{
353  
{
354  
    auto result = co_await std::move(inner);
354  
    auto result = co_await std::move(inner);
355  
    auto ec = result.ec;
355  
    auto ec = result.ec;
356  
    std::get<Index>(state->results_).set(std::move(result));
356  
    std::get<Index>(state->results_).set(std::move(result));
357  

357  

358  
    if(ec)
358  
    if(ec)
359  
    {
359  
    {
360  
        state->record_error(ec);
360  
        state->record_error(ec);
361  
        state->core_.stop_source_.request_stop();
361  
        state->core_.stop_source_.request_stop();
362  
    }
362  
    }
363  
}
363  
}
364  

364  

365  
/** Launcher that uses io_result-aware runners. */
365  
/** Launcher that uses io_result-aware runners. */
366  
template<IoAwaitable... Awaitables>
366  
template<IoAwaitable... Awaitables>
367  
class when_all_io_launcher
367  
class when_all_io_launcher
368  
{
368  
{
369  
    using state_type = when_all_state<awaitable_result_t<Awaitables>...>;
369  
    using state_type = when_all_state<awaitable_result_t<Awaitables>...>;
370  

370  

371  
    std::tuple<Awaitables...>* awaitables_;
371  
    std::tuple<Awaitables...>* awaitables_;
372  
    state_type* state_;
372  
    state_type* state_;
373  

373  

374  
public:
374  
public:
375  
    when_all_io_launcher(
375  
    when_all_io_launcher(
376  
        std::tuple<Awaitables...>* awaitables,
376  
        std::tuple<Awaitables...>* awaitables,
377  
        state_type* state)
377  
        state_type* state)
378  
        : awaitables_(awaitables)
378  
        : awaitables_(awaitables)
379  
        , state_(state)
379  
        , state_(state)
380  
    {
380  
    {
381  
    }
381  
    }
382  

382  

383  
    bool await_ready() const noexcept
383  
    bool await_ready() const noexcept
384  
    {
384  
    {
385  
        return sizeof...(Awaitables) == 0;
385  
        return sizeof...(Awaitables) == 0;
386  
    }
386  
    }
387  

387  

388  
    std::coroutine_handle<> await_suspend(
388  
    std::coroutine_handle<> await_suspend(
389  
        std::coroutine_handle<> continuation, io_env const* caller_env)
389  
        std::coroutine_handle<> continuation, io_env const* caller_env)
390  
    {
390  
    {
391  
        state_->core_.continuation_.h = continuation;
391  
        state_->core_.continuation_.h = continuation;
392  
        state_->core_.caller_env_ = caller_env;
392  
        state_->core_.caller_env_ = caller_env;
393  

393  

394  
        if(caller_env->stop_token.stop_possible())
394  
        if(caller_env->stop_token.stop_possible())
395  
        {
395  
        {
396  
            state_->core_.parent_stop_callback_.emplace(
396  
            state_->core_.parent_stop_callback_.emplace(
397  
                caller_env->stop_token,
397  
                caller_env->stop_token,
398  
                when_all_core::stop_callback_fn{&state_->core_.stop_source_});
398  
                when_all_core::stop_callback_fn{&state_->core_.stop_source_});
399  

399  

400  
            if(caller_env->stop_token.stop_requested())
400  
            if(caller_env->stop_token.stop_requested())
401  
                state_->core_.stop_source_.request_stop();
401  
                state_->core_.stop_source_.request_stop();
402  
        }
402  
        }
403  

403  

404  
        auto token = state_->core_.stop_source_.get_token();
404  
        auto token = state_->core_.stop_source_.get_token();
405  
        [&]<std::size_t... Is>(std::index_sequence<Is...>) {
405  
        [&]<std::size_t... Is>(std::index_sequence<Is...>) {
406  
            (..., launch_one<Is>(caller_env->executor, token));
406  
            (..., launch_one<Is>(caller_env->executor, token));
407  
        }(std::index_sequence_for<Awaitables...>{});
407  
        }(std::index_sequence_for<Awaitables...>{});
408  

408  

409  
        return std::noop_coroutine();
409  
        return std::noop_coroutine();
410  
    }
410  
    }
411  

411  

412  
    void await_resume() const noexcept {}
412  
    void await_resume() const noexcept {}
413  

413  

414  
private:
414  
private:
415  
    template<std::size_t I>
415  
    template<std::size_t I>
416  
    void launch_one(executor_ref caller_ex, std::stop_token token)
416  
    void launch_one(executor_ref caller_ex, std::stop_token token)
417  
    {
417  
    {
418  
        auto runner = make_when_all_io_runner<I>(
418  
        auto runner = make_when_all_io_runner<I>(
419  
            std::move(std::get<I>(*awaitables_)), state_);
419  
            std::move(std::get<I>(*awaitables_)), state_);
420  

420  

421  
        auto h = runner.release();
421  
        auto h = runner.release();
422  
        h.promise().state_ = state_;
422  
        h.promise().state_ = state_;
423  
        h.promise().env_ = io_env{caller_ex, token,
423  
        h.promise().env_ = io_env{caller_ex, token,
424  
            state_->core_.caller_env_->frame_allocator};
424  
            state_->core_.caller_env_->frame_allocator};
425  

425  

426  
        state_->runner_handles_[I].h = std::coroutine_handle<>{h};
426  
        state_->runner_handles_[I].h = std::coroutine_handle<>{h};
427  
        state_->core_.caller_env_->executor.post(state_->runner_handles_[I]);
427  
        state_->core_.caller_env_->executor.post(state_->runner_handles_[I]);
428  
    }
428  
    }
429  
};
429  
};
430  

430  

431  
/** Helper to extract a single result from state.
431  
/** Helper to extract a single result from state.
432  
    This is a separate function to work around a GCC-11 ICE that occurs
432  
    This is a separate function to work around a GCC-11 ICE that occurs
433  
    when using nested immediately-invoked lambdas with pack expansion.
433  
    when using nested immediately-invoked lambdas with pack expansion.
434  
*/
434  
*/
435  
template<std::size_t I, typename... Ts>
435  
template<std::size_t I, typename... Ts>
436  
auto extract_single_result(when_all_state<Ts...>& state)
436  
auto extract_single_result(when_all_state<Ts...>& state)
437  
{
437  
{
438  
    return std::move(std::get<I>(state.results_)).get();
438  
    return std::move(std::get<I>(state.results_)).get();
439  
}
439  
}
440  

440  

441  
/** Extract all results from state as a tuple.
441  
/** Extract all results from state as a tuple.
442  
*/
442  
*/
443  
template<typename... Ts>
443  
template<typename... Ts>
444  
auto extract_results(when_all_state<Ts...>& state)
444  
auto extract_results(when_all_state<Ts...>& state)
445  
{
445  
{
446  
    return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
446  
    return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
447  
        return std::tuple(extract_single_result<Is>(state)...);
447  
        return std::tuple(extract_single_result<Is>(state)...);
448  
    }(std::index_sequence_for<Ts...>{});
448  
    }(std::index_sequence_for<Ts...>{});
449  
}
449  
}
450  

450  

451  
/** Launches all homogeneous runners concurrently.
451  
/** Launches all homogeneous runners concurrently.
452  

452  

453  
    Two-phase approach: create all runners first, then post all.
453  
    Two-phase approach: create all runners first, then post all.
454  
    This avoids lifetime issues if a task completes synchronously.
454  
    This avoids lifetime issues if a task completes synchronously.
455  
*/
455  
*/
456  
template<typename Range>
456  
template<typename Range>
457  
class when_all_homogeneous_launcher
457  
class when_all_homogeneous_launcher
458  
{
458  
{
459  
    using Awaitable = std::ranges::range_value_t<Range>;
459  
    using Awaitable = std::ranges::range_value_t<Range>;
460  
    using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
460  
    using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
461  

461  

462  
    Range* range_;
462  
    Range* range_;
463  
    when_all_homogeneous_state<PayloadT>* state_;
463  
    when_all_homogeneous_state<PayloadT>* state_;
464  

464  

465  
public:
465  
public:
466  
    when_all_homogeneous_launcher(
466  
    when_all_homogeneous_launcher(
467  
        Range* range,
467  
        Range* range,
468  
        when_all_homogeneous_state<PayloadT>* state)
468  
        when_all_homogeneous_state<PayloadT>* state)
469  
        : range_(range)
469  
        : range_(range)
470  
        , state_(state)
470  
        , state_(state)
471  
    {
471  
    {
472  
    }
472  
    }
473  

473  

474  
    bool await_ready() const noexcept
474  
    bool await_ready() const noexcept
475  
    {
475  
    {
476  
        return std::ranges::empty(*range_);
476  
        return std::ranges::empty(*range_);
477  
    }
477  
    }
478  

478  

479  
    std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
479  
    std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
480  
    {
480  
    {
481  
        state_->core_.continuation_.h = continuation;
481  
        state_->core_.continuation_.h = continuation;
482  
        state_->core_.caller_env_ = caller_env;
482  
        state_->core_.caller_env_ = caller_env;
483  

483  

484  
        if(caller_env->stop_token.stop_possible())
484  
        if(caller_env->stop_token.stop_possible())
485  
        {
485  
        {
486  
            state_->core_.parent_stop_callback_.emplace(
486  
            state_->core_.parent_stop_callback_.emplace(
487  
                caller_env->stop_token,
487  
                caller_env->stop_token,
488  
                when_all_core::stop_callback_fn{&state_->core_.stop_source_});
488  
                when_all_core::stop_callback_fn{&state_->core_.stop_source_});
489  

489  

490  
            if(caller_env->stop_token.stop_requested())
490  
            if(caller_env->stop_token.stop_requested())
491  
                state_->core_.stop_source_.request_stop();
491  
                state_->core_.stop_source_.request_stop();
492  
        }
492  
        }
493  

493  

494  
        auto token = state_->core_.stop_source_.get_token();
494  
        auto token = state_->core_.stop_source_.get_token();
495  

495  

496  
        // Phase 1: Create all runners without dispatching.
496  
        // Phase 1: Create all runners without dispatching.
497  
        std::size_t index = 0;
497  
        std::size_t index = 0;
498  
        for(auto&& a : *range_)
498  
        for(auto&& a : *range_)
499  
        {
499  
        {
500  
            auto runner = make_when_all_homogeneous_runner(
500  
            auto runner = make_when_all_homogeneous_runner(
501  
                std::move(a), state_, index);
501  
                std::move(a), state_, index);
502  

502  

503  
            auto h = runner.release();
503  
            auto h = runner.release();
504  
            h.promise().state_ = state_;
504  
            h.promise().state_ = state_;
505  
            h.promise().index_ = index;
505  
            h.promise().index_ = index;
506  
            h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
506  
            h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
507  

507  

508  
            state_->runner_handles_[index].h = std::coroutine_handle<>{h};
508  
            state_->runner_handles_[index].h = std::coroutine_handle<>{h};
509  
            ++index;
509  
            ++index;
510  
        }
510  
        }
511  

511  

512  
        // Phase 2: Post all runners. Any may complete synchronously.
512  
        // Phase 2: Post all runners. Any may complete synchronously.
513  
        // After last post, state_ and this may be destroyed.
513  
        // After last post, state_ and this may be destroyed.
514  
        auto* handles = state_->runner_handles_.get();
514  
        auto* handles = state_->runner_handles_.get();
515  
        std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed);
515  
        std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed);
516  
        for(std::size_t i = 0; i < count; ++i)
516  
        for(std::size_t i = 0; i < count; ++i)
517  
            caller_env->executor.post(handles[i]);
517  
            caller_env->executor.post(handles[i]);
518  

518  

519  
        return std::noop_coroutine();
519  
        return std::noop_coroutine();
520  
    }
520  
    }
521  

521  

522  
    void await_resume() const noexcept
522  
    void await_resume() const noexcept
523  
    {
523  
    {
524  
    }
524  
    }
525  
};
525  
};
526  

526  

527  
} // namespace detail
527  
} // namespace detail
528  

528  

529  
/** Execute a range of io_result-returning awaitables concurrently.
529  
/** Execute a range of io_result-returning awaitables concurrently.
530  

530  

531  
    Launches all awaitables simultaneously and waits for all to complete.
531  
    Launches all awaitables simultaneously and waits for all to complete.
532  
    On success, extracted payloads are collected in a vector preserving
532  
    On success, extracted payloads are collected in a vector preserving
533  
    input order. The first error_code cancels siblings and is propagated
533  
    input order. The first error_code cancels siblings and is propagated
534  
    in the outer io_result. Exceptions always beat error codes.
534  
    in the outer io_result. Exceptions always beat error codes.
535  

535  

536  
    @li All child awaitables run concurrently on the caller's executor
536  
    @li All child awaitables run concurrently on the caller's executor
537  
    @li Payloads are returned as a vector in input order
537  
    @li Payloads are returned as a vector in input order
538  
    @li First error_code wins and cancels siblings
538  
    @li First error_code wins and cancels siblings
539  
    @li Exception always beats error_code
539  
    @li Exception always beats error_code
540  
    @li Completes only after all children have finished
540  
    @li Completes only after all children have finished
541  

541  

542  
    @par Thread Safety
542  
    @par Thread Safety
543  
    The returned task must be awaited from a single execution context.
543  
    The returned task must be awaited from a single execution context.
544  
    Child awaitables execute concurrently but complete through the caller's
544  
    Child awaitables execute concurrently but complete through the caller's
545  
    executor.
545  
    executor.
546  

546  

547  
    @param awaitables Range of io_result-returning awaitables to execute
547  
    @param awaitables Range of io_result-returning awaitables to execute
548  
        concurrently (must not be empty).
548  
        concurrently (must not be empty).
549  

549  

550  
    @return A task yielding io_result<vector<PayloadT>> where PayloadT
550  
    @return A task yielding io_result<vector<PayloadT>> where PayloadT
551  
        is the payload extracted from each child's io_result.
551  
        is the payload extracted from each child's io_result.
552  

552  

553  
    @throws std::invalid_argument if range is empty (thrown before
553  
    @throws std::invalid_argument if range is empty (thrown before
554  
        coroutine suspends).
554  
        coroutine suspends).
555  
    @throws Rethrows the first child exception after all children
555  
    @throws Rethrows the first child exception after all children
556  
        complete (exception beats error_code).
556  
        complete (exception beats error_code).
557  

557  

558  
    @par Example
558  
    @par Example
559  
    @code
559  
    @code
560  
    task<void> example()
560  
    task<void> example()
561  
    {
561  
    {
562  
        std::vector<io_task<size_t>> reads;
562  
        std::vector<io_task<size_t>> reads;
563  
        for (auto& buf : buffers)
563  
        for (auto& buf : buffers)
564  
            reads.push_back(stream.read_some(buf));
564  
            reads.push_back(stream.read_some(buf));
565  

565  

566  
        auto [ec, counts] = co_await when_all(std::move(reads));
566  
        auto [ec, counts] = co_await when_all(std::move(reads));
567  
        if (ec) { // handle error
567  
        if (ec) { // handle error
568  
        }
568  
        }
569  
    }
569  
    }
570  
    @endcode
570  
    @endcode
571  

571  

572  
    @see IoAwaitableRange, when_all
572  
    @see IoAwaitableRange, when_all
573  
*/
573  
*/
574  
template<IoAwaitableRange R>
574  
template<IoAwaitableRange R>
575  
    requires detail::is_io_result_v<
575  
    requires detail::is_io_result_v<
576  
        awaitable_result_t<std::ranges::range_value_t<R>>>
576  
        awaitable_result_t<std::ranges::range_value_t<R>>>
577  
    && (!std::is_same_v<
577  
    && (!std::is_same_v<
578  
            detail::io_result_payload_t<
578  
            detail::io_result_payload_t<
579  
                awaitable_result_t<std::ranges::range_value_t<R>>>,
579  
                awaitable_result_t<std::ranges::range_value_t<R>>>,
580  
            std::tuple<>>)
580  
            std::tuple<>>)
581  
[[nodiscard]] auto when_all(R&& awaitables)
581  
[[nodiscard]] auto when_all(R&& awaitables)
582  
    -> task<io_result<std::vector<
582  
    -> task<io_result<std::vector<
583  
        detail::io_result_payload_t<
583  
        detail::io_result_payload_t<
584  
            awaitable_result_t<std::ranges::range_value_t<R>>>>>>
584  
            awaitable_result_t<std::ranges::range_value_t<R>>>>>>
585  
{
585  
{
586  
    using Awaitable = std::ranges::range_value_t<R>;
586  
    using Awaitable = std::ranges::range_value_t<R>;
587  
    using PayloadT = detail::io_result_payload_t<
587  
    using PayloadT = detail::io_result_payload_t<
588  
        awaitable_result_t<Awaitable>>;
588  
        awaitable_result_t<Awaitable>>;
589  
    using OwnedRange = std::remove_cvref_t<R>;
589  
    using OwnedRange = std::remove_cvref_t<R>;
590  

590  

591  
    auto count = std::ranges::size(awaitables);
591  
    auto count = std::ranges::size(awaitables);
592  
    if(count == 0)
592  
    if(count == 0)
593  
        throw std::invalid_argument("when_all requires at least one awaitable");
593  
        throw std::invalid_argument("when_all requires at least one awaitable");
594  

594  

595  
    OwnedRange owned_awaitables = std::forward<R>(awaitables);
595  
    OwnedRange owned_awaitables = std::forward<R>(awaitables);
596  

596  

597  
    detail::when_all_homogeneous_state<PayloadT> state(count);
597  
    detail::when_all_homogeneous_state<PayloadT> state(count);
598  

598  

599  
    co_await detail::when_all_homogeneous_launcher<OwnedRange>(
599  
    co_await detail::when_all_homogeneous_launcher<OwnedRange>(
600  
        &owned_awaitables, &state);
600  
        &owned_awaitables, &state);
601  

601  

602  
    if(state.core_.first_exception_)
602  
    if(state.core_.first_exception_)
603  
        std::rethrow_exception(state.core_.first_exception_);
603  
        std::rethrow_exception(state.core_.first_exception_);
604  

604  

605  
    if(state.has_error_.load(std::memory_order_relaxed))
605  
    if(state.has_error_.load(std::memory_order_relaxed))
606  
        co_return io_result<std::vector<PayloadT>>{state.first_error_, {}};
606  
        co_return io_result<std::vector<PayloadT>>{state.first_error_, {}};
607  

607  

608  
    std::vector<PayloadT> results;
608  
    std::vector<PayloadT> results;
609  
    results.reserve(count);
609  
    results.reserve(count);
610  
    for(auto& opt : state.results_)
610  
    for(auto& opt : state.results_)
611  
        results.push_back(std::move(*opt));
611  
        results.push_back(std::move(*opt));
612  

612  

613  
    co_return io_result<std::vector<PayloadT>>{{}, std::move(results)};
613  
    co_return io_result<std::vector<PayloadT>>{{}, std::move(results)};
614  
}
614  
}
615  

615  

616  
/** Execute a range of void io_result-returning awaitables concurrently.
616  
/** Execute a range of void io_result-returning awaitables concurrently.
617  

617  

618  
    Launches all awaitables simultaneously and waits for all to complete.
618  
    Launches all awaitables simultaneously and waits for all to complete.
619  
    Since all awaitables return io_result<>, no payload values are
619  
    Since all awaitables return io_result<>, no payload values are
620  
    collected. The first error_code cancels siblings and is propagated.
620  
    collected. The first error_code cancels siblings and is propagated.
621  
    Exceptions always beat error codes.
621  
    Exceptions always beat error codes.
622  

622  

623  
    @param awaitables Range of io_result<>-returning awaitables to
623  
    @param awaitables Range of io_result<>-returning awaitables to
624  
        execute concurrently (must not be empty).
624  
        execute concurrently (must not be empty).
625  

625  

626  
    @return A task yielding io_result<> whose ec is the first child
626  
    @return A task yielding io_result<> whose ec is the first child
627  
        error, or default-constructed on success.
627  
        error, or default-constructed on success.
628  

628  

629  
    @throws std::invalid_argument if range is empty.
629  
    @throws std::invalid_argument if range is empty.
630  
    @throws Rethrows the first child exception after all children
630  
    @throws Rethrows the first child exception after all children
631  
        complete (exception beats error_code).
631  
        complete (exception beats error_code).
632  

632  

633  
    @par Example
633  
    @par Example
634  
    @code
634  
    @code
635  
    task<void> example()
635  
    task<void> example()
636  
    {
636  
    {
637  
        std::vector<io_task<>> jobs;
637  
        std::vector<io_task<>> jobs;
638  
        for (int i = 0; i < n; ++i)
638  
        for (int i = 0; i < n; ++i)
639  
            jobs.push_back(process(i));
639  
            jobs.push_back(process(i));
640  

640  

641  
        auto [ec] = co_await when_all(std::move(jobs));
641  
        auto [ec] = co_await when_all(std::move(jobs));
642  
    }
642  
    }
643  
    @endcode
643  
    @endcode
644  

644  

645  
    @see IoAwaitableRange, when_all
645  
    @see IoAwaitableRange, when_all
646  
*/
646  
*/
647  
template<IoAwaitableRange R>
647  
template<IoAwaitableRange R>
648  
    requires detail::is_io_result_v<
648  
    requires detail::is_io_result_v<
649  
        awaitable_result_t<std::ranges::range_value_t<R>>>
649  
        awaitable_result_t<std::ranges::range_value_t<R>>>
650  
    && std::is_same_v<
650  
    && std::is_same_v<
651  
            detail::io_result_payload_t<
651  
            detail::io_result_payload_t<
652  
                awaitable_result_t<std::ranges::range_value_t<R>>>,
652  
                awaitable_result_t<std::ranges::range_value_t<R>>>,
653  
            std::tuple<>>
653  
            std::tuple<>>
654  
[[nodiscard]] auto when_all(R&& awaitables) -> task<io_result<>>
654  
[[nodiscard]] auto when_all(R&& awaitables) -> task<io_result<>>
655  
{
655  
{
656  
    using OwnedRange = std::remove_cvref_t<R>;
656  
    using OwnedRange = std::remove_cvref_t<R>;
657  

657  

658  
    auto count = std::ranges::size(awaitables);
658  
    auto count = std::ranges::size(awaitables);
659  
    if(count == 0)
659  
    if(count == 0)
660  
        throw std::invalid_argument("when_all requires at least one awaitable");
660  
        throw std::invalid_argument("when_all requires at least one awaitable");
661  

661  

662  
    OwnedRange owned_awaitables = std::forward<R>(awaitables);
662  
    OwnedRange owned_awaitables = std::forward<R>(awaitables);
663  

663  

664  
    detail::when_all_homogeneous_state<std::tuple<>> state(count);
664  
    detail::when_all_homogeneous_state<std::tuple<>> state(count);
665  

665  

666  
    co_await detail::when_all_homogeneous_launcher<OwnedRange>(
666  
    co_await detail::when_all_homogeneous_launcher<OwnedRange>(
667  
        &owned_awaitables, &state);
667  
        &owned_awaitables, &state);
668  

668  

669  
    if(state.core_.first_exception_)
669  
    if(state.core_.first_exception_)
670  
        std::rethrow_exception(state.core_.first_exception_);
670  
        std::rethrow_exception(state.core_.first_exception_);
671  

671  

672  
    if(state.has_error_.load(std::memory_order_relaxed))
672  
    if(state.has_error_.load(std::memory_order_relaxed))
673  
        co_return io_result<>{state.first_error_};
673  
        co_return io_result<>{state.first_error_};
674  

674  

675  
    co_return io_result<>{};
675  
    co_return io_result<>{};
676  
}
676  
}
677  

677  

678  
/** Execute io_result-returning awaitables concurrently, inspecting error codes.
678  
/** Execute io_result-returning awaitables concurrently, inspecting error codes.
679  

679  

680  
    Overload selected when all children return io_result<Ts...>.
680  
    Overload selected when all children return io_result<Ts...>.
681  
    The error_code is lifted out of each child into a single outer
681  
    The error_code is lifted out of each child into a single outer
682  
    io_result. On success all values are returned; on failure the
682  
    io_result. On success all values are returned; on failure the
683  
    first error_code wins.
683  
    first error_code wins.
684  

684  

685  
    @par Exception Safety
685  
    @par Exception Safety
686  
    Exception always beats error_code. If any child throws, the
686  
    Exception always beats error_code. If any child throws, the
687  
    exception is rethrown regardless of error_code results.
687  
    exception is rethrown regardless of error_code results.
688  

688  

689  
    @param awaitables One or more awaitables each returning
689  
    @param awaitables One or more awaitables each returning
690  
        io_result<Ts...>.
690  
        io_result<Ts...>.
691  

691  

692  
    @return A task yielding io_result<R1, R2, ..., Rn> where each Ri
692  
    @return A task yielding io_result<R1, R2, ..., Rn> where each Ri
693  
        follows the payload flattening rules.
693  
        follows the payload flattening rules.
694  
*/
694  
*/
695  
template<IoAwaitable... As>
695  
template<IoAwaitable... As>
696  
    requires (sizeof...(As) > 0)
696  
    requires (sizeof...(As) > 0)
697  
          && detail::all_io_result_awaitables<As...>
697  
          && detail::all_io_result_awaitables<As...>
698  
[[nodiscard]] auto when_all(As... awaitables)
698  
[[nodiscard]] auto when_all(As... awaitables)
699  
    -> task<io_result<
699  
    -> task<io_result<
700  
        detail::io_result_payload_t<awaitable_result_t<As>>...>>
700  
        detail::io_result_payload_t<awaitable_result_t<As>>...>>
701  
{
701  
{
702  
    using result_type = io_result<
702  
    using result_type = io_result<
703  
        detail::io_result_payload_t<awaitable_result_t<As>>...>;
703  
        detail::io_result_payload_t<awaitable_result_t<As>>...>;
704  

704  

705  
    detail::when_all_state<awaitable_result_t<As>...> state;
705  
    detail::when_all_state<awaitable_result_t<As>...> state;
706  
    std::tuple<As...> awaitable_tuple(std::move(awaitables)...);
706  
    std::tuple<As...> awaitable_tuple(std::move(awaitables)...);
707  

707  

708  
    co_await detail::when_all_io_launcher<As...>(&awaitable_tuple, &state);
708  
    co_await detail::when_all_io_launcher<As...>(&awaitable_tuple, &state);
709  

709  

710  
    // Exception always wins over error_code
710  
    // Exception always wins over error_code
711  
    if(state.core_.first_exception_)
711  
    if(state.core_.first_exception_)
712  
        std::rethrow_exception(state.core_.first_exception_);
712  
        std::rethrow_exception(state.core_.first_exception_);
713  

713  

714  
    auto r = detail::build_when_all_io_result<result_type>(
714  
    auto r = detail::build_when_all_io_result<result_type>(
715  
        detail::extract_results(state));
715  
        detail::extract_results(state));
716  
    if(state.has_error_.load(std::memory_order_relaxed))
716  
    if(state.has_error_.load(std::memory_order_relaxed))
717  
        r.ec = state.first_error_;
717  
        r.ec = state.first_error_;
718  
    co_return r;
718  
    co_return r;
719  
}
719  
}
720  

720  

721  
} // namespace capy
721  
} // namespace capy
722  
} // namespace boost
722  
} // namespace boost
723  

723  

724  
#endif
724  
#endif