TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_ASYNC_EVENT_HPP
11 : #define BOOST_CAPY_ASYNC_EVENT_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/intrusive.hpp>
15 : #include <boost/capy/continuation.hpp>
16 : #include <boost/capy/concept/executor.hpp>
17 : #include <boost/capy/error.hpp>
18 : #include <boost/capy/ex/io_env.hpp>
19 : #include <boost/capy/io_result.hpp>
20 :
21 : #include <stop_token>
22 :
23 : #include <atomic>
24 : #include <coroutine>
25 : #include <new>
26 : #include <utility>
27 :
28 : /* async_event implementation notes
29 : =================================
30 :
31 : Same cancellation pattern as async_mutex (see that file for the
32 : full discussion on claimed_, stop_cb lifetime, member ordering,
33 : and threading assumptions).
34 :
35 : Key difference: set() wakes ALL waiters (broadcast), not one.
36 : It pops every waiter from the list and posts the ones it
37 : claims. Waiters already claimed by a stop callback are skipped.
38 :
39 : Because set() pops all waiters, a canceled waiter may have been
40 : removed from the list by set() before its await_resume runs.
41 : This requires a separate in_list_ flag (unlike async_mutex where
42 : active_ served double duty). await_resume only calls remove()
43 : when in_list_ is true.
44 : */
45 :
46 : namespace boost {
47 : namespace capy {
48 :
49 : /** An asynchronous event for coroutines.
50 :
51 : This event provides a way to notify multiple coroutines that some
52 : condition has occurred. When a coroutine awaits an unset event, it
53 : suspends and is added to a wait queue. When the event is set, all
54 : waiting coroutines are resumed.
55 :
56 : @par Cancellation
57 :
58 : When a coroutine is suspended waiting for the event and its stop
59 : token is triggered, the waiter completes with `error::canceled`
60 : instead of waiting for `set()`.
61 :
62 : Cancellation only applies while the coroutine is suspended in the
63 : wait queue. If the event is already set when `wait()` is called,
64 : the wait completes immediately even if the stop token is already
65 : signaled.
66 :
67 : @par Zero Allocation
68 :
69 : No heap allocation occurs for wait operations.
70 :
71 : @par Thread Safety
72 :
73 : Distinct objects: Safe.@n
74 : Shared objects: Unsafe.
75 :
76 : The event operations are designed for single-threaded use on one
77 : executor. The stop callback may fire from any thread.
78 :
79 : This type is non-copyable and non-movable because suspended
80 : waiters hold intrusive pointers into the event's internal list.
81 :
82 : @par Example
83 : @code
84 : async_event event;
85 :
86 : task<> waiter() {
87 : auto [ec] = co_await event.wait();
88 : if(ec)
89 : co_return;
90 : // ... event was set ...
91 : }
92 :
93 : task<> notifier() {
94 : // ... do some work ...
95 : event.set(); // Wake all waiters
96 : }
97 : @endcode
98 : */
99 : class async_event
100 : {
101 : public:
102 : class wait_awaiter;
103 :
104 : private:
105 : bool set_ = false;
106 : detail::intrusive_list<wait_awaiter> waiters_;
107 :
108 : public:
109 : /** Awaiter returned by wait().
110 : */
111 : class wait_awaiter
112 : : public detail::intrusive_list<wait_awaiter>::node
113 : {
114 : friend class async_event;
115 :
116 : async_event* e_;
117 : continuation cont_;
118 : executor_ref ex_;
119 :
120 : // Declared before stop_cb_buf_: the callback
121 : // accesses these members, so they must still be
122 : // alive if the stop_cb_ destructor blocks.
123 : std::atomic<bool> claimed_{false};
124 : bool canceled_ = false;
125 : bool active_ = false;
126 : bool in_list_ = false;
127 :
128 : struct cancel_fn
129 : {
130 : wait_awaiter* self_;
131 :
132 HIT 4 : void operator()() const noexcept
133 : {
134 4 : if(!self_->claimed_.exchange(
135 : true, std::memory_order_acq_rel))
136 : {
137 3 : self_->canceled_ = true;
138 3 : self_->ex_.post(self_->cont_);
139 : }
140 4 : }
141 : };
142 :
143 : using stop_cb_t =
144 : std::stop_callback<cancel_fn>;
145 :
146 : // Aligned storage for stop_cb_t. Declared last:
147 : // its destructor may block while the callback
148 : // accesses the members above.
149 : BOOST_CAPY_MSVC_WARNING_PUSH
150 : BOOST_CAPY_MSVC_WARNING_DISABLE(4324) // padded due to alignas
151 : alignas(stop_cb_t)
152 : unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
153 : BOOST_CAPY_MSVC_WARNING_POP
154 :
155 20 : stop_cb_t& stop_cb_() noexcept
156 : {
157 : return *reinterpret_cast<stop_cb_t*>(
158 20 : stop_cb_buf_);
159 : }
160 :
161 : public:
162 52 : ~wait_awaiter()
163 : {
164 52 : if(active_)
165 1 : stop_cb_().~stop_cb_t();
166 52 : if(in_list_)
167 1 : e_->waiters_.remove(this);
168 52 : }
169 :
170 25 : explicit wait_awaiter(async_event* e) noexcept
171 25 : : e_(e)
172 : {
173 25 : }
174 :
175 27 : wait_awaiter(wait_awaiter&& o) noexcept
176 27 : : e_(o.e_)
177 27 : , cont_(o.cont_)
178 27 : , ex_(o.ex_)
179 27 : , claimed_(o.claimed_.load(
180 : std::memory_order_relaxed))
181 27 : , canceled_(o.canceled_)
182 27 : , active_(std::exchange(o.active_, false))
183 27 : , in_list_(std::exchange(o.in_list_, false))
184 : {
185 27 : }
186 :
187 : wait_awaiter(wait_awaiter const&) = delete;
188 : wait_awaiter& operator=(wait_awaiter const&) = delete;
189 : wait_awaiter& operator=(wait_awaiter&&) = delete;
190 :
191 25 : bool await_ready() const noexcept
192 : {
193 25 : return e_->set_;
194 : }
195 :
196 : /** IoAwaitable protocol overload. */
197 : std::coroutine_handle<>
198 21 : await_suspend(
199 : std::coroutine_handle<> h,
200 : io_env const* env) noexcept
201 : {
202 21 : if(env->stop_token.stop_requested())
203 : {
204 1 : canceled_ = true;
205 1 : return h;
206 : }
207 20 : cont_.h = h;
208 20 : ex_ = env->executor;
209 20 : e_->waiters_.push_back(this);
210 20 : in_list_ = true;
211 60 : ::new(stop_cb_buf_) stop_cb_t(
212 20 : env->stop_token, cancel_fn{this});
213 20 : active_ = true;
214 20 : return std::noop_coroutine();
215 : }
216 :
217 22 : io_result<> await_resume() noexcept
218 : {
219 22 : if(active_)
220 : {
221 19 : stop_cb_().~stop_cb_t();
222 19 : active_ = false;
223 : }
224 22 : if(canceled_)
225 : {
226 4 : if(in_list_)
227 : {
228 3 : e_->waiters_.remove(this);
229 3 : in_list_ = false;
230 : }
231 : return {make_error_code(
232 4 : error::canceled)};
233 : }
234 18 : return {{}};
235 : }
236 : };
237 :
238 : /// Construct an unset event.
239 : async_event() = default;
240 :
241 : /// Copy constructor (deleted).
242 : async_event(async_event const&) = delete;
243 :
244 : /// Copy assignment (deleted).
245 : async_event& operator=(async_event const&) = delete;
246 :
247 : /// Move constructor (deleted).
248 : async_event(async_event&&) = delete;
249 :
250 : /// Move assignment (deleted).
251 : async_event& operator=(async_event&&) = delete;
252 :
253 : /** Returns an awaiter that waits until the event is set.
254 :
255 : If the event is already set, completes immediately.
256 :
257 : @return An awaitable that await-returns `(error_code)`.
258 : */
259 25 : wait_awaiter wait() noexcept
260 : {
261 25 : return wait_awaiter{this};
262 : }
263 :
264 : /** Sets the event.
265 :
266 : All waiting coroutines are resumed. Canceled waiters
267 : are skipped. Subsequent calls to wait() complete
268 : immediately until clear() is called.
269 : */
270 17 : void set()
271 : {
272 17 : set_ = true;
273 : for(;;)
274 : {
275 33 : auto* w = waiters_.pop_front();
276 33 : if(!w)
277 17 : break;
278 16 : w->in_list_ = false;
279 16 : if(!w->claimed_.exchange(
280 : true, std::memory_order_acq_rel))
281 : {
282 16 : w->ex_.post(w->cont_);
283 : }
284 16 : }
285 17 : }
286 :
287 : /** Clears the event.
288 :
289 : Subsequent calls to wait() will suspend until
290 : set() is called again.
291 : */
292 2 : void clear() noexcept
293 : {
294 2 : set_ = false;
295 2 : }
296 :
297 : /** Returns true if the event is currently set.
298 : */
299 9 : bool is_set() const noexcept
300 : {
301 9 : return set_;
302 : }
303 : };
304 :
305 : } // namespace capy
306 : } // namespace boost
307 :
308 : #endif
|