/* * Project: MoleCuilder * Description: creates and alters molecular systems * Copyright (C) 2011 University of Bonn. All rights reserved. * Please see the LICENSE file or "Copyright notice" in builder.cpp for details. */ /* * WorkerPool.cpp * * Created on: 22.02.2012 * Author: heber */ // include config.h #ifdef HAVE_CONFIG_H #include #endif // boost asio needs specific operator new #include #include "CodePatterns/MemDebug.hpp" #include "WorkerPool.hpp" #include "CodePatterns/Assert.hpp" #include "CodePatterns/Info.hpp" #include "CodePatterns/Log.hpp" #include "Connection.hpp" WorkerPool::priority_t WorkerPool::default_priority = 0; WorkerAddress WorkerPool::emptyAddress("empty", "empty"); /** Constructor for class WorkerPool. * */ WorkerPool::WorkerPool() {} /** Destructor for class WorkerPool. * */ WorkerPool::~WorkerPool() {} /** Helper function to check whether an address is already in the pool. * * @param address worker address to check * @return true - address is present, false - else */ bool WorkerPool::presentInPool(const WorkerAddress &address) const { return pool.find(address) != pool.end(); } /** Get address of next idle worker. * * Note that worker is automatically marked as busy, \sa WorkerPool::markWorkerBusy() * * @return address of idle worker */ WorkerAddress WorkerPool::getNextIdleWorker() { // get first idle worker ASSERT( presentIdleWorkers(), "WorkerPool::getNextIdleWorker() - there is no idle worker."); if (!presentIdleWorkers()) return emptyAddress; Idle_Queue_t::iterator iter = idle_queue.begin(); const WorkerAddress returnaddress = iter->second; // enter in busy queue markWorkerBusy( iter ); // return address return returnaddress; } WorkerPool::Idle_Queue_t::iterator WorkerPool::getIdleWorker(const WorkerAddress &address) { Idle_Queue_t::iterator idleiter = idle_queue.begin(); while (idleiter != idle_queue.end()) { if (idleiter->second == address) { break; } ++idleiter; } return idleiter; } /** Checks whether a worker is busy or not. * * @param address address of worker to check */ bool WorkerPool::isWorkerBusy(const WorkerAddress &address) const { Busy_Queue_t::const_iterator iter = busy_queue.find(address); if (iter != busy_queue.end()) return true; #ifndef NDEBUG else { Idle_Queue_t::const_iterator iter = idle_queue.begin(); for(;iter != idle_queue.end(); ++iter) if (iter->second == address) break; ASSERT( iter != idle_queue.end(), "WorkerPool::isWorkerBusy() - worker "+toString(address) +" is neither busy nor idle."); } #endif return false; } /** Adds another worker to the pool by noting down its address. * * @param address host and service address of the listening worker * @return true - added successfully, false - not added */ bool WorkerPool::addWorker(const WorkerAddress& address) { std::pair inserter = pool.insert( address ); if (inserter.second) { // if new also add to queue LOG(1, "INFO: Successfully added "+toString(address)+" to pool."); idle_queue.insert( make_pair( default_priority, address ) ); return true; } else { LOG(1, "INFO: "+toString(address)+" is already present pool."); return false; } } /** Removes a worker from the pool. * * @param address host and service address of the listening worker * @return true - removed successfully, false - not removed */ bool WorkerPool::removeWorker(const WorkerAddress& address) { Pool_t::iterator iter = pool.find( address ); if (iter != pool.end()) { Idle_Queue_t::iterator idleiter = getIdleWorker(address); if (idleiter != idle_queue.end()) idle_queue.erase(idleiter); Busy_Queue_t::iterator busyiter = busy_queue.find(address); if (busyiter != busy_queue.end()) busy_queue.erase(busyiter); ASSERT( idleiter != idle_queue.end() || busyiter != busy_queue.end(), "WorkerPool::removeWorker() - Worker "+toString(address) +" is in pool but neither idle nor busy!"); ASSERT( !(idleiter != idle_queue.end() && busyiter != busy_queue.end()), "WorkerPool::removeWorker() - Worker "+toString(address) +" is in pool and both idle and busy!"); pool.erase(iter); LOG(1, "INFO: Removed worker " << address << " from pool."); return true; } else { ELOG(1, "Worker "+toString(address)+" is not present pool."); return false; } } /** Sends shutdown to all current workers in the pool. * */ void WorkerPool::removeAllWorkers() { // empty pool and queue idle_queue.clear(); busy_queue.clear(); pool.clear(); } /** Helper function to mark a worker as busy. * * Removes from idle_queue and places into busy_queue. * Sets \a iter to Idle_Queue_t::end(). * * @param iter iterator on idle worker */ void WorkerPool::markWorkerBusy(Idle_Queue_t::iterator &iter) { const WorkerAddress returnaddress = iter->second; if (isWorkerBusy(returnaddress)) return; const priority_t priority = iter->first; // remove from idle queue idle_queue.erase(iter); // insert into busy queue #ifndef NDEBUG std::pair< Busy_Queue_t::iterator, bool > inserter = #endif busy_queue.insert( make_pair(returnaddress, priority) ); ASSERT( inserter.second, "WorkerPool::sendJobToWorker() - Worker "+toString(inserter.first->first)+" is already busy."); LOG(1, "INFO: Worker " << returnaddress << " is now marked busy."); } /** Helper function to unmark a worker as busy. * * Removes worker from busy_queue and returns it to idle_queue. * * @param address address of worker */ void WorkerPool::unmarkWorkerBusy(const WorkerAddress &address) { if (isWorkerBusy(address)) { Busy_Queue_t::const_iterator iter = busy_queue.find(address); const priority_t priority = iter->second; busy_queue.erase(address); idle_queue.insert( make_pair( priority, address) ); LOG(1, "INFO: Worker " << address << " is now marked idle."); } }