SLikeNet  0.1.3
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
ThreadPool.h
Go to the documentation of this file.
1 /*
2  * Original work: Copyright (c) 2014, Oculus VR, Inc.
3  * All rights reserved.
4  *
5  * This source code is licensed under the BSD-style license found in the
6  * RakNet License.txt file in the licenses directory of this source tree. An additional grant
7  * of patent rights can be found in the RakNet Patents.txt file in the same directory.
8  *
9  *
10  * Modified work: Copyright (c) 2016-2017, SLikeSoft UG (haftungsbeschränkt)
11  *
12  * This source code was modified by SLikeSoft. Modifications are licensed under the MIT-style
13  * license found in the license.txt file in the root directory of this source tree.
14  */
15 
16 #ifndef __THREAD_POOL_H
17 #define __THREAD_POOL_H
18 
19 #include "memoryoverride.h"
20 #include "DS_Queue.h"
21 #include "SimpleMutex.h"
22 #include "Export.h"
23 #include "thread.h"
24 #include "SignaledEvent.h"
25 
27 {
28 public:
30  virtual ~ThreadDataInterface() {}
31 
32  virtual void* PerThreadFactory(void *context)=0;
33  virtual void PerThreadDestructor(void* factoryResult, void *context)=0;
34 };
39 template <class InputType, class OutputType>
41 {
42  ThreadPool();
43  ~ThreadPool();
44 
51  bool StartThreads(int numThreads, int stackSize, void* (*_perThreadInit)()=0, void (*_perThreadDeinit)(void*)=0);
52 
53  // Alternate form of _perThreadDataFactory, _perThreadDataDestructor
54  void SetThreadDataInterface(ThreadDataInterface *tdi, void *context);
55 
57  void StopThreads(void);
58 
67  void AddInput(OutputType (*workerThreadCallback)(InputType, bool *returnOutput, void* perThreadData), InputType inputData);
68 
72  void AddOutput(OutputType outputData);
73 
76  bool HasOutput(void);
77 
80  bool HasOutputFast(void);
81 
84  bool HasInput(void);
85 
88  bool HasInputFast(void);
89 
93  OutputType GetOutput(void);
94 
96  void Clear(void);
97 
100  void LockInput(void);
101 
103  void UnlockInput(void);
104 
106  unsigned InputSize(void);
107 
109  InputType GetInputAtIndex(unsigned index);
110 
112  void RemoveInputAtIndex(unsigned index);
113 
116  void LockOutput(void);
117 
119  void UnlockOutput(void);
120 
122  unsigned OutputSize(void);
123 
125  OutputType GetOutputAtIndex(unsigned index);
126 
128  void RemoveOutputAtIndex(unsigned index);
129 
131  void ClearInput(void);
132 
134  void ClearOutput(void);
135 
137  bool IsWorking(void);
138 
140  int NumThreadsWorking(void);
141 
143  bool WasStarted(void);
144 
145  // Block until all threads are stopped.
146  bool Pause(void);
147 
148  // Continue running
149  void Resume(void);
150 
151 protected:
152  // It is valid to cancel input before it is processed. To do so, lock the inputQueue with inputQueueMutex,
153  // Scan the list, and remove the item you don't want.
154  SLNet::SimpleMutex inputQueueMutex, outputQueueMutex, workingThreadCountMutex, runThreadsMutex;
155 
156  void* (*perThreadDataFactory)();
157  void (*perThreadDataDestructor)(void*);
158 
159  // inputFunctionQueue & inputQueue are paired arrays so if you delete from one at a particular index you must delete from the other
160  // at the same index
164 
166  void *tdiContext;
167 
168 
169  template <class ThreadInputType, class ThreadOutputType>
170  friend RAK_THREAD_DECLARATION(WorkerThread);
171 
172  /*
173 #ifdef _WIN32
174  friend unsigned __stdcall WorkerThread( LPVOID arguments );
175 #else
176  friend void* WorkerThread( void* arguments );
177 #endif
178  */
179 
188 
190 
191 // #if defined(SN_TARGET_PSP2)
192 // SLNet::RakThread::UltUlThreadRuntime *runtime;
193 // #endif
194 };
195 
196 #include "ThreadPool.h"
197 #include "sleep.h"
198 #ifdef _WIN32
199 
200 #else
201 #include <unistd.h>
202 #endif
203 
204 template <class ThreadInputType, class ThreadOutputType>
205 RAK_THREAD_DECLARATION(WorkerThread)
206 /*
207 #ifdef _WIN32
208 unsigned __stdcall WorkerThread( LPVOID arguments )
209 #else
210 void* WorkerThread( void* arguments )
211 #endif
212 */
213 {
214 
215 
216 
218 
219 
221  ThreadOutputType (*userCallback)(ThreadInputType, bool *, void*);
222  ThreadInputType inputData;
223  ThreadOutputType callbackOutput;
224 
225  userCallback=0;
226 
228  if (threadPool->perThreadDataFactory)
229  perThreadData=threadPool->perThreadDataFactory();
230  else if (threadPool->threadDataInterface)
231  perThreadData=threadPool->threadDataInterface->PerThreadFactory(threadPool->tdiContext);
232  else
233  perThreadData=0;
234 
235  // Increase numThreadsRunning
236  threadPool->numThreadsRunningMutex.Lock();
237  ++threadPool->numThreadsRunning;
238  threadPool->numThreadsRunningMutex.Unlock();
239 
240  for(;;)
241  {
242 //#ifdef _WIN32
243  if (userCallback==0)
244  {
245  threadPool->quitAndIncomingDataEvents.WaitOnEvent(1000);
246  }
247 // #else
248 // if (userCallback==0)
249 // RakSleep(30);
250 // #endif
251 
252  threadPool->runThreadsMutex.Lock();
253  if (threadPool->runThreads==false)
254  {
255  threadPool->runThreadsMutex.Unlock();
256  break;
257  }
258  threadPool->runThreadsMutex.Unlock();
259 
260  threadPool->workingThreadCountMutex.Lock();
261  ++threadPool->numThreadsWorking;
262  threadPool->workingThreadCountMutex.Unlock();
263 
264  // Read input data
265  userCallback=0;
266  threadPool->inputQueueMutex.Lock();
267  if (threadPool->inputFunctionQueue.Size())
268  {
269  userCallback=threadPool->inputFunctionQueue.Pop();
270  inputData=threadPool->inputQueue.Pop();
271  }
272  threadPool->inputQueueMutex.Unlock();
273 
274  if (userCallback)
275  {
276  callbackOutput=userCallback(inputData, &returnOutput,perThreadData);
277  if (returnOutput)
278  {
279  threadPool->outputQueueMutex.Lock();
280  threadPool->outputQueue.Push(callbackOutput, _FILE_AND_LINE_ );
281  threadPool->outputQueueMutex.Unlock();
282  }
283  }
284 
285  threadPool->workingThreadCountMutex.Lock();
286  --threadPool->numThreadsWorking;
287  threadPool->workingThreadCountMutex.Unlock();
288  }
289 
290  // Decrease numThreadsRunning
291  threadPool->numThreadsRunningMutex.Lock();
292  --threadPool->numThreadsRunning;
293  threadPool->numThreadsRunningMutex.Unlock();
294 
295  if (threadPool->perThreadDataDestructor)
296  threadPool->perThreadDataDestructor(perThreadData);
297  else if (threadPool->threadDataInterface)
298  threadPool->threadDataInterface->PerThreadDestructor(perThreadData, threadPool->tdiContext);
299 
300 
301 
302 
303  return 0;
304 
305 }
306 template <class InputType, class OutputType>
308 {
309  runThreads=false;
311  threadDataInterface=0;
312  tdiContext=0;
313  numThreadsWorking=0;
314 
315 }
316 template <class InputType, class OutputType>
318 {
319  StopThreads();
320  Clear();
321 }
322 template <class InputType, class OutputType>
323 bool ThreadPool<InputType, OutputType>::StartThreads(int numThreads, int stackSize, void* (*_perThreadDataFactory)(), void (*_perThreadDataDestructor)(void *))
324 {
325  (void) stackSize;
326 
327 // #if defined(SN_TARGET_PSP2)
328 // runtime = SLNet::RakThread::AllocRuntime(numThreads);
329 // #endif
330 
331  runThreadsMutex.Lock();
332  if (runThreads==true)
333  {
334  // Already running
335  runThreadsMutex.Unlock();
336  return false;
337  }
338  runThreadsMutex.Unlock();
339 
340  quitAndIncomingDataEvents.InitEvent();
341 
342  perThreadDataFactory=_perThreadDataFactory;
343  perThreadDataDestructor=_perThreadDataDestructor;
344 
345  runThreadsMutex.Lock();
346  runThreads=true;
347  runThreadsMutex.Unlock();
348 
349  numThreadsWorking=0;
350  unsigned threadId = 0;
351  (void) threadId;
352  int i;
353  for (i=0; i < numThreads; i++)
354  {
355  int errorCode;
356 
357 
358 
359 
360  errorCode = SLNet::RakThread::Create(WorkerThread<InputType, OutputType>, this);
361 
362  if (errorCode!=0)
363  {
364  StopThreads();
365  return false;
366  }
367  }
368  // Wait for number of threads running to increase to numThreads
369  bool done=false;
370  while (done==false)
371  {
372  RakSleep(50);
373  numThreadsRunningMutex.Lock();
374  if (numThreadsRunning==numThreads)
375  done=true;
376  numThreadsRunningMutex.Unlock();
377  }
378 
379  return true;
380 }
381 template <class InputType, class OutputType>
383 {
384  threadDataInterface=tdi;
385  tdiContext=context;
386 }
387 template <class InputType, class OutputType>
389 {
390  runThreadsMutex.Lock();
391  if (runThreads==false)
392  {
393  runThreadsMutex.Unlock();
394  return;
395  }
396 
397  runThreads=false;
398  runThreadsMutex.Unlock();
399 
400  // Wait for number of threads running to decrease to 0
401  bool done=false;
402  while (done==false)
403  {
404  quitAndIncomingDataEvents.SetEvent();
405 
406  RakSleep(50);
407  numThreadsRunningMutex.Lock();
408  if (numThreadsRunning==0)
409  done=true;
410  numThreadsRunningMutex.Unlock();
411  }
412 
413  quitAndIncomingDataEvents.CloseEvent();
414 
415 // #if defined(SN_TARGET_PSP2)
416 // SLNet::RakThread::DeallocRuntime(runtime);
417 // runtime=0;
418 // #endif
419 
420 }
421 template <class InputType, class OutputType>
422 void ThreadPool<InputType, OutputType>::AddInput(OutputType (*workerThreadCallback)(InputType, bool *returnOutput, void* perThreadData), InputType inputData)
423 {
424  inputQueueMutex.Lock();
425  inputQueue.Push(inputData, _FILE_AND_LINE_ );
426  inputFunctionQueue.Push(workerThreadCallback, _FILE_AND_LINE_ );
427  inputQueueMutex.Unlock();
428 
429  quitAndIncomingDataEvents.SetEvent();
430 }
431 template <class InputType, class OutputType>
433 {
434  outputQueueMutex.Lock();
435  outputQueue.Push(outputData, _FILE_AND_LINE_ );
436  outputQueueMutex.Unlock();
437 }
438 template <class InputType, class OutputType>
440 {
441  return outputQueue.IsEmpty()==false;
442 }
443 template <class InputType, class OutputType>
445 {
446  bool res;
447  outputQueueMutex.Lock();
448  res=outputQueue.IsEmpty()==false;
449  outputQueueMutex.Unlock();
450  return res;
451 }
452 template <class InputType, class OutputType>
454 {
455  return inputQueue.IsEmpty()==false;
456 }
457 template <class InputType, class OutputType>
459 {
460  bool res;
461  inputQueueMutex.Lock();
462  res=inputQueue.IsEmpty()==false;
463  inputQueueMutex.Unlock();
464  return res;
465 }
466 template <class InputType, class OutputType>
468 {
469  // Real output check
470  OutputType output;
471  outputQueueMutex.Lock();
472  output=outputQueue.Pop();
473  outputQueueMutex.Unlock();
474  return output;
475 }
476 template <class InputType, class OutputType>
478 {
479  runThreadsMutex.Lock();
480  if (runThreads)
481  {
482  runThreadsMutex.Unlock();
483  inputQueueMutex.Lock();
484  inputFunctionQueue.Clear(_FILE_AND_LINE_);
485  inputQueue.Clear(_FILE_AND_LINE_);
486  inputQueueMutex.Unlock();
487 
488  outputQueueMutex.Lock();
489  outputQueue.Clear(_FILE_AND_LINE_);
490  outputQueueMutex.Unlock();
491  }
492  else
493  {
494  inputFunctionQueue.Clear(_FILE_AND_LINE_);
495  inputQueue.Clear(_FILE_AND_LINE_);
496  outputQueue.Clear(_FILE_AND_LINE_);
497  }
498 }
499 template <class InputType, class OutputType>
501 {
502  inputQueueMutex.Lock();
503 }
504 template <class InputType, class OutputType>
506 {
507  inputQueueMutex.Unlock();
508 }
509 template <class InputType, class OutputType>
511 {
512  return inputQueue.Size();
513 }
514 template <class InputType, class OutputType>
516 {
517  return inputQueue[index];
518 }
519 template <class InputType, class OutputType>
521 {
522  inputQueue.RemoveAtIndex(index);
523  inputFunctionQueue.RemoveAtIndex(index);
524 }
525 template <class InputType, class OutputType>
527 {
528  outputQueueMutex.Lock();
529 }
530 template <class InputType, class OutputType>
532 {
533  outputQueueMutex.Unlock();
534 }
535 template <class InputType, class OutputType>
537 {
538  return outputQueue.Size();
539 }
540 template <class InputType, class OutputType>
542 {
543  return outputQueue[index];
544 }
545 template <class InputType, class OutputType>
547 {
548  outputQueue.RemoveAtIndex(index);
549 }
550 template <class InputType, class OutputType>
552 {
553  inputQueue.Clear(_FILE_AND_LINE_);
554  inputFunctionQueue.Clear(_FILE_AND_LINE_);
555 }
556 
557 template <class InputType, class OutputType>
559 {
560  outputQueue.Clear(_FILE_AND_LINE_);
561 }
562 template <class InputType, class OutputType>
564 {
565  bool isWorking;
566 // workingThreadCountMutex.Lock();
567 // isWorking=numThreadsWorking!=0;
568 // workingThreadCountMutex.Unlock();
569 
570 // if (isWorking)
571 // return true;
572 
573  // Bug fix: Originally the order of these two was reversed.
574  // It's possible with the thread timing that working could have been false, then it picks up the data in the other thread, then it checks
575  // here and sees there is no data. So it thinks the thread is not working when it was.
576  if (HasOutputFast() && HasOutput())
577  return true;
578 
579  if (HasInputFast() && HasInput())
580  return true;
581 
582  // Need to check is working again, in case the thread was between the first and second checks
583  workingThreadCountMutex.Lock();
584  isWorking=numThreadsWorking!=0;
585  workingThreadCountMutex.Unlock();
586 
587  return isWorking;
588 }
589 
590 template <class InputType, class OutputType>
592 {
593  return numThreadsWorking;
594 }
595 
596 template <class InputType, class OutputType>
598 {
599  bool b;
600  runThreadsMutex.Lock();
601  b = runThreads;
602  runThreadsMutex.Unlock();
603  return b;
604 }
605 template <class InputType, class OutputType>
607 {
608  if (WasStarted()==false)
609  return false;
610 
611  workingThreadCountMutex.Lock();
612  while (numThreadsWorking>0)
613  {
614  RakSleep(30);
615  }
616  return true;
617 }
618 template <class InputType, class OutputType>
620 {
621  workingThreadCountMutex.Unlock();
622 }
623 
624 #endif
625