Commit 424568c3 authored by Victor Lopez's avatar Victor Lopez

Merge branch 'feature/threads_per_core' into 'master'

Implement --smp-threads-per-core option

See merge request !47
parents f17ae4a5 adfcc58d
......@@ -55,16 +55,15 @@ AS_IF([test "x$with_hwloc" != xyes dnl
-o "x$with_hwloc" != xno dnl
-o "x$with_hwloc" != x dnl
],[
hwlocinc="-I $with_hwloc/include"
hwloc_h="$with_hwloc/include/hwloc.h"
AS_IF([test -d $with_hwloc/lib64],
[hwloclib="-L$with_hwloc/lib64 -Wl,-rpath,$with_hwloc/lib64"],
[hwloclib="-L$with_hwloc/lib -Wl,-rpath,$with_hwloc/lib"])dnl
])dnl
AS_IF([test "x$with_hwloc_include" != x],[
hwlocinc="-isystem $with_hwloc_include"
hwloc_h="$with_hwloc_include/hwloc.h"
AS_IF([test -d $with_hwloc/include], [
hwlocinc="-I $with_hwloc/include"
hwloc_h="$with_hwloc/include/hwloc.h"
])
AS_IF([test -d $with_hwloc/lib64], [
hwloclib="-L$with_hwloc/lib64 -Wl,-rpath,$with_hwloc/lib64"
], [test -d $with_hwloc/lib], [
hwloclib="-L$with_hwloc/lib -Wl,-rpath,$with_hwloc/lib"
])dnl
])dnl
AS_IF([test "x$with_hwloc_lib" != x],[
......@@ -73,7 +72,7 @@ AS_IF([test "x$with_hwloc_lib" != x],[
# This condition is satisfied even if $with_hwloc="yes"
# This happens when user leaves --with-value alone
AS_IF([test "x$with_hwloc$with_hwloc_include$with_hwloc_lib" != x],[
AS_IF([test "x$with_hwloc$with_hwloc_lib" != x],[
AC_LANG_PUSH([C++])
#tests if provided headers and libraries are usable and correct
......
......@@ -59,6 +59,7 @@ nanos::PE * smpProcessorFactory ( int id, int uid )
, _smpHostCpus( 0 )
, _smpPrivateMemorySize( 256 * 1024 * 1024 ) // 256 Mb
, _workersCreated( false )
, _threadsPerCore( 0 )
, _cpuSystemMask()
, _cpuProcessMask()
, _cpuActiveMask()
......@@ -150,6 +151,10 @@ nanos::PE * smpProcessorFactory ( int id, int uid )
"SMP sync transfers." );
cfg.registerArgOption( "smp-sync-transfers", "smp-sync-transfers" );
cfg.registerEnvOption( "smp-sync-transfers", "NX_SMP_SYNC_TRANSFERS" );
cfg.registerConfigOption( "smp-threads-per-core", NEW Config::PositiveVar( _threadsPerCore ),
"Limit the number of threads per core on SMT processors." );
cfg.registerArgOption( "smp-threads-per-core", "smp-threads-per-core" );
}
void SMPPlugin::init()
......@@ -192,22 +197,68 @@ nanos::PE * smpProcessorFactory ( int id, int uid )
}
verbose0("requested cpus: " << _requestedCPUs << " available: " << _availableCPUs << " to be used: " << _currentCPUs);
//! \note Fill _bindings vector with the active CPUs first, then the not active
_bindings.reserve( _availableCPUs );
for ( int i = 0; i < _availableCPUs; i++ ) {
if ( _cpuProcessMask.isSet(i) ) {
_bindings.push_back(i);
std::map<int, CpuSet> bindings_list;
// If --smp-threads-per-core option is used, adjust CPUs used
if ( _threadsPerCore > 0 ) {
fatal_cond0( !sys._hwloc.isHwlocAvailable(),
"Option --smp-threads-per-core requires HWLOC" );
std::list<CpuSet> core_cpusets = sys._hwloc.getCoreCpusetsOf( _cpuProcessMask );
fatal_cond0( core_cpusets.size() == 0, "Process mask [" << _cpuProcessMask <<
"] does not contain a full core cpuset. Cannot run with --smp-threads-per-core" );
{
// Append not owned CPUs to core list
CpuSet cpus_not_owned = _cpuSystemMask - _cpuProcessMask;
std::list<CpuSet> core_cpusets_not_owned =
sys._hwloc.getCoreCpusetsOf( cpus_not_owned );
core_cpusets.splice( core_cpusets.end(), core_cpusets_not_owned );
}
}
for ( int i = 0; i < _availableCPUs; i++ ) {
if ( !_cpuProcessMask.isSet(i) ) {
_bindings.push_back(i);
for ( std::list<CpuSet>::iterator it = core_cpusets.begin();
it != core_cpusets.end(); ++it) {
CpuSet& core = *it;
unsigned int nthreads = std::min<unsigned int>( _threadsPerCore, core.size() );
unsigned int offset = core.size() / nthreads;
unsigned int steps_left = 0;
unsigned int inserted = 0;
int last_binding = -1;
// Iterate core cpuset up to nthreads and add them to the _bindings list
for ( CpuSet::const_iterator cit = core.begin(); cit != core.end(); ++cit ) {
int cpuid = *cit;
// We try to evenly distribute CPUs inside Core
if ( steps_left == 0 && inserted < nthreads) {
_bindings.push_back(cpuid);
last_binding = cpuid;
bindings_list.insert( std::pair<int,CpuSet>(last_binding, CpuSet(cpuid)) );
steps_left += offset - 1;
++inserted;
} else {
--steps_left;
bindings_list[last_binding].set(cpuid);
}
}
}
_availableCPUs = _bindings.size();
} else {
//! \note Fill _bindings vector with the active CPUs first, then the not active
_bindings.reserve( _availableCPUs );
for ( int i = 0; i < _availableCPUs; i++ ) {
if ( _cpuProcessMask.isSet(i) ) {
_bindings.push_back(i);
bindings_list.insert( std::pair<int,CpuSet>(i, CpuSet(i)) );
}
}
for ( int i = 0; i < _availableCPUs; i++ ) {
if ( !_cpuProcessMask.isSet(i) ) {
_bindings.push_back(i);
bindings_list.insert( std::pair<int,CpuSet>(i, CpuSet(i)) );
}
}
}
//! \note Load & check NUMA config (_cpus vectors must be created before)
_cpus = NEW std::vector<SMPProcessor *>( _availableCPUs, (SMPProcessor *) NULL );
_cpusByCpuId = NEW std::vector<SMPProcessor *>( _availableCPUs, (SMPProcessor *) NULL );
_cpusByCpuId = NEW std::map<int, SMPProcessor *>();
loadNUMAInfo();
......@@ -234,37 +285,42 @@ nanos::PE * smpProcessorFactory ( int id, int uid )
//! \note Create the SMPProcessors in _cpus array
int count = 0;
for ( std::vector<int>::iterator it = _bindings.begin(); it != _bindings.end(); it++ ) {
int cpuid = *it;
SMPProcessor *cpu;
bool active = ( (count < _currentCPUs) && _cpuProcessMask.isSet(*it) );
bool active = (count < _currentCPUs) && _cpuProcessMask.isSet(cpuid);
unsigned numaNode;
// If this PE can't be seen by hwloc (weird case in Altix 2, for instance)
if ( !sys._hwloc.isCpuAvailable( *it ) ) {
if ( !sys._hwloc.isCpuAvailable( cpuid ) ) {
/* There's a problem: we can't query it's numa
node. Let's give it 0 (ticket #1090), consider throwing a warning */
numaNode = 0;
}
else
numaNode = getNodeOfPE( *it );
numaNode = getNodeOfPE( cpuid );
unsigned socket = numaNode; /* FIXME: socket */
memory_space_id_t id;
if ( _smpPrivateMemory && count >= _smpHostCpus && !_memkindSupport ) {
OSAllocator a;
memory_space_id_t id = sys.addSeparateMemoryAddressSpace( ext::getSMPDevice(), _smpAllocWide, sys.getRegionCacheSlabSize() );
id = sys.addSeparateMemoryAddressSpace( ext::getSMPDevice(),
_smpAllocWide, sys.getRegionCacheSlabSize() );
SeparateMemoryAddressSpace &numaMem = sys.getSeparateMemory( id );
numaMem.setSpecificData( NEW SimpleAllocator( ( uintptr_t ) a.allocate(_smpPrivateMemorySize), _smpPrivateMemorySize ) );
numaMem.setAcceleratorNumber( sys.getNewAcceleratorId() );
cpu = NEW SMPProcessor( *it, id, active, numaNode, socket );
} else {
cpu = NEW SMPProcessor( *it, mem_id, active, numaNode, socket );
id = mem_id;
}
// Create SMPProcessor object
cpu = NEW SMPProcessor( cpuid, bindings_list[cpuid], id, active, numaNode, socket );
if ( active ) {
_cpuActiveMask.set( cpu->getBindingId() );
}
//cpu->setNUMANode( getNodeOfPE( cpu->getId() ) );
(*_cpus)[count] = cpu;
(*_cpusByCpuId)[ *it ] = cpu;
(*_cpusByCpuId)[cpuid] = cpu;
count += 1;
}
......@@ -785,7 +841,7 @@ nanos::PE * smpProcessorFactory ( int id, int uid )
void SMPPlugin::updateCpuStatus( int cpuid )
{
SMPProcessor *cpu = _cpusByCpuId->at(cpuid);
SMPProcessor *cpu = (*_cpusByCpuId)[cpuid];
if ( cpu->getRunningThreads() > 0 ) {
_cpuActiveMask.set( cpuid );
} else {
......@@ -951,31 +1007,36 @@ nanos::PE * smpProcessorFactory ( int id, int uid )
// Initialize rank limits with the first cpu in the list
int a0 = -1, aN = -1, i0 = -1, iN = -1;
SMPProcessor *first_cpu = _cpusByCpuId->front();
SMPProcessor *first_cpu = _cpusByCpuId->begin()->second;
first_cpu->isActive() ? a0 = first_cpu->getBindingId() : i0 = first_cpu->getBindingId();
// Iterate through begin+1..end
for ( std::vector<SMPProcessor *>::iterator curr = _cpusByCpuId->begin()+1; curr != _cpusByCpuId->end(); curr++ ) {
std::map<int,SMPProcessor*>::iterator prev_it = _cpusByCpuId->begin(); /* begin() */
std::map<int,SMPProcessor*>::iterator curr_it = prev_it; ++curr_it; /* begin() + 1 */
while ( curr_it != _cpusByCpuId->end() ) {
/* Detect whether there is a state change (a->i/i->a). If so,
* close the rank and start a new one. If it's the last iteration
* close it anyway.
*/
std::vector<SMPProcessor *>::iterator prev = curr-1;
if ( (*curr)->isActive() && !(*prev)->isActive() ) {
SMPProcessor *prev = prev_it->second;
SMPProcessor *curr = curr_it->second;
if ( curr->isActive() && !prev->isActive() ) {
// change, i->a
iN = (*prev)->getBindingId();
a0 = (*curr)->getBindingId();
iN = prev->getBindingId();
a0 = curr->getBindingId();
( i0 != iN ) ? i << i0 << "-" << iN << ", " : i << i0 << ", ";
} else if ( !(*curr)->isActive() && (*prev)->isActive() ) {
} else if ( !curr->isActive() && prev->isActive() ) {
// change, a->i
aN = (*prev)->getBindingId();
i0 = (*curr)->getBindingId();
aN = prev->getBindingId();
i0 = curr->getBindingId();
( a0 != aN ) ? a << a0 << "-" << aN << ", " : a << a0 << ", ";
}
++prev_it;
++curr_it;
}
// close ranks and append strings according to the last cpu
SMPProcessor *last_cpu = _cpusByCpuId->back();
SMPProcessor *last_cpu = prev_it->second;
if ( last_cpu->isActive() ) {
aN = last_cpu->getBindingId();
( a0 != aN ) ? a << a0 << "-" << aN << ", " : a << a0 << ", ";
......
......@@ -54,7 +54,7 @@ class SMPPlugin : public SMPBasePlugin
int _currentCPUs;
int _requestedWorkers;
std::vector<SMPProcessor *> *_cpus;
std::vector<SMPProcessor *> *_cpusByCpuId;
std::map<int,SMPProcessor*> *_cpusByCpuId;
std::vector<SMPThread *> _workers;
int _bindingStart;
int _bindingStride;
......@@ -64,6 +64,7 @@ class SMPPlugin : public SMPBasePlugin
int _smpHostCpus;
std::size_t _smpPrivateMemorySize;
bool _workersCreated;
int _threadsPerCore;
// Nanos++ scheduling domain
CpuSet _cpuSystemMask; /*!< \brief system's default cpu_set */
......
......@@ -35,9 +35,11 @@ size_t SMPProcessor::_threadsStackSize = 0;
System::CachePolicyType SMPProcessor::_cachePolicy = System::DEFAULT;
size_t SMPProcessor::_cacheDefaultSize = 1048580;
SMPProcessor::SMPProcessor( int bindingId, memory_space_id_t memId, bool active, unsigned int numaNode, unsigned int socket ) :
SMPProcessor::SMPProcessor( int bindingId, const CpuSet& bindingList,
memory_space_id_t memId, bool active, unsigned int numaNode, unsigned int socket ) :
PE( &getSMPDevice(), memId, 0 /* always local node */, numaNode, true, socket, true ),
_bindingId( bindingId ), _reserved( false ), _active( active ), _futureThreads( 0 ) {}
_bindingId( bindingId ), _bindingList( bindingList ),
_reserved( false ), _active( active ), _futureThreads( 0 ) {}
void SMPProcessor::prepareConfig ( Config &config )
{
......@@ -134,4 +136,3 @@ void SMPProcessor::setNumFutureThreads( unsigned int nthreads ) {
unsigned int SMPProcessor::getNumFutureThreads() const {
return _futureThreads;
}
......@@ -43,6 +43,7 @@ namespace ext {
static size_t _cacheDefaultSize;
static System::CachePolicyType _cachePolicy;
unsigned int _bindingId;
CpuSet _bindingList;
bool _reserved;
bool _active;
unsigned int _futureThreads;
......@@ -54,9 +55,11 @@ namespace ext {
public:
// constructors
SMPProcessor( int bindingId, memory_space_id_t numMemId, bool active, unsigned int numaNode, unsigned int socket );
SMPProcessor( int bindingId, const CpuSet& bindingList, memory_space_id_t numMemId,
bool active, unsigned int numaNode, unsigned int socket );
unsigned int getBindingId() const { return _bindingId; }
const CpuSet& getBindingList() const { return _bindingList; }
virtual ~SMPProcessor() {}
......
......@@ -67,6 +67,8 @@ class CpuSet
// Default constructor
CpuSet(): _mask() {}
CpuSet( int cpuid ): _mask() { set(cpuid); }
// Destructor
~CpuSet() {}
......@@ -105,6 +107,7 @@ class CpuSet
friend CpuSet operator&( const CpuSet& lhs, const CpuSet& rhs );
friend CpuSet operator+( const CpuSet& lhs, const CpuSet& rhs );
friend CpuSet operator*( const CpuSet& lhs, const CpuSet& rhs );
friend CpuSet operator-( const CpuSet& lhs, const CpuSet& rhs );
friend bool operator==( const CpuSet& lhs, const CpuSet& rhs );
friend bool operator!=( const CpuSet& lhs, const CpuSet& rhs );
......@@ -235,6 +238,14 @@ inline CpuSet operator*( const CpuSet& lhs, const CpuSet& rhs )
return lhs & rhs;
}
inline CpuSet operator-( const CpuSet& lhs, const CpuSet& rhs )
{
CpuSet result;
CPU_XOR( &result._mask, &lhs._mask, &rhs._mask );
CPU_AND( &result._mask, &lhs._mask, &result._mask );
return result;
}
inline bool operator==( const CpuSet& lhs, const CpuSet& rhs )
{
return CPU_EQUAL( &lhs._mask, &rhs._mask );
......
......@@ -113,12 +113,9 @@ void PThread::join ()
void PThread::bind()
{
int cpu_id = _core->getBindingId();
cpu_set_t cpu_set;
CPU_ZERO( &cpu_set );
CPU_SET( cpu_id, &cpu_set );
verbose( "Binding thread " << getMyThreadSafe()->getId() << " to cpu " << cpu_id );
pthread_setaffinity_np( _pth, sizeof(cpu_set_t), &cpu_set );
const CpuSet& binding = _core->getBindingList();
verbose( "Binding thread " << getMyThreadSafe()->getId() << " to cpus [" << binding << "]" );
pthread_setaffinity_np( _pth, sizeof(cpu_set_t), binding.get_cpu_set_pointer() );
NANOS_INSTRUMENT ( static InstrumentationDictionary *ID = sys.getInstrumentation()->getInstrumentationDictionary(); )
NANOS_INSTRUMENT ( static nanos_event_key_t cpuid_key = ID->getEventKey("cpuid"); )
......@@ -129,7 +126,7 @@ void PThread::bind()
NANOS_INSTRUMENT ( keys[1] = numa_key )
NANOS_INSTRUMENT ( nanos_event_value_t values[2]; )
NANOS_INSTRUMENT ( values[0] = (nanos_event_value_t) cpu_id + 1; )
NANOS_INSTRUMENT ( values[0] = (nanos_event_value_t) _core->getBindingId() + 1; )
NANOS_INSTRUMENT ( values[1] = (nanos_event_value_t) _core->getNumaNode() + 1; )
NANOS_INSTRUMENT ( sys.getInstrumentation()->raisePointEvents(2, keys, values); )
}
......
......@@ -21,6 +21,7 @@
#include "debug.hpp"
#ifdef HWLOC
#include <hwloc/glibc-sched.h>
#ifdef GPU_DEV
#include <hwloc/cudart.h>
#endif
......@@ -70,8 +71,10 @@ void Hwloc::loadHwloc ()
fatal_cond0( res != 0, "Could not load hwloc topology xml file." );
}
#if (HWLOC_API_VERSION >> 16) == 1
// Enable GPU detection
hwloc_topology_set_flags( _hwlocTopology, HWLOC_TOPOLOGY_FLAG_IO_DEVICES );
#endif
// Perform the topology detection.
hwloc_topology_load( _hwlocTopology );
......@@ -174,4 +177,42 @@ bool Hwloc::isCpuAvailable( unsigned int cpu ) const
return hwloc_get_pu_obj_by_os_index( _hwlocTopology, cpu ) != NULL;
#endif
}
CpuSet Hwloc::getCoreCpusetOf( unsigned int cpu )
{
CpuSet core_cpuset;
#ifdef HWLOC
hwloc_obj_t pu = hwloc_get_pu_obj_by_os_index( _hwlocTopology, cpu );
hwloc_obj_t core = hwloc_get_ancestor_obj_by_type( _hwlocTopology, HWLOC_OBJ_CORE, pu );
hwloc_cpuset_to_glibc_sched_affinity( _hwlocTopology, core->cpuset,
core_cpuset.get_cpu_set_pointer(), sizeof(cpu_set_t));
#endif
return core_cpuset;
}
std::list<CpuSet> Hwloc::getCoreCpusetsOf( const CpuSet& parent )
{
std::list<CpuSet> core_cpusets;
#ifdef HWLOC
// Covert parent cpuset to hwlocset
hwloc_cpuset_t hwlocset = hwloc_bitmap_alloc();
hwloc_cpuset_from_glibc_sched_affinity( _hwlocTopology, hwlocset,
parent.get_cpu_set_pointer(), sizeof(cpu_set_t));
// Iterate cores inside parent cpuset
hwloc_obj_t core = NULL;
while ( (core = hwloc_get_next_obj_inside_cpuset_by_type(
_hwlocTopology, hwlocset, HWLOC_OBJ_CORE, core)) != NULL ) {
CpuSet core_cpuset;
hwloc_cpuset_to_glibc_sched_affinity( _hwlocTopology, core->cpuset,
core_cpuset.get_cpu_set_pointer(), sizeof(cpu_set_t));
// Append core cpuset to list
core_cpusets.push_back( core_cpuset );
}
hwloc_bitmap_free(hwlocset);
#endif
return core_cpusets;
}
}
......@@ -23,6 +23,7 @@
#include <config.hpp>
#include <string>
#include "cpuset.hpp"
#ifdef HWLOC
#include <hwloc.h>
......@@ -63,6 +64,8 @@ class Hwloc {
*/
bool isCpuAvailable( unsigned int cpu ) const;
CpuSet getCoreCpusetOf( unsigned int cpu );
std::list<CpuSet> getCoreCpusetsOf( const CpuSet& parent );
};
} // namespace nanos
......
......@@ -57,6 +57,7 @@ int main(int argc, char *argv[])
assert((set1 + set2).size()==4); /* 1111 */
assert((set1 & set2).size()==1); /* 0001 */
assert((set1 * set2).size()==1); /* 0001 */
assert((set1 - set2).size()==2); /* 0110 */
// Compound assignment operators
set2 = set1; /* 0111 */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment