Grok  7.6.2
ThreadPool.hpp
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2016-2020 Grok Image Compression Inc.
3  *
4  * This source code is free software: you can redistribute it and/or modify
5  * it under the terms of the GNU Affero General Public License, version 3,
6  * as published by the Free Software Foundation.
7  *
8  * This source code is distributed in the hope that it will be useful,
9  * but WITHOUT ANY WARRANTY; without even the implied warranty of
10  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11  * GNU Affero General Public License for more details.
12  *
13  * You should have received a copy of the GNU Affero General Public License
14  * along with this program. If not, see <http://www.gnu.org/licenses/>.
15  *
16  */
17 
18 
19 #pragma once
20 
21 #include <vector>
22 #include <queue>
23 #include <memory>
24 #include <thread>
25 #include <mutex>
26 #include <condition_variable>
27 #include <future>
28 #include <functional>
29 #include <stdexcept>
30 #include <map>
31 #ifdef __linux__
32 #include "pthread.h"
33 #endif
34 #include <type_traits>
35 
36 
37 class ThreadPool {
38 public:
39  ThreadPool(size_t);
40  template<class F, class... Args>
41  auto enqueue(F&& f, Args&&... args)
42  -> std::future<typename std::invoke_result<F,Args...>::type>;
43  ~ThreadPool();
44  int thread_number(std::thread::id id){
45  if (id_map.find(id) != id_map.end())
46  return id_map[id];
47  return -1;
48  }
49  size_t num_threads(){return m_num_threads;}
50 
51  static ThreadPool* get(){
52  return instance(0);
53  }
54  static ThreadPool* instance(uint32_t numthreads){
55  std::unique_lock<std::mutex> lock(singleton_mutex);
56  if (!singleton)
57  singleton = new ThreadPool(numthreads ? numthreads : hardware_concurrency());
58  return singleton;
59  }
60  static void release(){
61  std::unique_lock<std::mutex> lock(singleton_mutex);
62  delete singleton;
63  singleton = nullptr;
64  }
65  static uint32_t hardware_concurrency() {
66  uint32_t ret = 0;
67 
68  #if _MSC_VER >= 1200 && MSC_VER <= 1910
69  SYSTEM_INFO sysinfo;
70  GetSystemInfo(&sysinfo);
71  ret = sysinfo.dwNumberOfProcessors;
72 
73  #else
74  ret = std::thread::hardware_concurrency();
75  #endif
76  return ret;
77  }
78 private:
79  // need to keep track of threads so we can join them
80  std::vector< std::thread > workers;
81  // the task queue
82  std::queue< std::function<void()> > tasks;
83 
84  // synchronization
85  std::mutex queue_mutex;
86  std::condition_variable condition;
87  bool stop;
88 
89  std::map<std::thread::id, int> id_map;
90  size_t m_num_threads;
91 
93  static std::mutex singleton_mutex;
94 
95 
96 };
97 
98 // the constructor just launches some amount of workers
99 inline ThreadPool::ThreadPool(size_t threads)
100  : stop(false), m_num_threads(threads)
101 {
102  if (threads == 1)
103  return;
104 
105  for(size_t i = 0;i<threads;++i)
106  workers.emplace_back(
107  [this]
108  {
109  for(;;)
110  {
111  std::function<void()> task;
112 
113  {
114  std::unique_lock<std::mutex> lock(this->queue_mutex);
115  this->condition.wait(lock,
116  [this]{ return this->stop || !this->tasks.empty(); });
117  if(this->stop && this->tasks.empty())
118  return;
119  task = std::move(this->tasks.front());
120  this->tasks.pop();
121  }
122 
123  task();
124  }
125  }
126  );
127  int thread_count = 0;
128  for(std::thread &worker: workers){
129  id_map[worker.get_id()] = thread_count;
130 #ifdef __linux__
131  // Create a cpu_set_t object representing a set of CPUs. Clear it and mark
132  // only CPU i as set.
133  // Note: we assume that the second half of the logical cores
134  // are hyper-threaded siblings to the first half
135  cpu_set_t cpuset;
136  CPU_ZERO(&cpuset);
137  CPU_SET(thread_count, &cpuset);
138  int rc = pthread_setaffinity_np(worker.native_handle(),
139  sizeof(cpu_set_t), &cpuset);
140  if (rc != 0) {
141  std::cerr << "Error calling pthread_setaffinity_np: " << rc << "\n";
142  }
143 #endif
144  thread_count++;
145  }
146 
147 }
148 
149 // add new work item to the pool
150 template<class F, class... Args>
151 auto ThreadPool::enqueue(F&& f, Args&&... args)
152  -> std::future<typename std::invoke_result<F,Args...>::type>
153 {
154  assert(m_num_threads > 1);
155  using return_type = typename std::invoke_result<F,Args...>::type;
156 
157  auto task = std::make_shared< std::packaged_task<return_type()> >(
158  std::bind(std::forward<F>(f), std::forward<Args>(args)...)
159  );
160 
161  std::future<return_type> res = task->get_future();
162  {
163  std::unique_lock<std::mutex> lock(queue_mutex);
164 
165  // don't allow enqueueing after stopping the pool
166  if(stop)
167  throw std::runtime_error("enqueue on stopped ThreadPool");
168 
169  tasks.emplace([task](){ (*task)(); });
170  }
171  condition.notify_one();
172  return res;
173 }
174 
175 // the destructor joins all threads
177 {
178  {
179  std::unique_lock<std::mutex> lock(queue_mutex);
180  stop = true;
181  }
182  condition.notify_all();
183  for(std::thread &worker: workers)
184  worker.join();
185 }
ThreadPool
Definition: ThreadPool.hpp:37
grk::ISparseBuffer
Definition: SparseBuffer.h:72
ThreadPool::get
static ThreadPool * get()
Definition: ThreadPool.hpp:51
ThreadPool::singleton
static ThreadPool * singleton
Definition: ThreadPool.hpp:92
main
int main()
Definition: TestSparseBuffer.cpp:57
grk::SparseBuffer
Definition: SparseBuffer.h:159
ThreadPool::queue_mutex
std::mutex queue_mutex
Definition: ThreadPool.hpp:85
ThreadPool::num_threads
size_t num_threads()
Definition: ThreadPool.hpp:49
ThreadPool::stop
bool stop
Definition: ThreadPool.hpp:87
ThreadPool::instance
static ThreadPool * instance(uint32_t numthreads)
Definition: ThreadPool.hpp:54
ThreadPool::enqueue
auto enqueue(F &&f, Args &&... args) -> std::future< typename std::invoke_result< F, Args... >::type >
Definition: ThreadPool.hpp:151
ThreadPool::ThreadPool
ThreadPool(size_t)
Definition: ThreadPool.hpp:99
grk
Copyright (C) 2016-2020 Grok Image Compression Inc.
Definition: BitIO.cpp:23
ThreadPool::singleton_mutex
static std::mutex singleton_mutex
Definition: ThreadPool.hpp:93
grk_includes.h
grk::grk_rect_u32
grk_rectangle< uint32_t > grk_rect_u32
Definition: util.h:48
ThreadPool::hardware_concurrency
static uint32_t hardware_concurrency()
Definition: ThreadPool.hpp:65
ThreadPool::condition
std::condition_variable condition
Definition: ThreadPool.hpp:86
ThreadPool::release
static void release()
Definition: ThreadPool.hpp:60
ThreadPool::workers
std::vector< std::thread > workers
Definition: ThreadPool.hpp:80
ThreadPool::m_num_threads
size_t m_num_threads
Definition: ThreadPool.hpp:90
ThreadPool::~ThreadPool
~ThreadPool()
Definition: ThreadPool.hpp:176
ThreadPool::id_map
std::map< std::thread::id, int > id_map
Definition: ThreadPool.hpp:89
ThreadPool::thread_number
int thread_number(std::thread::id id)
Definition: ThreadPool.hpp:44
ThreadPool::tasks
std::queue< std::function< void()> > tasks
Definition: ThreadPool.hpp:82