Skip to content

Commit d59d256

Browse files
committed
Fix worker_pool::wait_for and wait_until to use lock and predicate
1 parent bef8da2 commit d59d256

3 files changed

Lines changed: 20 additions & 15 deletions

File tree

include/dispatch_queue.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,12 @@ class dispatch_queue {
140140
* @see std::future<T>::wait_until
141141
*/
142142
template<class Clock, class Duration>
143-
std::future_status wait_until(const std::chrono::time_point<Clock, Duration>& timeout_time) {
143+
bool wait_until(const std::chrono::time_point<Clock, Duration>& timeout_time) {
144144
if (worker_pool) {
145145
return worker_pool->wait_until(timeout_time);
146146
}
147147
else {
148-
return std::future_status::ready;
148+
return true;
149149
}
150150
}
151151

include/worker_pool.hpp

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ namespace dispatch_queue {
1313
namespace detail {
1414

1515
class worker_pool {
16+
auto wait_predicate() const {
17+
return [this]{ return is_shutting_down || task_queue.empty(); };
18+
}
1619
public:
1720
template<typename Fn>
1821
worker_pool(pending_task_queue& task_queue, int thread_count, Fn&& worker_init)
@@ -43,12 +46,14 @@ class worker_pool {
4346

4447
template<class Rep, class Period>
4548
std::future_status wait_for(const std::chrono::duration<Rep, Period>& timeout_duration) {
46-
return all_done_condition_variable.wait_for(timeout_duration);
49+
std::unique_lock<std::mutex> lock(mutex);
50+
return all_done_condition_variable.wait_for(lock, timeout_duration, wait_predicate());
4751
}
4852

4953
template<class Clock, class Duration>
50-
std::future_status wait_until(const std::chrono::time_point<Clock, Duration>& timeout_time) {
51-
return all_done_condition_variable.wait_until(timeout_time);
54+
bool wait_until(const std::chrono::time_point<Clock, Duration>& timeout_time) {
55+
std::unique_lock<std::mutex> lock(mutex);
56+
return all_done_condition_variable.wait_until(lock, timeout_time, wait_predicate());
5257
}
5358

5459
private:

src/worker_pool.cpp

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,25 +13,25 @@ int worker_pool::thread_count() const {
1313
}
1414

1515
size_t worker_pool::size() {
16-
std::lock_guard<std::mutex> lk(mutex);
16+
std::lock_guard<std::mutex> lock(mutex);
1717
return task_queue.size();
1818
}
1919

2020
void worker_pool::enqueue_task(pending_task&& task, bool run_on_main_loop) {
2121
{
22-
std::lock_guard<std::mutex> lk(mutex);
22+
std::lock_guard<std::mutex> lock(mutex);
2323
task_queue.push(std::move(task), run_on_main_loop);
2424
}
2525
task_condition_variable.notify_one();
2626
}
2727

2828
std::deque<pending_task> worker_pool::pop_main_loop_tasks() {
29-
std::lock_guard<std::mutex> lk(mutex);
29+
std::lock_guard<std::mutex> lock(mutex);
3030
return task_queue.pop_main_loop_tasks();
3131
}
3232

3333
void worker_pool::clear() {
34-
std::lock_guard<std::mutex> lk(mutex);
34+
std::lock_guard<std::mutex> lock(mutex);
3535
task_queue.clear();
3636
}
3737

@@ -41,7 +41,7 @@ void worker_pool::shutdown() {
4141
}
4242

4343
{
44-
std::lock_guard<std::mutex> lk(mutex);
44+
std::lock_guard<std::mutex> lock(mutex);
4545
is_shutting_down = true;
4646
}
4747
for (int i = 0; i < thread_count(); i++) {
@@ -57,17 +57,17 @@ void worker_pool::shutdown() {
5757
}
5858

5959
void worker_pool::wait() {
60-
std::unique_lock<std::mutex> lk(mutex);
61-
all_done_condition_variable.wait(lk, [this]{ return is_shutting_down || task_queue.empty(); });
60+
std::unique_lock<std::mutex> lock(mutex);
61+
all_done_condition_variable.wait(lock, wait_predicate());
6262
}
6363

6464
void worker_pool::run_task_loop() {
6565
while (true) {
6666
// 1. Get a valid task
6767
pending_task task;
6868
{
69-
std::unique_lock<std::mutex> lk(mutex);
70-
task_condition_variable.wait(lk, [this, &task]() { return is_shutting_down || task_queue.try_pop(task); });
69+
std::unique_lock<std::mutex> lock(mutex);
70+
task_condition_variable.wait(lock, [this, &task]() { return is_shutting_down || task_queue.try_pop(task); });
7171
if (is_shutting_down) {
7272
return;
7373
}
@@ -79,7 +79,7 @@ void worker_pool::run_task_loop() {
7979
// 3. If all is done, notify waiters
8080
bool all_done;
8181
{
82-
std::lock_guard<std::mutex> lk(mutex);
82+
std::lock_guard<std::mutex> lock(mutex);
8383
all_done = task_queue.empty();
8484
}
8585
if (all_done) {

0 commit comments

Comments
 (0)