NeoPZ
TPZThreadPool.cpp
Go to the documentation of this file.
1 #include "pzerror.h"
2 
3 #include "pzlog.h"
4 #ifdef LOG4CXX
5 static LoggerPtr logger(Logger::getLogger("TPZThreadPool"));
6 #endif
7 
8 #include "TPZThreadPool.h"
9 
10 #include <exception>
11 #include <iostream>
12 #include "TPZTask.h"
13 #include <mutex>
14 
15 
16 #include "TPZReschedulableTask.h"
17 
19  if (mTasksQueue.size() != 0) {
20  mMaxPriority = mTasksQueue.top()->priority();
21  } else {
22  mMaxPriority = std::numeric_limits<int>::min();
23  mMinPriority = std::numeric_limits<int>::max();
24  }
25 }
26 
28  while (true) {
30  std::function<void(void) > thread_join_task;
31  {
32  std::unique_lock<std::mutex> lock(mTasksQueue.mMutex);
33  mTaskAvailableCond.wait(lock, [this] {
34  return mStop || mThreadsToDelete != 0 || mTasksQueue.size() != 0;
35  });
36  if (mStop) {
37  while (mTasksQueue.size() != 0) {
38  task = mTasksQueue.popTop();
39  task->Cancel();
40  }
41  return;
42  }
43  if (mTasksQueue.size() == 0 || !mTasksQueue.top()->mSystemTask) {
44  {
45  if ((mThreadsToDelete != 0)) { // It may seem odd to check this so soon, but mind the cost of locking a mutex
46  std::unique_lock<std::mutex> lock(mThreadsMutex);
47  if ((mThreadsToDelete != 0)) {
48  typename std::thread::id thread_id = std::this_thread::get_id();
49  thread_join_task = [this, thread_id] {
50  std::unique_lock<std::mutex> lock(mThreadsMutex);
51  unsigned int num_threads = ActualThreadCount();
52  for (unsigned int i = 0; i < num_threads; ++i) {
53  if (mThreads[i].get_id() == thread_id) {
54  mThreads[i].join();
55  if (i != num_threads - 1) {
56  mThreads[i].swap(mThreads[num_threads - 1]);
57  }
58  mThreads.pop_back();
60  //std::cout << "Deleted thread " << thread_id << " totalizing " << mThreads.size()-mZombieThreads << " active and " << mThreads.size() << " total." << std::endl;
61  return;
62  }
63  }
64  };
67  //std::cout << "Scheduling deletion of thread " << thread_id << " totalizing " << mThreads.size()-mZombieThreads << " active and " << mThreads.size() << " total." << std::endl;
68  }
69  }
70  }
71  }
72  if (!thread_join_task) {
73  task = mTasksQueue.popTop();
75  }
76  }
77  if (thread_join_task) {
78  runSystemTask(std::numeric_limits<int>::max(), thread_join_task);
79  return ;
80  }
81  if (task) {
82  task->start();
83  }
84  }
85 }
86 
87 void TPZThreadPool::SetNumThreads(const unsigned numThreads) {
88  std::unique_lock<std::mutex> lock(mThreadsMutex);
89  unsigned int num_threads_before = threadCount();
90  int threads_to_create = numThreads - num_threads_before;
91  if (threads_to_create < 0) {
92  mThreadsToDelete -= threads_to_create;
93  for (unsigned int i = 0; i < -threads_to_create; ++i) {
94  mTaskAvailableCond.notify_one();
95  }
96  }
97  for (int i = 0; i < threads_to_create; ++i) {
98  //std::cout << "Creating thread " << i + 1 << " of " << threads_to_create << " totalizing " << threadCount() + 1 << " active and " << mThreads.size() +1 << " total." << std::endl;
99  mThreads.emplace_back([this] {
100  threadsLoop();
101  });
102  }
103 }
104 
106 mMinPriority(std::numeric_limits<int>::max()),
107 mMaxPriority(std::numeric_limits<int>::min()){
108  //SetNumThreads(std::thread::hardware_concurrency());
109 }
110 
111 std::shared_future<void> TPZThreadPool::run(const int priority, TPZAutoPointer<std::packaged_task<void(void)> >& task, TPZTaskGroup *taskGroup) {
112  std::shared_future<void> fut = task->get_future().share();
113  if (threadCount() != 0) {
114  appendTaskToQueue(priority, task, false, taskGroup);
115  } else {
116  (*task)();
117  }
118  return fut;
119 }
120 
122  TPZAutoPointer<TPZTask> autoPointerTask(TPZAutoPointerDynamicCast<TPZTask>(task));
123  {
124  std::unique_lock<std::mutex> lock(mTasksQueue.mMutex);
125  mTasksQueue.remove(autoPointerTask);
126  }
127  task->mPriority = priority;
128  appendTaskToQueue(autoPointerTask);
129 }
130 
132  {
133  std::unique_lock<std::mutex> lock(mTasksQueue.mMutex);
134  mStop = true;
135  }
136  mTaskAvailableCond.notify_all();
137  {
138  std::unique_lock<std::mutex> lock(mThreadsMutex);
139  for (auto &thread : mThreads) {
140  if (thread.joinable()) {
141  thread.join();
142  }
143  }
144  }
145 }
146 
148  return mMaxPriority;
149 }
150 
152  return mMinPriority;
153 }
154 
157 }
158 
160  return mThreads.size();
161 }
162 
164  std::unique_lock<std::mutex> lock(mTasksQueue.mMutex);
165  mTasksQueue.addItem(task);
166  mTaskAvailableCond.notify_one();
167 }
168 
169 TPZAutoPointer<TPZTask> TPZThreadPool::appendTaskToQueue(const int priority, TPZAutoPointer<std::packaged_task<void (void)>> &task, bool system_task, TPZTaskGroup *taskGroup) {
170  TPZAutoPointer<TPZTask> newTask(new TPZTask(priority, task, taskGroup));
171  newTask->mSystemTask = system_task;
172  appendTaskToQueue(newTask);
173  return newTask;
174 }
175 
177  if (priority > mMaxPriority) {
179  }
180  if (priority < mMinPriority) {
182  }
183 }
184 
186  std::unique_lock<std::mutex> lock(task->mStateMutex);
187  switch (task->mState) {
188  case TPZReschedulableTask::EProcessingState::CREATED:
189  task->mFuture = task->mTask->get_future().share();
190  if (threadCount() != 0) {
191  TPZAutoPointer<TPZTask> tpztask = TPZAutoPointerDynamicCast<TPZTask>(task);
192  tpztask->mPriority = std::numeric_limits<int>::max();
193  appendTaskToQueue(tpztask);
194  task->mState = TPZReschedulableTask::EProcessingState::SCHEDULED;
195  task->mCondition.wait(lock);
196  } else {
197  task->startInternal();
198  task->mCondition.notify_all();
199  }
200  break;
202  TPZThreadPool::globalInstance().reschedule(std::numeric_limits<int>::max(), task);
203  task->mCondition.wait(lock);
204  break;
206  task->mCondition.wait(lock);
207  break;
209  break;
210  }
211  return task->mFuture;
212 }
213 
215  std::unique_lock<std::mutex> lock(task->mStateMutex);
216  switch (task->mState) {
217  case TPZReschedulableTask::EProcessingState::CREATED:
218  {
219  task->mFuture = task->mTask->get_future().share();
220  if (threadCount() != 0) {
221  TPZAutoPointer<TPZTask> tpztask = TPZAutoPointerDynamicCast<TPZTask>(task);
222  appendTaskToQueue(tpztask);
223  task->mState = TPZReschedulableTask::EProcessingState::SCHEDULED;
224  } else {
225  task->startInternal();
226  task->mCondition.notify_all();
227  }
228  break;
229  }
230  case TPZReschedulableTask::EProcessingState::SCHEDULED:
231  case TPZReschedulableTask::EProcessingState::STARTED:
232  case TPZReschedulableTask::EProcessingState::FINISHED:
233  break;
234  }
235 }
236 
237 
239  static TPZThreadPool globalIntstance;
240  return globalIntstance;
241 }
void addItem(const T &item)
Container::size_type size() const
clarg::argInt num_threads("-ntdec", "Number of threads to decompose in TPZParFrontStructMatrix.", 6)
unsigned int mThreadsToDelete
Definition: TPZThreadPool.h:86
Contains definitions to LOGPZ_DEBUG, LOGPZ_INFO, LOGPZ_WARN, LOGPZ_ERROR and LOGPZ_FATAL, and the implementation of the inline InitializePZLOG(string) function using log4cxx library or not. It must to be called out of "#ifdef LOG4CXX" scope.
TPZTask(const int priority, TPZAutoPointer< std::packaged_task< void(void)>> &task, TPZTaskGroup *taskGroup=NULL)
Definition: TPZTask.cpp:12
int ActualThreadCount() const
std::condition_variable mTaskAvailableCond
Definition: TPZThreadPool.h:90
EProcessingState mState
Definition: TPZTask.h:39
Administers tasks that will be executed asynchronously.
Definition: TPZThreadPool.h:23
std::mutex mThreadsMutex
one mutex to synchronize access to the data structures
Definition: TPZThreadPool.h:85
int threadCount() const
Defines PZError.
std::shared_future< void > runSystemTask(const int priority, std::function< void(Args...) > func, Args... args)
Submits and processes a "maximum priority" system task.
Definition: TPZThreadPool.h:74
void appendTaskToQueue(TPZAutoPointer< TPZTask > &task)
static TPZThreadPool & globalInstance()
TPZPriorityQueue< TPZAutoPointer< TPZTask >, std::vector< TPZAutoPointer< TPZTask > >, TPZTaskOrdering > mTasksQueue
Definition: TPZThreadPool.h:89
int maxPriority() const
void updatePriorities()
void reschedule(const int priority, TPZAutoPointer< TPZReschedulableTask > &task)
std::shared_future< void > run(const int priority, TPZAutoPointer< std::packaged_task< void(void) > > &task, TPZTaskGroup *taskGroup=NULL)
submits a task to be executed by TPZThreadPool
bool remove(T &value)
void checkForMaxAndMinPriority(const int priority)
virtual void start()
Definition: TPZTask.cpp:27
TPZAutoPointer< std::packaged_task< void(void)> > mTask
Definition: TPZTask.h:38
virtual void Cancel()
Definition: TPZTask.cpp:36
int mPriority
Definition: TPZTask.h:44
std::shared_future< void > mFuture
bool mSystemTask
Definition: TPZTask.h:43
unsigned int mZombieThreads
Definition: TPZThreadPool.h:87
void SetNumThreads(const unsigned numThreads)
sets the number of threads to be executed simultaneously
std::condition_variable mCondition
int priority() const
Definition: TPZTask.cpp:23
int minPriority() const
std::vector< std::thread > mThreads
vector of thread objects
Definition: TPZThreadPool.h:83
std::shared_future< void > runNow(TPZAutoPointer< TPZReschedulableTask > &task)
This class implements a reference counter mechanism to administer a dynamically allocated object...