1
0
Fork 0
mirror of https://github.com/AquariaOSE/Aquaria.git synced 2025-08-08 15:19:50 +00:00

Add a thread pool for background job processing, and use it for OggDecoder.

The pool adjusts to the amount of required threads.
The implementation is as simple as possible, but should be enough
for future extensions.

This brings down decoder thread creation/destruction even more.

Also fix OpenALSystem::release() to close channels properly on shutdown,
otherwise it would deadlock with the pool, because it waits until all
threads have died off, which did not necessarily happen.
This commit is contained in:
fgenesis 2012-02-09 16:08:35 +01:00
parent 9e675be651
commit 010f44d264
5 changed files with 460 additions and 8 deletions

View file

@ -42,6 +42,8 @@ BUILD_LINUX
#include "DarkLayer.h"
#include "MT.h"
/*
#ifdef BBGE_BUILD_WINDOWS
#include "Joystick.h"
@ -1313,6 +1315,8 @@ public:
int tgaSave(const char *filename, short int width, short int height, unsigned char pixelDepth, unsigned char *imageData);
ThreadPool threadpool;
protected:
std::string fpsDebugString;

View file

@ -33,6 +33,7 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#include "Base.h"
#include "Core.h"
#include "MT.h"
#include "FmodOpenALBridge.h"
@ -108,7 +109,7 @@ private:
int freq;
#ifdef BBGE_BUILD_SDL
SDL_Thread *thread;
Runnable *thread;
#else
#warning Threads not supported, music may cut out on area changes!
// ... because the stream runs out of decoded data while the area is
@ -282,11 +283,12 @@ bool OggDecoder::start(ALuint source, bool loop)
#ifdef BBGE_BUILD_SDL
stop_thread = false;
thread = SDL_CreateThread((int (*)(void *))decode_loop, this);
if (!thread)
thread = new FunctionRunnable(false, (void (*)(void *))decode_loop, this);
if (!core->threadpool.addJob(thread))
{
debugLog("Failed to create Ogg Vorbis decode thread: "
+ std::string(SDL_GetError()));
delete thread;
thread = NULL;
debugLog("Failed to add Ogg Vorbis decoder to thread pool");
}
#endif
@ -322,7 +324,8 @@ void OggDecoder::stop()
if (thread)
{
stop_thread = true;
SDL_WaitThread(thread, NULL);
thread->wait();
delete thread;
thread = NULL;
}
#endif
@ -1313,10 +1316,9 @@ FMOD_RESULT OpenALSystem::release()
for (int i = 0; i < num_channels; i++)
{
const ALuint sid = channels[i].getSourceName();
channels[i].stop();
channels[i].setSourceName(0);
channels[i].setSound(NULL);
alSourceStop(sid);
alSourcei(sid, AL_BUFFER, 0);
alDeleteSources(1, &sid);
}
ALCdevice *dev = alcGetContextsDevice(ctx);

283
BBGE/MT.cpp Normal file
View file

@ -0,0 +1,283 @@
#include "MT.h"
#include "Base.h"
// If more threads are idle than this, start killing excess threads.
// Note: This has nothing to do with the amount of CPU cores.
// The thread pool will create and destroy threads on demand.
const int spareThreads = 8;
// --------- Lockable ----------
Lockable::Lockable()
: _mtx(NULL)
{
#ifdef BBGE_BUILD_SDL
_mtx = SDL_CreateMutex();
#endif
}
Lockable::~Lockable()
{
#ifdef BBGE_BUILD_SDL
SDL_DestroyMutex((SDL_mutex*)_mtx);
#endif
}
void Lockable::lock()
{
#ifdef BBGE_BUILD_SDL
SDL_LockMutex((SDL_mutex*)_mtx);
#endif
}
void Lockable::unlock()
{
#ifdef BBGE_BUILD_SDL
SDL_UnlockMutex((SDL_mutex*)_mtx);
#endif
}
// --------- Waitable ----------
Waitable::Waitable()
: _cond(NULL)
{
#ifdef BBGE_BUILD_SDL
_cond = SDL_CreateCond();
#endif
}
Waitable::~Waitable()
{
#ifdef BBGE_BUILD_SDL
SDL_DestroyCond((SDL_cond*)_cond);
#endif
}
void Waitable::wait()
{
#ifdef BBGE_BUILD_SDL
SDL_CondWait((SDL_cond*)_cond, (SDL_mutex*)mutex());
#endif
}
void Waitable::signal()
{
#ifdef BBGE_BUILD_SDL
SDL_CondSignal((SDL_cond*)_cond);
#endif
}
void Waitable::broadcast()
{
#ifdef BBGE_BUILD_SDL
SDL_CondBroadcast((SDL_cond*)_cond);
#endif
}
// --------- Runnable ----------
void Runnable::wait()
{
MTGuard(this);
while (!_done)
Waitable::wait();
}
void Runnable::run()
{
_run(); // this is the job's entry point
lock();
_done = true;
// we may get deleted by another thread directly after unlock(), have to save this on the stack
volatile bool suicide = _suicide;
broadcast(); // this accesses _cond, and must be done before unlock() too, same reason
unlock();
if (suicide)
delete this;
}
// --------- ThreadPool ----------
static int threadpool_runner(void *ptr)
{
ThreadPool *pool = (ThreadPool*)ptr;
pool->_runThread();
return 0;
}
ThreadPool::ThreadPool()
: _quit(false), _idleThreads(0)
{
// If we fail to create a few threads right from the start, fall back to single-threaded mode
if (!ensureFreeThreads(spareThreads))
{
debugLog("ThreadPool: Failed to spawn initial threads");
shutdown();
}
}
void ThreadPool::shutdown()
{
lock();
_quit = true;
while (_threads.size())
{
_jobs.push(NULL);
wait();
}
unlock();
cleanupDeadThreads();
}
ThreadPool::~ThreadPool()
{
shutdown();
}
void ThreadPool::waitForThread(void *th)
{
#ifdef BBGE_BUILD_SDL
SDL_WaitThread((SDL_Thread*)th, NULL);
#endif
}
void *ThreadPool::u_createThread()
{
void *th = NULL;
#ifdef BBGE_BUILD_SDL
th = SDL_CreateThread(threadpool_runner, this);
#endif
if (!th)
return false;
u_addThread(th);
return th;
}
bool ThreadPool::u_ensureFreeThreads(int c)
{
if (_quit)
return false;
for (int i = _idleThreads; i < c; ++i)
if (!u_createThread())
return false;
return true;
}
bool ThreadPool::ensureFreeThreads(int c)
{
MTGuard(this);
return u_ensureFreeThreads(c);
}
bool ThreadPool::addJob(Runnable *job, RunnablePriority rp /* = RP_START */)
{
if (!job)
return false;
{
MTGuard(this);
if (_quit)
return false;
if (rp == RP_START)
{
if (!u_ensureFreeThreads(1))
return false; // unable to start job right away, reject
}
if (_idleThreads > spareThreads)
_jobs.push(NULL); // kill one excess thread
}
_jobs.push(job);
cleanupDeadThreads();
return true;
}
void ThreadPool::incrIdle()
{
MTGuard(this);
_idleThreads++;
}
void ThreadPool::decrIdle()
{
MTGuard(this);
_idleThreads--;
}
void ThreadPool::u_addThread(void *th)
{
#ifdef BBGE_BUILD_SDL
_threads[SDL_GetThreadID((SDL_Thread*)th)] = th;
#endif
}
// called by one thread to notify the pool that it is about to die off
void ThreadPool::removeSelf()
{
int id = -1;
#if BBGE_BUILD_SDL
id = SDL_ThreadID();
#endif
MTGuard(this);
std::map<int, void*>::iterator it = _threads.find(id);
if (it != _threads.end())
{
// still needs to be collected by *another* thread.
// the problem here is that only using SDL_WaitThread() frees resources,
// but this thread can't wait for itself (otherwise deadlock).
_deadThreads.push_back(it->second);
_threads.erase(it); // no longer active
}
}
void ThreadPool::cleanupDeadThreads()
{
lock();
if (_deadThreads.size() > 1)
{
std::vector<void*> copy = _deadThreads;
_deadThreads.clear();
unlock();
// we know these threads are dead, but to minimize potential wait time
// while a thread is still dying, operate on a copy.
for (size_t i = 0; i < copy.size(); ++i)
waitForThread(copy[i]);
}
else if (_deadThreads.size() == 1) // this case is very likely, treat this specially to keep copying down
{
void *th = _deadThreads[0];
_deadThreads.clear();
unlock();
waitForThread(th);
}
else
unlock();
}
void ThreadPool::_runThread()
{
while (!_quit)
{
incrIdle(); // going to wait until we get a job
Runnable *job = _jobs.pop(); // this blocks until a job is available
decrIdle(); // .. time to work.
if (job)
job->run();
else
break; // use NULL as suicide signal
}
removeSelf(); // register thread for garbage
broadcast(); // tell the pool we are dying off
}

