TLA Line data Source code
1 : //
2 : // Copyright (c) 2026 Michael Vandeberg
3 : // Copyright (c) 2026 Steve Gerbino
4 : //
5 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 : //
8 : // Official repository: https://github.com/cppalliance/capy
9 : //
10 :
11 : #ifndef BOOST_CAPY_WHEN_ANY_HPP
12 : #define BOOST_CAPY_WHEN_ANY_HPP
13 :
14 : #include <boost/capy/detail/config.hpp>
15 : #include <boost/capy/detail/io_result_combinators.hpp>
16 : #include <boost/capy/continuation.hpp>
17 : #include <boost/capy/concept/executor.hpp>
18 : #include <boost/capy/concept/io_awaitable.hpp>
19 : #include <coroutine>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <boost/capy/ex/frame_allocator.hpp>
22 : #include <boost/capy/ex/io_env.hpp>
23 : #include <boost/capy/task.hpp>
24 :
25 : #include <array>
26 : #include <atomic>
27 : #include <exception>
28 : #include <memory>
29 : #include <mutex>
30 : #include <optional>
31 : #include <ranges>
32 : #include <stdexcept>
33 : #include <stop_token>
34 : #include <tuple>
35 : #include <type_traits>
36 : #include <utility>
37 : #include <variant>
38 : #include <vector>
39 :
40 : /*
41 : when_any - Race multiple io_result tasks, select first success
42 : =============================================================
43 :
44 : OVERVIEW:
45 : ---------
46 : when_any launches N io_result-returning tasks concurrently. A task
47 : wins by returning !ec; errors and exceptions do not win. Once a
48 : winner is found, stop is requested for siblings and the winner's
49 : payload is returned. If no winner exists (all fail), the first
50 : error_code is returned or the last exception is rethrown.
51 :
52 : ARCHITECTURE:
53 : -------------
54 : The design mirrors when_all but with inverted completion semantics:
55 :
56 : when_all: complete when remaining_count reaches 0 (all done)
57 : when_any: complete when has_winner becomes true (first done)
58 : BUT still wait for remaining_count to reach 0 for cleanup
59 :
60 : Key components:
61 : - when_any_core: Shared state tracking winner and completion
62 : - when_any_io_runner: Wrapper coroutine for each child task
63 : - when_any_io_launcher/when_any_io_homogeneous_launcher:
64 : Awaitables that start all runners concurrently
65 :
66 : CRITICAL INVARIANTS:
67 : --------------------
68 : 1. Only a task returning !ec can become the winner (via atomic CAS)
69 : 2. All tasks must complete before parent resumes (cleanup safety)
70 : 3. Stop is requested immediately when winner is determined
71 : 4. Exceptions and errors do not claim winner status
72 :
73 : POSITIONAL VARIANT:
74 : -------------------
75 : The variadic overload returns std::variant<error_code, R1, R2, ..., Rn>.
76 : Index 0 is error_code (failure/no-winner). Index 1..N identifies the
77 : winning child and carries its payload.
78 :
79 : RANGE OVERLOAD:
80 : ---------------
81 : The range overload returns variant<error_code, pair<size_t, T>> for
82 : non-void children or variant<error_code, size_t> for void children.
83 :
84 : MEMORY MODEL:
85 : -------------
86 : Synchronization chain from winner's write to parent's read:
87 :
88 : 1. Winner thread writes result_ (non-atomic)
89 : 2. Winner thread calls signal_completion() -> fetch_sub(acq_rel) on remaining_count_
90 : 3. Last task thread (may be winner or non-winner) calls signal_completion()
91 : -> fetch_sub(acq_rel) on remaining_count_, observing count becomes 0
92 : 4. Last task returns caller_ex_.dispatch(continuation_) via symmetric transfer
93 : 5. Parent coroutine resumes and reads result_
94 :
95 : Synchronization analysis:
96 : - All fetch_sub operations on remaining_count_ form a release sequence
97 : - Winner's fetch_sub releases; subsequent fetch_sub operations participate
98 : in the modification order of remaining_count_
99 : - Last task's fetch_sub(acq_rel) synchronizes-with prior releases in the
100 : modification order, establishing happens-before from winner's writes
101 : - Executor dispatch() is expected to provide queue-based synchronization
102 : (release-on-post, acquire-on-execute) completing the chain to parent
103 : - Even inline executors work (same thread = sequenced-before)
104 :
105 : EXCEPTION SEMANTICS:
106 : --------------------
107 : Exceptions do NOT claim winner status. If a child throws, the exception
108 : is recorded but the combinator keeps waiting for a success. Only when
109 : all children complete without a winner does the combinator check: if
110 : any exception was recorded, it is rethrown (exception beats error_code).
111 : */
112 :
113 : namespace boost {
114 : namespace capy {
115 :
116 : namespace detail {
117 :
118 : /** Core shared state for when_any operations.
119 :
120 : Contains all members and methods common to both heterogeneous (variadic)
121 : and homogeneous (range) when_any implementations. State classes embed
122 : this via composition to avoid CRTP destructor ordering issues.
123 :
124 : @par Thread Safety
125 : Atomic operations protect winner selection and completion count.
126 : */
127 : struct when_any_core
128 : {
129 : std::atomic<std::size_t> remaining_count_;
130 : std::size_t winner_index_{0};
131 : std::exception_ptr winner_exception_;
132 : std::stop_source stop_source_;
133 :
134 : // Bridges parent's stop token to our stop_source
135 : struct stop_callback_fn
136 : {
137 : std::stop_source* source_;
138 HIT 2 : void operator()() const noexcept { source_->request_stop(); }
139 : };
140 : using stop_callback_t = std::stop_callback<stop_callback_fn>;
141 : std::optional<stop_callback_t> parent_stop_callback_;
142 :
143 : continuation continuation_;
144 : io_env const* caller_env_ = nullptr;
145 :
146 : // Placed last to avoid padding (1-byte atomic followed by 8-byte aligned members)
147 : std::atomic<bool> has_winner_{false};
148 :
149 31 : explicit when_any_core(std::size_t count) noexcept
150 31 : : remaining_count_(count)
151 : {
152 31 : }
153 :
154 : /** Atomically claim winner status; exactly one task succeeds. */
155 52 : bool try_win(std::size_t index) noexcept
156 : {
157 52 : bool expected = false;
158 52 : if(has_winner_.compare_exchange_strong(
159 : expected, true, std::memory_order_acq_rel))
160 : {
161 22 : winner_index_ = index;
162 22 : stop_source_.request_stop();
163 22 : return true;
164 : }
165 30 : return false;
166 : }
167 :
168 : /** @pre try_win() returned true. */
169 MIS 0 : void set_winner_exception(std::exception_ptr ep) noexcept
170 : {
171 0 : winner_exception_ = ep;
172 0 : }
173 :
174 : // Runners signal completion directly via final_suspend; no member function needed.
175 : };
176 :
177 : } // namespace detail
178 :
179 : namespace detail {
180 :
181 : // State for io_result-aware when_any: only !ec wins.
182 : template<typename... Ts>
183 : struct when_any_io_state
184 : {
185 : static constexpr std::size_t task_count = sizeof...(Ts);
186 : using variant_type = std::variant<std::error_code, Ts...>;
187 :
188 : when_any_core core_;
189 : std::optional<variant_type> result_;
190 : std::array<continuation, task_count> runner_handles_{};
191 :
192 : // Last failure (error or exception) for the all-fail case.
193 : // Last writer wins — no priority between errors and exceptions.
194 : std::mutex failure_mu_;
195 : std::error_code last_error_;
196 : std::exception_ptr last_exception_;
197 :
198 HIT 16 : when_any_io_state()
199 16 : : core_(task_count)
200 : {
201 16 : }
202 :
203 12 : void record_error(std::error_code ec)
204 : {
205 12 : std::lock_guard lk(failure_mu_);
206 12 : last_error_ = ec;
207 12 : last_exception_ = nullptr;
208 12 : }
209 :
210 7 : void record_exception(std::exception_ptr ep)
211 : {
212 7 : std::lock_guard lk(failure_mu_);
213 7 : last_exception_ = ep;
214 7 : last_error_ = {};
215 7 : }
216 : };
217 :
218 : // Wrapper coroutine for io_result-aware when_any children.
219 : // unhandled_exception records the exception but does NOT claim winner status.
220 : template<typename StateType>
221 : struct when_any_io_runner
222 : {
223 : struct promise_type
224 : {
225 : StateType* state_ = nullptr;
226 : std::size_t index_ = 0;
227 : io_env env_;
228 :
229 82 : when_any_io_runner get_return_object() noexcept
230 : {
231 : return when_any_io_runner(
232 82 : std::coroutine_handle<promise_type>::from_promise(*this));
233 : }
234 :
235 82 : std::suspend_always initial_suspend() noexcept { return {}; }
236 :
237 82 : auto final_suspend() noexcept
238 : {
239 : struct awaiter
240 : {
241 : promise_type* p_;
242 82 : bool await_ready() const noexcept { return false; }
243 82 : auto await_suspend(std::coroutine_handle<> h) noexcept
244 : {
245 82 : auto& core = p_->state_->core_;
246 82 : auto* counter = &core.remaining_count_;
247 82 : auto* caller_env = core.caller_env_;
248 82 : auto& cont = core.continuation_;
249 :
250 82 : h.destroy();
251 :
252 82 : auto remaining = counter->fetch_sub(1, std::memory_order_acq_rel);
253 82 : if(remaining == 1)
254 31 : return detail::symmetric_transfer(caller_env->executor.dispatch(cont));
255 51 : return detail::symmetric_transfer(std::noop_coroutine());
256 : }
257 MIS 0 : void await_resume() const noexcept {}
258 : };
259 HIT 82 : return awaiter{this};
260 : }
261 :
262 71 : void return_void() noexcept {}
263 :
264 : // Exceptions do NOT win in io_result when_any
265 11 : void unhandled_exception()
266 : {
267 11 : state_->record_exception(std::current_exception());
268 11 : }
269 :
270 : template<class Awaitable>
271 : struct transform_awaiter
272 : {
273 : std::decay_t<Awaitable> a_;
274 : promise_type* p_;
275 :
276 82 : bool await_ready() { return a_.await_ready(); }
277 82 : decltype(auto) await_resume() { return a_.await_resume(); }
278 :
279 : template<class Promise>
280 81 : auto await_suspend(std::coroutine_handle<Promise> h)
281 : {
282 : using R = decltype(a_.await_suspend(h, &p_->env_));
283 : if constexpr (std::is_same_v<R, std::coroutine_handle<>>)
284 81 : return detail::symmetric_transfer(a_.await_suspend(h, &p_->env_));
285 : else
286 : return a_.await_suspend(h, &p_->env_);
287 : }
288 : };
289 :
290 : template<class Awaitable>
291 82 : auto await_transform(Awaitable&& a)
292 : {
293 : using A = std::decay_t<Awaitable>;
294 : if constexpr (IoAwaitable<A>)
295 : {
296 : return transform_awaiter<Awaitable>{
297 163 : std::forward<Awaitable>(a), this};
298 : }
299 : else
300 : {
301 : static_assert(sizeof(A) == 0, "requires IoAwaitable");
302 : }
303 81 : }
304 : };
305 :
306 : std::coroutine_handle<promise_type> h_;
307 :
308 82 : explicit when_any_io_runner(std::coroutine_handle<promise_type> h) noexcept
309 82 : : h_(h)
310 : {
311 82 : }
312 :
313 : when_any_io_runner(when_any_io_runner&& other) noexcept
314 : : h_(std::exchange(other.h_, nullptr))
315 : {
316 : }
317 :
318 : when_any_io_runner(when_any_io_runner const&) = delete;
319 : when_any_io_runner& operator=(when_any_io_runner const&) = delete;
320 : when_any_io_runner& operator=(when_any_io_runner&&) = delete;
321 :
322 82 : auto release() noexcept
323 : {
324 82 : return std::exchange(h_, nullptr);
325 : }
326 : };
327 :
328 : // Runner coroutine: only tries to win when the child returns !ec.
329 : template<std::size_t I, IoAwaitable Awaitable, typename StateType>
330 : when_any_io_runner<StateType>
331 30 : make_when_any_io_runner(Awaitable inner, StateType* state)
332 : {
333 : auto result = co_await std::move(inner);
334 :
335 : if(!result.ec)
336 : {
337 : // Success: try to claim winner
338 : if(state->core_.try_win(I))
339 : {
340 : try
341 : {
342 : state->result_.emplace(
343 : std::in_place_index<I + 1>,
344 : detail::extract_io_payload(std::move(result)));
345 : }
346 : catch(...)
347 : {
348 : state->core_.set_winner_exception(std::current_exception());
349 : }
350 : }
351 : }
352 : else
353 : {
354 : // Error: record but don't win
355 : state->record_error(result.ec);
356 : }
357 60 : }
358 :
359 : // Launcher for io_result-aware when_any.
360 : template<IoAwaitable... Awaitables>
361 : class when_any_io_launcher
362 : {
363 : using state_type = when_any_io_state<
364 : io_result_payload_t<awaitable_result_t<Awaitables>>...>;
365 :
366 : std::tuple<Awaitables...>* tasks_;
367 : state_type* state_;
368 :
369 : public:
370 16 : when_any_io_launcher(
371 : std::tuple<Awaitables...>* tasks,
372 : state_type* state)
373 16 : : tasks_(tasks)
374 16 : , state_(state)
375 : {
376 16 : }
377 :
378 16 : bool await_ready() const noexcept
379 : {
380 16 : return sizeof...(Awaitables) == 0;
381 : }
382 :
383 16 : std::coroutine_handle<> await_suspend(
384 : std::coroutine_handle<> continuation, io_env const* caller_env)
385 : {
386 16 : state_->core_.continuation_.h = continuation;
387 16 : state_->core_.caller_env_ = caller_env;
388 :
389 16 : if(caller_env->stop_token.stop_possible())
390 : {
391 2 : state_->core_.parent_stop_callback_.emplace(
392 1 : caller_env->stop_token,
393 1 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
394 :
395 1 : if(caller_env->stop_token.stop_requested())
396 MIS 0 : state_->core_.stop_source_.request_stop();
397 : }
398 :
399 HIT 16 : auto token = state_->core_.stop_source_.get_token();
400 28 : [&]<std::size_t... Is>(std::index_sequence<Is...>) {
401 16 : (..., launch_one<Is>(caller_env->executor, token));
402 16 : }(std::index_sequence_for<Awaitables...>{});
403 :
404 32 : return std::noop_coroutine();
405 16 : }
406 :
407 16 : void await_resume() const noexcept {}
408 :
409 : private:
410 : template<std::size_t I>
411 30 : void launch_one(executor_ref caller_ex, std::stop_token token)
412 : {
413 30 : auto runner = make_when_any_io_runner<I>(
414 30 : std::move(std::get<I>(*tasks_)), state_);
415 :
416 30 : auto h = runner.release();
417 30 : h.promise().state_ = state_;
418 30 : h.promise().index_ = I;
419 30 : h.promise().env_ = io_env{caller_ex, token,
420 30 : state_->core_.caller_env_->frame_allocator};
421 :
422 30 : state_->runner_handles_[I].h = std::coroutine_handle<>{h};
423 30 : caller_ex.post(state_->runner_handles_[I]);
424 60 : }
425 : };
426 :
427 : /** Shared state for homogeneous io_result-aware when_any (range overload).
428 :
429 : @tparam T The payload type extracted from io_result.
430 : */
431 : template<typename T>
432 : struct when_any_io_homogeneous_state
433 : {
434 : when_any_core core_;
435 : std::optional<T> result_;
436 : std::unique_ptr<continuation[]> runner_handles_;
437 :
438 : std::mutex failure_mu_;
439 : std::error_code last_error_;
440 : std::exception_ptr last_exception_;
441 :
442 13 : explicit when_any_io_homogeneous_state(std::size_t count)
443 13 : : core_(count)
444 13 : , runner_handles_(std::make_unique<continuation[]>(count))
445 : {
446 13 : }
447 :
448 6 : void record_error(std::error_code ec)
449 : {
450 6 : std::lock_guard lk(failure_mu_);
451 6 : last_error_ = ec;
452 6 : last_exception_ = nullptr;
453 6 : }
454 :
455 4 : void record_exception(std::exception_ptr ep)
456 : {
457 4 : std::lock_guard lk(failure_mu_);
458 4 : last_exception_ = ep;
459 4 : last_error_ = {};
460 4 : }
461 : };
462 :
463 : /** Specialization for void io_result children (no payload storage). */
464 : template<>
465 : struct when_any_io_homogeneous_state<std::tuple<>>
466 : {
467 : when_any_core core_;
468 : std::unique_ptr<continuation[]> runner_handles_;
469 :
470 : std::mutex failure_mu_;
471 : std::error_code last_error_;
472 : std::exception_ptr last_exception_;
473 :
474 2 : explicit when_any_io_homogeneous_state(std::size_t count)
475 2 : : core_(count)
476 2 : , runner_handles_(std::make_unique<continuation[]>(count))
477 : {
478 2 : }
479 :
480 1 : void record_error(std::error_code ec)
481 : {
482 1 : std::lock_guard lk(failure_mu_);
483 1 : last_error_ = ec;
484 1 : last_exception_ = nullptr;
485 1 : }
486 :
487 MIS 0 : void record_exception(std::exception_ptr ep)
488 : {
489 0 : std::lock_guard lk(failure_mu_);
490 0 : last_exception_ = ep;
491 0 : last_error_ = {};
492 0 : }
493 : };
494 :
495 : /** Create an io_result-aware runner for homogeneous when_any (range path).
496 :
497 : Only tries to win when the child returns !ec.
498 : */
499 : template<IoAwaitable Awaitable, typename StateType>
500 : when_any_io_runner<StateType>
501 HIT 52 : make_when_any_io_homogeneous_runner(
502 : Awaitable inner, StateType* state, std::size_t index)
503 : {
504 : auto result = co_await std::move(inner);
505 :
506 : if(!result.ec)
507 : {
508 : if(state->core_.try_win(index))
509 : {
510 : using PayloadT = io_result_payload_t<
511 : awaitable_result_t<Awaitable>>;
512 : if constexpr (!std::is_same_v<PayloadT, std::tuple<>>)
513 : {
514 : try
515 : {
516 : state->result_.emplace(
517 : extract_io_payload(std::move(result)));
518 : }
519 : catch(...)
520 : {
521 : state->core_.set_winner_exception(
522 : std::current_exception());
523 : }
524 : }
525 : }
526 : }
527 : else
528 : {
529 : state->record_error(result.ec);
530 : }
531 104 : }
532 :
533 : /** Launches all io_result-aware homogeneous runners concurrently. */
534 : template<IoAwaitableRange Range>
535 : class when_any_io_homogeneous_launcher
536 : {
537 : using Awaitable = std::ranges::range_value_t<Range>;
538 : using PayloadT = io_result_payload_t<awaitable_result_t<Awaitable>>;
539 :
540 : Range* range_;
541 : when_any_io_homogeneous_state<PayloadT>* state_;
542 :
543 : public:
544 15 : when_any_io_homogeneous_launcher(
545 : Range* range,
546 : when_any_io_homogeneous_state<PayloadT>* state)
547 15 : : range_(range)
548 15 : , state_(state)
549 : {
550 15 : }
551 :
552 15 : bool await_ready() const noexcept
553 : {
554 15 : return std::ranges::empty(*range_);
555 : }
556 :
557 15 : std::coroutine_handle<> await_suspend(
558 : std::coroutine_handle<> continuation, io_env const* caller_env)
559 : {
560 15 : state_->core_.continuation_.h = continuation;
561 15 : state_->core_.caller_env_ = caller_env;
562 :
563 15 : if(caller_env->stop_token.stop_possible())
564 : {
565 4 : state_->core_.parent_stop_callback_.emplace(
566 2 : caller_env->stop_token,
567 2 : when_any_core::stop_callback_fn{&state_->core_.stop_source_});
568 :
569 2 : if(caller_env->stop_token.stop_requested())
570 1 : state_->core_.stop_source_.request_stop();
571 : }
572 :
573 15 : auto token = state_->core_.stop_source_.get_token();
574 :
575 : // Phase 1: Create all runners without dispatching.
576 15 : std::size_t index = 0;
577 67 : for(auto&& a : *range_)
578 : {
579 52 : auto runner = make_when_any_io_homogeneous_runner(
580 52 : std::move(a), state_, index);
581 :
582 52 : auto h = runner.release();
583 52 : h.promise().state_ = state_;
584 52 : h.promise().index_ = index;
585 52 : h.promise().env_ = io_env{caller_env->executor, token,
586 52 : caller_env->frame_allocator};
587 :
588 52 : state_->runner_handles_[index].h = std::coroutine_handle<>{h};
589 52 : ++index;
590 : }
591 :
592 : // Phase 2: Post all runners. Any may complete synchronously.
593 15 : auto* handles = state_->runner_handles_.get();
594 15 : std::size_t count = state_->core_.remaining_count_.load(std::memory_order_relaxed);
595 67 : for(std::size_t i = 0; i < count; ++i)
596 52 : caller_env->executor.post(handles[i]);
597 :
598 30 : return std::noop_coroutine();
599 67 : }
600 :
601 15 : void await_resume() const noexcept {}
602 : };
603 :
604 : } // namespace detail
605 :
606 : /** Race a range of io_result-returning awaitables (non-void payloads).
607 :
608 : Only a child returning !ec can win. Errors and exceptions do not
609 : claim winner status. If all children fail, the last failure
610 : is reported — either the last error_code at variant index 0,
611 : or the last exception rethrown.
612 :
613 : @param awaitables Range of io_result-returning awaitables (must
614 : not be empty).
615 :
616 : @return A task yielding variant<error_code, pair<size_t, PayloadT>>
617 : where index 0 is failure and index 1 carries the winner's
618 : index and payload.
619 :
620 : @throws std::invalid_argument if range is empty.
621 : @throws Rethrows last exception when no winner and the last
622 : failure was an exception.
623 :
624 : @par Example
625 : @code
626 : task<void> example()
627 : {
628 : std::vector<io_task<size_t>> reads;
629 : for (auto& buf : buffers)
630 : reads.push_back(stream.read_some(buf));
631 :
632 : auto result = co_await when_any(std::move(reads));
633 : if (result.index() == 1)
634 : {
635 : auto [idx, n] = std::get<1>(result);
636 : }
637 : }
638 : @endcode
639 :
640 : @see IoAwaitableRange, when_any
641 : */
642 : template<IoAwaitableRange R>
643 : requires detail::is_io_result_v<
644 : awaitable_result_t<std::ranges::range_value_t<R>>>
645 : && (!std::is_same_v<
646 : detail::io_result_payload_t<
647 : awaitable_result_t<std::ranges::range_value_t<R>>>,
648 : std::tuple<>>)
649 14 : [[nodiscard]] auto when_any(R&& awaitables)
650 : -> task<std::variant<std::error_code,
651 : std::pair<std::size_t,
652 : detail::io_result_payload_t<
653 : awaitable_result_t<std::ranges::range_value_t<R>>>>>>
654 : {
655 : using Awaitable = std::ranges::range_value_t<R>;
656 : using PayloadT = detail::io_result_payload_t<
657 : awaitable_result_t<Awaitable>>;
658 : using result_type = std::variant<std::error_code,
659 : std::pair<std::size_t, PayloadT>>;
660 : using OwnedRange = std::remove_cvref_t<R>;
661 :
662 : auto count = std::ranges::size(awaitables);
663 : if(count == 0)
664 : throw std::invalid_argument("when_any requires at least one awaitable");
665 :
666 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
667 :
668 : detail::when_any_io_homogeneous_state<PayloadT> state(count);
669 :
670 : co_await detail::when_any_io_homogeneous_launcher<OwnedRange>(
671 : &owned_awaitables, &state);
672 :
673 : // Winner found
674 : if(state.core_.has_winner_.load(std::memory_order_acquire))
675 : {
676 : if(state.core_.winner_exception_)
677 : std::rethrow_exception(state.core_.winner_exception_);
678 : co_return result_type{std::in_place_index<1>,
679 : std::pair{state.core_.winner_index_, std::move(*state.result_)}};
680 : }
681 :
682 : // No winner — report last failure
683 : if(state.last_exception_)
684 : std::rethrow_exception(state.last_exception_);
685 : co_return result_type{std::in_place_index<0>, state.last_error_};
686 28 : }
687 :
688 : /** Race a range of void io_result-returning awaitables.
689 :
690 : Only a child returning !ec can win. Returns the winner's index
691 : at variant index 1, or error_code at index 0 on all-fail.
692 :
693 : @param awaitables Range of io_result<>-returning awaitables (must
694 : not be empty).
695 :
696 : @return A task yielding variant<error_code, size_t> where index 0
697 : is failure and index 1 carries the winner's index.
698 :
699 : @throws std::invalid_argument if range is empty.
700 : @throws Rethrows first exception when no winner and at least one
701 : child threw.
702 :
703 : @par Example
704 : @code
705 : task<void> example()
706 : {
707 : std::vector<io_task<>> jobs;
708 : jobs.push_back(background_work_a());
709 : jobs.push_back(background_work_b());
710 :
711 : auto result = co_await when_any(std::move(jobs));
712 : if (result.index() == 1)
713 : {
714 : auto winner = std::get<1>(result);
715 : }
716 : }
717 : @endcode
718 :
719 : @see IoAwaitableRange, when_any
720 : */
721 : template<IoAwaitableRange R>
722 : requires detail::is_io_result_v<
723 : awaitable_result_t<std::ranges::range_value_t<R>>>
724 : && std::is_same_v<
725 : detail::io_result_payload_t<
726 : awaitable_result_t<std::ranges::range_value_t<R>>>,
727 : std::tuple<>>
728 2 : [[nodiscard]] auto when_any(R&& awaitables)
729 : -> task<std::variant<std::error_code, std::size_t>>
730 : {
731 : using OwnedRange = std::remove_cvref_t<R>;
732 : using result_type = std::variant<std::error_code, std::size_t>;
733 :
734 : auto count = std::ranges::size(awaitables);
735 : if(count == 0)
736 : throw std::invalid_argument("when_any requires at least one awaitable");
737 :
738 : OwnedRange owned_awaitables = std::forward<R>(awaitables);
739 :
740 : detail::when_any_io_homogeneous_state<std::tuple<>> state(count);
741 :
742 : co_await detail::when_any_io_homogeneous_launcher<OwnedRange>(
743 : &owned_awaitables, &state);
744 :
745 : // Winner found
746 : if(state.core_.has_winner_.load(std::memory_order_acquire))
747 : {
748 : if(state.core_.winner_exception_)
749 : std::rethrow_exception(state.core_.winner_exception_);
750 : co_return result_type{std::in_place_index<1>,
751 : state.core_.winner_index_};
752 : }
753 :
754 : // No winner — report last failure
755 : if(state.last_exception_)
756 : std::rethrow_exception(state.last_exception_);
757 : co_return result_type{std::in_place_index<0>, state.last_error_};
758 4 : }
759 :
760 : /** Race io_result-returning awaitables, selecting the first success.
761 :
762 : Overload selected when all children return io_result<Ts...>.
763 : Only a child returning !ec can win. Errors and exceptions do
764 : not claim winner status.
765 :
766 : @return A task yielding variant<error_code, R1, ..., Rn> where
767 : index 0 is the failure/no-winner case and index i+1
768 : identifies the winning child.
769 : */
770 : template<IoAwaitable... As>
771 : requires (sizeof...(As) > 0)
772 : && detail::all_io_result_awaitables<As...>
773 16 : [[nodiscard]] auto when_any(As... as)
774 : -> task<std::variant<
775 : std::error_code,
776 : detail::io_result_payload_t<awaitable_result_t<As>>...>>
777 : {
778 : using result_type = std::variant<
779 : std::error_code,
780 : detail::io_result_payload_t<awaitable_result_t<As>>...>;
781 :
782 : detail::when_any_io_state<
783 : detail::io_result_payload_t<awaitable_result_t<As>>...> state;
784 : std::tuple<As...> awaitable_tuple(std::move(as)...);
785 :
786 : co_await detail::when_any_io_launcher<As...>(
787 : &awaitable_tuple, &state);
788 :
789 : // Winner found: return their result
790 : if(state.result_.has_value())
791 : co_return std::move(*state.result_);
792 :
793 : // Winner claimed but payload construction failed
794 : if(state.core_.winner_exception_)
795 : std::rethrow_exception(state.core_.winner_exception_);
796 :
797 : // No winner — report last failure
798 : if(state.last_exception_)
799 : std::rethrow_exception(state.last_exception_);
800 : co_return result_type{std::in_place_index<0>, state.last_error_};
801 32 : }
802 :
803 : } // namespace capy
804 : } // namespace boost
805 :
806 : #endif
|