1  
//
1  
//
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
2  
// Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3  
// Copyright (c) 2026 Michael Vandeberg
3  
// Copyright (c) 2026 Michael Vandeberg
4  
//
4  
//
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
5  
// Distributed under the Boost Software License, Version 1.0. (See accompanying
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6  
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7  
//
7  
//
8  
// Official repository: https://github.com/boostorg/capy
8  
// Official repository: https://github.com/boostorg/capy
9  
//
9  
//
10  

10  

11  
#include <boost/capy/ex/thread_pool.hpp>
11  
#include <boost/capy/ex/thread_pool.hpp>
12  
#include <boost/capy/continuation.hpp>
12  
#include <boost/capy/continuation.hpp>
13  
#include <boost/capy/test/thread_name.hpp>
13  
#include <boost/capy/test/thread_name.hpp>
14  
#include <algorithm>
14  
#include <algorithm>
15  
#include <atomic>
15  
#include <atomic>
16  
#include <condition_variable>
16  
#include <condition_variable>
17  
#include <cstdio>
17  
#include <cstdio>
18  
#include <mutex>
18  
#include <mutex>
19  
#include <thread>
19  
#include <thread>
20  
#include <vector>
20  
#include <vector>
21  

21  

22  
/*
22  
/*
23  
    Thread pool implementation using a shared work queue.
23  
    Thread pool implementation using a shared work queue.
24  

24  

25  
    Work items are continuations linked via their intrusive next pointer,
25  
    Work items are continuations linked via their intrusive next pointer,
26  
    stored in a single queue protected by a mutex. No per-post heap
26  
    stored in a single queue protected by a mutex. No per-post heap
27  
    allocation: the continuation is owned by the caller and linked
27  
    allocation: the continuation is owned by the caller and linked
28  
    directly. Worker threads wait on a condition_variable until work
28  
    directly. Worker threads wait on a condition_variable until work
29  
    is available or stop is requested.
29  
    is available or stop is requested.
30  

30  

31  
    Threads are started lazily on first post() via std::call_once to avoid
31  
    Threads are started lazily on first post() via std::call_once to avoid
32  
    spawning threads for pools that are constructed but never used. Each
32  
    spawning threads for pools that are constructed but never used. Each
33  
    thread is named with a configurable prefix plus index for debugger
33  
    thread is named with a configurable prefix plus index for debugger
34  
    visibility.
34  
    visibility.
35  

35  

36  
    Work tracking: on_work_started/on_work_finished maintain an atomic
36  
    Work tracking: on_work_started/on_work_finished maintain an atomic
37  
    outstanding_work_ counter. join() blocks until this counter reaches
37  
    outstanding_work_ counter. join() blocks until this counter reaches
38  
    zero, then signals workers to stop and joins threads.
38  
    zero, then signals workers to stop and joins threads.
39  

39  

40  
    Two shutdown paths:
40  
    Two shutdown paths:
41  
    - join(): waits for outstanding work to drain, then stops workers.
41  
    - join(): waits for outstanding work to drain, then stops workers.
42  
    - stop(): immediately signals workers to exit; queued work is abandoned.
42  
    - stop(): immediately signals workers to exit; queued work is abandoned.
43  
    - Destructor: stop() then join() (abandon + wait for threads).
43  
    - Destructor: stop() then join() (abandon + wait for threads).
44  
*/
44  
*/
45  

45  

