| [5d8c0f] | 1 | /*
 | 
|---|
 | 2 |  * Project: MoleCuilder
 | 
|---|
 | 3 |  * Description: creates and alters molecular systems
 | 
|---|
 | 4 |  * Copyright (C)  2011 University of Bonn. All rights reserved.
 | 
|---|
 | 5 |  * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
 | 
|---|
 | 6 |  */
 | 
|---|
 | 7 | 
 | 
|---|
 | 8 | /*
 | 
|---|
 | 9 |  * WorkerPool.cpp
 | 
|---|
 | 10 |  *
 | 
|---|
 | 11 |  *  Created on: 22.02.2012
 | 
|---|
 | 12 |  *      Author: heber
 | 
|---|
 | 13 |  */
 | 
|---|
 | 14 | 
 | 
|---|
 | 15 | // include config.h
 | 
|---|
 | 16 | #ifdef HAVE_CONFIG_H
 | 
|---|
 | 17 | #include <config.h>
 | 
|---|
 | 18 | #endif
 | 
|---|
 | 19 | 
 | 
|---|
 | 20 | // boost asio needs specific operator new
 | 
|---|
 | 21 | #include <boost/asio.hpp>
 | 
|---|
 | 22 | 
 | 
|---|
 | 23 | #include "CodePatterns/MemDebug.hpp"
 | 
|---|
 | 24 | 
 | 
|---|
 | 25 | #include "WorkerPool.hpp"
 | 
|---|
 | 26 | 
 | 
|---|
 | 27 | #include "CodePatterns/Assert.hpp"
 | 
|---|
 | 28 | #include "CodePatterns/Info.hpp"
 | 
|---|
 | 29 | #include "CodePatterns/Log.hpp"
 | 
|---|
| [fb255d] | 30 | #include "CodePatterns/Observer/Channels.hpp"
 | 
|---|
| [5d8c0f] | 31 | #include "Connection.hpp"
 | 
|---|
 | 32 | 
 | 
|---|
 | 33 | WorkerPool::priority_t WorkerPool::default_priority = 0;
 | 
|---|
 | 34 | WorkerAddress WorkerPool::emptyAddress("empty", "empty");
 | 
|---|
 | 35 | 
 | 
|---|
 | 36 | /** Constructor for class WorkerPool.
 | 
|---|
 | 37 |  *
 | 
|---|
 | 38 |  */
 | 
|---|
| [fb255d] | 39 | WorkerPool::WorkerPool() :
 | 
|---|
 | 40 |     Observable("WorkerPool")
 | 
|---|
 | 41 | {
 | 
|---|
 | 42 |   // observable stuff
 | 
|---|
 | 43 |   Channels *OurChannel = new Channels;
 | 
|---|
 | 44 |   NotificationChannels.insert( std::make_pair(this, OurChannel) );
 | 
|---|
 | 45 |   // add instance for each notification type
 | 
|---|
 | 46 |   for (size_t type = 0; type < NotificationType_MAX; ++type)
 | 
|---|
 | 47 |     OurChannel->addChannel(type);
 | 
|---|
 | 48 | }
 | 
|---|
| [5d8c0f] | 49 | 
 | 
|---|
 | 50 | /** Destructor for class WorkerPool.
 | 
|---|
 | 51 |  *
 | 
|---|
 | 52 |  */
 | 
|---|
 | 53 | WorkerPool::~WorkerPool()
 | 
|---|
 | 54 | {}
 | 
|---|
 | 55 | 
 | 
|---|
 | 56 | /** Helper function to check whether an address is already in the pool.
 | 
|---|
 | 57 |  *
 | 
|---|
 | 58 |  * @param address worker address to check
 | 
|---|
 | 59 |  * @return true - address is present, false - else
 | 
|---|
 | 60 |  */
 | 
|---|
 | 61 | bool WorkerPool::presentInPool(const WorkerAddress &address) const
 | 
|---|
 | 62 | {
 | 
|---|
 | 63 |   return pool.find(address) != pool.end();
 | 
|---|
 | 64 | }
 | 
|---|
 | 65 | 
 | 
