LCOV - code coverage report
Current view: top level - capy - when_all.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 98.1 % 161 158 3
Test Date: 2026-03-21 03:20:11 Functions: 92.9 % 436 405 31

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Steve Gerbino
       3                 : //
       4                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6                 : //
       7                 : // Official repository: https://github.com/cppalliance/capy
       8                 : //
       9                 : 
      10                 : #ifndef BOOST_CAPY_WHEN_ALL_HPP
      11                 : #define BOOST_CAPY_WHEN_ALL_HPP
      12                 : 
      13                 : #include <boost/capy/detail/config.hpp>
      14                 : #include <boost/capy/detail/io_result_combinators.hpp>
      15                 : #include <boost/capy/continuation.hpp>
      16                 : #include <boost/capy/concept/executor.hpp>
      17                 : #include <boost/capy/concept/io_awaitable.hpp>
      18                 : #include <coroutine>
      19                 : #include <boost/capy/ex/io_env.hpp>
      20                 : #include <boost/capy/ex/frame_allocator.hpp>
      21                 : #include <boost/capy/task.hpp>
      22                 : 
      23                 : #include <array>
      24                 : #include <atomic>
      25                 : #include <exception>
      26                 : #include <memory>
      27                 : #include <optional>
      28                 : #include <ranges>
      29                 : #include <stdexcept>
      30                 : #include <stop_token>
      31                 : #include <tuple>
      32                 : #include <type_traits>
      33                 : #include <utility>
      34                 : #include <vector>
      35                 : 
      36                 : namespace boost {
      37                 : namespace capy {
      38                 : 
      39                 : namespace detail {
      40                 : 
      41                 : /** Holds the result of a single task within when_all.
      42                 : */
      43                 : template<typename T>
      44                 : struct result_holder
      45                 : {
      46                 :     std::optional<T> value_;
      47                 : 
      48 HIT          81 :     void set(T v)
      49                 :     {
      50              81 :         value_ = std::move(v);
      51              81 :     }
      52                 : 
      53              69 :     T get() &&
      54                 :     {
      55              69 :         return std::move(*value_);
      56                 :     }
      57                 : };
      58                 : 
      59                 : /** Core shared state for when_all operations.
      60                 : 
      61                 :     Contains all members and methods common to both heterogeneous (variadic)
      62                 :     and homogeneous (range) when_all implementations. State classes embed
      63                 :     this via composition to avoid CRTP destructor ordering issues.
      64                 : 
      65                 :     @par Thread Safety
      66                 :     Atomic operations protect exception capture and completion count.
      67                 : */
      68                 : struct when_all_core
      69                 : {
      70                 :     std::atomic<std::size_t> remaining_count_;
      71                 : 
      72                 :     // Exception storage - first error wins, others discarded
      73                 :     std::atomic<bool> has_exception_{false};
      74                 :     std::exception_ptr first_exception_;
      75                 : 
      76                 :     std::stop_source stop_source_;
      77                 : 
      78                 :     // Bridges parent's stop token to our stop_source
      79                 :     struct stop_callback_fn
      80                 :     {
      81                 :         std::stop_source* source_;
      82               1 :         void operator()() const { source_->request_stop(); }
      83                 :     };
      84                 :     using stop_callback_t = std::stop_callback<stop_callback_fn>;
      85                 :     std::optional<stop_callback_t> parent_stop_callback_;
      86                 : 
      87                 :     continuation continuation_;
      88                 :     io_env const* caller_env_ = nullptr;
      89                 : 
      90              72 :     explicit when_all_core(std::size_t count) noexcept
      91              72 :         : remaining_count_(count)
      92                 :     {
      93              72 :     }
      94                 : 
      95                 :     /** Capture an exception (first one wins). */
      96              19 :     void capture_exception(std::exception_ptr ep)
      97                 :     {
      98              19 :         bool expected = false;
      99              19 :         if(has_exception_.compare_exchange_strong(
     100                 :             expected, true, std::memory_order_relaxed))
     101              17 :             first_exception_ = ep;
     102              19 :     }
     103                 : };
     104                 : 
     105                 : /** Shared state for heterogeneous when_all (variadic overload).
     106                 : 
     107                 :     @tparam Ts The result types of the tasks.
     108                 : */
     109                 : template<typename... Ts>
     110                 : struct when_all_state
     111                 : {
     112                 :     static constexpr std::size_t task_count = sizeof...(Ts);
     113                 : 
     114                 :     when_all_core core_;
     115                 :     std::tuple<result_holder<Ts>...> results_;
     116                 :     std::array<continuation, task_count> runner_handles_{};
     117                 : 
     118                 :     std::atomic<bool> has_error_{false};
     119                 :     std::error_code first_error_;
     120                 : 
     121              50 :     when_all_state()
     122              50 :         : core_(task_count)
     123                 :     {
     124              50 :     }
     125                 : 
     126                 :     /** Record the first error (subsequent errors are discarded). */
     127              43 :     void record_error(std::error_code ec)
     128                 :     {
     129              43 :         bool expected = false;
     130              43 :         if(has_error_.compare_exchange_strong(
     131                 :             expected, true, std::memory_order_relaxed))
     132              29 :             first_error_ = ec;
     133              43 :     }
     134                 : };
     135                 : 
     136                 : /** Shared state for homogeneous when_all (range overload).
     137                 : 
     138                 :     Stores extracted io_result payloads in a vector indexed by task
     139                 :     position. Tracks the first error_code for error propagation.
     140                 : 
     141                 :     @tparam T The payload type extracted from io_result.
     142                 : */
     143                 : template<typename T>
     144                 : struct when_all_homogeneous_state
     145                 : {
     146                 :     when_all_core core_;
     147                 :     std::vector<std::optional<T>> results_;
     148                 :     std::unique_ptr<continuation[]> runner_handles_;
     149                 : 
     150                 :     std::atomic<bool> has_error_{false};
     151                 :     std::error_code first_error_;
     152                 : 
     153              11 :     explicit when_all_homogeneous_state(std::size_t count)
     154              11 :         : core_(count)
     155              22 :         , results_(count)
     156              11 :         , runner_handles_(std::make_unique<continuation[]>(count))
     157                 :     {
     158              11 :     }
     159                 : 
     160              16 :     void set_result(std::size_t index, T value)
     161                 :     {
     162              16 :         results_[index].emplace(std::move(value));
     163              16 :     }
     164                 : 
     165                 :     /** Record the first error (subsequent errors are discarded). */
     166               7 :     void record_error(std::error_code ec)
     167                 :     {
     168               7 :         bool expected = false;
     169               7 :         if(has_error_.compare_exchange_strong(
     170                 :             expected, true, std::memory_order_relaxed))
     171               5 :             first_error_ = ec;
     172               7 :     }
     173                 : };
     174                 : 
     175                 : /** Specialization for void io_result children (no payload storage). */
     176                 : template<>
     177                 : struct when_all_homogeneous_state<std::tuple<>>
     178                 : {
     179                 :     when_all_core core_;
     180                 :     std::unique_ptr<continuation[]> runner_handles_;
     181                 : 
     182                 :     std::atomic<bool> has_error_{false};
     183                 :     std::error_code first_error_;
     184                 : 
     185               3 :     explicit when_all_homogeneous_state(std::size_t count)
     186               3 :         : core_(count)
     187               3 :         , runner_handles_(std::make_unique<continuation[]>(count))
     188                 :     {
     189               3 :     }
     190                 : 
     191                 :     /** Record the first error (subsequent errors are discarded). */
     192               1 :     void record_error(std::error_code ec)
     193                 :     {
     194               1 :         bool expected = false;
     195               1 :         if(has_error_.compare_exchange_strong(
     196                 :             expected, true, std::memory_order_relaxed))
     197               1 :             first_error_ = ec;
     198               1 :     }
     199                 : };
     200                 : 
     201                 : /** Wrapper coroutine that intercepts task completion for when_all.
     202                 : 
     203                 :     Parameterized on StateType to work with both heterogeneous (variadic)
     204                 :     and homogeneous (range) state types. All state types expose their
     205                 :     shared members through a `core_` member of type when_all_core.
     206                 : 
     207                 :     @tparam StateType The state type (when_all_state or when_all_homogeneous_state).
     208                 : */
     209                 : template<typename StateType>
     210                 : struct when_all_runner
     211                 : {
     212                 :     struct promise_type
     213                 :     {
     214                 :         StateType* state_ = nullptr;
     215                 :         std::size_t index_ = 0;
     216                 :         io_env env_;
     217                 : 
     218             145 :         when_all_runner get_return_object() noexcept
     219                 :         {
     220                 :             return when_all_runner(
     221             145 :                 std::coroutine_handle<promise_type>::from_promise(*this));
     222                 :         }
     223                 : 
     224             145 :         std::suspend_always initial_suspend() noexcept
     225                 :         {
     226             145 :             return {};
     227                 :         }
     228                 : 
     229             145 :         auto final_suspend() noexcept
     230                 :         {
     231                 :             struct awaiter
     232                 :             {
     233                 :                 promise_type* p_;
     234             145 :                 bool await_ready() const noexcept { return false; }
     235             145 :                 auto await_suspend(std::coroutine_handle<> h) noexcept
     236                 :                 {
     237             145 :                     auto& core = p_->state_->core_;
     238             145 :                     auto* counter = &core.remaining_count_;
     239             145 :                     auto* caller_env = core.caller_env_;
     240             145 :                     auto& cont = core.continuation_;
     241                 : 
     242             145 :                     h.destroy();
     243                 : 
     244             145 :                     auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
     245             145 :                     if(remaining == 1)
     246              72 :                         return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
     247              73 :                     return detail::symmetric_transfer(std::noop_coroutine());
     248                 :                 }
     249 MIS           0 :                 void await_resume() const noexcept {}
     250                 :             };
     251 HIT         145 :             return awaiter{this};
     252                 :         }
     253                 : 
     254             126 :         void return_void() noexcept {}
     255                 : 
     256              19 :         void unhandled_exception()
     257                 :         {
     258              19 :             state_->core_.capture_exception(std::current_exception());
     259              19 :             state_->core_.stop_source_.request_stop();
     260              19 :         }
     261                 : 
     262                 :         template<class Awaitable>
     263                 :         struct transform_awaiter
     264                 :         {
     265                 :             std::decay_t<Awaitable> a_;
     266                 :             promise_type* p_;
     267                 : 
     268             145 :             bool await_ready() { return a_.await_ready(); }
     269             145 :             decltype(auto) await_resume() { return a_.await_resume(); }
     270                 : 
     271                 :             template<class Promise>
     272             144 :             auto await_suspend(std::coroutine_handle<Promise> h)
     273                 :             {
     274                 :                 using R = decltype(a_.await_suspend(h, &p_->env_));
     275                 :                 if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
     276             144 :                     return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
     277                 :                 else
     278                 :                     return a_.await_suspend(h, &p_->env_);
     279                 :             }
     280                 :         };
     281                 : 
     282                 :         template<class Awaitable>
     283             145 :         auto await_transform(Awaitable&& a)
     284                 :         {
     285                 :             using A = std::decay_t<Awaitable>;
     286                 :             if constexpr (IoAwaitable<A>)
     287                 :             {
     288                 :                 return transform_awaiter<Awaitable>{
     289             290 :                     std::forward<Awaitable>(a), this};
     290                 :             }
     291                 :             else
     292                 :             {
     293                 :                 static_assert(sizeof(A) == 0, "requires IoAwaitable");
     294                 :             }
     295             145 :         }
     296                 :     };
     297                 : 
     298                 :     std::coroutine_handle<promise_type> h_;
     299                 : 
     300             145 :     explicit when_all_runner(std::coroutine_handle<promise_type> h) noexcept
     301             145 :         : h_(h)
     302                 :     {
     303             145 :     }
     304                 : 
     305                 :     // Enable move for all clang versions - some versions need it
     306                 :     when_all_runner(when_all_runner&& other) noexcept
     307                 :         : h_(std::exchange(other.h_, nullptr))
     308                 :     {
     309                 :     }
     310                 : 
     311                 :     when_all_runner(when_all_runner const&) = delete;
     312                 :     when_all_runner& operator=(when_all_runner const&) = delete;
     313                 :     when_all_runner& operator=(when_all_runner&&) = delete;
     314                 : 
     315             145 :     auto release() noexcept
     316                 :     {
     317             145 :         return std::exchange(h_, nullptr);
     318                 :     }
     319                 : };
     320                 : 
     321                 : /** Create an io_result-aware runner for a single awaitable (range path).
     322                 : 
     323                 :     Checks the error code, records errors and requests stop on failure,
     324                 :     or extracts the payload on success.
     325                 : */
     326                 : template<IoAwaitable Awaitable, typename StateType>
     327                 : when_all_runner<StateType>
     328              32 : make_when_all_homogeneous_runner(Awaitable inner, StateType* state, std::size_t index)
     329                 : {
     330                 :     auto result = co_await std::move(inner);
     331                 : 
     332                 :     if(result.ec)
     333                 :     {
     334                 :         state->record_error(result.ec);
     335                 :         state->core_.stop_source_.request_stop();
     336                 :     }
     337                 :     else
     338                 :     {
     339                 :         using PayloadT = io_result_payload_t<
     340                 :             awaitable_result_t<Awaitable>>;
     341                 :         if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
     342                 :         {
     343                 :             state->set_result(index,
     344                 :                 extract_io_payload(std::move(result)));
     345                 :         }
     346                 :     }
     347              64 : }
     348                 : 
     349                 : /** Create a runner for io_result children that requests stop on ec. */
     350                 : template<std::size_t Index, IoAwaitable Awaitable, typename... Ts>
     351                 : when_all_runner<when_all_state<Ts...>>
     352              97 : make_when_all_io_runner(Awaitable inner, when_all_state<Ts...>* state)
     353                 : {
     354                 :     auto result = co_await std::move(inner);
     355                 :     auto ec = result.ec;
     356                 :     std::get<Index>(state->results_).set(std::move(result));
     357                 : 
     358                 :     if(ec)
     359                 :     {
     360                 :         state->record_error(ec);
     361                 :         state->core_.stop_source_.request_stop();
     362                 :     }
     363             194 : }
     364                 : 
     365                 : /** Launcher that uses io_result-aware runners. */
     366                 : template<IoAwaitable... Awaitables>
     367                 : class when_all_io_launcher
     368                 : {
     369                 :     using state_type = when_all_state<awaitable_result_t<Awaitables>...>;
     370                 : 
     371                 :     std::tuple<Awaitables...>* awaitables_;
     372                 :     state_type* state_;
     373                 : 
     374                 : public:
     375              50 :     when_all_io_launcher(
     376                 :         std::tuple<Awaitables...>* awaitables,
     377                 :         state_type* state)
     378              50 :         : awaitables_(awaitables)
     379              50 :         , state_(state)
     380                 :     {
     381              50 :     }
     382                 : 
     383              50 :     bool await_ready() const noexcept
     384                 :     {
     385              50 :         return sizeof...(Awaitables) == 0;
     386                 :     }
     387                 : 
     388              50 :     std::coroutine_handle<> await_suspend(
     389                 :         std::coroutine_handle<> continuation, io_env const* caller_env)
     390                 :     {
     391              50 :         state_->core_.continuation_.h = continuation;
     392              50 :         state_->core_.caller_env_ = caller_env;
     393                 : 
     394              50 :         if(caller_env->stop_token.stop_possible())
     395                 :         {
     396               2 :             state_->core_.parent_stop_callback_.emplace(
     397               1 :                 caller_env->stop_token,
     398               1 :                 when_all_core::stop_callback_fn{&state_->core_.stop_source_});
     399                 : 
     400               1 :             if(caller_env->stop_token.stop_requested())
     401 MIS           0 :                 state_->core_.stop_source_.request_stop();
     402                 :         }
     403                 : 
     404 HIT          50 :         auto token = state_->core_.stop_source_.get_token();
     405              46 :         [&]<std::size_t... Is>(std::index_sequence<Is...>) {
     406              50 :             (..., launch_one<Is>(caller_env->executor, token));
     407              50 :         }(std::index_sequence_for<Awaitables...>{});
     408                 : 
     409             100 :         return std::noop_coroutine();
     410              50 :     }
     411                 : 
     412              50 :     void await_resume() const noexcept {}
     413                 : 
     414                 : private:
     415                 :     template<std::size_t I>
     416              97 :     void launch_one(executor_ref caller_ex, std::stop_token token)
     417                 :     {
     418              97 :         auto runner = make_when_all_io_runner<I>(
     419              97 :             std::move(std::get<I>(*awaitables_)), state_);
     420                 : 
     421              97 :         auto h = runner.release();
     422              97 :         h.promise().state_ = state_;
     423              97 :         h.promise().env_ = io_env{caller_ex, token,
     424              97 :             state_->core_.caller_env_->frame_allocator};
     425                 : 
     426              97 :         state_->runner_handles_[I].h = std::coroutine_handle<>{h};
     427              97 :         state_->core_.caller_env_->executor.post(state_->runner_handles_[I]);
     428             194 :     }
     429                 : };
     430                 : 
     431                 : /** Helper to extract a single result from state.
     432                 :     This is a separate function to work around a GCC-11 ICE that occurs
     433                 :     when using nested immediately-invoked lambdas with pack expansion.
     434                 : */
     435                 : template<std::size_t I, typename... Ts>
     436              69 : auto extract_single_result(when_all_state<Ts...>& state)
     437                 : {
     438              69 :     return std::move(std::get<I>(state.results_)).get();
     439                 : }
     440                 : 
     441                 : /** Extract all results from state as a tuple.
     442                 : */
     443                 : template<typename... Ts>
     444              36 : auto extract_results(when_all_state<Ts...>& state)
     445                 : {
     446              55 :     return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
     447              36 :         return std::tuple(extract_single_result<Is>(state)...);
     448              72 :     }(std::index_sequence_for<Ts...>{});
     449                 : }
     450                 : 
     451                 : /** Launches all homogeneous runners concurrently.
     452                 : 
     453                 :     Two-phase approach: create all runners first, then post all.
     454                 :     This avoids lifetime issues if a task completes synchronously.
     455                 : */
     456                 : template<typename Range>
     457                 : class when_all_homogeneous_launcher
     458                 : {
     459                 :     using Awaitable = std::ranges::range_value_t<Range>;
     460                 :     using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
     461                 : 
     462                 :     Range* range_;
     463                 :     when_all_homogeneous_state<PayloadT>* state_;
     464                 : 
     465                 : public:
     466              14 :     when_all_homogeneous_launcher(
     467                 :         Range* range,
     468                 :         when_all_homogeneous_state<PayloadT>* state)
     469              14 :         : range_(range)
     470              14 :         , state_(state)
     471                 :     {
     472              14 :     }
     473                 : 
     474              14 :     bool await_ready() const noexcept
     475                 :     {
     476              14 :         return std::ranges::empty(*range_);
     477                 :     }
     478                 : 
     479              14 :     std::coroutine_handle<> await_suspend(std::coroutine_handle<> continuation, io_env const* caller_env)
     480                 :     {
     481              14 :         state_->core_.continuation_.h = continuation;
     482              14 :         state_->core_.caller_env_ = caller_env;
     483                 : 
     484              14 :         if(caller_env->stop_token.stop_possible())
     485                 :         {
     486               2 :             state_->core_.parent_stop_callback_.emplace(
     487               1 :                 caller_env->stop_token,
     488               1 :                 when_all_core::stop_callback_fn{&state_->core_.stop_source_});
     489                 : 
     490               1 :             if(caller_env->stop_token.stop_requested())
     491 MIS           0 :                 state_->core_.stop_source_.request_stop();
     492                 :         }
     493                 : 
     494 HIT          14 :         auto token = state_->core_.stop_source_.get_token();
     495                 : 
     496                 :         // Phase 1: Create all runners without dispatching.
     497              14 :         std::size_t index = 0;
     498              46 :         for(auto&& a : *range_)
     499                 :         {
     500              32 :             auto runner = make_when_all_homogeneous_runner(
     501              32 :                 std::move(a), state_, index);
     502                 : 
     503              32 :             auto h = runner.release();
     504              32 :             h.promise().state_ = state_;
     505              32 :             h.promise().index_ = index;
     506              32 :             h.promise().env_ = io_env{caller_env->executor, token, caller_env->frame_allocator};
     507                 : 
     508              32 :             state_->runner_handles_[index].h = std::coroutine_handle<>{h};
     509              32 :             ++index;
     510                 :         }
     511                 : 
     512                 :         // Phase 2: Post all runners. Any may complete synchronously.
     513                 :         // After last post, state_ and this may be destroyed.
     514              14 :         auto* handles = state_->runner_handles_.get();
     515              14 :         std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed);
     516              46 :         for(std::size_t i = 0; i < count; ++i)
     517              32 :             caller_env->executor.post(handles[i]);
     518                 : 
     519              28 :         return std::noop_coroutine();
     520              46 :     }
     521                 : 
     522              14 :     void await_resume() const noexcept
     523                 :     {
     524              14 :     }
     525                 : };
     526                 : 
     527                 : } // namespace detail
     528                 : 
     529                 : /** Execute a range of io_result-returning awaitables concurrently.
     530                 : 
     531                 :     Launches all awaitables simultaneously and waits for all to complete.
     532                 :     On success, extracted payloads are collected in a vector preserving
     533                 :     input order. The first error_code cancels siblings and is propagated
     534                 :     in the outer io_result. Exceptions always beat error codes.
     535                 : 
     536                 :     @li All child awaitables run concurrently on the caller's executor
     537                 :     @li Payloads are returned as a vector in input order
     538                 :     @li First error_code wins and cancels siblings
     539                 :     @li Exception always beats error_code
     540                 :     @li Completes only after all children have finished
     541                 : 
     542                 :     @par Thread Safety
     543                 :     The returned task must be awaited from a single execution context.
     544                 :     Child awaitables execute concurrently but complete through the caller's
     545                 :     executor.
     546                 : 
     547                 :     @param awaitables Range of io_result-returning awaitables to execute
     548                 :         concurrently (must not be empty).
     549                 : 
     550                 :     @return A task yielding io_result<vector<PayloadT>> where PayloadT
     551                 :         is the payload extracted from each child's io_result.
     552                 : 
     553                 :     @throws std::invalid_argument if range is empty (thrown before
     554                 :         coroutine suspends).
     555                 :     @throws Rethrows the first child exception after all children
     556                 :         complete (exception beats error_code).
     557                 : 
     558                 :     @par Example
     559                 :     @code
     560                 :     task<void> example()
     561                 :     {
     562                 :         std::vector<io_task<size_t>> reads;
     563                 :         for (auto& buf : buffers)
     564                 :             reads.push_back(stream.read_some(buf));
     565                 : 
     566                 :         auto [ec, counts] = co_await when_all(std::move(reads));
     567                 :         if (ec) { // handle error
     568                 :         }
     569                 :     }
     570                 :     @endcode
     571                 : 
     572                 :     @see IoAwaitableRange, when_all
     573                 : */
     574                 : template<IoAwaitableRange R>
     575                 :     requires detail::is_io_result_v<
     576                 :         awaitable_result_t<std::ranges::range_value_t<R>>>
     577                 :     && (!std::is_same_v<
     578                 :             detail::io_result_payload_t<
     579                 :                 awaitable_result_t<std::ranges::range_value_t<R>>>,
     580                 :             std::tuple<>>)
     581              12 : [[nodiscard]] auto when_all(R&& awaitables)
     582                 :     -> task<io_result<std::vector<
     583                 :         detail::io_result_payload_t<
     584                 :             awaitable_result_t<std::ranges::range_value_t<R>>>>>>
     585                 : {
     586                 :     using Awaitable = std::ranges::range_value_t<R>;
     587                 :     using PayloadT = detail::io_result_payload_t<
     588                 :         awaitable_result_t<Awaitable>>;
     589                 :     using OwnedRange = std::remove_cvref_t<R>;
     590                 : 
     591                 :     auto count = std::ranges::size(awaitables);
     592                 :     if(count == 0)
     593                 :         throw std::invalid_argument("when_all requires at least one awaitable");
     594                 : 
     595                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     596                 : 
     597                 :     detail::when_all_homogeneous_state<PayloadT> state(count);
     598                 : 
     599                 :     co_await detail::when_all_homogeneous_launcher<OwnedRange>(
     600                 :         &owned_awaitables, &state);
     601                 : 
     602                 :     if(state.core_.first_exception_)
     603                 :         std::rethrow_exception(state.core_.first_exception_);
     604                 : 
     605                 :     if(state.has_error_.load(std::memory_order_relaxed))
     606                 :         co_return io_result<std::vector<PayloadT>>{state.first_error_, {}};
     607                 : 
     608                 :     std::vector<PayloadT> results;
     609                 :     results.reserve(count);
     610                 :     for(auto& opt : state.results_)
     611                 :         results.push_back(std::move(*opt));
     612                 : 
     613                 :     co_return io_result<std::vector<PayloadT>>{{}, std::move(results)};
     614              24 : }
     615                 : 
     616                 : /** Execute a range of void io_result-returning awaitables concurrently.
     617                 : 
     618                 :     Launches all awaitables simultaneously and waits for all to complete.
     619                 :     Since all awaitables return io_result<>, no payload values are
     620                 :     collected. The first error_code cancels siblings and is propagated.
     621                 :     Exceptions always beat error codes.
     622                 : 
     623                 :     @param awaitables Range of io_result<>-returning awaitables to
     624                 :         execute concurrently (must not be empty).
     625                 : 
     626                 :     @return A task yielding io_result<> whose ec is the first child
     627                 :         error, or default-constructed on success.
     628                 : 
     629                 :     @throws std::invalid_argument if range is empty.
     630                 :     @throws Rethrows the first child exception after all children
     631                 :         complete (exception beats error_code).
     632                 : 
     633                 :     @par Example
     634                 :     @code
     635                 :     task<void> example()
     636                 :     {
     637                 :         std::vector<io_task<>> jobs;
     638                 :         for (int i = 0; i < n; ++i)
     639                 :             jobs.push_back(process(i));
     640                 : 
     641                 :         auto [ec] = co_await when_all(std::move(jobs));
     642                 :     }
     643                 :     @endcode
     644                 : 
     645                 :     @see IoAwaitableRange, when_all
     646                 : */
     647                 : template<IoAwaitableRange R>
     648                 :     requires detail::is_io_result_v<
     649                 :         awaitable_result_t<std::ranges::range_value_t<R>>>
     650                 :     && std::is_same_v<
     651                 :             detail::io_result_payload_t<
     652                 :                 awaitable_result_t<std::ranges::range_value_t<R>>>,
     653                 :             std::tuple<>>
     654               4 : [[nodiscard]] auto when_all(R&& awaitables) -> task<io_result<>>
     655                 : {
     656                 :     using OwnedRange = std::remove_cvref_t<R>;
     657                 : 
     658                 :     auto count = std::ranges::size(awaitables);
     659                 :     if(count == 0)
     660                 :         throw std::invalid_argument("when_all requires at least one awaitable");
     661                 : 
     662                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     663                 : 
     664                 :     detail::when_all_homogeneous_state<std::tuple<>> state(count);
     665                 : 
     666                 :     co_await detail::when_all_homogeneous_launcher<OwnedRange>(
     667                 :         &owned_awaitables, &state);
     668                 : 
     669                 :     if(state.core_.first_exception_)
     670                 :         std::rethrow_exception(state.core_.first_exception_);
     671                 : 
     672                 :     if(state.has_error_.load(std::memory_order_relaxed))
     673                 :         co_return io_result<>{state.first_error_};
     674                 : 
     675                 :     co_return io_result<>{};
     676               8 : }
     677                 : 
     678                 : /** Execute io_result-returning awaitables concurrently, inspecting error codes.
     679                 : 
     680                 :     Overload selected when all children return io_result<Ts...>.
     681                 :     The error_code is lifted out of each child into a single outer
     682                 :     io_result. On success all values are returned; on failure the
     683                 :     first error_code wins.
     684                 : 
     685                 :     @par Exception Safety
     686                 :     Exception always beats error_code. If any child throws, the
     687                 :     exception is rethrown regardless of error_code results.
     688                 : 
     689                 :     @param awaitables One or more awaitables each returning
     690                 :         io_result<Ts...>.
     691                 : 
     692                 :     @return A task yielding io_result<R1, R2, ..., Rn> where each Ri
     693                 :         follows the payload flattening rules.
     694                 : */
     695                 : template<IoAwaitable... As>
     696                 :     requires (sizeof...(As) > 0)
     697                 :           && detail::all_io_result_awaitables<As...>
     698              50 : [[nodiscard]] auto when_all(As... awaitables)
     699                 :     -> task<io_result<
     700                 :         detail::io_result_payload_t<awaitable_result_t<As>>...>>
     701                 : {
     702                 :     using result_type = io_result<
     703                 :         detail::io_result_payload_t<awaitable_result_t<As>>...>;
     704                 : 
     705                 :     detail::when_all_state<awaitable_result_t<As>...> state;
     706                 :     std::tuple<As...> awaitable_tuple(std::move(awaitables)...);
     707                 : 
     708                 :     co_await detail::when_all_io_launcher<As...>(&awaitable_tuple, &state);
     709                 : 
     710                 :     // Exception always wins over error_code
     711                 :     if(state.core_.first_exception_)
     712                 :         std::rethrow_exception(state.core_.first_exception_);
     713                 : 
     714                 :     auto r = detail::build_when_all_io_result<result_type>(
     715                 :         detail::extract_results(state));
     716                 :     if(state.has_error_.load(std::memory_order_relaxed))
     717                 :         r.ec = state.first_error_;
     718                 :     co_return r;
     719             100 : }
     720                 : 
     721                 : } // namespace capy
     722                 : } // namespace boost
     723                 : 
     724                 : #endif
        

Generated by: LCOV version 2.3