source: XIOS3/trunk/src/manager/thread_manager.hpp @ 2547

Last change on this file since 2547 was 2547, checked in by ymipsl, 10 months ago

Major update :

  • New method to lock and unlock one-sided windows (window_dynamic) to avoid network overhead
  • Introducing multithreading on server sided to manage more efficiently dead-lock occuring (similar to co-routine which will be available and implemented in futur c++ standard), based on c++ threads
  • Suprression of old "attached mode" which is replaced by online writer and reder filters

YM

  • Property svn:executable set to *
File size: 4.9 KB
Line 
1#ifndef __THREAD_MANAGER_HPP__
2#define __THREAD_MANAGER_HPP__
3
4#include <thread>             // std::thread
5#include <mutex>              // std::mutex, std::unique_lock
6#include <condition_variable> // std::condition_variable
7#include <map>
8
9
10namespace xios
11{
12  class CThreadManager
13  {
14    private:
15     
16      struct SThreadInfo
17      {
18        std::thread* thread;
19        std::thread::id previous;
20        std::thread::id next;
21        bool isNotified ;
22        std::condition_variable* cvs ;
23        std::unique_lock<std::mutex>* lck;
24        bool finished ;
25        bool isMainThread ;
26      } ;
27
28   
29      static std::mutex* mtx_;
30      static std::map<std::thread::id, SThreadInfo>* threads_;
31      static std::thread::id masterThreadId_ ;
32      static bool usingThreads_ ;
33   
34    public:
35     
36      static int getNumThreads(void) { return threads_->size()-1 ;}
37     
38      template <class Fn, class... Args> 
39      static void spawnThread(Fn&& fn, Args&&... args)
40      {
41        std::thread::id myId=std::this_thread::get_id() ;
42
43        SThreadInfo myThreadInfo ;
44        myThreadInfo.thread=nullptr ;
45        myThreadInfo.previous = myId ;
46        myThreadInfo.next = (*threads_)[myId].next ;
47        myThreadInfo.cvs = new std::condition_variable() ;
48        myThreadInfo.thread = new std::thread(fn, args...) ;
49        myThreadInfo.finished = false ;
50        myThreadInfo.isNotified = false ;
51        (*threads_)[myThreadInfo.thread->get_id()] =  myThreadInfo ;
52        (*threads_)[(*threads_)[myId].next].previous = myThreadInfo.thread->get_id() ;
53        (*threads_)[myId].next = myThreadInfo.thread->get_id() ; 
54
55        do (*threads_)[myId].cvs->wait(*((*threads_)[myId].lck)) ;
56        while(!(*threads_)[myId].isNotified) ;
57
58        (*threads_)[myId].isNotified=false ;
59      }
60
61      static void yield(void)
62      {
63        if (isMasterThread) checkJoin() ;
64        std::thread::id id = std::this_thread::get_id() ;
65        if (getNumThreads()>0)
66        {
67          (*threads_)[(*threads_)[id].next].isNotified=true ;
68          (*threads_)[(*threads_)[id].next].cvs->notify_one() ;
69          do (*threads_)[id].cvs->wait(*((*threads_)[id].lck)) ;
70          while(!(*threads_)[id].isNotified) ;
71
72          (*threads_)[id].isNotified=false ;
73
74        }
75
76      }
77   
78      static void checkJoin(void)
79      {
80        std::thread::id myId = std::this_thread::get_id() ;
81        for(auto it=(*threads_).begin(); it!=(*threads_).end(); )
82        { 
83          if (it->first!=myId)
84          {
85            if (it->second.finished) 
86            {
87              it->second.thread->join() ;
88              it=(*threads_).erase(it) ;
89            }
90            else 
91            {
92              it++ ;
93            }
94          }
95            else it++ ;
96        }
97      }
98
99      static bool isMasterThread(void) { return std::this_thread::get_id()==masterThreadId_; }
100      static bool isUsingThreads(void) { return usingThreads_;}
101      static void threadInitialize(void)
102      {
103        std::unique_lock<std::mutex>* lck = new std::unique_lock<std::mutex>(*mtx_);
104        std::thread::id id=std::this_thread::get_id() ;
105
106        (*threads_)[id].lck = lck ;
107        (*threads_)[(*threads_)[id].previous].isNotified = true ;
108        (*threads_)[(*threads_)[id].previous].cvs->notify_one() ;
109        do (*threads_)[id].cvs->wait(*lck) ; while(!(*threads_)[id].isNotified) ;
110        (*threads_)[id].isNotified=false ;
111      }
112   
113      static void threadFinalize(void)
114      {
115        std::thread::id myId = std::this_thread::get_id() ;
116        (*threads_)[(*threads_)[myId].previous].next = (*threads_)[myId].next;
117        (*threads_)[(*threads_)[myId].next].previous = (*threads_)[myId].previous ;
118        (*threads_)[myId].finished = true ;
119
120        (*threads_)[(*threads_)[myId].next].isNotified=true ;
121        (*threads_)[(*threads_)[myId].next].cvs->notify_one() ;
122     
123        delete (*threads_)[myId].lck ;
124      }
125   
126      static void initialize(bool usingThreads)
127      {
128        mtx_ = new std::mutex ;
129        threads_ = new std::map<std::thread::id, SThreadInfo> ;
130        usingThreads_ = usingThreads ;
131
132        std::thread::id myId= std::this_thread::get_id() ;
133        SThreadInfo myThreadInfo ;
134        myThreadInfo.thread=nullptr ;
135        myThreadInfo.previous = myId ;
136        myThreadInfo.next = myId ; 
137        myThreadInfo.cvs = new std::condition_variable() ;
138        myThreadInfo.lck = new std::unique_lock<std::mutex>(*mtx_) ;
139        myThreadInfo.finished = false ;
140        myThreadInfo.isNotified = false ;
141        (*threads_)[myId] = myThreadInfo ;
142      }
143     
144      static void finalize(void)
145      {
146       
147        std::thread::id myId = std::this_thread::get_id() ;
148        delete (*threads_)[myId].lck ;
149        delete threads_ ;
150        delete mtx_ ;
151      }
152  };
153
154}
155
156
157#endif
Note: See TracBrowser for help on using the repository browser.