include/boost/capy/ex/async_event.hpp

100.0% Lines (66/66) 100.0% List of functions (12/12)
async_event.hpp
f(x) Functions (12)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_ASYNC_EVENT_HPP
11 #define BOOST_CAPY_ASYNC_EVENT_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/intrusive.hpp>
15 #include <boost/capy/continuation.hpp>
16 #include <boost/capy/concept/executor.hpp>
17 #include <boost/capy/error.hpp>
18 #include <boost/capy/ex/io_env.hpp>
19 #include <boost/capy/io_result.hpp>
20
21 #include <stop_token>
22
23 #include <atomic>
24 #include <coroutine>
25 #include <new>
26 #include <utility>
27
28 /* async_event implementation notes
29 =================================
30
31 Same cancellation pattern as async_mutex (see that file for the
32 full discussion on claimed_, stop_cb lifetime, member ordering,
33 and threading assumptions).
34
35 Key difference: set() wakes ALL waiters (broadcast), not one.
36 It pops every waiter from the list and posts the ones it
37 claims. Waiters already claimed by a stop callback are skipped.
38
39 Because set() pops all waiters, a canceled waiter may have been
40 removed from the list by set() before its await_resume runs.
41 This requires a separate in_list_ flag (unlike async_mutex where
42 active_ served double duty). await_resume only calls remove()
43 when in_list_ is true.
44 */
45
46 namespace boost {
47 namespace capy {
48
49 /** An asynchronous event for coroutines.
50
51 This event provides a way to notify multiple coroutines that some
52 condition has occurred. When a coroutine awaits an unset event, it
53 suspends and is added to a wait queue. When the event is set, all
54 waiting coroutines are resumed.
55
56 @par Cancellation
57
58 When a coroutine is suspended waiting for the event and its stop
59 token is triggered, the waiter completes with `error::canceled`
60 instead of waiting for `set()`.
61
62 Cancellation only applies while the coroutine is suspended in the
63 wait queue. If the event is already set when `wait()` is called,
64 the wait completes immediately even if the stop token is already
65 signaled.
66
67 @par Zero Allocation
68
69 No heap allocation occurs for wait operations.
70
71 @par Thread Safety
72
73 Distinct objects: Safe.@n
74 Shared objects: Unsafe.
75
76 The event operations are designed for single-threaded use on one
77 executor. The stop callback may fire from any thread.
78
79 This type is non-copyable and non-movable because suspended
80 waiters hold intrusive pointers into the event's internal list.
81
82 @par Example
83 @code
84 async_event event;
85
86 task<> waiter() {
87 auto [ec] = co_await event.wait();
88 if(ec)
89 co_return;
90 // ... event was set ...
91 }
92
93 task<> notifier() {
94 // ... do some work ...
95 event.set(); // Wake all waiters
96 }
97 @endcode
98 */
99 class async_event
100 {
101 public:
102 class wait_awaiter;
103
104 private:
105 bool set_ = false;
106 detail::intrusive_list<wait_awaiter> waiters_;
107
108 public:
109 /** Awaiter returned by wait().
110 */
111 class wait_awaiter
112 : public detail::intrusive_list<wait_awaiter>::node
113 {
114 friend class async_event;
115
116 async_event* e_;
117 continuation cont_;
118 executor_ref ex_;
119
120 // Declared before stop_cb_buf_: the callback
121 // accesses these members, so they must still be
122 // alive if the stop_cb_ destructor blocks.
123 std::atomic<bool> claimed_{false};
124 bool canceled_ = false;
125 bool active_ = false;
126 bool in_list_ = false;
127
128 struct cancel_fn
129 {
130 wait_awaiter* self_;
131
132 4x void operator()() const noexcept
133 {
134 4x if(!self_->claimed_.exchange(
135 true, std::memory_order_acq_rel))
136 {
137 3x self_->canceled_ = true;
138 3x self_->ex_.post(self_->cont_);
139 }
140 4x }
141 };
142
143 using stop_cb_t =
144 std::stop_callback<cancel_fn>;
145
146 // Aligned storage for stop_cb_t. Declared last:
147 // its destructor may block while the callback
148 // accesses the members above.
149 BOOST_CAPY_MSVC_WARNING_PUSH
150 BOOST_CAPY_MSVC_WARNING_DISABLE(4324) // padded due to alignas
151 alignas(stop_cb_t)
152 unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
153 BOOST_CAPY_MSVC_WARNING_POP
154
155 20x stop_cb_t& stop_cb_() noexcept
156 {
157 return *reinterpret_cast<stop_cb_t*>(
158 20x stop_cb_buf_);
159 }
160
161 public:
162 52x ~wait_awaiter()
163 {
164 52x if(active_)
165 1x stop_cb_().~stop_cb_t();
166 52x if(in_list_)
167 1x e_->waiters_.remove(this);
168 52x }
169
170 25x explicit wait_awaiter(async_event* e) noexcept
171 25x : e_(e)
172 {
173 25x }
174
175 27x wait_awaiter(wait_awaiter&& o) noexcept
176 27x : e_(o.e_)
177 27x , cont_(o.cont_)
178 27x , ex_(o.ex_)
179 27x , claimed_(o.claimed_.load(
180 std::memory_order_relaxed))
181 27x , canceled_(o.canceled_)
182 27x , active_(std::exchange(o.active_, false))
183 27x , in_list_(std::exchange(o.in_list_, false))
184 {
185 27x }
186
187 wait_awaiter(wait_awaiter const&) = delete;
188 wait_awaiter& operator=(wait_awaiter const&) = delete;
189 wait_awaiter& operator=(wait_awaiter&&) = delete;
190
191 25x bool await_ready() const noexcept
192 {
193 25x return e_->set_;
194 }
195
196 /** IoAwaitable protocol overload. */
197 std::coroutine_handle<>
198 21x await_suspend(
199 std::coroutine_handle<> h,
200 io_env const* env) noexcept
201 {
202 21x if(env->stop_token.stop_requested())
203 {
204 1x canceled_ = true;
205 1x return h;
206 }
207 20x cont_.h = h;
208 20x ex_ = env->executor;
209 20x e_->waiters_.push_back(this);
210 20x in_list_ = true;
211 60x ::new(stop_cb_buf_) stop_cb_t(
212 20x env->stop_token, cancel_fn{this});
213 20x active_ = true;
214 20x return std::noop_coroutine();
215 }
216
217 22x io_result<> await_resume() noexcept
218 {
219 22x if(active_)
220 {
221 19x stop_cb_().~stop_cb_t();
222 19x active_ = false;
223 }
224 22x if(canceled_)
225 {
226 4x if(in_list_)
227 {
228 3x e_->waiters_.remove(this);
229 3x in_list_ = false;
230 }
231 return {make_error_code(
232 4x error::canceled)};
233 }
234 18x return {{}};
235 }
236 };
237
238 /// Construct an unset event.
239 async_event() = default;
240
241 /// Copy constructor (deleted).
242 async_event(async_event const&) = delete;
243
244 /// Copy assignment (deleted).
245 async_event& operator=(async_event const&) = delete;
246
247 /// Move constructor (deleted).
248 async_event(async_event&&) = delete;
249
250 /// Move assignment (deleted).
251 async_event& operator=(async_event&&) = delete;
252
253 /** Returns an awaiter that waits until the event is set.
254
255 If the event is already set, completes immediately.
256
257 @return An awaitable that await-returns `(error_code)`.
258 */
259 25x wait_awaiter wait() noexcept
260 {
261 25x return wait_awaiter{this};
262 }
263
264 /** Sets the event.
265
266 All waiting coroutines are resumed. Canceled waiters
267 are skipped. Subsequent calls to wait() complete
268 immediately until clear() is called.
269 */
270 17x void set()
271 {
272 17x set_ = true;
273 for(;;)
274 {
275 33x auto* w = waiters_.pop_front();
276 33x if(!w)
277 17x break;
278 16x w->in_list_ = false;
279 16x if(!w->claimed_.exchange(
280 true, std::memory_order_acq_rel))
281 {
282 16x w->ex_.post(w->cont_);
283 }
284 16x }
285 17x }
286
287 /** Clears the event.
288
289 Subsequent calls to wait() will suspend until
290 set() is called again.
291 */
292 2x void clear() noexcept
293 {
294 2x set_ = false;
295 2x }
296
297 /** Returns true if the event is currently set.
298 */
299 9x bool is_set() const noexcept
300 {
301 9x return set_;
302 }
303 };
304
305 } // namespace capy
306 } // namespace boost
307
308 #endif
309