diff --git a/BBGE/Core.h b/BBGE/Core.h index e9ccede..482cb9d 100644 --- a/BBGE/Core.h +++ b/BBGE/Core.h @@ -42,6 +42,8 @@ BUILD_LINUX #include "DarkLayer.h" +#include "MT.h" + /* #ifdef BBGE_BUILD_WINDOWS #include "Joystick.h" @@ -1313,6 +1315,8 @@ public: int tgaSave(const char *filename, short int width, short int height, unsigned char pixelDepth, unsigned char *imageData); + ThreadPool threadpool; + protected: std::string fpsDebugString; diff --git a/BBGE/FmodOpenALBridge.cpp b/BBGE/FmodOpenALBridge.cpp index 1d95bd1..2897033 100644 --- a/BBGE/FmodOpenALBridge.cpp +++ b/BBGE/FmodOpenALBridge.cpp @@ -33,6 +33,7 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. #include "Base.h" #include "Core.h" +#include "MT.h" #include "FmodOpenALBridge.h" @@ -108,7 +109,7 @@ private: int freq; #ifdef BBGE_BUILD_SDL - SDL_Thread *thread; + Runnable *thread; #else #warning Threads not supported, music may cut out on area changes! // ... because the stream runs out of decoded data while the area is @@ -282,11 +283,12 @@ bool OggDecoder::start(ALuint source, bool loop) #ifdef BBGE_BUILD_SDL stop_thread = false; - thread = SDL_CreateThread((int (*)(void *))decode_loop, this); - if (!thread) + thread = new FunctionRunnable(false, (void (*)(void *))decode_loop, this); + if (!core->threadpool.addJob(thread)) { - debugLog("Failed to create Ogg Vorbis decode thread: " - + std::string(SDL_GetError())); + delete thread; + thread = NULL; + debugLog("Failed to add Ogg Vorbis decoder to thread pool"); } #endif @@ -322,7 +324,8 @@ void OggDecoder::stop() if (thread) { stop_thread = true; - SDL_WaitThread(thread, NULL); + thread->wait(); + delete thread; thread = NULL; } #endif @@ -1313,10 +1316,9 @@ FMOD_RESULT OpenALSystem::release() for (int i = 0; i < num_channels; i++) { const ALuint sid = channels[i].getSourceName(); + channels[i].stop(); channels[i].setSourceName(0); channels[i].setSound(NULL); - alSourceStop(sid); - alSourcei(sid, AL_BUFFER, 0); alDeleteSources(1, &sid); } ALCdevice *dev = alcGetContextsDevice(ctx); diff --git a/BBGE/MT.cpp b/BBGE/MT.cpp new file mode 100644 index 0000000..2ef61f3 --- /dev/null +++ b/BBGE/MT.cpp @@ -0,0 +1,283 @@ +#include "MT.h" +#include "Base.h" + + +// If more threads are idle than this, start killing excess threads. +// Note: This has nothing to do with the amount of CPU cores. +// The thread pool will create and destroy threads on demand. +const int spareThreads = 8; + + +// --------- Lockable ---------- + +Lockable::Lockable() +: _mtx(NULL) +{ +#ifdef BBGE_BUILD_SDL + _mtx = SDL_CreateMutex(); +#endif +} + +Lockable::~Lockable() +{ +#ifdef BBGE_BUILD_SDL + SDL_DestroyMutex((SDL_mutex*)_mtx); +#endif +} + +void Lockable::lock() +{ +#ifdef BBGE_BUILD_SDL + SDL_LockMutex((SDL_mutex*)_mtx); +#endif +} + +void Lockable::unlock() +{ +#ifdef BBGE_BUILD_SDL + SDL_UnlockMutex((SDL_mutex*)_mtx); +#endif +} + +// --------- Waitable ---------- + +Waitable::Waitable() +: _cond(NULL) +{ +#ifdef BBGE_BUILD_SDL + _cond = SDL_CreateCond(); +#endif +} + +Waitable::~Waitable() +{ +#ifdef BBGE_BUILD_SDL + SDL_DestroyCond((SDL_cond*)_cond); +#endif +} + +void Waitable::wait() +{ +#ifdef BBGE_BUILD_SDL + SDL_CondWait((SDL_cond*)_cond, (SDL_mutex*)mutex()); +#endif +} + +void Waitable::signal() +{ +#ifdef BBGE_BUILD_SDL + SDL_CondSignal((SDL_cond*)_cond); +#endif +} + +void Waitable::broadcast() +{ +#ifdef BBGE_BUILD_SDL + SDL_CondBroadcast((SDL_cond*)_cond); +#endif +} + +// --------- Runnable ---------- + +void Runnable::wait() +{ + MTGuard(this); + while (!_done) + Waitable::wait(); +} + +void Runnable::run() +{ + _run(); // this is the job's entry point + + lock(); + _done = true; + // we may get deleted by another thread directly after unlock(), have to save this on the stack + volatile bool suicide = _suicide; + broadcast(); // this accesses _cond, and must be done before unlock() too, same reason + unlock(); + + if (suicide) + delete this; +} + + +// --------- ThreadPool ---------- + +static int threadpool_runner(void *ptr) +{ + ThreadPool *pool = (ThreadPool*)ptr; + pool->_runThread(); + return 0; +} + +ThreadPool::ThreadPool() +: _quit(false), _idleThreads(0) +{ + // If we fail to create a few threads right from the start, fall back to single-threaded mode + if (!ensureFreeThreads(spareThreads)) + { + debugLog("ThreadPool: Failed to spawn initial threads"); + shutdown(); + } +} + +void ThreadPool::shutdown() +{ + lock(); + _quit = true; + while (_threads.size()) + { + _jobs.push(NULL); + wait(); + } + unlock(); + + cleanupDeadThreads(); +} + +ThreadPool::~ThreadPool() +{ + shutdown(); +} + +void ThreadPool::waitForThread(void *th) +{ +#ifdef BBGE_BUILD_SDL + SDL_WaitThread((SDL_Thread*)th, NULL); +#endif +} + +void *ThreadPool::u_createThread() +{ + void *th = NULL; +#ifdef BBGE_BUILD_SDL + th = SDL_CreateThread(threadpool_runner, this); +#endif + if (!th) + return false; + u_addThread(th); + return th; +} + +bool ThreadPool::u_ensureFreeThreads(int c) +{ + if (_quit) + return false; + + for (int i = _idleThreads; i < c; ++i) + if (!u_createThread()) + return false; + + return true; +} + +bool ThreadPool::ensureFreeThreads(int c) +{ + MTGuard(this); + return u_ensureFreeThreads(c); +} + +bool ThreadPool::addJob(Runnable *job, RunnablePriority rp /* = RP_START */) +{ + if (!job) + return false; + + { + MTGuard(this); + if (_quit) + return false; + if (rp == RP_START) + { + if (!u_ensureFreeThreads(1)) + return false; // unable to start job right away, reject + } + if (_idleThreads > spareThreads) + _jobs.push(NULL); // kill one excess thread + } + + _jobs.push(job); + cleanupDeadThreads(); + return true; +} + +void ThreadPool::incrIdle() +{ + MTGuard(this); + _idleThreads++; +} + +void ThreadPool::decrIdle() +{ + MTGuard(this); + _idleThreads--; +} + +void ThreadPool::u_addThread(void *th) +{ +#ifdef BBGE_BUILD_SDL + _threads[SDL_GetThreadID((SDL_Thread*)th)] = th; +#endif +} + +// called by one thread to notify the pool that it is about to die off +void ThreadPool::removeSelf() +{ + int id = -1; +#if BBGE_BUILD_SDL + id = SDL_ThreadID(); +#endif + + MTGuard(this); + std::map::iterator it = _threads.find(id); + if (it != _threads.end()) + { + // still needs to be collected by *another* thread. + // the problem here is that only using SDL_WaitThread() frees resources, + // but this thread can't wait for itself (otherwise deadlock). + _deadThreads.push_back(it->second); + _threads.erase(it); // no longer active + } +} + +void ThreadPool::cleanupDeadThreads() +{ + lock(); + if (_deadThreads.size() > 1) + { + std::vector copy = _deadThreads; + _deadThreads.clear(); + unlock(); + + // we know these threads are dead, but to minimize potential wait time + // while a thread is still dying, operate on a copy. + for (size_t i = 0; i < copy.size(); ++i) + waitForThread(copy[i]); + } + else if (_deadThreads.size() == 1) // this case is very likely, treat this specially to keep copying down + { + void *th = _deadThreads[0]; + _deadThreads.clear(); + unlock(); + waitForThread(th); + } + else + unlock(); +} + +void ThreadPool::_runThread() +{ + while (!_quit) + { + incrIdle(); // going to wait until we get a job + Runnable *job = _jobs.pop(); // this blocks until a job is available + decrIdle(); // .. time to work. + if (job) + job->run(); + else + break; // use NULL as suicide signal + } + removeSelf(); // register thread for garbage + broadcast(); // tell the pool we are dying off +} + diff --git a/BBGE/MT.h b/BBGE/MT.h new file mode 100644 index 0000000..5a27dcc --- /dev/null +++ b/BBGE/MT.h @@ -0,0 +1,162 @@ +#ifndef BBGE_MT_H +#define BBGE_MT_H + +#include +#include + +class Lockable +{ +public: + Lockable(); + virtual ~Lockable(); + void lock(); + void unlock(); + +protected: + inline void *mutex() { return _mtx; } + +private: + void *_mtx; +}; + +class Waitable : public Lockable +{ +public: + Waitable(); + virtual ~Waitable(); + void wait(); // releases the associated lock while waiting + void signal(); // signal a single waiting thread + void broadcast(); // signal all waiting threads + +private: + void *_cond; +}; + +class MTGuard +{ +public: + MTGuard(Lockable& x) : _obj(&x) { _obj->lock(); } + MTGuard(Lockable* x) : _obj(x) { _obj->lock(); } + ~MTGuard() { _obj->unlock(); } + Lockable *_obj; +}; + +template class BlockingQueue : public Waitable +{ +public: + void push(const T& e) + { + lock(); + _q.push(e); + unlock(); + signal(); + } + T pop() // blocks if empty + { + lock(); + while(_q.empty()) + wait(); + T ret = _q.front(); + _q.pop(); + unlock(); + return ret; + } +private: + std::queue _q; +}; + +class ThreadPool; + +// To be distributed to workers in ThreadPool. +class Runnable : public Waitable +{ +public: + Runnable(bool suicide) : _done(false), _shouldquit(false), _suicide(suicide) {}; + virtual ~Runnable() {} + + void run(); + inline void quit() { _shouldquit = true; } + inline bool shouldQuit() const { return _shouldquit; } + inline bool isDone() const { return _done; } + + virtual void wait(); + +protected: + virtual void _run() = 0; // to be overloaded + +private: + volatile bool _done; + volatile bool _shouldquit; + bool _suicide; +}; + + +class FunctionRunnable : public Runnable +{ +public: + typedef void (*Func)(void*); + + FunctionRunnable(bool suicide, Func f, void *ptr = NULL) + : Runnable(suicide), _func(f), _param1(ptr) {} + + virtual ~FunctionRunnable() {} + +protected: + virtual void _run() + { + _func(_param1); + } + + Func _func; + void *_param1; +}; + +enum RunnablePriority +{ + RP_ENQUEUE = -1, // just enqueue and let a thread run it when one gets free + RP_START = 0, // execute right away, start another thread if none is free +}; + +class ThreadPool : protected Waitable +{ +public: + ThreadPool(); + ~ThreadPool(); + + // Push a job to the queue, by default starting a thread to execute it, if necessary. + // If adding the job failed for some reason, the caller can use job->run() to execute it manually. + // When a job is done, it will broadcast. + bool addJob(Runnable *job, RunnablePriority rp = RP_START); + + // Have at least c threads idle, otherwise spawn some until c are idle. + bool ensureFreeThreads(int c); + + // The ThreadPool will no longer accept any jobs after this was called, and signal threads to stop. + // All jobs still left in the queue will be executed. + // Waits for all threads to exit. + void shutdown(); + + void _runThread(); + +private: + // these lock 'this', do not call while already holding the lock + void incrIdle(); + void decrIdle(); + void waitForThread(void*); + void cleanupDeadThreads(); + void removeSelf(); + + // need to have 'this' locked when calling these + void *u_createThread(); + void u_addThread(void*); + bool u_ensureFreeThreads(int c); + + BlockingQueue _jobs; + std::map _threads; + std::vector _deadThreads; // necessary to clean up + volatile bool _quit; + volatile int _idleThreads; +}; + + +#endif diff --git a/CMakeLists.txt b/CMakeLists.txt index e2e8a14..86ed884 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -376,6 +376,7 @@ SET(BBGE_SRCS ${BBGEDIR}/LightCone.cpp ${BBGEDIR}/Light.cpp ${BBGEDIR}/Math.cpp + ${BBGEDIR}/MT.cpp ${BBGEDIR}/ParticleEffect.cpp ${BBGEDIR}/ParticleManager.cpp ${BBGEDIR}/Particles.cpp