LCOV - code coverage report
Current view: top level - capy - when_any.hpp (source / functions) Coverage Total Hit Missed
Test: coverage_remapped.info Lines: 93.9 % 165 155 10
Test Date: 2026-03-21 03:20:11 Functions: 91.2 % 160 146 14

           TLA  Line data    Source code
       1                 : //
       2                 : // Copyright (c) 2026 Michael Vandeberg
       3                 : // Copyright (c) 2026 Steve Gerbino
       4                 : //
       5                 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       6                 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       7                 : //
       8                 : // Official repository: https://github.com/cppalliance/capy
       9                 : //
      10                 : 
      11                 : #ifndef BOOST_CAPY_WHEN_ANY_HPP
      12                 : #define BOOST_CAPY_WHEN_ANY_HPP
      13                 : 
      14                 : #include <boost/capy/detail/config.hpp>
      15                 : #include <boost/capy/detail/io_result_combinators.hpp>
      16                 : #include <boost/capy/continuation.hpp>
      17                 : #include <boost/capy/concept/executor.hpp>
      18                 : #include <boost/capy/concept/io_awaitable.hpp>
      19                 : #include <coroutine>
      20                 : #include <boost/capy/ex/executor_ref.hpp>
      21                 : #include <boost/capy/ex/frame_allocator.hpp>
      22                 : #include <boost/capy/ex/io_env.hpp>
      23                 : #include <boost/capy/task.hpp>
      24                 : 
      25                 : #include <array>
      26                 : #include <atomic>
      27                 : #include <exception>
      28                 : #include <memory>
      29                 : #include <mutex>
      30                 : #include <optional>
      31                 : #include <ranges>
      32                 : #include <stdexcept>
      33                 : #include <stop_token>
      34                 : #include <tuple>
      35                 : #include <type_traits>
      36                 : #include <utility>
      37                 : #include <variant>
      38                 : #include <vector>
      39                 : 
      40                 : /*
      41                 :    when_any - Race multiple io_result tasks, select first success
      42                 :    =============================================================
      43                 : 
      44                 :    OVERVIEW:
      45                 :    ---------
      46                 :    when_any launches N io_result-returning tasks concurrently. A task
      47                 :    wins by returning !ec; errors and exceptions do not win. Once a
      48                 :    winner is found, stop is requested for siblings and the winner's
      49                 :    payload is returned. If no winner exists (all fail), the first
      50                 :    error_code is returned or the last exception is rethrown.
      51                 : 
      52                 :    ARCHITECTURE:
      53                 :    -------------
      54                 :    The design mirrors when_all but with inverted completion semantics:
      55                 : 
      56                 :      when_all:  complete when remaining_count reaches 0 (all done)
      57                 :      when_any:  complete when has_winner becomes true (first done)
      58                 :                 BUT still wait for remaining_count to reach 0 for cleanup
      59                 : 
      60                 :    Key components:
      61                 :      - when_any_core:    Shared state tracking winner and completion
      62                 :      - when_any_io_runner: Wrapper coroutine for each child task
      63                 :      - when_any_io_launcher/when_any_io_homogeneous_launcher:
      64                 :                           Awaitables that start all runners concurrently
      65                 : 
      66                 :    CRITICAL INVARIANTS:
      67                 :    --------------------
      68                 :    1. Only a task returning !ec can become the winner (via atomic CAS)
      69                 :    2. All tasks must complete before parent resumes (cleanup safety)
      70                 :    3. Stop is requested immediately when winner is determined
      71                 :    4. Exceptions and errors do not claim winner status
      72                 : 
      73                 :    POSITIONAL VARIANT:
      74                 :    -------------------
      75                 :    The variadic overload returns std::variant<error_code, R1, R2, ..., Rn>.
      76                 :    Index 0 is error_code (failure/no-winner). Index 1..N identifies the
      77                 :    winning child and carries its payload.
      78                 : 
      79                 :    RANGE OVERLOAD:
      80                 :    ---------------
      81                 :    The range overload returns variant<error_code, pair<size_t, T>> for
      82                 :    non-void children or variant<error_code, size_t> for void children.
      83                 : 
      84                 :    MEMORY MODEL:
      85                 :    -------------
      86                 :    Synchronization chain from winner's write to parent's read:
      87                 : 
      88                 :    1. Winner thread writes result_ (non-atomic)
      89                 :    2. Winner thread calls signal_completion() -> fetch_sub(acq_rel) on remaining_count_
      90                 :    3. Last task thread (may be winner or non-winner) calls signal_completion()
      91                 :       -> fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
      92                 :    4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
      93                 :    5. Parent coroutine resumes and reads result_
      94                 : 
      95                 :    Synchronization analysis:
      96                 :    - All fetch_sub operations on remaining_count_ form a release sequence
      97                 :    - Winner's fetch_sub releases; subsequent fetch_sub operations participate
      98                 :      in the modification order of remaining_count_
      99                 :    - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
     100                 :      modification order, establishing happens-before from winner's writes
     101                 :    - Executor dispatch() is expected to provide queue-based synchronization
     102                 :      (release-on-post, acquire-on-execute) completing the chain to parent
     103                 :    - Even inline executors work (same thread = sequenced-before)
     104                 : 
     105                 :    EXCEPTION SEMANTICS:
     106                 :    --------------------
     107                 :    Exceptions do NOT claim winner status. If a child throws, the exception
     108                 :    is recorded but the combinator keeps waiting for a success. Only when
     109                 :    all children complete without a winner does the combinator check: if
     110                 :    any exception was recorded, it is rethrown (exception beats error_code).
     111                 : */
     112                 : 
     113                 : namespace boost {
     114                 : namespace capy {
     115                 : 
     116                 : namespace detail {
     117                 : 
     118                 : /** Core shared state for when_any operations.
     119                 : 
     120                 :     Contains all members and methods common to both heterogeneous (variadic)
     121                 :     and homogeneous (range) when_any implementations. State classes embed
     122                 :     this via composition to avoid CRTP destructor ordering issues.
     123                 : 
     124                 :     @par Thread Safety
     125                 :     Atomic operations protect winner selection and completion count.
     126                 : */
     127                 : struct when_any_core
     128                 : {
     129                 :     std::atomic<std::size_t> remaining_count_;
     130                 :     std::size_t winner_index_{0};
     131                 :     std::exception_ptr winner_exception_;
     132                 :     std::stop_source stop_source_;
     133                 : 
     134                 :     // Bridges parent's stop token to our stop_source
     135                 :     struct stop_callback_fn
     136                 :     {
     137                 :         std::stop_source* source_;
     138 HIT           2 :         void operator()() const noexcept { source_->request_stop(); }
     139                 :     };
     140                 :     using stop_callback_t = std::stop_callback<stop_callback_fn>;
     141                 :     std::optional<stop_callback_t> parent_stop_callback_;
     142                 : 
     143                 :     continuation continuation_;
     144                 :     io_env const* caller_env_ = nullptr;
     145                 : 
     146                 :     // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
     147                 :     std::atomic<bool> has_winner_{false};
     148                 : 
     149              31 :     explicit when_any_core(std::size_t count) noexcept
     150              31 :         : remaining_count_(count)
     151                 :     {
     152              31 :     }
     153                 : 
     154                 :     /** Atomically claim winner status; exactly one task succeeds. */
     155              52 :     bool try_win(std::size_t index) noexcept
     156                 :     {
     157              52 :         bool expected = false;
     158              52 :         if(has_winner_.compare_exchange_strong(
     159                 :             expected, true, std::memory_order_acq_rel))
     160                 :         {
     161              22 :             winner_index_ = index;
     162              22 :             stop_source_.request_stop();
     163              22 :             return true;
     164                 :         }
     165              30 :         return false;
     166                 :     }
     167                 : 
     168                 :     /** @pre try_win() returned true. */
     169 MIS           0 :     void set_winner_exception(std::exception_ptr ep) noexcept
     170                 :     {
     171               0 :         winner_exception_ = ep;
     172               0 :     }
     173                 : 
     174                 :     // Runners signal completion directly via final_suspend; no member function needed.
     175                 : };
     176                 : 
     177                 : } // namespace detail
     178                 : 
     179                 : namespace detail {
     180                 : 
     181                 : // State for io_result-aware when_any: only !ec wins.
     182                 : template<typename... Ts>
     183                 : struct when_any_io_state
     184                 : {
     185                 :     static constexpr std::size_t task_count = sizeof...(Ts);
     186                 :     using variant_type = std::variant<std::error_code, Ts...>;
     187                 : 
     188                 :     when_any_core core_;
     189                 :     std::optional<variant_type> result_;
     190                 :     std::array<continuation, task_count> runner_handles_{};
     191                 : 
     192                 :     // Last failure (error or exception) for the all-fail case.
     193                 :     // Last writer wins — no priority between errors and exceptions.
     194                 :     std::mutex failure_mu_;
     195                 :     std::error_code last_error_;
     196                 :     std::exception_ptr last_exception_;
     197                 : 
     198 HIT          16 :     when_any_io_state()
     199              16 :         : core_(task_count)
     200                 :     {
     201              16 :     }
     202                 : 
     203              12 :     void record_error(std::error_code ec)
     204                 :     {
     205              12 :         std::lock_guard lk(failure_mu_);
     206              12 :         last_error_ = ec;
     207              12 :         last_exception_ = nullptr;
     208              12 :     }
     209                 : 
     210               7 :     void record_exception(std::exception_ptr ep)
     211                 :     {
     212               7 :         std::lock_guard lk(failure_mu_);
     213               7 :         last_exception_ = ep;
     214               7 :         last_error_ = {};
     215               7 :     }
     216                 : };
     217                 : 
     218                 : // Wrapper coroutine for io_result-aware when_any children.
     219                 : // unhandled_exception records the exception but does NOT claim winner status.
     220                 : template<typename StateType>
     221                 : struct when_any_io_runner
     222                 : {
     223                 :     struct promise_type
     224                 :     {
     225                 :         StateType* state_ = nullptr;
     226                 :         std::size_t index_ = 0;
     227                 :         io_env env_;
     228                 : 
     229              82 :         when_any_io_runner get_return_object() noexcept
     230                 :         {
     231                 :             return when_any_io_runner(
     232              82 :                 std::coroutine_handle<promise_type>::from_promise(*this));
     233                 :         }
     234                 : 
     235              82 :         std::suspend_always initial_suspend() noexcept { return {}; }
     236                 : 
     237              82 :         auto final_suspend() noexcept
     238                 :         {
     239                 :             struct awaiter
     240                 :             {
     241                 :                 promise_type* p_;
     242              82 :                 bool await_ready() const noexcept { return false; }
     243              82 :                 auto await_suspend(std::coroutine_handle<> h) noexcept
     244                 :                 {
     245              82 :                     auto& core = p_->state_->core_;
     246              82 :                     auto* counter = &core.remaining_count_;
     247              82 :                     auto* caller_env = core.caller_env_;
     248              82 :                     auto& cont = core.continuation_;
     249                 : 
     250              82 :                     h.destroy();
     251                 : 
     252              82 :                     auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
     253              82 :                     if(remaining == 1)
     254              31 :                         return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
     255              51 :                     return detail::symmetric_transfer(std::noop_coroutine());
     256                 :                 }
     257 MIS           0 :                 void await_resume() const noexcept {}
     258                 :             };
     259 HIT          82 :             return awaiter{this};
     260                 :         }
     261                 : 
     262              71 :         void return_void() noexcept {}
     263                 : 
     264                 :         // Exceptions do NOT win in io_result when_any
     265              11 :         void unhandled_exception()
     266                 :         {
     267              11 :             state_->record_exception(std::current_exception());
     268              11 :         }
     269                 : 
     270                 :         template<class Awaitable>
     271                 :         struct transform_awaiter
     272                 :         {
     273                 :             std::decay_t<Awaitable> a_;
     274                 :             promise_type* p_;
     275                 : 
     276              82 :             bool await_ready() { return a_.await_ready(); }
     277              82 :             decltype(auto) await_resume() { return a_.await_resume(); }
     278                 : 
     279                 :             template<class Promise>
     280              81 :             auto await_suspend(std::coroutine_handle<Promise> h)
     281                 :             {
     282                 :                 using R = decltype(a_.await_suspend(h, &p_->env_));
     283                 :                 if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
     284              81 :                     return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
     285                 :                 else
     286                 :                     return a_.await_suspend(h, &p_->env_);
     287                 :             }
     288                 :         };
     289                 : 
     290                 :         template<class Awaitable>
     291              82 :         auto await_transform(Awaitable&& a)
     292                 :         {
     293                 :             using A = std::decay_t<Awaitable>;
     294                 :             if constexpr (IoAwaitable<A>)
     295                 :             {
     296                 :                 return transform_awaiter<Awaitable>{
     297             163 :                     std::forward<Awaitable>(a), this};
     298                 :             }
     299                 :             else
     300                 :             {
     301                 :                 static_assert(sizeof(A) == 0, "requires IoAwaitable");
     302                 :             }
     303              81 :         }
     304                 :     };
     305                 : 
     306                 :     std::coroutine_handle<promise_type> h_;
     307                 : 
     308              82 :     explicit when_any_io_runner(std::coroutine_handle<promise_type> h) noexcept
     309              82 :         : h_(h)
     310                 :     {
     311              82 :     }
     312                 : 
     313                 :     when_any_io_runner(when_any_io_runner&& other) noexcept
     314                 :         : h_(std::exchange(other.h_, nullptr))
     315                 :     {
     316                 :     }
     317                 : 
     318                 :     when_any_io_runner(when_any_io_runner const&) = delete;
     319                 :     when_any_io_runner& operator=(when_any_io_runner const&) = delete;
     320                 :     when_any_io_runner& operator=(when_any_io_runner&&) = delete;
     321                 : 
     322              82 :     auto release() noexcept
     323                 :     {
     324              82 :         return std::exchange(h_, nullptr);
     325                 :     }
     326                 : };
     327                 : 
     328                 : // Runner coroutine: only tries to win when the child returns !ec.
     329                 : template<std::size_t I, IoAwaitable Awaitable, typename StateType>
     330                 : when_any_io_runner<StateType>
     331              30 : make_when_any_io_runner(Awaitable inner, StateType* state)
     332                 : {
     333                 :     auto result = co_await std::move(inner);
     334                 : 
     335                 :     if(!result.ec)
     336                 :     {
     337                 :         // Success: try to claim winner
     338                 :         if(state->core_.try_win(I))
     339                 :         {
     340                 :             try
     341                 :             {
     342                 :                 state->result_.emplace(
     343                 :                     std::in_place_index<I + 1>,
     344                 :                     detail::extract_io_payload(std::move(result)));
     345                 :             }
     346                 :             catch(...)
     347                 :             {
     348                 :                 state->core_.set_winner_exception(std::current_exception());
     349                 :             }
     350                 :         }
     351                 :     }
     352                 :     else
     353                 :     {
     354                 :         // Error: record but don't win
     355                 :         state->record_error(result.ec);
     356                 :     }
     357              60 : }
     358                 : 
     359                 : // Launcher for io_result-aware when_any.
     360                 : template<IoAwaitable... Awaitables>
     361                 : class when_any_io_launcher
     362                 : {
     363                 :     using state_type = when_any_io_state<
     364                 :         io_result_payload_t<awaitable_result_t<Awaitables>>...>;
     365                 : 
     366                 :     std::tuple<Awaitables...>* tasks_;
     367                 :     state_type* state_;
     368                 : 
     369                 : public:
     370              16 :     when_any_io_launcher(
     371                 :         std::tuple<Awaitables...>* tasks,
     372                 :         state_type* state)
     373              16 :         : tasks_(tasks)
     374              16 :         , state_(state)
     375                 :     {
     376              16 :     }
     377                 : 
     378              16 :     bool await_ready() const noexcept
     379                 :     {
     380              16 :         return sizeof...(Awaitables) == 0;
     381                 :     }
     382                 : 
     383              16 :     std::coroutine_handle<> await_suspend(
     384                 :         std::coroutine_handle<> continuation, io_env const* caller_env)
     385                 :     {
     386              16 :         state_->core_.continuation_.h = continuation;
     387              16 :         state_->core_.caller_env_ = caller_env;
     388                 : 
     389              16 :         if(caller_env->stop_token.stop_possible())
     390                 :         {
     391               2 :             state_->core_.parent_stop_callback_.emplace(
     392               1 :                 caller_env->stop_token,
     393               1 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     394                 : 
     395               1 :             if(caller_env->stop_token.stop_requested())
     396 MIS           0 :                 state_->core_.stop_source_.request_stop();
     397                 :         }
     398                 : 
     399 HIT          16 :         auto token = state_->core_.stop_source_.get_token();
     400              28 :         [&]<std::size_t... Is>(std::index_sequence<Is...>) {
     401              16 :             (..., launch_one<Is>(caller_env->executor, token));
     402              16 :         }(std::index_sequence_for<Awaitables...>{});
     403                 : 
     404              32 :         return std::noop_coroutine();
     405              16 :     }
     406                 : 
     407              16 :     void await_resume() const noexcept {}
     408                 : 
     409                 : private:
     410                 :     template<std::size_t I>
     411              30 :     void launch_one(executor_ref caller_ex, std::stop_token token)
     412                 :     {
     413              30 :         auto runner = make_when_any_io_runner<I>(
     414              30 :             std::move(std::get<I>(*tasks_)), state_);
     415                 : 
     416              30 :         auto h = runner.release();
     417              30 :         h.promise().state_ = state_;
     418              30 :         h.promise().index_ = I;
     419              30 :         h.promise().env_ = io_env{caller_ex, token,
     420              30 :             state_->core_.caller_env_->frame_allocator};
     421                 : 
     422              30 :         state_->runner_handles_[I].h = std::coroutine_handle<>{h};
     423              30 :         caller_ex.post(state_->runner_handles_[I]);
     424              60 :     }
     425                 : };
     426                 : 
     427                 : /** Shared state for homogeneous io_result-aware when_any (range overload).
     428                 : 
     429                 :     @tparam T The payload type extracted from io_result.
     430                 : */
     431                 : template<typename T>
     432                 : struct when_any_io_homogeneous_state
     433                 : {
     434                 :     when_any_core core_;
     435                 :     std::optional<T> result_;
     436                 :     std::unique_ptr<continuation[]> runner_handles_;
     437                 : 
     438                 :     std::mutex failure_mu_;
     439                 :     std::error_code last_error_;
     440                 :     std::exception_ptr last_exception_;
     441                 : 
     442              13 :     explicit when_any_io_homogeneous_state(std::size_t count)
     443              13 :         : core_(count)
     444              13 :         , runner_handles_(std::make_unique<continuation[]>(count))
     445                 :     {
     446              13 :     }
     447                 : 
     448               6 :     void record_error(std::error_code ec)
     449                 :     {
     450               6 :         std::lock_guard lk(failure_mu_);
     451               6 :         last_error_ = ec;
     452               6 :         last_exception_ = nullptr;
     453               6 :     }
     454                 : 
     455               4 :     void record_exception(std::exception_ptr ep)
     456                 :     {
     457               4 :         std::lock_guard lk(failure_mu_);
     458               4 :         last_exception_ = ep;
     459               4 :         last_error_ = {};
     460               4 :     }
     461                 : };
     462                 : 
     463                 : /** Specialization for void io_result children (no payload storage). */
     464                 : template<>
     465                 : struct when_any_io_homogeneous_state<std::tuple<>>
     466                 : {
     467                 :     when_any_core core_;
     468                 :     std::unique_ptr<continuation[]> runner_handles_;
     469                 : 
     470                 :     std::mutex failure_mu_;
     471                 :     std::error_code last_error_;
     472                 :     std::exception_ptr last_exception_;
     473                 : 
     474               2 :     explicit when_any_io_homogeneous_state(std::size_t count)
     475               2 :         : core_(count)
     476               2 :         , runner_handles_(std::make_unique<continuation[]>(count))
     477                 :     {
     478               2 :     }
     479                 : 
     480               1 :     void record_error(std::error_code ec)
     481                 :     {
     482               1 :         std::lock_guard lk(failure_mu_);
     483               1 :         last_error_ = ec;
     484               1 :         last_exception_ = nullptr;
     485               1 :     }
     486                 : 
     487 MIS           0 :     void record_exception(std::exception_ptr ep)
     488                 :     {
     489               0 :         std::lock_guard lk(failure_mu_);
     490               0 :         last_exception_ = ep;
     491               0 :         last_error_ = {};
     492               0 :     }
     493                 : };
     494                 : 
     495                 : /** Create an io_result-aware runner for homogeneous when_any (range path).
     496                 : 
     497                 :     Only tries to win when the child returns !ec.
     498                 : */
     499                 : template<IoAwaitable Awaitable, typename StateType>
     500                 : when_any_io_runner<StateType>
     501 HIT          52 : make_when_any_io_homogeneous_runner(
     502                 :     Awaitable inner, StateType* state, std::size_t index)
     503                 : {
     504                 :     auto result = co_await std::move(inner);
     505                 : 
     506                 :     if(!result.ec)
     507                 :     {
     508                 :         if(state->core_.try_win(index))
     509                 :         {
     510                 :             using PayloadT = io_result_payload_t<
     511                 :                 awaitable_result_t<Awaitable>>;
     512                 :             if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
     513                 :             {
     514                 :                 try
     515                 :                 {
     516                 :                     state->result_.emplace(
     517                 :                         extract_io_payload(std::move(result)));
     518                 :                 }
     519                 :                 catch(...)
     520                 :                 {
     521                 :                     state->core_.set_winner_exception(
     522                 :                         std::current_exception());
     523                 :                 }
     524                 :             }
     525                 :         }
     526                 :     }
     527                 :     else
     528                 :     {
     529                 :         state->record_error(result.ec);
     530                 :     }
     531             104 : }
     532                 : 
     533                 : /** Launches all io_result-aware homogeneous runners concurrently. */
     534                 : template<IoAwaitableRange Range>
     535                 : class when_any_io_homogeneous_launcher
     536                 : {
     537                 :     using Awaitable = std::ranges::range_value_t<Range>;
     538                 :     using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
     539                 : 
     540                 :     Range* range_;
     541                 :     when_any_io_homogeneous_state<PayloadT>* state_;
     542                 : 
     543                 : public:
     544              15 :     when_any_io_homogeneous_launcher(
     545                 :         Range* range,
     546                 :         when_any_io_homogeneous_state<PayloadT>* state)
     547              15 :         : range_(range)
     548              15 :         , state_(state)
     549                 :     {
     550              15 :     }
     551                 : 
     552              15 :     bool await_ready() const noexcept
     553                 :     {
     554              15 :         return std::ranges::empty(*range_);
     555                 :     }
     556                 : 
     557              15 :     std::coroutine_handle<> await_suspend(
     558                 :         std::coroutine_handle<> continuation, io_env const* caller_env)
     559                 :     {
     560              15 :         state_->core_.continuation_.h = continuation;
     561              15 :         state_->core_.caller_env_ = caller_env;
     562                 : 
     563              15 :         if(caller_env->stop_token.stop_possible())
     564                 :         {
     565               4 :             state_->core_.parent_stop_callback_.emplace(
     566               2 :                 caller_env->stop_token,
     567               2 :                 when_any_core::stop_callback_fn{&state_->core_.stop_source_});
     568                 : 
     569               2 :             if(caller_env->stop_token.stop_requested())
     570               1 :                 state_->core_.stop_source_.request_stop();
     571                 :         }
     572                 : 
     573              15 :         auto token = state_->core_.stop_source_.get_token();
     574                 : 
     575                 :         // Phase 1: Create all runners without dispatching.
     576              15 :         std::size_t index = 0;
     577              67 :         for(auto&& a : *range_)
     578                 :         {
     579              52 :             auto runner = make_when_any_io_homogeneous_runner(
     580              52 :                 std::move(a), state_, index);
     581                 : 
     582              52 :             auto h = runner.release();
     583              52 :             h.promise().state_ = state_;
     584              52 :             h.promise().index_ = index;
     585              52 :             h.promise().env_ = io_env{caller_env->executor, token,
     586              52 :                 caller_env->frame_allocator};
     587                 : 
     588              52 :             state_->runner_handles_[index].h = std::coroutine_handle<>{h};
     589              52 :             ++index;
     590                 :         }
     591                 : 
     592                 :         // Phase 2: Post all runners. Any may complete synchronously.
     593              15 :         auto* handles = state_->runner_handles_.get();
     594              15 :         std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed);
     595              67 :         for(std::size_t i = 0; i < count; ++i)
     596              52 :             caller_env->executor.post(handles[i]);
     597                 : 
     598              30 :         return std::noop_coroutine();
     599              67 :     }
     600                 : 
     601              15 :     void await_resume() const noexcept {}
     602                 : };
     603                 : 
     604                 : } // namespace detail
     605                 : 
     606                 : /** Race a range of io_result-returning awaitables (non-void payloads).
     607                 : 
     608                 :     Only a child returning !ec can win. Errors and exceptions do not
     609                 :     claim winner status. If all children fail, the last failure
     610                 :     is reported — either the last error_code at variant index 0,
     611                 :     or the last exception rethrown.
     612                 : 
     613                 :     @param awaitables Range of io_result-returning awaitables (must
     614                 :         not be empty).
     615                 : 
     616                 :     @return A task yielding variant<error_code, pair<size_t, PayloadT>>
     617                 :         where index 0 is failure and index 1 carries the winner's
     618                 :         index and payload.
     619                 : 
     620                 :     @throws std::invalid_argument if range is empty.
     621                 :     @throws Rethrows last exception when no winner and the last
     622                 :         failure was an exception.
     623                 : 
     624                 :     @par Example
     625                 :     @code
     626                 :     task<void> example()
     627                 :     {
     628                 :         std::vector<io_task<size_t>> reads;
     629                 :         for (auto& buf : buffers)
     630                 :             reads.push_back(stream.read_some(buf));
     631                 : 
     632                 :         auto result = co_await when_any(std::move(reads));
     633                 :         if (result.index() == 1)
     634                 :         {
     635                 :             auto [idx, n] = std::get<1>(result);
     636                 :         }
     637                 :     }
     638                 :     @endcode
     639                 : 
     640                 :     @see IoAwaitableRange, when_any
     641                 : */
     642                 : template<IoAwaitableRange R>
     643                 :     requires detail::is_io_result_v<
     644                 :         awaitable_result_t<std::ranges::range_value_t<R>>>
     645                 :     && (!std::is_same_v<
     646                 :             detail::io_result_payload_t<
     647                 :                 awaitable_result_t<std::ranges::range_value_t<R>>>,
     648                 :             std::tuple<>>)
     649              14 : [[nodiscard]] auto when_any(R&& awaitables)
     650                 :     -> task<std::variant<std::error_code,
     651                 :         std::pair<std::size_t,
     652                 :             detail::io_result_payload_t<
     653                 :                 awaitable_result_t<std::ranges::range_value_t<R>>>>>>
     654                 : {
     655                 :     using Awaitable = std::ranges::range_value_t<R>;
     656                 :     using PayloadT = detail::io_result_payload_t<
     657                 :         awaitable_result_t<Awaitable>>;
     658                 :     using result_type = std::variant<std::error_code,
     659                 :         std::pair<std::size_t, PayloadT>>;
     660                 :     using OwnedRange = std::remove_cvref_t<R>;
     661                 : 
     662                 :     auto count = std::ranges::size(awaitables);
     663                 :     if(count == 0)
     664                 :         throw std::invalid_argument("when_any requires at least one awaitable");
     665                 : 
     666                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     667                 : 
     668                 :     detail::when_any_io_homogeneous_state<PayloadT> state(count);
     669                 : 
     670                 :     co_await detail::when_any_io_homogeneous_launcher<OwnedRange>(
     671                 :         &owned_awaitables, &state);
     672                 : 
     673                 :     // Winner found
     674                 :     if(state.core_.has_winner_.load(std::memory_order_acquire))
     675                 :     {
     676                 :         if(state.core_.winner_exception_)
     677                 :             std::rethrow_exception(state.core_.winner_exception_);
     678                 :         co_return result_type{std::in_place_index<1>,
     679                 :             std::pair{state.core_.winner_index_, std::move(*state.result_)}};
     680                 :     }
     681                 : 
     682                 :     // No winner — report last failure
     683                 :     if(state.last_exception_)
     684                 :         std::rethrow_exception(state.last_exception_);
     685                 :     co_return result_type{std::in_place_index<0>, state.last_error_};
     686              28 : }
     687                 : 
     688                 : /** Race a range of void io_result-returning awaitables.
     689                 : 
     690                 :     Only a child returning !ec can win. Returns the winner's index
     691                 :     at variant index 1, or error_code at index 0 on all-fail.
     692                 : 
     693                 :     @param awaitables Range of io_result<>-returning awaitables (must
     694                 :         not be empty).
     695                 : 
     696                 :     @return A task yielding variant<error_code, size_t> where index 0
     697                 :         is failure and index 1 carries the winner's index.
     698                 : 
     699                 :     @throws std::invalid_argument if range is empty.
     700                 :     @throws Rethrows first exception when no winner and at least one
     701                 :         child threw.
     702                 : 
     703                 :     @par Example
     704                 :     @code
     705                 :     task<void> example()
     706                 :     {
     707                 :         std::vector<io_task<>> jobs;
     708                 :         jobs.push_back(background_work_a());
     709                 :         jobs.push_back(background_work_b());
     710                 : 
     711                 :         auto result = co_await when_any(std::move(jobs));
     712                 :         if (result.index() == 1)
     713                 :         {
     714                 :             auto winner = std::get<1>(result);
     715                 :         }
     716                 :     }
     717                 :     @endcode
     718                 : 
     719                 :     @see IoAwaitableRange, when_any
     720                 : */
     721                 : template<IoAwaitableRange R>
     722                 :     requires detail::is_io_result_v<
     723                 :         awaitable_result_t<std::ranges::range_value_t<R>>>
     724                 :     && std::is_same_v<
     725                 :             detail::io_result_payload_t<
     726                 :                 awaitable_result_t<std::ranges::range_value_t<R>>>,
     727                 :             std::tuple<>>
     728               2 : [[nodiscard]] auto when_any(R&& awaitables)
     729                 :     -> task<std::variant<std::error_code, std::size_t>>
     730                 : {
     731                 :     using OwnedRange = std::remove_cvref_t<R>;
     732                 :     using result_type = std::variant<std::error_code, std::size_t>;
     733                 : 
     734                 :     auto count = std::ranges::size(awaitables);
     735                 :     if(count == 0)
     736                 :         throw std::invalid_argument("when_any requires at least one awaitable");
     737                 : 
     738                 :     OwnedRange owned_awaitables = std::forward<R>(awaitables);
     739                 : 
     740                 :     detail::when_any_io_homogeneous_state<std::tuple<>> state(count);
     741                 : 
     742                 :     co_await detail::when_any_io_homogeneous_launcher<OwnedRange>(
     743                 :         &owned_awaitables, &state);
     744                 : 
     745                 :     // Winner found
     746                 :     if(state.core_.has_winner_.load(std::memory_order_acquire))
     747                 :     {
     748                 :         if(state.core_.winner_exception_)
     749                 :             std::rethrow_exception(state.core_.winner_exception_);
     750                 :         co_return result_type{std::in_place_index<1>,
     751                 :             state.core_.winner_index_};
     752                 :     }
     753                 : 
     754                 :     // No winner — report last failure
     755                 :     if(state.last_exception_)
     756                 :         std::rethrow_exception(state.last_exception_);
     757                 :     co_return result_type{std::in_place_index<0>, state.last_error_};
     758               4 : }
     759                 : 
     760                 : /** Race io_result-returning awaitables, selecting the first success.
     761                 : 
     762                 :     Overload selected when all children return io_result<Ts...>.
     763                 :     Only a child returning !ec can win. Errors and exceptions do
     764                 :     not claim winner status.
     765                 : 
     766                 :     @return A task yielding variant<error_code, R1, ..., Rn> where
     767                 :         index 0 is the failure/no-winner case and index i+1
     768                 :         identifies the winning child.
     769                 : */
     770                 : template<IoAwaitable... As>
     771                 :     requires (sizeof...(As) > 0)
     772                 :           && detail::all_io_result_awaitables<As...>
     773              16 : [[nodiscard]] auto when_any(As... as)
     774                 :     -> task<std::variant<
     775                 :         std::error_code,
     776                 :         detail::io_result_payload_t<awaitable_result_t<As>>...>>
     777                 : {
     778                 :     using result_type = std::variant<
     779                 :         std::error_code,
     780                 :         detail::io_result_payload_t<awaitable_result_t<As>>...>;
     781                 : 
     782                 :     detail::when_any_io_state<
     783                 :         detail::io_result_payload_t<awaitable_result_t<As>>...> state;
     784                 :     std::tuple<As...> awaitable_tuple(std::move(as)...);
     785                 : 
     786                 :     co_await detail::when_any_io_launcher<As...>(
     787                 :         &awaitable_tuple, &state);
     788                 : 
     789                 :     // Winner found: return their result
     790                 :     if(state.result_.has_value())
     791                 :         co_return std::move(*state.result_);
     792                 : 
     793                 :     // Winner claimed but payload construction failed
     794                 :     if(state.core_.winner_exception_)
     795                 :         std::rethrow_exception(state.core_.winner_exception_);
     796                 : 
     797                 :     // No winner — report last failure
     798                 :     if(state.last_exception_)
     799                 :         std::rethrow_exception(state.last_exception_);
     800                 :     co_return result_type{std::in_place_index<0>, state.last_error_};
     801              32 : }
     802                 : 
     803                 : } // namespace capy
     804                 : } // namespace boost
     805                 : 
     806                 : #endif
        

Generated by: LCOV version 2.3