SUMO - Simulation of Urban MObility
FXWorkerThread.h
Go to the documentation of this file.
1 /****************************************************************************/
2 // Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.org/sumo
3 // Copyright (C) 2004-2017 German Aerospace Center (DLR) and others.
4 /****************************************************************************/
5 //
6 // This program and the accompanying materials
7 // are made available under the terms of the Eclipse Public License v2.0
8 // which accompanies this distribution, and is available at
9 // http://www.eclipse.org/legal/epl-v20.html
10 //
11 /****************************************************************************/
17 // A thread class together with a pool and a task for parallelized computation
18 /****************************************************************************/
19 
20 #ifndef FXWorkerThread_h
21 #define FXWorkerThread_h
22 
23 
24 // ===========================================================================
25 // included modules
26 // ===========================================================================
27 #ifdef _MSC_VER
28 #include <windows_config.h>
29 #else
30 #include <config.h>
31 #endif
32 
33 #include <list>
34 #include <vector>
35 #include <fx.h>
36 #include <FXThread.h>
37 
38 
39 // ===========================================================================
40 // class definitions
41 // ===========================================================================
46 class FXWorkerThread : public FXThread {
47 
48 public:
53  class Task {
54  public:
56  virtual ~Task() {};
57 
66  virtual void run(FXWorkerThread* context) = 0;
67 
74  void setIndex(const int newIndex) {
75  myIndex = newIndex;
76  }
77  private:
79  int myIndex;
80  };
81 
86  class Pool {
87  public:
94  Pool(int numThreads = 0) : myPoolMutex(true), myRunningIndex(0), myNumFinished(0) {
95  while (numThreads > 0) {
96  new FXWorkerThread(*this);
97  numThreads--;
98  }
99  }
100 
105  virtual ~Pool() {
106  clear();
107  }
108 
111  void clear() {
112  for (std::vector<FXWorkerThread*>::iterator it = myWorkers.begin(); it != myWorkers.end(); ++it) {
113  delete *it;
114  }
115  myWorkers.clear();
116  }
117 
122  void addWorker(FXWorkerThread* const w) {
123 // if (myWorkers.empty()) std::cout << "created pool at " << SysUtils::getCurrentMillis() << std::endl;
124  myWorkers.push_back(w);
125  }
126 
133  void add(Task* const t, int index = -1) {
134  t->setIndex(myRunningIndex++);
135  if (index < 0) {
136  index = myRunningIndex % myWorkers.size();
137  }
138  myWorkers[index]->add(t);
139  }
140 
147  void addFinished(Task* const t) {
148  myMutex.lock();
149  myNumFinished++;
150  myFinishedTasks.push_back(t);
151  myCondition.signal();
152  myMutex.unlock();
153  }
154 
156  void waitAll(const bool deleteFinished = true) {
157  myMutex.lock();
158  while (myNumFinished < myRunningIndex) {
159  myCondition.wait(myMutex);
160  }
161 // if (myRunningIndex > 0) std::cout << "finished waiting for " << myRunningIndex << " tasks at " << SysUtils::getCurrentMillis() << std::endl;
162  if (deleteFinished) {
163  for (Task* task : myFinishedTasks) {
164  delete task;
165  }
166  }
167  myFinishedTasks.clear();
168  myRunningIndex = 0;
169  myNumFinished = 0;
170  myMutex.unlock();
171  }
172 
180  bool isFull() const {
181  return myRunningIndex - myNumFinished >= size();
182  }
183 
188  int size() const {
189  return (int)myWorkers.size();
190  }
191 
193  void lock() {
194  myPoolMutex.lock();
195  }
196 
198  void unlock() {
199  myPoolMutex.unlock();
200  }
201 
202  private:
204  std::vector<FXWorkerThread*> myWorkers;
206  FXMutex myMutex;
208  FXMutex myPoolMutex;
210  FXCondition myCondition;
212  std::list<Task*> myFinishedTasks;
217  };
218 
219 public:
226  FXWorkerThread(Pool& pool): FXThread(), myPool(pool), myStopped(false), myCounter(0) {
227  pool.addWorker(this);
228  start();
229  }
230 
235  virtual ~FXWorkerThread() {
236  stop();
237  }
238 
243  void add(Task* t) {
244  myMutex.lock();
245  myTasks.push_back(t);
246  myCondition.signal();
247  myMutex.unlock();
248  }
249 
256  FXint run() {
257  while (!myStopped) {
258  myMutex.lock();
259  while (!myStopped && myTasks.empty()) {
260  myCondition.wait(myMutex);
261  }
262  if (myStopped) {
263  myMutex.unlock();
264  break;
265  }
266  Task* t = myTasks.front();
267  myTasks.pop_front();
268  myMutex.unlock();
269  t->run(this);
270  myCounter++;
271 // if (myCounter % 1000 == 0) std::cout << (int)this << " ran " << myCounter << " tasks " << std::endl;
272  myPool.addFinished(t);
273  }
274 // std::cout << "ran " << myCounter << " tasks " << std::endl;
275  return 0;
276  }
277 
282  void stop() {
283  myMutex.lock();
284  myStopped = true;
285  myCondition.signal();
286  myMutex.unlock();
287  join();
288  }
289 
290 private:
294  FXMutex myMutex;
296  FXCondition myCondition;
298  std::list<Task*> myTasks;
300  bool myStopped;
303 };
304 
305 
306 #endif
std::vector< FXWorkerThread * > myWorkers
the current worker threads
int myNumFinished
the number of finished tasks (is reset when the pool runs empty)
void waitAll(const bool deleteFinished=true)
waits for all tasks to be finished
virtual ~FXWorkerThread()
Destructor.
FXCondition myCondition
the semaphore when waiting for new tasks
std::list< Task * > myTasks
the list of pending tasks
int myRunningIndex
the running index for the next task
FXWorkerThread(Pool &pool)
Constructor.
virtual ~Pool()
Destructor.
void addFinished(Task *const t)
Adds the given task to the list of finished tasks and assigns it to a randomly chosen worker...
void add(Task *t)
Adds the given task to this thread to be calculated.
FXMutex myMutex
the internal mutex for the task list
FXMutex myPoolMutex
the pool mutex for external sync
FXint run()
Main execution method of this thread.
void lock()
locks the pool mutex
bool myStopped
whether we are still running
int myIndex
the index of the task, valid only after the task has been added to the pool
int myCounter
counting completed tasks for debugging / profiling
std::list< Task * > myFinishedTasks
list of finished tasks
Pool(int numThreads=0)
Constructor.
int size() const
Returns the number of threads in the pool.
FXMutex myMutex
the mutex for the task list
void clear()
Stops and deletes all worker threads.
virtual ~Task()
Desctructor.
void add(Task *const t, int index=-1)
Gives a number to the given task and assigns it to the worker with the given index. If the index is negative, assign to the next (round robin) one.
A pool of worker threads which distributes the tasks and collects the results.
bool isFull() const
Checks whether there are currently more pending tasks than threads.
void addWorker(FXWorkerThread *const w)
Adds the given thread to the pool.
Pool & myPool
the pool for this thread
virtual void run(FXWorkerThread *context)=0
Abstract method which in subclasses should contain the computations to be performed.
void unlock()
unlocks the pool mutex
void stop()
Stops the thread.
Abstract superclass of a task to be run with an index to keep track of pending tasks.
A thread repeatingly calculating incoming tasks.
FXCondition myCondition
the semaphore to wait on for finishing all tasks
void setIndex(const int newIndex)
Sets the running index of this task.