1
0
Fork 0
mirror of https://github.com/AquariaOSE/Aquaria.git synced 2024-12-24 21:55:42 +00:00

Revert "Add a thread pool for background job processing, and use it for OggDecoder."

This reverts commit 010f44d264.

Conflicts:

	BBGE/MT.cpp
	BBGE/MT.h
This commit is contained in:
fgenesis 2012-02-12 03:27:54 +00:00
parent c06e66d3b7
commit 0cd0971671
5 changed files with 8 additions and 461 deletions

View file

@ -42,8 +42,6 @@ BUILD_LINUX
#include "DarkLayer.h"
#include "MT.h"
/*
#ifdef BBGE_BUILD_WINDOWS
#include "Joystick.h"
@ -1315,8 +1313,6 @@ public:
int tgaSave(const char *filename, short int width, short int height, unsigned char pixelDepth, unsigned char *imageData);
ThreadPool threadpool;
protected:
std::string fpsDebugString;

View file

@ -33,7 +33,6 @@ Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
#include "Base.h"
#include "Core.h"
#include "MT.h"
#include "FmodOpenALBridge.h"
@ -109,7 +108,7 @@ private:
int freq;
#ifdef BBGE_BUILD_SDL
Runnable *thread;
SDL_Thread *thread;
#else
#warning Threads not supported, music may cut out on area changes!
// ... because the stream runs out of decoded data while the area is
@ -283,12 +282,11 @@ bool OggDecoder::start(ALuint source, bool loop)
#ifdef BBGE_BUILD_SDL
stop_thread = false;
thread = new FunctionRunnable(false, (void (*)(void *))decode_loop, this);
if (!core->threadpool.addJob(thread))
thread = SDL_CreateThread((int (*)(void *))decode_loop, this);
if (!thread)
{
delete thread;
thread = NULL;
debugLog("Failed to add Ogg Vorbis decoder to thread pool");
debugLog("Failed to create Ogg Vorbis decode thread: "
+ std::string(SDL_GetError()));
}
#endif
@ -324,8 +322,7 @@ void OggDecoder::stop()
if (thread)
{
stop_thread = true;
thread->wait();
delete thread;
SDL_WaitThread(thread, NULL);
thread = NULL;
}
#endif
@ -1310,9 +1307,10 @@ FMOD_RESULT OpenALSystem::release()
for (int i = 0; i < num_channels; i++)
{
const ALuint sid = channels[i].getSourceName();
channels[i].stop();
channels[i].setSourceName(0);
channels[i].setSound(NULL);
alSourceStop(sid);
alSourcei(sid, AL_BUFFER, 0);
alDeleteSources(1, &sid);
}
ALCdevice *dev = alcGetContextsDevice(ctx);

View file

@ -1,283 +0,0 @@
#include "MT.h"
#include "Base.h"
// If more threads are idle than this, start killing excess threads.
// Note: This has nothing to do with the amount of CPU cores.
// The thread pool will create and destroy threads on demand.
const int spareThreads = 8;
// --------- Lockable ----------
Lockable::Lockable()
: _mtx(NULL)
{
#ifdef BBGE_BUILD_SDL
_mtx = SDL_CreateMutex();
#endif
}
Lockable::~Lockable()
{
#ifdef BBGE_BUILD_SDL
SDL_DestroyMutex((SDL_mutex*)_mtx);
#endif
}
void Lockable::lock()
{
#ifdef BBGE_BUILD_SDL
SDL_LockMutex((SDL_mutex*)_mtx);
#endif
}
void Lockable::unlock()
{
#ifdef BBGE_BUILD_SDL
SDL_UnlockMutex((SDL_mutex*)_mtx);
#endif
}
// --------- Waitable ----------
Waitable::Waitable()
: _cond(NULL)
{
#ifdef BBGE_BUILD_SDL
_cond = SDL_CreateCond();
#endif
}
Waitable::~Waitable()
{
#ifdef BBGE_BUILD_SDL
SDL_DestroyCond((SDL_cond*)_cond);
#endif
}
void Waitable::wait()
{
#ifdef BBGE_BUILD_SDL
SDL_CondWait((SDL_cond*)_cond, (SDL_mutex*)mutex());
#endif
}
void Waitable::signal()
{
#ifdef BBGE_BUILD_SDL
SDL_CondSignal((SDL_cond*)_cond);
#endif
}
void Waitable::broadcast()
{
#ifdef BBGE_BUILD_SDL
SDL_CondBroadcast((SDL_cond*)_cond);
#endif
}
// --------- Runnable ----------
void Runnable::wait()
{
MTGuard(this);
while (!_done)
Waitable::wait();
}
void Runnable::run()
{
_run(); // this is the job's entry point
lock();
_done = true;
// we may get deleted by another thread directly after unlock(), have to save this on the stack
volatile bool suicide = _suicide;
broadcast(); // this accesses _cond, and must be done before unlock() too, same reason
unlock();
if (suicide)
delete this;
}
// --------- ThreadPool ----------
static int threadpool_runner(void *ptr)
{
ThreadPool *pool = (ThreadPool*)ptr;
pool->_runThread();
return 0;
}
ThreadPool::ThreadPool()
: _quit(false), _idleThreads(0)
{
// If we fail to create a few threads right from the start, fall back to single-threaded mode
if (!ensureFreeThreads(spareThreads))
{
debugLog("ThreadPool: Failed to spawn initial threads");
shutdown();
}
}
void ThreadPool::shutdown()
{
lock();
_quit = true;
while (_threads.size())
{
_jobs.push(NULL);
wait();
}
unlock();
cleanupDeadThreads();
}
ThreadPool::~ThreadPool()
{
shutdown();
}
void ThreadPool::waitForThread(void *th)
{
#ifdef BBGE_BUILD_SDL
SDL_WaitThread((SDL_Thread*)th, NULL);
#endif
}
void *ThreadPool::u_createThread()
{
void *th = NULL;
#ifdef BBGE_BUILD_SDL
th = SDL_CreateThread(threadpool_runner, this);
#endif
if (!th)
return NULL;
u_addThread(th);
return th;
}
bool ThreadPool::u_ensureFreeThreads(int c)
{
if (_quit)
return false;
for (int i = _idleThreads; i < c; ++i)
if (!u_createThread())
return false;
return true;
}
bool ThreadPool::ensureFreeThreads(int c)
{
MTGuard(this);
return u_ensureFreeThreads(c);
}
bool ThreadPool::addJob(Runnable *job, RunnablePriority rp /* = RP_START */)
{
if (!job)
return false;
{
MTGuard(this);
if (_quit)
return false;
if (rp == RP_START)
{
if (!u_ensureFreeThreads(1))
return false; // unable to start job right away, reject
}
if (_idleThreads > spareThreads)
_jobs.push(NULL); // kill one excess thread
}
_jobs.push(job);
cleanupDeadThreads();
return true;
}
void ThreadPool::incrIdle()
{
MTGuard(this);
_idleThreads++;
}
void ThreadPool::decrIdle()
{
MTGuard(this);
_idleThreads--;
}
void ThreadPool::u_addThread(void *th)
{
#ifdef BBGE_BUILD_SDL
_threads[SDL_GetThreadID((SDL_Thread*)th)] = th;
#endif
}
// called by one thread to notify the pool that it is about to die off
void ThreadPool::removeSelf()
{
int id = -1;
#if BBGE_BUILD_SDL
id = SDL_ThreadID();
#endif
MTGuard(this);
std::map<int, void*>::iterator it = _threads.find(id);
if (it != _threads.end())
{
// still needs to be collected by *another* thread.
// the problem here is that only using SDL_WaitThread() frees resources,
// but this thread can't wait for itself (otherwise deadlock).
_deadThreads.push_back(it->second);
_threads.erase(it); // no longer active
}
}
void ThreadPool::cleanupDeadThreads()
{
lock();
if (_deadThreads.size() > 1)
{
std::vector<void*> copy = _deadThreads;
_deadThreads.clear();
unlock();
// we know these threads are dead, but to minimize potential wait time
// while a thread is still dying, operate on a copy.
for (size_t i = 0; i < copy.size(); ++i)
waitForThread(copy[i]);
}
else if (_deadThreads.size() == 1) // this case is very likely, treat this specially to keep copying down
{
void *th = _deadThreads[0];
_deadThreads.clear();
unlock();
waitForThread(th);
}
else
unlock();
}
void ThreadPool::_runThread()
{
while (!_quit)
{
incrIdle(); // going to wait until we get a job
Runnable *job = _jobs.pop(); // this blocks until a job is available
decrIdle(); // .. time to work.
if (job)
job->run();
else
break; // use NULL as suicide signal
}
removeSelf(); // register thread for garbage
broadcast(); // tell the pool we are dying off
}