162
BBGE/MT.h Normal file
View file

@ -0,0 +1,162 @@
#ifndef BBGE_MT_H
#define BBGE_MT_H
#include <queue>
#include <map>
class Lockable
{
public:
Lockable();
virtual ~Lockable();
void lock();
void unlock();
protected:
inline void *mutex() { return _mtx; }
private:
void *_mtx;
};
class Waitable : public Lockable
{
public:
Waitable();
virtual ~Waitable();
void wait(); // releases the associated lock while waiting
void signal(); // signal a single waiting thread
void broadcast(); // signal all waiting threads
private:
void *_cond;
};
class MTGuard
{
public:
MTGuard(Lockable& x) : _obj(&x) { _obj->lock(); }
MTGuard(Lockable* x) : _obj(x) { _obj->lock(); }
~MTGuard() { _obj->unlock(); }
Lockable *_obj;
};
template <typename T> class BlockingQueue : public Waitable
{
public:
void push(const T& e)
{
lock();
_q.push(e);
unlock();
signal();
}
T pop() // blocks if empty
{
lock();
while(_q.empty())
wait();
T ret = _q.front();
_q.pop();
unlock();
return ret;
}
private:
std::queue<T> _q;
};
class ThreadPool;
// To be distributed to workers in ThreadPool.
class Runnable : public Waitable
{
public:
Runnable(bool suicide) : _done(false), _shouldquit(false), _suicide(suicide) {};
virtual ~Runnable() {}
void run();
inline void quit() { _shouldquit = true; }
inline bool shouldQuit() const { return _shouldquit; }
inline bool isDone() const { return _done; }
virtual void wait();
protected:
virtual void _run() = 0; // to be overloaded
private:
volatile bool _done;
volatile bool _shouldquit;
bool _suicide;
};
class FunctionRunnable : public Runnable
{
public:
typedef void (*Func)(void*);
FunctionRunnable(bool suicide, Func f, void *ptr = NULL)
: Runnable(suicide), _func(f), _param1(ptr) {}
virtual ~FunctionRunnable() {}
protected:
virtual void _run()
{
_func(_param1);
}
Func _func;
void *_param1;
};
enum RunnablePriority
{
RP_ENQUEUE = -1, // just enqueue and let a thread run it when one gets free
RP_START = 0, // execute right away, start another thread if none is free
};
class ThreadPool : protected Waitable
{
public:
ThreadPool();
~ThreadPool();
// Push a job to the queue, by default starting a thread to execute it, if necessary.
// If adding the job failed for some reason, the caller can use job->run() to execute it manually.
// When a job is done, it will broadcast.
bool addJob(Runnable *job, RunnablePriority rp = RP_START);
// Have at least c threads idle, otherwise spawn some until c are idle.
bool ensureFreeThreads(int c);
// The ThreadPool will no longer accept any jobs after this was called, and signal threads to stop.
// All jobs still left in the queue will be executed.
// Waits for all threads to exit.
void shutdown();
void _runThread();
private:
// these lock 'this', do not call while already holding the lock
void incrIdle();
void decrIdle();
void waitForThread(void*);
void cleanupDeadThreads();
void removeSelf();
// need to have 'this' locked when calling these
void *u_createThread();
void u_addThread(void*);
bool u_ensureFreeThreads(int c);
BlockingQueue<Runnable*> _jobs;
std::map<int, void*> _threads;
std::vector<void*> _deadThreads; // necessary to clean up
volatile bool _quit;
volatile int _idleThreads;
};
#endif

View file

@ -376,6 +376,7 @@ SET(BBGE_SRCS
${BBGEDIR}/LightCone.cpp
${BBGEDIR}/Light.cpp
${BBGEDIR}/Math.cpp
${BBGEDIR}/MT.cpp
${BBGEDIR}/ParticleEffect.cpp
${BBGEDIR}/ParticleManager.cpp
${BBGEDIR}/Particles.cpp