Eclipse SUMO - Simulation of Urban MObility
WorkStealingThreadPool.h
Go to the documentation of this file.
1 /****************************************************************************/
2 // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.org/sumo
3 // Copyright (C) 2020-2020 German Aerospace Center (DLR) and others.
4 // This program and the accompanying materials are made available under the
5 // terms of the Eclipse Public License 2.0 which is available at
6 // https://www.eclipse.org/legal/epl-2.0/
7 // This Source Code may also be made available under the following Secondary
8 // Licenses when the conditions for such availability set forth in the Eclipse
9 // Public License 2.0 are satisfied: GNU General Public License, version 2
10 // or later which is available at
11 // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12 // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13 /****************************************************************************/
18 // Threadpool implementation,
19 // based on https://github.com/vukis/Cpp-Utilities/tree/master/ThreadPool
20 /****************************************************************************/
21 #pragma once
22 
23 #include "TaskQueue.h"
24 #include <algorithm>
25 #include <thread>
26 
27 template<typename CONTEXT = int>
29 public:
30 
31  explicit WorkStealingThreadPool(const bool workSteal, const std::vector<CONTEXT>& context)
32  : myQueues{ context.size() }, myTryoutCount(workSteal ? 1 : 0) {
33  size_t index = 0;
34  for (const CONTEXT& c : context) {
35  if (workSteal) {
36  myThreads.emplace_back([this, index, c] { workStealRun(index, c); });
37  } else {
38  myThreads.emplace_back([this, index, c] { run(index, c); });
39  }
40  index++;
41  }
42  }
43 
45  for (auto& queue : myQueues) {
46  queue.setEnabled(false);
47  }
48  for (auto& thread : myThreads) {
49  thread.join();
50  }
51  }
52 
53  template<typename TaskT>
54  auto executeAsync(TaskT&& task, int idx = -1) -> std::future<decltype(task(std::declval<CONTEXT>()))> {
55  const auto index = idx == -1 ? myQueueIndex++ : idx;
56  if (myTryoutCount > 0) {
57  for (size_t n = 0; n != myQueues.size() * myTryoutCount; ++n) {
58  // Here we need not to std::forward just copy task.
59  // Because if the universal reference of task has bound to an r-value reference
60  // then std::forward will have the same effect as std::move and thus task is not required to contain a valid task.
61  // Universal reference must only be std::forward'ed a exactly zero or one times.
62  bool success = false;
63  auto result = myQueues[(index + n) % myQueues.size()].tryPush(task, success);
64 
65  if (success) {
66  return std::move(result);
67  }
68  }
69  }
70  return myQueues[index % myQueues.size()].push(std::forward<TaskT>(task));
71  }
72 
73  void waitAll() {
74  std::vector<std::future<void>> results;
75  for (int n = 0; n != (int)myQueues.size(); ++n) {
76  results.push_back(executeAsync([](CONTEXT) {}, n));
77  }
78  for (auto& r : results) {
79  r.wait();
80  }
81  }
82 
83 private:
84  void run(size_t queueIndex, const CONTEXT& context) {
85  while (myQueues[queueIndex].isEnabled()) {
86  typename TaskQueue<CONTEXT>::TaskPtrType task;
87  if (myQueues[queueIndex].waitAndPop(task)) {
88  task->exec(context);
89  }
90  }
91  }
92 
93  void workStealRun(size_t queueIndex, const CONTEXT& context) {
94  while (myQueues[queueIndex].isEnabled()) {
95  typename TaskQueue<CONTEXT>::TaskPtrType task;
96  for (size_t n = 0; n != myQueues.size()*myTryoutCount; ++n) {
97  if (myQueues[(queueIndex + n) % myQueues.size()].tryPop(task)) {
98  break;
99  }
100  }
101  if (!task && !myQueues[queueIndex].waitAndPop(task)) {
102  return;
103  }
104  task->exec(context);
105  }
106  }
107 
108 private:
109  std::vector<TaskQueue<CONTEXT> > myQueues;
110  std::atomic<size_t> myQueueIndex{ 0 };
111  const size_t myTryoutCount;
112  std::vector<std::thread> myThreads;
113 };
std::unique_ptr< TaskBase< C > > TaskPtrType
Definition: TaskQueue.h:51
std::vector< TaskQueue< CONTEXT > > myQueues
auto executeAsync(TaskT &&task, int idx=-1) -> std::future< decltype(task(std::declval< CONTEXT >()))>
void workStealRun(size_t queueIndex, const CONTEXT &context)
std::vector< std::thread > myThreads
WorkStealingThreadPool(const bool workSteal, const std::vector< CONTEXT > &context)
void run(size_t queueIndex, const CONTEXT &context)
std::atomic< size_t > myQueueIndex