163
BBGE/MT.h
View file

@ -1,163 +0,0 @@
#ifndef BBGE_MT_H
#define BBGE_MT_H
#include <cstddef>
#include <queue>
#include <map>
class Lockable
{
public:
Lockable();
virtual ~Lockable();
void lock();
void unlock();
protected:
inline void *mutex() { return _mtx; }
private:
void *_mtx;
};
class Waitable : public Lockable
{
public:
Waitable();
virtual ~Waitable();
void wait(); // releases the associated lock while waiting
void signal(); // signal a single waiting thread
void broadcast(); // signal all waiting threads
private:
void *_cond;
};
class MTGuard
{
public:
MTGuard(Lockable& x) : _obj(&x) { _obj->lock(); }
MTGuard(Lockable* x) : _obj(x) { _obj->lock(); }
~MTGuard() { _obj->unlock(); }
Lockable *_obj;
};
template <typename T> class BlockingQueue : public Waitable
{
public:
void push(const T& e)
{
lock();
_q.push(e);
unlock();
signal();
}
T pop() // blocks if empty
{
lock();
while(_q.empty())
wait();
T ret = _q.front();
_q.pop();
unlock();
return ret;
}
private:
std::queue<T> _q;
};
class ThreadPool;
// To be distributed to workers in ThreadPool.
class Runnable : public Waitable
{
public:
Runnable(bool suicide) : _done(false), _shouldquit(false), _suicide(suicide) {};
virtual ~Runnable() {}
void run();
inline void quit() { _shouldquit = true; }
inline bool shouldQuit() const { return _shouldquit; }
inline bool isDone() const { return _done; }
virtual void wait();
protected:
virtual void _run() = 0; // to be overloaded
private:
volatile bool _done;
volatile bool _shouldquit;
bool _suicide;
};
class FunctionRunnable : public Runnable
{
public:
typedef void (*Func)(void*);
FunctionRunnable(bool suicide, Func f, void *ptr = NULL)
: Runnable(suicide), _func(f), _param1(ptr) {}
virtual ~FunctionRunnable() {}
protected:
virtual void _run()
{
_func(_param1);
}
Func _func;
void *_param1;
};
enum RunnablePriority
{
RP_ENQUEUE = -1, // just enqueue and let a thread run it when one gets free
RP_START = 0, // execute right away, start another thread if none is free
};
class ThreadPool : protected Waitable
{
public:
ThreadPool();
~ThreadPool();
// Push a job to the queue, by default starting a thread to execute it, if necessary.
// If adding the job failed for some reason, the caller can use job->run() to execute it manually.
// When a job is done, it will broadcast.
bool addJob(Runnable *job, RunnablePriority rp = RP_START);
// Have at least c threads idle, otherwise spawn some until c are idle.
bool ensureFreeThreads(int c);
// The ThreadPool will no longer accept any jobs after this was called, and signal threads to stop.
// All jobs still left in the queue will be executed.
// Waits for all threads to exit.
void shutdown();
void _runThread();
private:
// these lock 'this', do not call while already holding the lock
void incrIdle();
void decrIdle();
void waitForThread(void*);
void cleanupDeadThreads();
void removeSelf();
// need to have 'this' locked when calling these
void *u_createThread();
void u_addThread(void*);
bool u_ensureFreeThreads(int c);
BlockingQueue<Runnable*> _jobs;
std::map<int, void*> _threads;
std::vector<void*> _deadThreads; // necessary to clean up
volatile bool _quit;
volatile int _idleThreads;
};
#endif

View file

@ -387,7 +387,6 @@ SET(BBGE_SRCS
${BBGEDIR}/LightCone.cpp
${BBGEDIR}/Light.cpp
${BBGEDIR}/Math.cpp
${BBGEDIR}/MT.cpp
${BBGEDIR}/ParticleEffect.cpp
${BBGEDIR}/ParticleManager.cpp
${BBGEDIR}/Particles.cpp