/* * Project: JobMarket * Description: asynchronous Server/Controller/Client-approach to parallel computing, based on boost::asio * Copyright (C) 2012 Frederik Heber. All rights reserved. * */ /* * PoolGuard.cpp * * Created on: Sep 5, 2012 * Author: heber */ // include config.h #ifdef HAVE_CONFIG_H #include #endif // boost asio needs specific operator new #include #include "CodePatterns/MemDebug.hpp" #include "Pool/PoolGuard.hpp" #include #include "CodePatterns/Assert.hpp" #include "CodePatterns/Log.hpp" #include "FragmentQueue.hpp" #include "Operations/Servers/CheckAliveWorkerOperation.hpp" #include "Operations/OperationQueue.hpp" #include "Pool/WorkerPool.hpp" PoolGuard::PoolGuard( boost::asio::io_service& io_service, const size_t _timeout, Connection &_connection, const boost::function _removeWorkerfunction, const boost::function _resubmitJobfunction, OperationQueue &_OpQueue) : CheckAtNextInterval(false), timeout(_timeout), timer(io_service), removeWorkerfunction(_removeWorkerfunction), resubmitJobfunction(_resubmitJobfunction), OpQueue(_OpQueue), connection(_connection), WaitingOps(0) { // set timer if we start by default if (CheckAtNextInterval) { timer.expires_from_now(boost::posix_time::seconds(timeout)); timer.async_wait(boost::bind(&PoolGuard::checkWorkers, this)); } } void PoolGuard::checkWorkers() { if (WaitingOps == 0) { LOG(1, "INFO: There are " << LastWorkerList.size() << " busy workers from last time, " << CurrentWorkerList.size() << " are currently busy."); // create a vector of workers to check typedef std::set CheckList_t; CheckList_t currentworkers; for (WorkerList_t::const_iterator iter = LastWorkerList.begin(); iter != LastWorkerList.end(); ++iter) { const WorkerAddress &address = iter->first; LOG(2, "DEBUG: Checking whether worker " << address << " is still busy ..."); WorkerList_t::const_iterator currentiter = CurrentWorkerList.find(address); // check if worker was busy last time and on same job if (currentiter != CurrentWorkerList.end()) { LOG(2, "DEBUG: Worker " << address << " was busy last time on job " << iter->second << "."); if (currentiter->second == iter->second) { LOG(1, "INFO: Worker " << address << " is working on same job " << iter->second << " as last time, scheduling for checkalive."); currentworkers.insert(address); } else { LOG(1, "INFO: Worker " << address << " is working on different job " << currentiter->second << " than last time " << iter->second << ", scheduling for checkalive."); } } } // go through candidates to check LOG(1, "INFO: Checking on " << currentworkers.size() << " possible dead workers."); for(CheckList_t::const_iterator iter = currentworkers.begin(); iter != currentworkers.end(); ++iter) { const WorkerAddress &address = *iter; LOG(1, "INFO: Checking whether " << address << " is alive."); AsyncOperation *checkaliveWorkerOp = new CheckAliveWorkerOperation(connection, boost::bind(&PoolGuard::checkAddress, this, address, _1), boost::bind(&PoolGuard::printWorkerIsAlive, this, address), boost::bind(&PoolGuard::removeFromPool, this, address)); OpQueue.push_back(checkaliveWorkerOp, address); ++WaitingOps; } } else { ELOG(2, "We are lacking behind on CheckAliveOps, skipping this check interval."); } // set old list to new list LastWorkerList = CurrentWorkerList; // set next check interval if (CheckAtNextInterval) { timer.expires_from_now(boost::posix_time::seconds(timeout)); timer.async_wait(boost::bind(&PoolGuard::checkWorkers, this)); } } void PoolGuard::checkAddress( const WorkerAddress trueaddress, const WorkerAddress returnedaddress) { if (trueaddress != returnedaddress) { ELOG(1, "Worker at " << trueaddress << " returned itself wrongly as " << returnedaddress << ", removing."); removeFromPool(trueaddress); } } void PoolGuard::removeFromPool(const WorkerAddress address) { LOG(1, "INFO: Worker " << address << " does not react to CheckAlive, removing."); // erase from our lists LastWorkerList.erase(address); WorkerList_t::iterator iter = CurrentWorkerList.find(address); if ( iter != CurrentWorkerList.end()) { const JobId_t jobid = iter->second; CurrentWorkerList.erase(iter); // erase from pool removeWorkerfunction(address); // resubmit job resubmitJobfunction(jobid); } } void PoolGuard::addBusyWorker(const WorkerAddress address, const JobId_t id) { LOG(1, "INFO: Adding worker " << address << " with job " << id << " as busy."); #ifndef NDEBUG std::pair inserter = #endif CurrentWorkerList.insert( std::make_pair(address, id) ); ASSERT( inserter.second, "PoolGuard::addBusyWorker() - worker "+toString(address)+" " "is already busy to our knowledge on job "+toString(inserter.second)+"."); } void PoolGuard::removeBusyWorker(const WorkerAddress address) { LOG(1, "INFO: Removing worker " << address << " as busy."); WorkerList_t::iterator iter = CurrentWorkerList.find(address); if (iter != CurrentWorkerList.end()) { CurrentWorkerList.erase(address); LastWorkerList.erase(address); } } void PoolGuard::printWorkerIsAlive(const WorkerAddress address) { LOG(2, "DEBUG: Worker " << address << " checked and is still alive."); --WaitingOps; } void PoolGuard::stop() { // set flag to false and cancel timer CheckAtNextInterval = false; timer.cancel(); // clear internal list such that we may correctly be started again LastWorkerList.clear(); CurrentWorkerList.clear(); } void PoolGuard::start() { // set flag to true CheckAtNextInterval = true; // and check right away to fill LastWorkerList checkWorkers(); }