Eclipse SUMO - Simulation of Urban MObility
TaskQueue.h
Go to the documentation of this file.
1 /****************************************************************************/
2 // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.org/sumo
3 // Copyright (C) 2020-2020 German Aerospace Center (DLR) and others.
4 // This program and the accompanying materials are made available under the
5 // terms of the Eclipse Public License 2.0 which is available at
6 // https://www.eclipse.org/legal/epl-2.0/
7 // This Source Code may also be made available under the following Secondary
8 // Licenses when the conditions for such availability set forth in the Eclipse
9 // Public License 2.0 are satisfied: GNU General Public License, version 2
10 // or later which is available at
11 // https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
12 // SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
13 /****************************************************************************/
18 // Threadpool implementation,
19 // based on https://github.com/vukis/Cpp-Utilities/tree/master/ThreadPool
20 /****************************************************************************/
21 #pragma once
22 
23 #include <condition_variable>
24 #include <functional>
25 #include <future>
26 #include <queue>
27 
28 template <typename C>
29 class TaskBase {
30 public:
31  virtual ~TaskBase() = default;
32  virtual void exec(const C& context) = 0;
33 };
34 
35 template <typename T, typename C>
36 class Task : public TaskBase<C> {
37 public:
38  Task(T&& t) : task(std::move(t)) {}
39  void exec(const C& context) override {
40  task(context);
41  }
42 
43  T task;
44 };
45 
46 template <typename C>
47 class TaskQueue {
48  using LockType = std::unique_lock<std::mutex>;
49 
50 public:
51  using TaskPtrType = std::unique_ptr<TaskBase<C> >;
52  TaskQueue() = default;
53  ~TaskQueue() = default;
54 
55  void setEnabled(bool enabled) {
56  {
57  LockType lock{ myMutex };
58  myEnabled = enabled;
59  }
60  if (!enabled) {
61  myReady.notify_all();
62  }
63  }
64 
65  bool isEnabled() const {
66  LockType lock{ myMutex };
67  return myEnabled;
68  }
69 
70  bool waitAndPop(TaskPtrType& task) {
71  LockType lock{ myMutex };
72  myReady.wait(lock, [this] { return !myEnabled || !myQueue.empty(); });
73  if (myEnabled && !myQueue.empty()) {
74  task = std::move(myQueue.front());
75  myQueue.pop();
76  return true;
77  }
78  return false;
79  }
80 
81  template <typename TaskT>
82  auto push(TaskT&& task) -> std::future<decltype(task(std::declval<C>()))> {
83  using PkgTask = std::packaged_task<decltype(task(std::declval<C>()))(C)>;
84  auto job = std::unique_ptr<Task<PkgTask, C>>(new Task<PkgTask, C>(PkgTask(std::forward<TaskT>(task))));
85  auto future = job->task.get_future();
86  {
87  LockType lock{ myMutex };
88  myQueue.emplace(std::move(job));
89  }
90 
91  myReady.notify_one();
92  return future;
93  }
94 
95  bool tryPop(TaskPtrType& task) {
96  LockType lock{ myMutex, std::try_to_lock };
97  if (!lock || !myEnabled || myQueue.empty()) {
98  return false;
99  }
100  task = std::move(myQueue.front());
101  myQueue.pop();
102  return true;
103  }
104 
105  template <typename TaskT>
106  auto tryPush(TaskT&& task, bool& success) -> std::future<decltype(task(std::declval<C>()))> {
107  std::future<decltype(task(std::declval<C>()))> future;
108  success = false;
109  {
110  LockType lock{ myMutex, std::try_to_lock };
111  if (!lock) {
112  return future;
113  }
114  using PkgTask = std::packaged_task<decltype(task(std::declval<C>()))(C)>;
115  auto job = std::unique_ptr<Task<PkgTask, C>>(new Task<PkgTask, C>(PkgTask(std::forward<TaskT>(task))));
116  future = job->task.get_future();
117  success = true;
118  myQueue.emplace(std::move(job));
119  }
120 
121  myReady.notify_one();
122  return future;
123  }
124 
125 private:
126  TaskQueue(const TaskQueue&) = delete;
127  TaskQueue& operator=(const TaskQueue&) = delete;
128 
129  std::queue<TaskPtrType> myQueue;
130  bool myEnabled = true;
131  mutable std::mutex myMutex;
132  std::condition_variable myReady;
133 };
virtual ~TaskBase()=default
virtual void exec(const C &context)=0
Definition: TaskQueue.h:36
T task
Definition: TaskQueue.h:43
Task(T &&t)
Definition: TaskQueue.h:38
void exec(const C &context) override
Definition: TaskQueue.h:39
auto tryPush(TaskT &&task, bool &success) -> std::future< decltype(task(std::declval< C >()))>
Definition: TaskQueue.h:106
bool waitAndPop(TaskPtrType &task)
Definition: TaskQueue.h:70
bool tryPop(TaskPtrType &task)
Definition: TaskQueue.h:95
std::condition_variable myReady
Definition: TaskQueue.h:132
TaskQueue()=default
TaskQueue & operator=(const TaskQueue &)=delete
std::queue< TaskPtrType > myQueue
Definition: TaskQueue.h:129
bool myEnabled
Definition: TaskQueue.h:130
bool isEnabled() const
Definition: TaskQueue.h:65
TaskQueue(const TaskQueue &)=delete
std::unique_lock< std::mutex > LockType
Definition: TaskQueue.h:48
auto push(TaskT &&task) -> std::future< decltype(task(std::declval< C >()))>
Definition: TaskQueue.h:82
std::unique_ptr< TaskBase< C > > TaskPtrType
Definition: TaskQueue.h:51
~TaskQueue()=default
std::mutex myMutex
Definition: TaskQueue.h:131
void setEnabled(bool enabled)
Definition: TaskQueue.h:55