libbpg/x265/source/common/threadpool.h
2015-10-27 11:46:00 +01:00

175 lines
5.9 KiB
C++

/*****************************************************************************
* Copyright (C) 2013 x265 project
*
* Authors: Steve Borho <steve@borho.org>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation; either version 2 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02111, USA.
*
* This program is also available under a commercial proprietary license.
* For more information, contact us at license @ x265.com
*****************************************************************************/
#ifndef X265_THREADPOOL_H
#define X265_THREADPOOL_H
#include "common.h"
#include "threading.h"
namespace X265_NS {
// x265 private namespace
class ThreadPool;
class WorkerThread;
class BondedTaskGroup;
#if X86_64
typedef uint64_t sleepbitmap_t;
#else
typedef uint32_t sleepbitmap_t;
#endif
static const sleepbitmap_t ALL_POOL_THREADS = (sleepbitmap_t)-1;
enum { MAX_POOL_THREADS = sizeof(sleepbitmap_t) * 8 };
enum { INVALID_SLICE_PRIORITY = 10 }; // a value larger than any X265_TYPE_* macro
// Frame level job providers. FrameEncoder and Lookahead derive from
// this class and implement findJob()
class JobProvider
{
public:
ThreadPool* m_pool;
sleepbitmap_t m_ownerBitmap;
int m_jpId;
int m_sliceType;
bool m_helpWanted;
bool m_isFrameEncoder; /* rather ugly hack, but nothing better presents itself */
JobProvider()
: m_pool(NULL)
, m_ownerBitmap(0)
, m_jpId(-1)
, m_sliceType(INVALID_SLICE_PRIORITY)
, m_helpWanted(false)
, m_isFrameEncoder(false)
{}
virtual ~JobProvider() {}
// Worker threads will call this method to perform work
virtual void findJob(int workerThreadId) = 0;
// Will awaken one idle thread, preferring a thread which most recently
// performed work for this provider.
void tryWakeOne();
};
class ThreadPool
{
public:
sleepbitmap_t m_sleepBitmap;
int m_numProviders;
int m_numWorkers;
void* m_numaMask; // node mask in linux, cpu mask in windows
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= _WIN32_WINNT_WIN7
DWORD m_winCpuMask;
#endif
bool m_isActive;
JobProvider** m_jpTable;
WorkerThread* m_workers;
ThreadPool();
~ThreadPool();
bool create(int numThreads, int maxProviders, uint32_t nodeMask);
bool start();
void stopWorkers();
void setCurrentThreadAffinity();
int tryAcquireSleepingThread(sleepbitmap_t firstTryBitmap, sleepbitmap_t secondTryBitmap);
int tryBondPeers(int maxPeers, sleepbitmap_t peerBitmap, BondedTaskGroup& master);
static ThreadPool* allocThreadPools(x265_param* p, int& numPools);
static int getCpuCount();
static int getNumaNodeCount();
static void setThreadNodeAffinity(void *numaMask);
};
/* Any worker thread may enlist the help of idle worker threads from the same
* job provider. They must derive from this class and implement the
* processTasks() method. To use, an instance must be instantiated by a worker
* thread (referred to as the master thread) and then tryBondPeers() must be
* called. If it returns non-zero then some number of slave worker threads are
* already in the process of calling your processTasks() function. The master
* thread should participate and call processTasks() itself. When
* waitForExit() returns, all bonded peer threads are quarunteed to have
* exitied processTasks(). Since the thread count is small, it uses explicit
* locking instead of atomic counters and bitmasks */
class BondedTaskGroup
{
public:
Lock m_lock;
ThreadSafeInteger m_exitedPeerCount;
int m_bondedPeerCount;
int m_jobTotal;
int m_jobAcquired;
BondedTaskGroup() { m_bondedPeerCount = m_jobTotal = m_jobAcquired = 0; }
/* Do not allow the instance to be destroyed before all bonded peers have
* exited processTasks() */
~BondedTaskGroup() { waitForExit(); }
/* Try to enlist the help of idle worker threads on most recently associated
* with the given job provider and "bond" them to work on your tasks. Up to
* maxPeers worker threads will call your processTasks() method. */
int tryBondPeers(JobProvider& jp, int maxPeers)
{
int count = jp.m_pool->tryBondPeers(maxPeers, jp.m_ownerBitmap, *this);
m_bondedPeerCount += count;
return count;
}
/* Try to enlist the help of any idle worker threads and "bond" them to work
* on your tasks. Up to maxPeers worker threads will call your
* processTasks() method. */
int tryBondPeers(ThreadPool& pool, int maxPeers)
{
int count = pool.tryBondPeers(maxPeers, ALL_POOL_THREADS, *this);
m_bondedPeerCount += count;
return count;
}
/* Returns when all bonded peers have exited processTasks(). It does *NOT*
* ensure all tasks are completed (but this is generally implied). */
void waitForExit()
{
int exited = m_exitedPeerCount.get();
while (m_bondedPeerCount != exited)
exited = m_exitedPeerCount.waitForChange(exited);
}
/* Derived classes must define this method. The worker thread ID may be
* used to index into thread local data, or ignored. The ID will be between
* 0 and jp.m_numWorkers - 1 */
virtual void processTasks(int workerThreadId) = 0;
};
} // end namespace X265_NS
#endif // ifndef X265_THREADPOOL_H