46  
namespace boost {
46  
namespace boost {
47  
namespace capy {
47  
namespace capy {
48  

48  

49  
//------------------------------------------------------------------------------
49  
//------------------------------------------------------------------------------
50  

50  

51  
class thread_pool::impl
51  
class thread_pool::impl
52  
{
52  
{
53  
    // Intrusive queue of continuations via continuation::next.
53  
    // Intrusive queue of continuations via continuation::next.
54  
    // No per-post allocation: the continuation is owned by the caller.
54  
    // No per-post allocation: the continuation is owned by the caller.
55  
    continuation* head_ = nullptr;
55  
    continuation* head_ = nullptr;
56  
    continuation* tail_ = nullptr;
56  
    continuation* tail_ = nullptr;
57  

57  

58  
    void push(continuation* c) noexcept
58  
    void push(continuation* c) noexcept
59  
    {
59  
    {
60  
        c->next = nullptr;
60  
        c->next = nullptr;
61  
        if(tail_)
61  
        if(tail_)
62  
            tail_->next = c;
62  
            tail_->next = c;
63  
        else
63  
        else
64  
            head_ = c;
64  
            head_ = c;
65  
        tail_ = c;
65  
        tail_ = c;
66  
    }
66  
    }
67  

67  

68  
    continuation* pop() noexcept
68  
    continuation* pop() noexcept
69  
    {
69  
    {
70  
        if(!head_)
70  
        if(!head_)
71  
            return nullptr;
71  
            return nullptr;
72  
        continuation* c = head_;
72  
        continuation* c = head_;
73  
        head_ = head_->next;
73  
        head_ = head_->next;
74  
        if(!head_)
74  
        if(!head_)
75  
            tail_ = nullptr;
75  
            tail_ = nullptr;
76  
        return c;
76  
        return c;
77  
    }
77  
    }
78  

78  

79  
    bool empty() const noexcept
79  
    bool empty() const noexcept
80  
    {
80  
    {
81  
        return head_ == nullptr;
81  
        return head_ == nullptr;
82  
    }
82  
    }
83  

83  

84  
    std::mutex mutex_;
84  
    std::mutex mutex_;
85  
    std::condition_variable cv_;
85  
    std::condition_variable cv_;
86  
    std::vector<std::thread> threads_;
86  
    std::vector<std::thread> threads_;
87  
    std::atomic<std::size_t> outstanding_work_{0};
87  
    std::atomic<std::size_t> outstanding_work_{0};
88  
    bool stop_{false};
88  
    bool stop_{false};
89  
    bool joined_{false};
89  
    bool joined_{false};
90  
    std::size_t num_threads_;
90  
    std::size_t num_threads_;
91  
    char thread_name_prefix_[13]{};  // 12 chars max + null terminator
91  
    char thread_name_prefix_[13]{};  // 12 chars max + null terminator
92  
    std::once_flag start_flag_;
92  
    std::once_flag start_flag_;
93  

93  

94  
public:
94  
public:
95  
    ~impl() = default;
95  
    ~impl() = default;
96  

96  

97  
    // Destroy abandoned coroutine frames. Must be called
97  
    // Destroy abandoned coroutine frames. Must be called
98  
    // before execution_context::shutdown()/destroy() so
98  
    // before execution_context::shutdown()/destroy() so
99  
    // that suspended-frame destructors (e.g. delay_awaitable
99  
    // that suspended-frame destructors (e.g. delay_awaitable
100  
    // calling timer_service::cancel()) run while services
100  
    // calling timer_service::cancel()) run while services
101  
    // are still valid.
101  
    // are still valid.
102  
    void
102  
    void
103  
    drain_abandoned() noexcept
103  
    drain_abandoned() noexcept
104  
    {
104  
    {
105  
        while(auto* c = pop())
105  
        while(auto* c = pop())
106  
        {
106  
        {
107  
            auto h = c->h;
107  
            auto h = c->h;
108  
            if(h && h != std::noop_coroutine())
108  
            if(h && h != std::noop_coroutine())
109  
                h.destroy();
109  
                h.destroy();
110  
        }
110  
        }
111  
    }
111  
    }
112  

112  

113  
    impl(std::size_t num_threads, std::string_view thread_name_prefix)
113  
    impl(std::size_t num_threads, std::string_view thread_name_prefix)
114  
        : num_threads_(num_threads)
114  
        : num_threads_(num_threads)
115  
    {
115  
    {
116  
        if(num_threads_ == 0)
116  
        if(num_threads_ == 0)
117  
            num_threads_ = std::max(
117  
            num_threads_ = std::max(
118  
                std::thread::hardware_concurrency(), 1u);
118  
                std::thread::hardware_concurrency(), 1u);
119  

119  

120  
        // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
120  
        // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
121  
        auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
121  
        auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
122  
        thread_name_prefix_[n] = '\0';
122  
        thread_name_prefix_[n] = '\0';
123  
    }
123  
    }
124  

124  

125  
    void
125  
    void
126  
    post(continuation& c)
126  
    post(continuation& c)
127  
    {
127  
    {
128  
        ensure_started();
128  
        ensure_started();
129  
        {
129  
        {
130  
            std::lock_guard<std::mutex> lock(mutex_);
130  
            std::lock_guard<std::mutex> lock(mutex_);
131  
            push(&c);
131  
            push(&c);
132  
        }
132  
        }
133  
        cv_.notify_one();
133  
        cv_.notify_one();
134  
    }
134  
    }
135  

135  

136  
    void
136  
    void
137  
    on_work_started() noexcept
137  
    on_work_started() noexcept
138  
    {
138  
    {
139  
        outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
139  
        outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
140  
    }
140  
    }
141  

141  

142  
    void
142  
    void
143  
    on_work_finished() noexcept
143  
    on_work_finished() noexcept
144  
    {
144  
    {
145  
        if(outstanding_work_.fetch_sub(
145  
        if(outstanding_work_.fetch_sub(
146  
            1, std::memory_order_acq_rel) == 1)
146  
            1, std::memory_order_acq_rel) == 1)
147  
        {
147  
        {
148  
            std::lock_guard<std::mutex> lock(mutex_);
148  
            std::lock_guard<std::mutex> lock(mutex_);
149  
            if(joined_ && !stop_)
149  
            if(joined_ && !stop_)
150  
                stop_ = true;
150  
                stop_ = true;
151  
            cv_.notify_all();
151  
            cv_.notify_all();
152  
        }
152  
        }
153  
    }
153  
    }
154  

154  

155  
    void
155  
    void
156  
    join() noexcept
156  
    join() noexcept
157  
    {
157  
    {
158  
        {
158  
        {
159  
            std::unique_lock<std::mutex> lock(mutex_);
159  
            std::unique_lock<std::mutex> lock(mutex_);
160  
            if(joined_)
160  
            if(joined_)
161  
                return;
161  
                return;
162  
            joined_ = true;
162  
            joined_ = true;
163  

163  

164  
            if(outstanding_work_.load(
164  
            if(outstanding_work_.load(
165  
                std::memory_order_acquire) == 0)
165  
                std::memory_order_acquire) == 0)
166  
            {
166  
            {
167  
                stop_ = true;
167  
                stop_ = true;
168  
                cv_.notify_all();
168  
                cv_.notify_all();
169  
            }
169  
            }
170  
            else
170  
            else
171  
            {
171  
            {
172  
                cv_.wait(lock, [this]{
172  
                cv_.wait(lock, [this]{
173  
                    return stop_;
173  
                    return stop_;
174  
                });
174  
                });
175  
            }
175  
            }
176  
        }
176  
        }
177  

177  

178  
        for(auto& t : threads_)
178  
        for(auto& t : threads_)
179  
            if(t.joinable())
179  
            if(t.joinable())
180  
                t.join();
180  
                t.join();
181  
    }
181  
    }
182  

182  

183  
    void
183  
    void
184  
    stop() noexcept
184  
    stop() noexcept
185  
    {
185  
    {
186  
        {
186  
        {
187  
            std::lock_guard<std::mutex> lock(mutex_);
187  
            std::lock_guard<std::mutex> lock(mutex_);
188  
            stop_ = true;
188  
            stop_ = true;
189  
        }
189  
        }
190  
        cv_.notify_all();
190  
        cv_.notify_all();
191  
    }
191  
    }
192  

192  

193  
private:
193  
private:
194  
    void
194  
    void
195  
    ensure_started()
195  
    ensure_started()
196  
    {
196  
    {
197  
        std::call_once(start_flag_, [this]{
197  
        std::call_once(start_flag_, [this]{
198  
            threads_.reserve(num_threads_);
198  
            threads_.reserve(num_threads_);
199  
            for(std::size_t i = 0; i < num_threads_; ++i)
199  
            for(std::size_t i = 0; i < num_threads_; ++i)
200  
                threads_.emplace_back([this, i]{ run(i); });
200  
                threads_.emplace_back([this, i]{ run(i); });
201  
        });
201  
        });
202  
    }
202  
    }
203  

203  

204  
    void
204  
    void
205  
    run(std::size_t index)
205  
    run(std::size_t index)
206  
    {
206  
    {
207  
        // Build name; set_current_thread_name truncates to platform limits.
207  
        // Build name; set_current_thread_name truncates to platform limits.
208  
        char name[16];
208  
        char name[16];
209  
        std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
209  
        std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
210  
        set_current_thread_name(name);
210  
        set_current_thread_name(name);
211  

211  

212  
        for(;;)
212  
        for(;;)
213  
        {
213  
        {
214  
            continuation* c = nullptr;
214  
            continuation* c = nullptr;
215  
            {
215  
            {
216  
                std::unique_lock<std::mutex> lock(mutex_);
216  
                std::unique_lock<std::mutex> lock(mutex_);
217  
                cv_.wait(lock, [this]{
217  
                cv_.wait(lock, [this]{
218  
                    return !empty() ||
218  
                    return !empty() ||
219  
                        stop_;
219  
                        stop_;
220  
                });
220  
                });
221  
                if(stop_)
221  
                if(stop_)
222  
                    return;
222  
                    return;
223  
                c = pop();
223  
                c = pop();
224  
            }
224  
            }
225  
            if(c)
225  
            if(c)
226  
                c->h.resume();
226  
                c->h.resume();
227  
        }
227  
        }
228  
    }
228  
    }
229  
};
229  
};
230  

230  

231  
//------------------------------------------------------------------------------
231  
//------------------------------------------------------------------------------
232  

232  

233  
thread_pool::
233  
thread_pool::
234  
~thread_pool()
234  
~thread_pool()
235  
{
235  
{
236  
    impl_->stop();
236  
    impl_->stop();
237  
    impl_->join();
237  
    impl_->join();
238  
    impl_->drain_abandoned();
238  
    impl_->drain_abandoned();
239  
    shutdown();
239  
    shutdown();
240  
    destroy();
240  
    destroy();
241  
    delete impl_;
241  
    delete impl_;
242  
}
242  
}
243  

243  

244  
thread_pool::
244  
thread_pool::
245  
thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
245  
thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
246  
    : impl_(new impl(num_threads, thread_name_prefix))
246  
    : impl_(new impl(num_threads, thread_name_prefix))
247  
{
247  
{
248  
    this->set_frame_allocator(std::allocator<void>{});
248  
    this->set_frame_allocator(std::allocator<void>{});
249  
}
249  
}
250  

250  

251  
void
251  
void
252  
thread_pool::
252  
thread_pool::
253  
join() noexcept
253  
join() noexcept
254  
{
254  
{
255  
    impl_->join();
255  
    impl_->join();
256  
}
256  
}
257  

257  

258  
void
258  
void
259  
thread_pool::
259  
thread_pool::
260  
stop() noexcept
260  
stop() noexcept
261  
{
261  
{
262  
    impl_->stop();
262  
    impl_->stop();
263  
}
263  
}
264  

264  

265  
//------------------------------------------------------------------------------
265  
//------------------------------------------------------------------------------
266  

266  

267  
thread_pool::executor_type
267  
thread_pool::executor_type
268  
thread_pool::
268  
thread_pool::
269  
get_executor() const noexcept
269  
get_executor() const noexcept
270  
{
270  
{
271  
    return executor_type(
271  
    return executor_type(
272  
        const_cast<thread_pool&>(*this));
272  
        const_cast<thread_pool&>(*this));
273  
}
273  
}
274  

274  

275  
void
275  
void
276  
thread_pool::executor_type::
276  
thread_pool::executor_type::
277  
on_work_started() const noexcept
277  
on_work_started() const noexcept
278  
{
278  
{
279  
    pool_->impl_->on_work_started();
279  
    pool_->impl_->on_work_started();
280  
}
280  
}
281  

281  

282  
void
282  
void
283  
thread_pool::executor_type::
283  
thread_pool::executor_type::
284  
on_work_finished() const noexcept
284  
on_work_finished() const noexcept
285  
{
285  
{
286  
    pool_->impl_->on_work_finished();
286  
    pool_->impl_->on_work_finished();
287  
}
287  
}
288  

288  

289  
void
289  
void
290  
thread_pool::executor_type::
290  
thread_pool::executor_type::
291  
post(continuation& c) const
291  
post(continuation& c) const
292  
{
292  
{
293  
    pool_->impl_->post(c);
293  
    pool_->impl_->post(c);
294  
}
294  
}
295  

295  

296  
} // capy
296  
} // capy
297  
} // boost
297  
} // boost