Skip to content
Commits on Source (3)
......@@ -325,19 +325,12 @@ unsigned int ClusterMPIPlugin::getMaxPEs() const {
}
unsigned int ClusterMPIPlugin::getNumWorkers() const {
if ( _nodes ) {
return _nodes->size() - 1;
} else {
return 0;
}
return _clusterThread ? 1 : 0;
}
unsigned int ClusterMPIPlugin::getMaxWorkers() const {
if ( _nodes ) {
return _nodes->size() - 1;
} else {
return 0;
}
//NOTE: At most, the plugin will create 1 worker thread with several sub-threads
return 1;
}
bool ClusterMPIPlugin::unalignedNodeMemory() const {
......
......@@ -262,16 +262,7 @@ void ClusterPlugin::startSupportThreads() {
}
void ClusterPlugin::startWorkerThreads( std::map<unsigned int, BaseThread *> &workers ) {
if ( _gasnetApi->getNodeNum() == 0 )
{
if ( _clusterThread ) {
for ( unsigned int thdIndex = 0; thdIndex < _clusterThread->getNumThreads(); thdIndex += 1 )
{
BaseThread *thd = _clusterThread->getThreadVector()[ thdIndex ];
workers.insert( std::make_pair( thd->getId(), thd ) );
}
}
} else {
if ( _clusterThread ) {
workers.insert( std::make_pair( _clusterThread->getId(), _clusterThread ) );
}
}
......@@ -329,19 +320,12 @@ unsigned int ClusterPlugin::getMaxPEs() const {
}
unsigned int ClusterPlugin::getNumWorkers() const {
if ( _remoteNodes ) {
return _remoteNodes->size();
} else {
return 0;
}
return _clusterThread ? 1 : 0;
}
unsigned int ClusterPlugin::getMaxWorkers() const {
if ( _remoteNodes ) {
return _remoteNodes->size();
} else {
return 0;
}
//NOTE: At most, the plugin will create 1 worker thread with several sub-threads
return 1;
}
bool ClusterPlugin::unalignedNodeMemory() const {
......
......@@ -81,7 +81,7 @@ void ClusterThread::RunningWDQueue::completeWD( void *remoteWdAddr ) {
}
ClusterThread::ClusterThread( WD &w, PE *pe, SMPMultiThread *parent, int device )
: BaseThread( (unsigned int) -1, w, pe, parent ), _clusterNode( device ), _lock() {
: BaseThread( parent->getOsId(), w, pe, parent ), _clusterNode( device ), _lock() {
setCurrentWD( w );
}
......@@ -532,7 +532,6 @@ void ClusterThread::workerClusterLoop ()
SMPMultiThread *parentM = ( SMPMultiThread * ) parent;
for ( unsigned int i = 0; i < parentM->getNumThreads(); i += 1 ) {
myThread = parentM->getThreadVector()[ i ];
myThread->leaveTeam();
myThread->joined();
}
myThread = parent;
......
......@@ -267,3 +267,39 @@ void SMPMultiThread::initializeDependent( void )
}
myThread = tmpMyThread;
}
void SMPMultiThread::enterTeam( TeamData *data ) {
//Enter parent thread into the team
BaseThread::enterTeam( data );
//Enter all sub-threads into the team
for ( unsigned int i = 0; i < _threads.size(); i++ ) {
_threads[ i ]->enterTeam( data );
}
}
void SMPMultiThread::leaveTeam( void ) {
//Remove all sub-threads from the team without deleting the teamData as it is shared
BaseThread *tmpMyThread = myThread;
for ( unsigned int i = 0; i < _threads.size(); i++ ) {
//Change myThread so calls to myThread->... or getMythreadSafe()->...
// work as expected
myThread = _threads[ i ];
myThread->leaveTeamNoDeleteTeamData();
}
myThread = tmpMyThread;
//Remove parent thread from the team and delete the teamData
BaseThread::leaveTeam();
}
void SMPMultiThread::setLeaveTeam( bool leave ) {
//Do it for each sub-thread
for ( unsigned int i = 0; i < _threads.size(); i++ ) {
_threads[ i ]->setLeaveTeam( leave );
}
//Do it for parent thread
BaseThread::setLeaveTeam( leave );
}
......@@ -146,6 +146,11 @@ namespace ext {
void addThreadsFromPEs(unsigned int representingPEsCount, PE **representingPEs);
virtual bool canBlock() { return false;}
virtual void initializeDependent( void );
//NOTE: In a MultiThread all sub-threads enter and leave the team with its parent
virtual void enterTeam( TeamData *data );
virtual void leaveTeam();
virtual void setLeaveTeam( bool leave );
};
} // namespace ext
} // namespace nanos
......
......@@ -138,20 +138,35 @@ int BaseThread::getCpuId() const {
return _parent->getCpuId();
}
void BaseThread::enterTeam( TeamData *data )
{
if ( data != NULL ) _teamData = data;
else _teamData = _nextTeamData;
_status.has_team = true;
}
void BaseThread::leaveTeam()
{
// It's allowed to make another thread leave the team as long as the target thread is blocked
ensure( this == myThread || _status.is_waiting || _status.has_joined,
"thread is not leaving team by itself" );
TeamData *td = _teamData;
leaveTeamNoDeleteTeamData();
if ( td ) {
//Remove thread's teamData
delete td;
}
}
void BaseThread::leaveTeamNoDeleteTeamData()
{
if ( _teamData )
{
TeamData *td = _teamData;
debug( "removing thread " << this << " with id " << toString<int>(getTeamId()) << " from " << _teamData->getTeam() );
td->getTeam()->removeThread( getTeamId() );
_teamData->getTeam()->removeThread( getTeamId() );
_teamData = _teamData->getParentTeamData();
delete td;
}
_status.must_leave_team = false;
_status.has_team = _teamData != NULL;
......
......@@ -195,13 +195,6 @@ namespace nanos {
// team related methods
inline void BaseThread::reserve() { _status.has_team = true; }
inline void BaseThread::enterTeam( TeamData *data )
{
if ( data != NULL ) _teamData = data;
else _teamData = _nextTeamData;
_status.has_team = true;
}
inline bool BaseThread::hasTeam() const { return _status.has_team; }
inline ThreadTeam * BaseThread::getTeam() const { return _teamData ? _teamData->getTeam() : NULL; }
......
......@@ -302,9 +302,12 @@ namespace nanos {
// team related methods
void reserve();
void enterTeam( TeamData *data = NULL );
virtual void enterTeam( TeamData *data = NULL );
bool hasTeam() const;
void leaveTeam();
virtual void leaveTeam();
void leaveTeamNoDeleteTeamData ();
bool isLeavingTeam () const;
virtual void setLeaveTeam ( bool leave );
ThreadTeam * getTeam() const;
TeamData * getTeamData() const;
......@@ -340,10 +343,6 @@ namespace nanos {
void disableGettingWork ();
bool isLeavingTeam () const;
void setLeaveTeam ( bool leave );
ProcessingElement * runningOn() const;
void setRunningOn(ProcessingElement* element);
......
......@@ -222,7 +222,7 @@ extern "C" {
ompt_thread_id_t ompt_nanos_get_thread_id( void );
ompt_thread_id_t ompt_nanos_get_thread_id( void )
{
return (ompt_thread_id_t) nanos::myThread->getId();
return (ompt_thread_id_t) nanos::myThread->getOsId();
}
......@@ -370,7 +370,7 @@ namespace nanos
void finalize( void )
{
if (ompt_nanos_event_thread_end) {
ompt_nanos_event_thread_end((ompt_thread_type_t) ompt_thread_initial, (ompt_thread_id_t) nanos::myThread->getId());
ompt_nanos_event_thread_end((ompt_thread_type_t) ompt_thread_initial, (ompt_thread_id_t) nanos::myThread->getOsId());
}
if ( ompt_nanos_event_shutdown ) ompt_nanos_event_shutdown();
if ( _previousTask ) free ( _previousTask );
......@@ -402,7 +402,7 @@ namespace nanos
Event &e = events[i];
// XXX: debug information
#if 0
int thid = nanos::myThread? nanos::myThread->getId():0;
int thid = nanos::myThread? nanos::myThread->getOsId():0;
fprintf(stderr,"NANOS++ [%d]: (%d/%d) event %ld value %lu\n",
thid,
(int)i+1,
......@@ -576,7 +576,7 @@ namespace nanos
ompt_task_id_t post = (ompt_task_id_t) w.getId();
int thid = (int) nanos::myThread->getId();
int thid = (int) nanos::myThread->getOsId();
if (!_threadActive[thid]) return;
ompt_task_id_t pre = (ompt_task_id_t) _previousTask[thid];
......@@ -585,7 +585,7 @@ namespace nanos
}
void addSuspendTask( WorkDescriptor &w, bool last )
{
int thid = (int) nanos::myThread->getId();
int thid = (int) nanos::myThread->getOsId();
if (last) _previousTask[thid] = (ompt_task_id_t) 0;
else _previousTask[thid] = (ompt_task_id_t) w.getId();
......@@ -599,7 +599,7 @@ namespace nanos
thread.setSteps (1);
thread.setCallBack ( breakPointCallBack );
int thid = nanos::myThread->getId();
int thid = nanos::myThread->getOsId();
if (ompt_nanos_event_thread_begin) {
ompt_nanos_event_thread_begin( (ompt_thread_type_t) ompt_thread_worker, (ompt_thread_id_t) thid );
......@@ -609,7 +609,7 @@ namespace nanos
void threadFinish ( BaseThread &thread )
{
if (ompt_nanos_event_thread_end) {
ompt_nanos_event_thread_end((ompt_thread_type_t) ompt_thread_worker, (ompt_thread_id_t) nanos::myThread->getId());
ompt_nanos_event_thread_end((ompt_thread_type_t) ompt_thread_worker, (ompt_thread_id_t) nanos::myThread->getOsId());
}
}
void incrementMaxThreads( void ) {}
......