source: XMLIO_V2/external/src/POCO/Foundation.save/ThreadPool.cpp @ 80

Last change on this file since 80 was 80, checked in by ymipsl, 14 years ago

ajout lib externe

  • Property svn:eol-style set to native
File size: 9.8 KB
Line 
1//
2// ThreadPool.cpp
3//
4// $Id: //poco/1.3/Foundation/src/ThreadPool.cpp#5 $
5//
6// Library: Foundation
7// Package: Threading
8// Module:  ThreadPool
9//
10// Copyright (c) 2004-2006, Applied Informatics Software Engineering GmbH.
11// and Contributors.
12//
13// Permission is hereby granted, free of charge, to any person or organization
14// obtaining a copy of the software and accompanying documentation covered by
15// this license (the "Software") to use, reproduce, display, distribute,
16// execute, and transmit the Software, and to prepare derivative works of the
17// Software, and to permit third-parties to whom the Software is furnished to
18// do so, all subject to the following:
19//
20// The copyright notices in the Software and this entire statement, including
21// the above license grant, this restriction and the following disclaimer,
22// must be included in all copies of the Software, in whole or in part, and
23// all derivative works of the Software, unless such copies or derivative
24// works are solely in the form of machine-executable object code generated by
25// a source language processor.
26//
27// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
28// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
29// FITNESS FOR A PARTICULAR PURPOSE, TITLE AND NON-INFRINGEMENT. IN NO EVENT
30// SHALL THE COPYRIGHT HOLDERS OR ANYONE DISTRIBUTING THE SOFTWARE BE LIABLE
31// FOR ANY DAMAGES OR OTHER LIABILITY, WHETHER IN CONTRACT, TORT OR OTHERWISE,
32// ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
33// DEALINGS IN THE SOFTWARE.
34//
35
36
37#include "Poco/ThreadPool.h"
38#include "Poco/Runnable.h"
39#include "Poco/Thread.h"
40#include "Poco/Event.h"
41#include "Poco/ThreadLocal.h"
42#include "Poco/ErrorHandler.h"
43#include <sstream>
44#include <ctime>
45
46
47namespace Poco {
48
49
50class PooledThread: public Runnable
51{
52public:
53        PooledThread(const std::string& name, int stackSize = POCO_THREAD_STACK_SIZE);
54        ~PooledThread();
55
56        void start();
57        void start(Thread::Priority priority, Runnable& target);
58        void start(Thread::Priority priority, Runnable& target, const std::string& name);
59        bool idle();
60        int idleTime();
61        void join();
62        void activate();
63        void release();
64        void run();
65
66private:
67        volatile bool        _idle;
68        volatile std::time_t _idleTime;
69        Runnable*            _pTarget;
70        std::string          _name;
71        Thread               _thread;
72        Event                _targetReady;
73        Event                _targetCompleted;
74        Event                _started;
75        FastMutex            _mutex;
76};
77
78
79PooledThread::PooledThread(const std::string& name, int stackSize): 
80        _idle(true), 
81        _idleTime(0), 
82        _pTarget(0), 
83        _name(name), 
84        _thread(name),
85        _targetCompleted(false)
86{
87        poco_assert_dbg (stackSize >= 0);
88        _thread.setStackSize(stackSize);
89        _idleTime = time(NULL);
90}
91
92
93PooledThread::~PooledThread()
94{
95}
96
97
98void PooledThread::start()
99{
100        _thread.start(*this);
101        _started.wait();
102}
103
104
105void PooledThread::start(Thread::Priority priority, Runnable& target)
106{
107        FastMutex::ScopedLock lock(_mutex);
108       
109        poco_assert (_pTarget == 0);
110
111        _pTarget = &target;
112        _thread.setPriority(priority);
113        _targetReady.set();
114}
115
116
117void PooledThread::start(Thread::Priority priority, Runnable& target, const std::string& name)
118{
119        FastMutex::ScopedLock lock(_mutex);
120
121        std::string fullName(name);
122        if (name.empty())
123        {
124                fullName = _name;
125        }
126        else
127        {
128                fullName.append(" (");
129                fullName.append(_name);
130                fullName.append(")");
131        }
132        _thread.setName(fullName);
133        _thread.setPriority(priority);
134       
135        poco_assert (_pTarget == 0);
136
137        _pTarget = &target;
138        _targetReady.set();
139}
140
141
142inline bool PooledThread::idle()
143{
144        return _idle;
145}
146
147
148int PooledThread::idleTime()
149{
150        FastMutex::ScopedLock lock(_mutex);
151
152        return (int) (time(NULL) - _idleTime);
153}
154
155
156void PooledThread::join()
157{
158        _mutex.lock();
159        Runnable* pTarget = _pTarget;
160        _mutex.unlock();
161        if (pTarget)
162                _targetCompleted.wait();
163}
164
165
166void PooledThread::activate()
167{
168        FastMutex::ScopedLock lock(_mutex);
169       
170        poco_assert (_idle);
171        _idle = false;
172        _targetCompleted.reset();
173}
174
175
176void PooledThread::release()
177{
178        _mutex.lock();
179        _pTarget = 0;
180        _mutex.unlock();
181        // In case of a statically allocated thread pool (such
182        // as the default thread pool), Windows may have already
183        // terminated the thread before we got here.
184        if (_thread.isRunning()) 
185                _targetReady.set();
186        else
187                delete this;
188}
189
190
191void PooledThread::run()
192{
193        _started.set();
194        for (;;)
195        {
196                _targetReady.wait();
197                _mutex.lock();
198                if (_pTarget) // a NULL target means kill yourself
199                {
200                        _mutex.unlock();
201                        try
202                        {
203                                _pTarget->run();
204                        }
205                        catch (Exception& exc)
206                        {
207                                ErrorHandler::handle(exc);
208                        }
209                        catch (std::exception& exc)
210                        {
211                                ErrorHandler::handle(exc);
212                        }
213                        catch (...)
214                        {
215                                ErrorHandler::handle();
216                        }
217                        FastMutex::ScopedLock lock(_mutex);
218                        _pTarget  = 0;
219                        _idleTime = time(NULL);
220                        _idle     = true;
221                        _targetCompleted.set();
222                        ThreadLocalStorage::clear();
223                        _thread.setName(_name);
224                        _thread.setPriority(Thread::PRIO_NORMAL);
225                }
226                else
227                {
228                        _mutex.unlock();
229                        break;
230                }
231        }
232        delete this;
233}
234
235
236ThreadPool::ThreadPool(int minCapacity,
237        int maxCapacity,
238        int idleTime,
239        int stackSize): 
240        _minCapacity(minCapacity), 
241        _maxCapacity(maxCapacity), 
242        _idleTime(idleTime),
243        _serial(0),
244        _age(0),
245        _stackSize(stackSize)
246{
247        poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0);
248
249        for (int i = 0; i < _minCapacity; i++)
250        {
251                PooledThread* pThread = createThread();
252                _threads.push_back(pThread);
253                pThread->start();
254        }
255}
256
257
258ThreadPool::ThreadPool(const std::string& name,
259        int minCapacity,
260        int maxCapacity,
261        int idleTime,
262        int stackSize):
263        _name(name),
264        _minCapacity(minCapacity), 
265        _maxCapacity(maxCapacity), 
266        _idleTime(idleTime),
267        _serial(0),
268        _age(0),
269        _stackSize(stackSize)
270{
271        poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0);
272
273        for (int i = 0; i < _minCapacity; i++)
274        {
275                PooledThread* pThread = createThread();
276                _threads.push_back(pThread);
277                pThread->start();
278        }
279}
280
281
282ThreadPool::~ThreadPool()
283{
284        stopAll();
285}
286
287
288void ThreadPool::addCapacity(int n)
289{
290        FastMutex::ScopedLock lock(_mutex);
291
292        poco_assert (_maxCapacity + n >= _minCapacity);
293        _maxCapacity += n;
294        housekeep();
295}
296
297
298int ThreadPool::capacity() const
299{
300        FastMutex::ScopedLock lock(_mutex);
301        return _maxCapacity;
302}
303
304
305int ThreadPool::available() const
306{
307        FastMutex::ScopedLock lock(_mutex);
308
309        int count = 0;
310        for (ThreadVec::const_iterator it = _threads.begin(); it != _threads.end(); ++it)
311        {
312                if ((*it)->idle()) ++count;
313        }
314        return (int) (count + _maxCapacity - _threads.size());
315}
316
317
318int ThreadPool::used() const
319{
320        FastMutex::ScopedLock lock(_mutex);
321
322        int count = 0;
323        for (ThreadVec::const_iterator it = _threads.begin(); it != _threads.end(); ++it)
324        {
325                if (!(*it)->idle()) ++count;
326        }
327        return count;
328}
329
330
331int ThreadPool::allocated() const
332{
333        FastMutex::ScopedLock lock(_mutex);
334
335        return int(_threads.size());
336}
337
338
339void ThreadPool::start(Runnable& target)
340{
341        getThread()->start(Thread::PRIO_NORMAL, target);
342}
343
344
345void ThreadPool::start(Runnable& target, const std::string& name)
346{
347        getThread()->start(Thread::PRIO_NORMAL, target, name);
348}
349
350
351void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target)
352{
353        getThread()->start(priority, target);
354}
355
356
357void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target, const std::string& name)
358{
359        getThread()->start(priority, target, name);
360}
361
362
363void ThreadPool::stopAll()
364{
365        FastMutex::ScopedLock lock(_mutex);
366
367        for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it)
368        {
369                (*it)->release();
370        }
371        _threads.clear();
372}
373
374
375void ThreadPool::joinAll()
376{
377        FastMutex::ScopedLock lock(_mutex);
378
379        for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it)
380        {
381                (*it)->join();
382        }
383        housekeep();
384}
385
386
387void ThreadPool::collect()
388{
389        FastMutex::ScopedLock lock(_mutex);
390        housekeep();
391}
392
393
394void ThreadPool::housekeep()
395{
396        _age = 0;
397        if (_threads.size() <= _minCapacity)
398                return;
399
400        ThreadVec idleThreads;
401        ThreadVec expiredThreads;
402        ThreadVec activeThreads;
403        idleThreads.reserve(_threads.size());
404        activeThreads.reserve(_threads.size());
405       
406        for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it)
407        {
408                if ((*it)->idle())
409                {
410                        if ((*it)->idleTime() < _idleTime)
411                                idleThreads.push_back(*it);
412                        else 
413                                expiredThreads.push_back(*it); 
414                }
415                else activeThreads.push_back(*it);
416        }
417        int n = (int) activeThreads.size();
418        int limit = (int) idleThreads.size() + n;
419        if (limit < _minCapacity) limit = _minCapacity;
420        idleThreads.insert(idleThreads.end(), expiredThreads.begin(), expiredThreads.end());
421        _threads.clear();
422        for (ThreadVec::iterator it = idleThreads.begin(); it != idleThreads.end(); ++it)
423        {
424                if (n < limit)
425                {
426                        _threads.push_back(*it);
427                        ++n;
428                }
429                else (*it)->release();
430        }
431        _threads.insert(_threads.end(), activeThreads.begin(), activeThreads.end());
432}
433
434
435PooledThread* ThreadPool::getThread()
436{
437        FastMutex::ScopedLock lock(_mutex);
438
439        if (++_age == 32)
440                housekeep();
441
442        PooledThread* pThread = 0;
443        for (ThreadVec::iterator it = _threads.begin(); !pThread && it != _threads.end(); ++it)
444        {
445                if ((*it)->idle()) pThread = *it;
446        }
447        if (!pThread)
448        {
449                if (_threads.size() < _maxCapacity)
450                {
451                        pThread = createThread();
452                        _threads.push_back(pThread);
453                        pThread->start();
454                }
455                else throw NoThreadAvailableException();
456        }
457        pThread->activate();
458        return pThread;
459}
460
461
462PooledThread* ThreadPool::createThread()
463{
464        std::ostringstream name;
465        name << _name << "[#" << ++_serial << "]";
466        return new PooledThread(name.str(), _stackSize);
467}
468
469
470class ThreadPoolSingletonHolder
471{
472public:
473        ThreadPoolSingletonHolder()
474        {
475                _pPool = 0;
476        }
477        ~ThreadPoolSingletonHolder()
478        {
479                delete _pPool;
480        }
481        ThreadPool* pool()
482        {
483                FastMutex::ScopedLock lock(_mutex);
484               
485                if (!_pPool)
486                {
487                        _pPool = new ThreadPool("default");
488                        if (POCO_THREAD_STACK_SIZE > 0)
489                                _pPool->setStackSize(POCO_THREAD_STACK_SIZE);
490                }
491                return _pPool;
492        }
493       
494private:
495        ThreadPool* _pPool;
496        FastMutex   _mutex;
497};
498
499
500ThreadPool& ThreadPool::defaultPool()
501{
502        static ThreadPoolSingletonHolder sh;
503        return *sh.pool();
504}
505
506
507} // namespace Poco
Note: See TracBrowser for help on using the repository browser.