|---|
 | 66 | /** Get address of next idle worker.
 | 
|---|
 | 67 |  *
 | 
|---|
 | 68 |  * Note that worker is automatically marked as busy, \sa WorkerPool::markWorkerBusy()
 | 
|---|
 | 69 |  *
 | 
|---|
 | 70 |  * @return address of idle worker
 | 
|---|
 | 71 |  */
 | 
|---|
 | 72 | WorkerAddress WorkerPool::getNextIdleWorker()
 | 
|---|
 | 73 | {
 | 
|---|
 | 74 |   // get first idle worker
 | 
|---|
 | 75 |   ASSERT( presentIdleWorkers(),
 | 
|---|
 | 76 |       "WorkerPool::getNextIdleWorker() - there is no idle worker.");
 | 
|---|
 | 77 |   if (!presentIdleWorkers())
 | 
|---|
 | 78 |     return emptyAddress;
 | 
|---|
 | 79 |   Idle_Queue_t::iterator iter = idle_queue.begin();
 | 
|---|
 | 80 |   const WorkerAddress returnaddress = iter->second;
 | 
|---|
 | 81 | 
 | 
|---|
 | 82 |   // enter in busy queue
 | 
|---|
 | 83 |   markWorkerBusy( iter );
 | 
|---|
 | 84 | 
 | 
|---|
 | 85 |   // return address
 | 
|---|
 | 86 |   return returnaddress;
 | 
|---|
 | 87 | }
 | 
|---|
 | 88 | 
 | 
|---|
| [41c1b7] | 89 | WorkerPool::Idle_Queue_t::iterator WorkerPool::getIdleWorker(const WorkerAddress &address)
 | 
|---|
 | 90 | {
 | 
|---|
 | 91 |   Idle_Queue_t::iterator idleiter = idle_queue.begin();
 | 
|---|
 | 92 |   while (idleiter != idle_queue.end()) {
 | 
|---|
 | 93 |     if (idleiter->second == address) {
 | 
|---|
 | 94 |       break;
 | 
|---|
 | 95 |     }
 | 
|---|
 | 96 |     ++idleiter;
 | 
|---|
 | 97 |   }
 | 
|---|
 | 98 |   return idleiter;
 | 
|---|
 | 99 | }
 | 
|---|
 | 100 | 
 | 
|---|
| [5d8c0f] | 101 | /** Checks whether a worker is busy or not.
 | 
|---|
 | 102 |  *
 | 
|---|
 | 103 |  * @param address address of worker to check
 | 
|---|
 | 104 |  */
 | 
|---|
 | 105 | bool WorkerPool::isWorkerBusy(const WorkerAddress &address) const
 | 
|---|
 | 106 | {
 | 
|---|
 | 107 |   Busy_Queue_t::const_iterator iter = busy_queue.find(address);
 | 
|---|
 | 108 |   if (iter != busy_queue.end())
 | 
|---|
 | 109 |     return true;
 | 
|---|
 | 110 | #ifndef NDEBUG
 | 
|---|
 | 111 |   else {
 | 
|---|
 | 112 |     Idle_Queue_t::const_iterator iter = idle_queue.begin();
 | 
|---|
 | 113 |     for(;iter != idle_queue.end(); ++iter)
 | 
|---|
 | 114 |       if (iter->second == address)
 | 
|---|
 | 115 |         break;
 | 
|---|
 | 116 |     ASSERT( iter != idle_queue.end(),
 | 
|---|
 | 117 |         "WorkerPool::isWorkerBusy() - worker "+toString(address)
 | 
|---|
 | 118 |         +" is neither busy nor idle.");
 | 
|---|
 | 119 | 
 | 
|---|
 | 120 |   }
 | 
|---|
 | 121 | #endif
 | 
|---|
 | 122 |   return false;
 | 
|---|
 | 123 | }
 | 
|---|
 | 124 | 
 | 
|---|
 | 125 | /** Adds another worker to the pool by noting down its address.
 | 
|---|
 | 126 |  *
 | 
|---|
 | 127 |  * @param address host and service address of the listening worker
 | 
|---|
 | 128 |  * @return true - added successfully, false - not added
 | 
|---|
 | 129 |  */
 | 
