1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
// Copyright (c) 2026 Michael Vandeberg
3  
// Copyright (c) 2026 Michael Vandeberg
4  
//
4  
//
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7  
//
7  
//
8  
// Official repository: https://github.com/cppalliance/capy
8  
// Official repository: https://github.com/cppalliance/capy
9  
//
9  
//
10  

10  

11  
#ifndef BOOST_CAPY_EX_THREAD_POOL_HPP
11  
#ifndef BOOST_CAPY_EX_THREAD_POOL_HPP
12  
#define BOOST_CAPY_EX_THREAD_POOL_HPP
12  
#define BOOST_CAPY_EX_THREAD_POOL_HPP
13  

13  

14  
#include <boost/capy/detail/config.hpp>
14  
#include <boost/capy/detail/config.hpp>
15  
#include <boost/capy/continuation.hpp>
15  
#include <boost/capy/continuation.hpp>
16  
#include <coroutine>
16  
#include <coroutine>
17  
#include <boost/capy/ex/execution_context.hpp>
17  
#include <boost/capy/ex/execution_context.hpp>
18  
#include <cstddef>
18  
#include <cstddef>
19  
#include <string_view>
19  
#include <string_view>
20  

20  

21  
namespace boost {
21  
namespace boost {
22  
namespace capy {
22  
namespace capy {
23  

23  

24  
/** A pool of threads for executing work concurrently.
24  
/** A pool of threads for executing work concurrently.
25  

25  

26  
    Use this when you need to run coroutines on multiple threads
26  
    Use this when you need to run coroutines on multiple threads
27  
    without the overhead of creating and destroying threads for
27  
    without the overhead of creating and destroying threads for
28  
    each task. Work items are distributed across the pool using
28  
    each task. Work items are distributed across the pool using
29  
    a shared queue.
29  
    a shared queue.
30  

30  

31  
    @par Thread Safety
31  
    @par Thread Safety
32  
    Distinct objects: Safe.
32  
    Distinct objects: Safe.
33  
    Shared objects: Unsafe.
33  
    Shared objects: Unsafe.
34  

34  

35  
    @par Example
35  
    @par Example
36  
    @code
36  
    @code
37  
    thread_pool pool(4);  // 4 worker threads
37  
    thread_pool pool(4);  // 4 worker threads
38  
    auto ex = pool.get_executor();
38  
    auto ex = pool.get_executor();
39  
    ex.post(some_coroutine);
39  
    ex.post(some_coroutine);
40  
    // pool destructor waits for all work to complete
40  
    // pool destructor waits for all work to complete
41  
    @endcode
41  
    @endcode
42  
*/
42  
*/
43  
class BOOST_CAPY_DECL
43  
class BOOST_CAPY_DECL
44  
    thread_pool
44  
    thread_pool
45  
    : public execution_context
45  
    : public execution_context
46  
{
46  
{
47  
    class impl;
47  
    class impl;
48  
    impl* impl_;
48  
    impl* impl_;
49  

49  

50  
public:
50  
public:
51  
    class executor_type;
51  
    class executor_type;
52  

52  

53  
    /** Destroy the thread pool.
53  
    /** Destroy the thread pool.
54  

54  

55  
        Signals all worker threads to stop, waits for them to
55  
        Signals all worker threads to stop, waits for them to
56  
        finish, and destroys any pending work items.
56  
        finish, and destroys any pending work items.
57  
    */
57  
    */
58  
    ~thread_pool();
58  
    ~thread_pool();
59  

59  

60  
    /** Construct a thread pool.
60  
    /** Construct a thread pool.
61  

61  

62  
        Creates a pool with the specified number of worker threads.
62  
        Creates a pool with the specified number of worker threads.
63  
        If `num_threads` is zero, the number of threads is set to
63  
        If `num_threads` is zero, the number of threads is set to
64  
        the hardware concurrency, or one if that cannot be determined.
64  
        the hardware concurrency, or one if that cannot be determined.
65  

65  

66  
        @param num_threads The number of worker threads, or zero
66  
        @param num_threads The number of worker threads, or zero
67  
            for automatic selection.
67  
            for automatic selection.
68  

68  

69  
        @param thread_name_prefix The prefix for worker thread names.
69  
        @param thread_name_prefix The prefix for worker thread names.
70  
            Thread names appear as "{prefix}0", "{prefix}1", etc.
70  
            Thread names appear as "{prefix}0", "{prefix}1", etc.
71  
            The prefix is truncated to 12 characters. Defaults to
71  
            The prefix is truncated to 12 characters. Defaults to
72  
            "capy-pool-".
72  
            "capy-pool-".
73  
    */
73  
    */
74  
    explicit
74  
    explicit
75  
    thread_pool(
75  
    thread_pool(
76  
        std::size_t num_threads = 0,
76  
        std::size_t num_threads = 0,
77  
        std::string_view thread_name_prefix = "capy-pool-");
77  
        std::string_view thread_name_prefix = "capy-pool-");
78  

78  

79  
    thread_pool(thread_pool const&) = delete;
79  
    thread_pool(thread_pool const&) = delete;
80  
    thread_pool& operator=(thread_pool const&) = delete;
80  
    thread_pool& operator=(thread_pool const&) = delete;
81  

81  

82  
    /** Wait for all outstanding work to complete.
82  
    /** Wait for all outstanding work to complete.
83  

83  

84  
        Releases the internal work guard, then blocks the calling
84  
        Releases the internal work guard, then blocks the calling
85  
        thread until all outstanding work tracked by
85  
        thread until all outstanding work tracked by
86  
        @ref executor_type::on_work_started and
86  
        @ref executor_type::on_work_started and
87  
        @ref executor_type::on_work_finished completes. After all
87  
        @ref executor_type::on_work_finished completes. After all
88  
        work finishes, joins the worker threads.
88  
        work finishes, joins the worker threads.
89  

89  

90  
        If @ref stop is called while `join()` is blocking, the
90  
        If @ref stop is called while `join()` is blocking, the
91  
        pool stops without waiting for remaining work to
91  
        pool stops without waiting for remaining work to
92  
        complete. Worker threads finish their current item and
92  
        complete. Worker threads finish their current item and
93  
        exit; `join()` still waits for all threads to be joined
93  
        exit; `join()` still waits for all threads to be joined
94  
        before returning.
94  
        before returning.
95  

95  

96  
        This function is idempotent. The first call performs the
96  
        This function is idempotent. The first call performs the
97  
        join; subsequent calls return immediately.
97  
        join; subsequent calls return immediately.
98  

98  

99  
        @par Preconditions
99  
        @par Preconditions
100  
        Must not be called from a thread in this pool (undefined
100  
        Must not be called from a thread in this pool (undefined
101  
        behavior).
101  
        behavior).
102  

102  

103  
        @par Postconditions
103  
        @par Postconditions
104  
        All worker threads have been joined. The pool cannot be
104  
        All worker threads have been joined. The pool cannot be
105  
        reused.
105  
        reused.
106  

106  

107  
        @par Thread Safety
107  
        @par Thread Safety
108  
        May be called from any thread not in this pool.
108  
        May be called from any thread not in this pool.
109  
    */
109  
    */
110  
    void
110  
    void
111  
    join() noexcept;
111  
    join() noexcept;
112  

112  

113  
    /** Request all worker threads to stop.
113  
    /** Request all worker threads to stop.
114  

114  

115  
        Signals all threads to exit after finishing their current
115  
        Signals all threads to exit after finishing their current
116  
        work item. Queued work that has not started is abandoned.
116  
        work item. Queued work that has not started is abandoned.
117  
        Does not wait for threads to exit.
117  
        Does not wait for threads to exit.
118  

118  

119  
        If @ref join is blocking on another thread, calling
119  
        If @ref join is blocking on another thread, calling
120  
        `stop()` causes it to stop waiting for outstanding
120  
        `stop()` causes it to stop waiting for outstanding
121  
        work. The `join()` call still waits for worker threads
121  
        work. The `join()` call still waits for worker threads
122  
        to finish their current item and exit before returning.
122  
        to finish their current item and exit before returning.
123  
    */
123  
    */
124  
    void
124  
    void
125  
    stop() noexcept;
125  
    stop() noexcept;
126  

126  

127  
    /** Return an executor for this thread pool.
127  
    /** Return an executor for this thread pool.
128  

128  

129  
        @return An executor associated with this thread pool.
129  
        @return An executor associated with this thread pool.
130  
    */
130  
    */
131  
    executor_type
131  
    executor_type
132  
    get_executor() const noexcept;
132  
    get_executor() const noexcept;
133  
};
133  
};
134  

134  

135  
/** An executor that submits work to a thread_pool.
135  
/** An executor that submits work to a thread_pool.
136  

136  

137  
    Executors are lightweight handles that can be copied and stored.
137  
    Executors are lightweight handles that can be copied and stored.
138  
    All copies refer to the same underlying thread pool.
138  
    All copies refer to the same underlying thread pool.
139  

139  

140  
    @par Thread Safety
140  
    @par Thread Safety
141  
    Distinct objects: Safe.
141  
    Distinct objects: Safe.
142  
    Shared objects: Safe.
142  
    Shared objects: Safe.
143  
*/
143  
*/
144  
class thread_pool::executor_type
144  
class thread_pool::executor_type
145  
{
145  
{
146  
    friend class thread_pool;
146  
    friend class thread_pool;
147  

147  

148  
    thread_pool* pool_ = nullptr;
148  
    thread_pool* pool_ = nullptr;
149  

149  

150  
    explicit
150  
    explicit
151  
    executor_type(thread_pool& pool) noexcept
151  
    executor_type(thread_pool& pool) noexcept
152  
        : pool_(&pool)
152  
        : pool_(&pool)
153  
    {
153  
    {
154  
    }
154  
    }
155  

155  

156  
public:
156  
public:
157  
    /** Construct a default null executor.
157  
    /** Construct a default null executor.
158  

158  

159  
        The resulting executor is not associated with any pool.
159  
        The resulting executor is not associated with any pool.
160  
        `context()`, `dispatch()`, and `post()` require the
160  
        `context()`, `dispatch()`, and `post()` require the
161  
        executor to be associated with a pool before use.
161  
        executor to be associated with a pool before use.
162  
    */
162  
    */
163  
    executor_type() = default;
163  
    executor_type() = default;
164  

164  

165  
    /// Return the underlying thread pool.
165  
    /// Return the underlying thread pool.
166  
    thread_pool&
166  
    thread_pool&
167  
    context() const noexcept
167  
    context() const noexcept
168  
    {
168  
    {
169  
        return *pool_;
169  
        return *pool_;
170  
    }
170  
    }
171  

171  

172  
    /** Notify that work has started.
172  
    /** Notify that work has started.
173  

173  

174  
        Increments the outstanding work count. Must be paired
174  
        Increments the outstanding work count. Must be paired
175  
        with a subsequent call to @ref on_work_finished.
175  
        with a subsequent call to @ref on_work_finished.
176  

176  

177  
        @see on_work_finished, work_guard
177  
        @see on_work_finished, work_guard
178  
    */
178  
    */
179  
    BOOST_CAPY_DECL
179  
    BOOST_CAPY_DECL
180  
    void
180  
    void
181  
    on_work_started() const noexcept;
181  
    on_work_started() const noexcept;
182  

182  

183  
    /** Notify that work has finished.
183  
    /** Notify that work has finished.
184  

184  

185  
        Decrements the outstanding work count. When the count
185  
        Decrements the outstanding work count. When the count
186  
        reaches zero after @ref thread_pool::join has been called,
186  
        reaches zero after @ref thread_pool::join has been called,
187  
        the pool's worker threads are signaled to stop.
187  
        the pool's worker threads are signaled to stop.
188  

188  

189  
        @pre A preceding call to @ref on_work_started was made.
189  
        @pre A preceding call to @ref on_work_started was made.
190  

190  

191  
        @see on_work_started, work_guard
191  
        @see on_work_started, work_guard
192  
    */
192  
    */
193  
    BOOST_CAPY_DECL
193  
    BOOST_CAPY_DECL
194  
    void
194  
    void
195  
    on_work_finished() const noexcept;
195  
    on_work_finished() const noexcept;
196  

196  

197  
    /** Dispatch a continuation for execution.
197  
    /** Dispatch a continuation for execution.
198  

198  

199  
        Posts the continuation to the thread pool for execution on a
199  
        Posts the continuation to the thread pool for execution on a
200  
        worker thread and returns `std::noop_coroutine()`. Thread
200  
        worker thread and returns `std::noop_coroutine()`. Thread
201  
        pools never execute inline because no single thread "owns"
201  
        pools never execute inline because no single thread "owns"
202  
        the pool.
202  
        the pool.
203  

203  

204  
        @param c The continuation to execute. Must remain at a
204  
        @param c The continuation to execute. Must remain at a
205  
                 stable address until dequeued and resumed.
205  
                 stable address until dequeued and resumed.
206  

206  

207  
        @return `std::noop_coroutine()` always.
207  
        @return `std::noop_coroutine()` always.
208  
    */
208  
    */
209  
    std::coroutine_handle<>
209  
    std::coroutine_handle<>
210  
    dispatch(continuation& c) const
210  
    dispatch(continuation& c) const
211  
    {
211  
    {
212  
        post(c);
212  
        post(c);
213  
        return std::noop_coroutine();
213  
        return std::noop_coroutine();
214  
    }
214  
    }
215  

215  

216  
    /** Post a continuation to the thread pool.
216  
    /** Post a continuation to the thread pool.
217  

217  

218  
        The continuation will be resumed on one of the pool's
218  
        The continuation will be resumed on one of the pool's
219  
        worker threads. The continuation must remain at a stable
219  
        worker threads. The continuation must remain at a stable
220  
        address until it is dequeued and resumed.
220  
        address until it is dequeued and resumed.
221  

221  

222  
        @param c The continuation to execute.
222  
        @param c The continuation to execute.
223  
    */
223  
    */
224  
    BOOST_CAPY_DECL
224  
    BOOST_CAPY_DECL
225  
    void
225  
    void
226  
    post(continuation& c) const;
226  
    post(continuation& c) const;
227  

227  

228  
    /// Return true if two executors refer to the same thread pool.
228  
    /// Return true if two executors refer to the same thread pool.
229  
    bool
229  
    bool
230  
    operator==(executor_type const& other) const noexcept
230  
    operator==(executor_type const& other) const noexcept
231  
    {
231  
    {
232  
        return pool_ == other.pool_;
232  
        return pool_ == other.pool_;
233  
    }
233  
    }
234  
};
234  
};
235  

235  

236  
} // capy
236  
} // capy
237  
} // boost
237  
} // boost
238  

238  

239  
#endif
239  
#endif