TLA Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_ASYNC_MUTEX_HPP
11 : #define BOOST_CAPY_ASYNC_MUTEX_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/intrusive.hpp>
15 : #include <boost/capy/continuation.hpp>
16 : #include <boost/capy/concept/executor.hpp>
17 : #include <boost/capy/error.hpp>
18 : #include <boost/capy/ex/io_env.hpp>
19 : #include <boost/capy/io_result.hpp>
20 :
21 : #include <stop_token>
22 :
23 : #include <atomic>
24 : #include <coroutine>
25 : #include <new>
26 : #include <utility>
27 :
28 : /* async_mutex implementation notes
29 : ================================
30 :
31 : Waiters form a doubly-linked intrusive list (fair FIFO). lock_awaiter
32 : inherits intrusive_list<lock_awaiter>::node; the list is owned by
33 : async_mutex::waiters_.
34 :
35 : Cancellation via stop_token
36 : ---------------------------
37 : A std::stop_callback is registered in await_suspend. Two actors can
38 : race to resume the suspended coroutine: unlock() and the stop callback.
39 : An atomic bool `claimed_` resolves the race -- whoever does
40 : claimed_.exchange(true) and reads false wins. The loser does nothing.
41 :
42 : The stop callback calls ex_.post(h_). The stop_callback is
43 : destroyed later in await_resume. cancel_fn touches no members
44 : after post returns (same pattern as delete-this).
45 :
46 : unlock() pops waiters from the front. If the popped waiter was
47 : already claimed by the stop callback, unlock() skips it and tries
48 : the next. await_resume removes the (still-linked) canceled waiter
49 : via waiters_.remove(this).
50 :
51 : The stop_callback lives in a union to suppress automatic
52 : construction/destruction. Placement new in await_suspend, explicit
53 : destructor call in await_resume and ~lock_awaiter.
54 :
55 : Member ordering constraint
56 : --------------------------
57 : The union containing stop_cb_ must be declared AFTER the members
58 : the callback accesses (h_, ex_, claimed_, canceled_). If the
59 : stop_cb_ destructor blocks waiting for a concurrent callback, those
60 : members must still be alive (C++ destroys in reverse declaration
61 : order).
62 :
63 : active_ flag
64 : ------------
65 : Tracks both list membership and stop_cb_ lifetime (they are always
66 : set and cleared together). Used by the destructor to clean up if the
67 : coroutine is destroyed while suspended (e.g. execution_context
68 : shutdown).
69 :
70 : Cancellation scope
71 : ------------------
72 : Cancellation only takes effect while the coroutine is suspended in
73 : the wait queue. If the mutex is unlocked, await_ready acquires it
74 : immediately without checking the stop token. This is intentional:
75 : the fast path has no token access and no overhead.
76 :
77 : Threading assumptions
78 : ---------------------
79 : - All list mutations happen on the executor thread (await_suspend,
80 : await_resume, unlock, ~lock_awaiter).
81 : - The stop callback may fire from any thread, but only touches
82 : claimed_ (atomic) and then calls post. It never touches the
83 : list.
84 : - ~lock_awaiter must be called from the executor thread. This is
85 : guaranteed during normal shutdown but NOT if the coroutine frame
86 : is destroyed from another thread while a stop callback could
87 : fire (precondition violation, same as cppcoro/folly).
88 : */
89 :
90 : namespace boost {
91 : namespace capy {
92 :
93 : /** An asynchronous mutex for coroutines.
94 :
95 : This mutex provides mutual exclusion for coroutines without blocking.
96 : When a coroutine attempts to acquire a locked mutex, it suspends and
97 : is added to an intrusive wait queue. When the holder unlocks, the next
98 : waiter is resumed with the lock held.
99 :
100 : @par Cancellation
101 :
102 : When a coroutine is suspended waiting for the mutex and its stop
103 : token is triggered, the waiter completes with `error::canceled`
104 : instead of acquiring the lock.
105 :
106 : Cancellation only applies while the coroutine is suspended in the
107 : wait queue. If the mutex is unlocked when `lock()` is called, the
108 : lock is acquired immediately even if the stop token is already
109 : signaled.
110 :
111 : @par Zero Allocation
112 :
113 : No heap allocation occurs for lock operations.
114 :
115 : @par Thread Safety
116 :
117 : Distinct objects: Safe.@n
118 : Shared objects: Unsafe.
119 :
120 : The mutex operations are designed for single-threaded use on one
121 : executor. The stop callback may fire from any thread.
122 :
123 : This type is non-copyable and non-movable because suspended
124 : waiters hold intrusive pointers into the mutex's internal list.
125 :
126 : @par Example
127 : @code
128 : async_mutex cm;
129 :
130 : task<> protected_operation() {
131 : auto [ec] = co_await cm.lock();
132 : if(ec)
133 : co_return;
134 : // ... critical section ...
135 : cm.unlock();
136 : }
137 :
138 : // Or with RAII:
139 : task<> protected_operation() {
140 : auto [ec, guard] = co_await cm.scoped_lock();
141 : if(ec)
142 : co_return;
143 : // ... critical section ...
144 : // unlocks automatically
145 : }
146 : @endcode
147 : */
148 : class async_mutex
149 : {
150 : public:
151 : class lock_awaiter;
152 : class lock_guard;
153 : class lock_guard_awaiter;
154 :
155 : private:
156 : bool locked_ = false;
157 : detail::intrusive_list<lock_awaiter> waiters_;
158 :
159 : public:
160 : /** Awaiter returned by lock().
161 : */
162 : class lock_awaiter
163 : : public detail::intrusive_list<lock_awaiter>::node
164 : {
165 : friend class async_mutex;
166 :
167 : async_mutex* m_;
168 : continuation cont_;
169 : executor_ref ex_;
170 :
171 : // These members must be declared before stop_cb_
172 : // (see comment on the union below).
173 : std::atomic<bool> claimed_{false};
174 : bool canceled_ = false;
175 : bool active_ = false;
176 :
177 : struct cancel_fn
178 : {
179 : lock_awaiter* self_;
180 :
181 HIT 6 : void operator()() const noexcept
182 : {
183 6 : if(!self_->claimed_.exchange(
184 : true, std::memory_order_acq_rel))
185 : {
186 6 : self_->canceled_ = true;
187 6 : self_->ex_.post(self_->cont_);
188 : }
189 6 : }
190 : };
191 :
192 : using stop_cb_t =
193 : std::stop_callback<cancel_fn>;
194 :
195 : // Aligned storage for stop_cb_t. Declared last:
196 : // its destructor may block while the callback
197 : // accesses the members above.
198 : BOOST_CAPY_MSVC_WARNING_PUSH
199 : BOOST_CAPY_MSVC_WARNING_DISABLE(4324) // padded due to alignas
200 : alignas(stop_cb_t)
201 : unsigned char stop_cb_buf_[sizeof(stop_cb_t)];
202 : BOOST_CAPY_MSVC_WARNING_POP
203 :
204 17 : stop_cb_t& stop_cb_() noexcept
205 : {
206 : return *reinterpret_cast<stop_cb_t*>(
207 17 : stop_cb_buf_);
208 : }
209 :
210 : public:
211 70 : ~lock_awaiter()
212 : {
213 70 : if(active_)
214 : {
215 3 : stop_cb_().~stop_cb_t();
216 3 : m_->waiters_.remove(this);
217 : }
218 70 : }
219 :
220 35 : explicit lock_awaiter(async_mutex* m) noexcept
221 35 : : m_(m)
222 : {
223 35 : }
224 :
225 35 : lock_awaiter(lock_awaiter&& o) noexcept
226 35 : : m_(o.m_)
227 35 : , cont_(o.cont_)
228 35 : , ex_(o.ex_)
229 35 : , claimed_(o.claimed_.load(
230 : std::memory_order_relaxed))
231 35 : , canceled_(o.canceled_)
232 35 : , active_(std::exchange(o.active_, false))
233 : {
234 35 : }
235 :
236 : lock_awaiter(lock_awaiter const&) = delete;
237 : lock_awaiter& operator=(lock_awaiter const&) = delete;
238 : lock_awaiter& operator=(lock_awaiter&&) = delete;
239 :
240 35 : bool await_ready() const noexcept
241 : {
242 35 : if(!m_->locked_)
243 : {
244 16 : m_->locked_ = true;
245 16 : return true;
246 : }
247 19 : return false;
248 : }
249 :
250 : /** IoAwaitable protocol overload. */
251 : std::coroutine_handle<>
252 19 : await_suspend(
253 : std::coroutine_handle<> h,
254 : io_env const* env) noexcept
255 : {
256 19 : if(env->stop_token.stop_requested())
257 : {
258 2 : canceled_ = true;
259 2 : return h;
260 : }
261 17 : cont_.h = h;
262 17 : ex_ = env->executor;
263 17 : m_->waiters_.push_back(this);
264 51 : ::new(stop_cb_buf_) stop_cb_t(
265 17 : env->stop_token, cancel_fn{this});
266 17 : active_ = true;
267 17 : return std::noop_coroutine();
268 : }
269 :
270 32 : io_result<> await_resume() noexcept
271 : {
272 32 : if(active_)
273 : {
274 14 : stop_cb_().~stop_cb_t();
275 14 : if(canceled_)
276 : {
277 6 : m_->waiters_.remove(this);
278 6 : active_ = false;
279 : return {make_error_code(
280 6 : error::canceled)};
281 : }
282 8 : active_ = false;
283 : }
284 26 : if(canceled_)
285 : return {make_error_code(
286 2 : error::canceled)};
287 24 : return {{}};
288 : }
289 : };
290 :
291 : /** RAII lock guard for async_mutex.
292 :
293 : Automatically unlocks the mutex when destroyed.
294 : */
295 : class [[nodiscard]] lock_guard
296 : {
297 : async_mutex* m_;
298 :
299 : public:
300 9 : ~lock_guard()
301 : {
302 9 : if(m_)
303 2 : m_->unlock();
304 9 : }
305 :
306 2 : lock_guard() noexcept
307 2 : : m_(nullptr)
308 : {
309 2 : }
310 :
311 2 : explicit lock_guard(async_mutex* m) noexcept
312 2 : : m_(m)
313 : {
314 2 : }
315 :
316 5 : lock_guard(lock_guard&& o) noexcept
317 5 : : m_(std::exchange(o.m_, nullptr))
318 : {
319 5 : }
320 :
321 : lock_guard& operator=(lock_guard&& o) noexcept
322 : {
323 : if(this != &o)
324 : {
325 : if(m_)
326 : m_->unlock();
327 : m_ = std::exchange(o.m_, nullptr);
328 : }
329 : return *this;
330 : }
331 :
332 : lock_guard(lock_guard const&) = delete;
333 : lock_guard& operator=(lock_guard const&) = delete;
334 : };
335 :
336 : /** Awaiter returned by scoped_lock() that returns a lock_guard on resume.
337 : */
338 : class lock_guard_awaiter
339 : {
340 : async_mutex* m_;
341 : lock_awaiter inner_;
342 :
343 : public:
344 4 : explicit lock_guard_awaiter(async_mutex* m) noexcept
345 4 : : m_(m)
346 4 : , inner_(m)
347 : {
348 4 : }
349 :
350 4 : bool await_ready() const noexcept
351 : {
352 4 : return inner_.await_ready();
353 : }
354 :
355 : /** IoAwaitable protocol overload. */
356 : std::coroutine_handle<>
357 2 : await_suspend(
358 : std::coroutine_handle<> h,
359 : io_env const* env) noexcept
360 : {
361 2 : return inner_.await_suspend(h, env);
362 : }
363 :
364 4 : io_result<lock_guard> await_resume() noexcept
365 : {
366 4 : auto r = inner_.await_resume();
367 4 : if(r.ec)
368 2 : return {r.ec, {}};
369 2 : return {{}, lock_guard(m_)};
370 : }
371 : };
372 :
373 : /// Construct an unlocked mutex.
374 : async_mutex() = default;
375 :
376 : /// Copy constructor (deleted).
377 : async_mutex(async_mutex const&) = delete;
378 :
379 : /// Copy assignment (deleted).
380 : async_mutex& operator=(async_mutex const&) = delete;
381 :
382 : /// Move constructor (deleted).
383 : async_mutex(async_mutex&&) = delete;
384 :
385 : /// Move assignment (deleted).
386 : async_mutex& operator=(async_mutex&&) = delete;
387 :
388 : /** Returns an awaiter that acquires the mutex.
389 :
390 : @return An awaitable that await-returns `(error_code)`.
391 : */
392 31 : lock_awaiter lock() noexcept
393 : {
394 31 : return lock_awaiter{this};
395 : }
396 :
397 : /** Returns an awaiter that acquires the mutex with RAII.
398 :
399 : @return An awaitable that await-returns `(error_code,lock_guard)`.
400 : */
401 4 : lock_guard_awaiter scoped_lock() noexcept
402 : {
403 4 : return lock_guard_awaiter(this);
404 : }
405 :
406 : /** Releases the mutex.
407 :
408 : If waiters are queued, the next eligible waiter is
409 : resumed with the lock held. Canceled waiters are
410 : skipped. If no eligible waiter remains, the mutex
411 : becomes unlocked.
412 : */
413 24 : void unlock() noexcept
414 : {
415 : for(;;)
416 : {
417 24 : auto* waiter = waiters_.pop_front();
418 24 : if(!waiter)
419 : {
420 16 : locked_ = false;
421 16 : return;
422 : }
423 8 : if(!waiter->claimed_.exchange(
424 : true, std::memory_order_acq_rel))
425 : {
426 8 : waiter->ex_.post(waiter->cont_);
427 8 : return;
428 : }
429 MIS 0 : }
430 : }
431 :
432 : /** Returns true if the mutex is currently locked.
433 : */
434 HIT 26 : bool is_locked() const noexcept
435 : {
436 26 : return locked_;
437 : }
438 : };
439 :
440 : } // namespace capy
441 : } // namespace boost
442 :
443 : #endif
|