|---|
 | 130 | bool WorkerPool::addWorker(const WorkerAddress& address)
 | 
|---|
 | 131 | {
 | 
|---|
| [fb255d] | 132 |   OBSERVE;
 | 
|---|
 | 133 |   NOTIFY(WorkerAdded);
 | 
|---|
| [5d8c0f] | 134 |   std::pair<Pool_t::iterator, bool> inserter =
 | 
|---|
 | 135 |       pool.insert( address );
 | 
|---|
 | 136 |   if (inserter.second) { // if new also add to queue
 | 
|---|
 | 137 |     LOG(1, "INFO: Successfully added "+toString(address)+" to pool.");
 | 
|---|
 | 138 |     idle_queue.insert( make_pair( default_priority, address ) );
 | 
|---|
| [fb255d] | 139 |     NOTIFY(WorkerIdle);
 | 
|---|
| [5d8c0f] | 140 |     return true;
 | 
|---|
 | 141 |   } else {
 | 
|---|
 | 142 |     LOG(1, "INFO: "+toString(address)+" is already present pool.");
 | 
|---|
 | 143 |     return false;
 | 
|---|
 | 144 |   }
 | 
|---|
 | 145 | }
 | 
|---|
 | 146 | 
 | 
|---|
 | 147 | /** Removes a worker from the pool.
 | 
|---|
 | 148 |  *
 | 
|---|
 | 149 |  * @param address host and service address of the listening worker
 | 
|---|
 | 150 |  * @return true - removed successfully, false - not removed
 | 
|---|
 | 151 |  */
 | 
|---|
 | 152 | bool WorkerPool::removeWorker(const WorkerAddress& address)
 | 
|---|
 | 153 | {
 | 
|---|
 | 154 |   Pool_t::iterator iter = pool.find( address );
 | 
|---|
 | 155 |   if (iter != pool.end()) {
 | 
|---|
| [41c1b7] | 156 |     Idle_Queue_t::iterator idleiter = getIdleWorker(address);
 | 
|---|
 | 157 |     if (idleiter != idle_queue.end())
 | 
|---|
 | 158 |       idle_queue.erase(idleiter);
 | 
|---|
| [5d8c0f] | 159 |     Busy_Queue_t::iterator busyiter = busy_queue.find(address);
 | 
|---|
 | 160 |     if (busyiter != busy_queue.end())
 | 
|---|
 | 161 |       busy_queue.erase(busyiter);
 | 
|---|
 | 162 |     ASSERT( idleiter != idle_queue.end() || busyiter != busy_queue.end(),
 | 
|---|
 | 163 |         "WorkerPool::removeWorker() - Worker "+toString(address)
 | 
|---|
 | 164 |         +" is in pool but neither idle nor busy!");
 | 
|---|
 | 165 |     ASSERT( !(idleiter != idle_queue.end() && busyiter != busy_queue.end()),
 | 
|---|
 | 166 |         "WorkerPool::removeWorker() - Worker "+toString(address)
 | 
|---|
 | 167 |         +" is in pool and both idle and busy!");
 | 
|---|
 | 168 |     pool.erase(iter);
 | 
|---|
| [41c1b7] | 169 |     LOG(1, "INFO: Removed worker " << address << " from pool.");
 | 
|---|
| [5d8c0f] | 170 |     return true;
 | 
|---|
 | 171 |   } else {
 | 
|---|
 | 172 |     ELOG(1, "Worker "+toString(address)+" is not present pool.");
 | 
|---|
 | 173 |     return false;
 | 
|---|
 | 174 |   }
 | 
|---|
 | 175 | }
 | 
|---|
 | 176 | 
 | 
|---|
 | 177 | /** Sends shutdown to all current workers in the pool.
 | 
|---|
 | 178 |  *
 | 
|---|
 | 179 |  */
 | 
|---|
 | 180 | void WorkerPool::removeAllWorkers()
 | 
|---|
 | 181 | {
 | 
|---|
 | 182 |   // empty pool and queue
 | 
|---|
 | 183 |   idle_queue.clear();
 | 
|---|
 | 184 |   busy_queue.clear();
 | 
|---|
 | 185 |   pool.clear();
 | 
|---|
 | 186 | }
 | 
