Changeset 2c3ae5


Ignore:
Timestamp:
Aug 11, 2025, 5:45:18 PM (3 months ago)
Author:
Frederik Heber <frederik.heber@…>
Branches:
Candidate_v1.7.0, stable
Children:
e2f31d3
Parents:
74d798
git-author:
Frederik Heber <frederik.heber@…> (07/13/25 21:35:10)
git-committer:
Frederik Heber <frederik.heber@…> (08/11/25 17:45:18)
Message:

JobMarket: Extended PoolGuard to also check idle workers.

Location:
ThirdParty/JobMarket/src/JobMarket/Pool
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • ThirdParty/JobMarket/src/JobMarket/Pool/PoolGuard.cpp

    r74d798 r2c3ae5  
    6565{
    6666  if (WaitingOps == 0) {
    67     LOG(1, "INFO: There are " << LastWorkerList.size() << " busy workers from last time, "
    68          << CurrentWorkerList.size() << " are currently busy.");
     67    LOG(1, "INFO: There are " << LastBusyWorkerList.size() << " busy workers from last time, "
     68         << CurrentBusyWorkerList.size() << " are currently busy.");
    6969    // create a vector of workers to check
    7070    typedef std::set<WorkerAddress> CheckList_t;
    7171    CheckList_t currentworkers;
    72     for (WorkerList_t::const_iterator iter = LastWorkerList.begin();
    73         iter != LastWorkerList.end();
     72    for (WorkerList_t::const_iterator iter = LastBusyWorkerList.begin();
     73        iter != LastBusyWorkerList.end();
    7474        ++iter) {
    7575      const WorkerAddress &address = iter->first;
    7676      LOG(2, "DEBUG: Checking whether worker " << address << " is still busy ...");
    77       WorkerList_t::const_iterator currentiter = CurrentWorkerList.find(address);
     77      WorkerList_t::const_iterator currentiter = CurrentBusyWorkerList.find(address);
    7878      // check if worker was busy last time and on same job
    79       if (currentiter != CurrentWorkerList.end()) {
     79      if (currentiter != CurrentBusyWorkerList.end()) {
    8080        LOG(2, "DEBUG: Worker " << address << " was busy last time on job " << iter->second << ".");
    8181        if (currentiter->second == iter->second) {
     
    101101    for(CheckList_t::const_iterator iter = currentworkers.begin();
    102102        iter != currentworkers.end(); ++iter) {
    103       const WorkerAddress &address = *iter;
    104       LOG(1, "INFO: Checking whether " << address << " is alive.");
    105       AsyncOperation *checkaliveWorkerOp = new CheckAliveWorkerOperation(connection,
    106           boost::bind(&PoolGuard::checkAddress, this, address, _1),
    107           boost::bind(&PoolGuard::printWorkerIsAlive, this, address),
    108           boost::bind(&PoolGuard::removeFromPool, this, address));
    109       OpQueue.push_back(checkaliveWorkerOp, address);
    110       ++WaitingOps;
     103      CheckWorker(*iter);
     104    }
     105    LOG(1, "INFO: Checking on " << CurrentIdleWorkerList.size() << " idle workers.");
     106    for(std::set<WorkerAddress>::const_iterator iter = CurrentIdleWorkerList.begin();
     107        iter != CurrentIdleWorkerList.end(); ++iter) {
     108      CheckWorker(*iter);
    111109    }
    112110  } else {
     
    114112  }
    115113  // set old list to new list
    116   LastWorkerList = CurrentWorkerList;
     114  LastBusyWorkerList = CurrentBusyWorkerList;
    117115
    118116  // set next check interval
     
    126124}
    127125
     126void PoolGuard::CheckWorker(const WorkerAddress &address)
     127{
     128  LOG(1, "INFO: Checking whether " << address << " is alive.");
     129  AsyncOperation *checkaliveWorkerOp = new CheckAliveWorkerOperation(connection,
     130      boost::bind(&PoolGuard::checkAddress, this, address, _1),
     131      boost::bind(&PoolGuard::printWorkerIsAlive, this, address),
     132      boost::bind(&PoolGuard::removeFromPool, this, address));
     133  OpQueue.push_back(checkaliveWorkerOp, address);
     134  ++WaitingOps;
     135}
     136
    128137void PoolGuard::checkAddress(
    129138    const WorkerAddress trueaddress,
     
    142151  --WaitingOps;
    143152  // erase from our lists
    144   LastWorkerList.erase(address);
    145   WorkerList_t::iterator iter = CurrentWorkerList.find(address);
    146   if ( iter != CurrentWorkerList.end()) {
     153  LastBusyWorkerList.erase(address);
     154  WorkerList_t::iterator iter = CurrentBusyWorkerList.find(address);
     155  if ( iter != CurrentBusyWorkerList.end()) {
    147156    const JobId_t jobid = iter->second;
    148     CurrentWorkerList.erase(iter);
     157    CurrentBusyWorkerList.erase(iter);
    149158    // erase from pool
    150159    removeWorkerfunction(address);
     
    160169  std::pair<WorkerList_t::iterator, bool> inserter =
    161170#endif
    162   CurrentWorkerList.insert( std::make_pair(address, id) );
     171  CurrentBusyWorkerList.insert( std::make_pair(address, id) );
    163172  ASSERT( inserter.second,
    164173      "PoolGuard::addBusyWorker() - worker "+toString(address)+" "
     
    169178{
    170179  LOG(1, "INFO: Removing worker " << address << " as busy.");
    171   WorkerList_t::iterator iter = CurrentWorkerList.find(address);
    172   if (iter != CurrentWorkerList.end()) {
    173     CurrentWorkerList.erase(address);
    174     LastWorkerList.erase(address);
     180  WorkerList_t::iterator iter = CurrentBusyWorkerList.find(address);
     181  if (iter != CurrentBusyWorkerList.end()) {
     182    CurrentBusyWorkerList.erase(address);
     183    LastBusyWorkerList.erase(address);
    175184  }
    176185}
     
    188197  timer.cancel();
    189198  // clear internal list such that we may correctly be started again
    190   LastWorkerList.clear();
    191   CurrentWorkerList.clear();
     199  LastBusyWorkerList.clear();
     200  CurrentBusyWorkerList.clear();
    192201}
    193202
     
    196205  // set flag to true
    197206  CheckAtNextInterval = true;
    198   // and check right away to fill LastWorkerList
     207  // and check right away to fill LastBusyWorkerList
    199208  checkWorkers();
    200209}
  • ThirdParty/JobMarket/src/JobMarket/Pool/PoolGuard.hpp

    r74d798 r2c3ae5  
    6767  void checkWorkers();
    6868
     69  /** Schedules a checkalive operation for a single worker at \a address.
     70   *
     71   * \param address the address to check
     72   */
     73  void CheckWorker(const WorkerAddress &address);
     74 
    6975  /** Checks busy workers in pool whether react to CheckAlive.
    7076   *
     
    142148
    143149  //!> last set of working workers
    144   WorkerList_t LastWorkerList;
     150  WorkerList_t LastBusyWorkerList;
    145151
    146152  //!> last set of working workers
    147   WorkerList_t CurrentWorkerList;
     153  WorkerList_t CurrentBusyWorkerList;
     154
     155  //!> set of idle working workers
     156  std::set<WorkerAddress> CurrentIdleWorkerList;
    148157
    149158  //!> reference to OperationQueue for pushing CheckAliveWorkerOperation
  • ThirdParty/JobMarket/src/JobMarket/Pool/WorkerPool.hpp

    r74d798 r2c3ae5  
    6969  typedef std::vector<std::pair<std::string, std::string> > WorkerList_t;
    7070  WorkerList_t getListOfIdleWorkers() const;
    71   const Pool_t& getPoolOfWorkers() const
    72   {
    73     return pool;
    74   }
    7571
    7672  /** Return the number of busy workers.
Note: See TracChangeset for help on using the changeset viewer.