|---|
 | 187 | 
 | 
|---|
 | 188 | /** Helper function to mark a worker as busy.
 | 
|---|
 | 189 |  *
 | 
|---|
 | 190 |  * Removes from idle_queue and places into busy_queue.
 | 
|---|
 | 191 |  * Sets \a iter to Idle_Queue_t::end().
 | 
|---|
 | 192 |  *
 | 
|---|
 | 193 |  * @param iter iterator on idle worker
 | 
|---|
 | 194 |  */
 | 
|---|
 | 195 | void WorkerPool::markWorkerBusy(Idle_Queue_t::iterator &iter)
 | 
|---|
 | 196 | {
 | 
|---|
 | 197 |   const WorkerAddress returnaddress = iter->second;
 | 
|---|
| [41c1b7] | 198 |   if (isWorkerBusy(returnaddress))
 | 
|---|
 | 199 |     return;
 | 
|---|
| [5d8c0f] | 200 |   const priority_t priority = iter->first;
 | 
|---|
 | 201 | 
 | 
|---|
 | 202 |   // remove from idle queue
 | 
|---|
 | 203 |   idle_queue.erase(iter);
 | 
|---|
 | 204 | 
 | 
|---|
 | 205 |   // insert into busy queue
 | 
|---|
 | 206 | #ifndef NDEBUG
 | 
|---|
 | 207 |   std::pair< Busy_Queue_t::iterator, bool > inserter =
 | 
|---|
 | 208 | #endif
 | 
|---|
 | 209 |   busy_queue.insert( make_pair(returnaddress, priority) );
 | 
|---|
 | 210 |   ASSERT( inserter.second,
 | 
|---|
 | 211 |       "WorkerPool::sendJobToWorker() - Worker "+toString(inserter.first->first)+" is already busy.");
 | 
|---|
 | 212 | 
 | 
|---|
 | 213 |   LOG(1, "INFO: Worker " << returnaddress << " is now marked busy.");
 | 
|---|
 | 214 | }
 | 
|---|
 | 215 | 
 | 
|---|
 | 216 | /** Helper function to unmark a worker as busy.
 | 
|---|
 | 217 |  *
 | 
|---|
 | 218 |  * Removes worker from busy_queue and returns it to idle_queue.
 | 
|---|
 | 219 |  *
 | 
|---|
 | 220 |  * @param address address of worker
 | 
|---|
 | 221 |  */
 | 
|---|
 | 222 | void WorkerPool::unmarkWorkerBusy(const WorkerAddress &address)
 | 
|---|
 | 223 | {
 | 
|---|
 | 224 |   if (isWorkerBusy(address)) {
 | 
|---|
| [fb255d] | 225 |     OBSERVE;
 | 
|---|
 | 226 |     NOTIFY(WorkerIdle);
 | 
|---|
| [5d8c0f] | 227 |     Busy_Queue_t::const_iterator iter = busy_queue.find(address);
 | 
|---|
 | 228 |     const priority_t priority = iter->second;
 | 
|---|
 | 229 |     busy_queue.erase(address);
 | 
|---|
 | 230 |     idle_queue.insert( make_pair( priority, address) );
 | 
|---|
| [41c1b7] | 231 | 
 | 
|---|
 | 232 |     LOG(1, "INFO: Worker " << address << " is now marked idle.");
 | 
|---|
| [5d8c0f] | 233 |   }
 | 
|---|
 | 234 | }
 | 
|---|
| [befcf8] | 235 | 
 | 
|---|
 | 236 | WorkerPool::WorkerList_t WorkerPool::getListOfIdleWorkers() const
 | 
|---|
 | 237 | {
 | 
|---|
 | 238 |   WorkerList_t WorkerList;
 | 
|---|
 | 239 |   for (Idle_Queue_t::const_iterator iter = idle_queue.begin(); iter != idle_queue.end(); ++iter)
 | 
|---|
 | 240 |     WorkerList.push_back( make_pair(iter->second.host, iter->second.service) );
 | 
|---|
 | 241 |   return WorkerList;
 | 
|---|
 | 242 | }
 | 
|---|