| 1 | /* | 
|---|
| 2 | * Project: MoleCuilder | 
|---|
| 3 | * Description: creates and alters molecular systems | 
|---|
| 4 | * Copyright (C)  2011 University of Bonn. All rights reserved. | 
|---|
| 5 | * Please see the LICENSE file or "Copyright notice" in builder.cpp for details. | 
|---|
| 6 | */ | 
|---|
| 7 |  | 
|---|
| 8 | /* | 
|---|
| 9 | * WorkerPool.cpp | 
|---|
| 10 | * | 
|---|
| 11 | *  Created on: 22.02.2012 | 
|---|
| 12 | *      Author: heber | 
|---|
| 13 | */ | 
|---|
| 14 |  | 
|---|
| 15 | // include config.h | 
|---|
| 16 | #ifdef HAVE_CONFIG_H | 
|---|
| 17 | #include <config.h> | 
|---|
| 18 | #endif | 
|---|
| 19 |  | 
|---|
| 20 | // boost asio needs specific operator new | 
|---|
| 21 | #include <boost/asio.hpp> | 
|---|
| 22 |  | 
|---|
| 23 | #include "CodePatterns/MemDebug.hpp" | 
|---|
| 24 |  | 
|---|
| 25 | #include "WorkerPool.hpp" | 
|---|
| 26 |  | 
|---|
| 27 | #include "CodePatterns/Assert.hpp" | 
|---|
| 28 | #include "CodePatterns/Info.hpp" | 
|---|
| 29 | #include "CodePatterns/Log.hpp" | 
|---|
| 30 | #include "CodePatterns/Observer/Channels.hpp" | 
|---|
| 31 | #include "Connection.hpp" | 
|---|
| 32 |  | 
|---|
| 33 | WorkerPool::priority_t WorkerPool::default_priority = 0; | 
|---|
| 34 | WorkerAddress WorkerPool::emptyAddress("empty", "empty"); | 
|---|
| 35 |  | 
|---|
| 36 | /** Constructor for class WorkerPool. | 
|---|
| 37 | * | 
|---|
| 38 | */ | 
|---|
| 39 | WorkerPool::WorkerPool() : | 
|---|
| 40 | Observable("WorkerPool") | 
|---|
| 41 | { | 
|---|
| 42 | // observable stuff | 
|---|
| 43 | Channels *OurChannel = new Channels; | 
|---|
| 44 | NotificationChannels.insert( std::make_pair(this, OurChannel) ); | 
|---|
| 45 | // add instance for each notification type | 
|---|
| 46 | for (size_t type = 0; type < NotificationType_MAX; ++type) | 
|---|
| 47 | OurChannel->addChannel(type); | 
|---|
| 48 | } | 
|---|
| 49 |  | 
|---|
| 50 | /** Destructor for class WorkerPool. | 
|---|
| 51 | * | 
|---|
| 52 | */ | 
|---|
| 53 | WorkerPool::~WorkerPool() | 
|---|
| 54 | {} | 
|---|
| 55 |  | 
|---|
| 56 | /** Helper function to check whether an address is already in the pool. | 
|---|
| 57 | * | 
|---|
| 58 | * @param address worker address to check | 
|---|
| 59 | * @return true - address is present, false - else | 
|---|
| 60 | */ | 
|---|
| 61 | bool WorkerPool::presentInPool(const WorkerAddress &address) const | 
|---|
| 62 | { | 
|---|
| 63 | return pool.find(address) != pool.end(); | 
|---|
| 64 | } | 
|---|
| 65 |  | 
|---|
| 66 | /** Get address of next idle worker. | 
|---|
| 67 | * | 
|---|
| 68 | * Note that worker is automatically marked as busy, \sa WorkerPool::markWorkerBusy() | 
|---|
| 69 | * | 
|---|
| 70 | * @return address of idle worker | 
|---|
| 71 | */ | 
|---|
| 72 | WorkerAddress WorkerPool::getNextIdleWorker() | 
|---|
| 73 | { | 
|---|
| 74 | // get first idle worker | 
|---|
| 75 | ASSERT( presentIdleWorkers(), | 
|---|
| 76 | "WorkerPool::getNextIdleWorker() - there is no idle worker."); | 
|---|
| 77 | if (!presentIdleWorkers()) | 
|---|
| 78 | return emptyAddress; | 
|---|
| 79 | Idle_Queue_t::iterator iter = idle_queue.begin(); | 
|---|
| 80 | const WorkerAddress returnaddress = iter->second; | 
|---|
| 81 |  | 
|---|
| 82 | // enter in busy queue | 
|---|
| 83 | markWorkerBusy( iter ); | 
|---|
| 84 |  | 
|---|
| 85 | // return address | 
|---|
| 86 | return returnaddress; | 
|---|
| 87 | } | 
|---|
| 88 |  | 
|---|
| 89 | WorkerPool::Idle_Queue_t::iterator WorkerPool::getIdleWorker(const WorkerAddress &address) | 
|---|
| 90 | { | 
|---|
| 91 | Idle_Queue_t::iterator idleiter = idle_queue.begin(); | 
|---|
| 92 | while (idleiter != idle_queue.end()) { | 
|---|
| 93 | if (idleiter->second == address) { | 
|---|
| 94 | break; | 
|---|
| 95 | } | 
|---|
| 96 | ++idleiter; | 
|---|
| 97 | } | 
|---|
| 98 | return idleiter; | 
|---|
| 99 | } | 
|---|
| 100 |  | 
|---|
| 101 | /** Checks whether a worker is busy or not. | 
|---|
| 102 | * | 
|---|
| 103 | * @param address address of worker to check | 
|---|
| 104 | */ | 
|---|
| 105 | bool WorkerPool::isWorkerBusy(const WorkerAddress &address) const | 
|---|
| 106 | { | 
|---|
| 107 | Busy_Queue_t::const_iterator iter = busy_queue.find(address); | 
|---|
| 108 | if (iter != busy_queue.end()) | 
|---|
| 109 | return true; | 
|---|
| 110 | #ifndef NDEBUG | 
|---|
| 111 | else { | 
|---|
| 112 | Idle_Queue_t::const_iterator iter = idle_queue.begin(); | 
|---|
| 113 | for(;iter != idle_queue.end(); ++iter) | 
|---|
| 114 | if (iter->second == address) | 
|---|
| 115 | break; | 
|---|
| 116 | ASSERT( iter != idle_queue.end(), | 
|---|
| 117 | "WorkerPool::isWorkerBusy() - worker "+toString(address) | 
|---|
| 118 | +" is neither busy nor idle."); | 
|---|
| 119 |  | 
|---|
| 120 | } | 
|---|
| 121 | #endif | 
|---|
| 122 | return false; | 
|---|
| 123 | } | 
|---|
| 124 |  | 
|---|
| 125 | /** Adds another worker to the pool by noting down its address. | 
|---|
| 126 | * | 
|---|
| 127 | * @param address host and service address of the listening worker | 
|---|
| 128 | * @return true - added successfully, false - not added | 
|---|
| 129 | */ | 
|---|
| 130 | bool WorkerPool::addWorker(const WorkerAddress& address) | 
|---|
| 131 | { | 
|---|
| 132 | OBSERVE; | 
|---|
| 133 | NOTIFY(WorkerAdded); | 
|---|
| 134 | std::pair<Pool_t::iterator, bool> inserter = | 
|---|
| 135 | pool.insert( address ); | 
|---|
| 136 | if (inserter.second) { // if new also add to queue | 
|---|
| 137 | LOG(1, "INFO: Successfully added "+toString(address)+" to pool."); | 
|---|
| 138 | idle_queue.insert( make_pair( default_priority, address ) ); | 
|---|
| 139 | NOTIFY(WorkerIdle); | 
|---|
| 140 | return true; | 
|---|
| 141 | } else { | 
|---|
| 142 | LOG(1, "INFO: "+toString(address)+" is already present pool."); | 
|---|
| 143 | return false; | 
|---|
| 144 | } | 
|---|
| 145 | } | 
|---|
| 146 |  | 
|---|
| 147 | /** Removes a worker from the pool. | 
|---|
| 148 | * | 
|---|
| 149 | * @param address host and service address of the listening worker | 
|---|
| 150 | * @return true - removed successfully, false - not removed | 
|---|
| 151 | */ | 
|---|
| 152 | bool WorkerPool::removeWorker(const WorkerAddress& address) | 
|---|
| 153 | { | 
|---|
| 154 | Pool_t::iterator iter = pool.find( address ); | 
|---|
| 155 | if (iter != pool.end()) { | 
|---|
| 156 | OBSERVE; | 
|---|
| 157 | NOTIFY(WorkerRemoved); | 
|---|
| 158 | Idle_Queue_t::iterator idleiter = getIdleWorker(address); | 
|---|
| 159 | if (idleiter != idle_queue.end()) | 
|---|
| 160 | idle_queue.erase(idleiter); | 
|---|
| 161 | Busy_Queue_t::iterator busyiter = busy_queue.find(address); | 
|---|
| 162 | if (busyiter != busy_queue.end()) | 
|---|
| 163 | busy_queue.erase(busyiter); | 
|---|
| 164 | ASSERT( idleiter != idle_queue.end() || busyiter != busy_queue.end(), | 
|---|
| 165 | "WorkerPool::removeWorker() - Worker "+toString(address) | 
|---|
| 166 | +" is in pool but neither idle nor busy!"); | 
|---|
| 167 | ASSERT( !(idleiter != idle_queue.end() && busyiter != busy_queue.end()), | 
|---|
| 168 | "WorkerPool::removeWorker() - Worker "+toString(address) | 
|---|
| 169 | +" is in pool and both idle and busy!"); | 
|---|
| 170 | pool.erase(iter); | 
|---|
| 171 | LOG(1, "INFO: Removed worker " << address << " from pool."); | 
|---|
| 172 | return true; | 
|---|
| 173 | } else { | 
|---|
| 174 | ELOG(1, "Worker "+toString(address)+" is not present pool."); | 
|---|
| 175 | return false; | 
|---|
| 176 | } | 
|---|
| 177 | } | 
|---|
| 178 |  | 
|---|
| 179 | /** Sends shutdown to all current workers in the pool. | 
|---|
| 180 | * | 
|---|
| 181 | */ | 
|---|
| 182 | void WorkerPool::removeAllWorkers() | 
|---|
| 183 | { | 
|---|
| 184 | OBSERVE; | 
|---|
| 185 | NOTIFY(WorkerRemoved); | 
|---|
| 186 | // empty pool and queue | 
|---|
| 187 | idle_queue.clear(); | 
|---|
| 188 | busy_queue.clear(); | 
|---|
| 189 | pool.clear(); | 
|---|
| 190 | } | 
|---|
| 191 |  | 
|---|
| 192 | /** Helper function to mark a worker as busy. | 
|---|
| 193 | * | 
|---|
| 194 | * Removes from idle_queue and places into busy_queue. | 
|---|
| 195 | * Sets \a iter to Idle_Queue_t::end(). | 
|---|
| 196 | * | 
|---|
| 197 | * @param iter iterator on idle worker | 
|---|
| 198 | */ | 
|---|
| 199 | void WorkerPool::markWorkerBusy(Idle_Queue_t::iterator &iter) | 
|---|
| 200 | { | 
|---|
| 201 | const WorkerAddress returnaddress = iter->second; | 
|---|
| 202 | if (isWorkerBusy(returnaddress)) | 
|---|
| 203 | return; | 
|---|
| 204 | const priority_t priority = iter->first; | 
|---|
| 205 |  | 
|---|
| 206 | // remove from idle queue | 
|---|
| 207 | idle_queue.erase(iter); | 
|---|
| 208 |  | 
|---|
| 209 | // insert into busy queue | 
|---|
| 210 | #ifndef NDEBUG | 
|---|
| 211 | std::pair< Busy_Queue_t::iterator, bool > inserter = | 
|---|
| 212 | #endif | 
|---|
| 213 | busy_queue.insert( make_pair(returnaddress, priority) ); | 
|---|
| 214 | ASSERT( inserter.second, | 
|---|
| 215 | "WorkerPool::sendJobToWorker() - Worker "+toString(inserter.first->first)+" is already busy."); | 
|---|
| 216 |  | 
|---|
| 217 | LOG(1, "INFO: Worker " << returnaddress << " is now marked busy."); | 
|---|
| 218 | } | 
|---|
| 219 |  | 
|---|
| 220 | /** Helper function to unmark a worker as busy. | 
|---|
| 221 | * | 
|---|
| 222 | * Removes worker from busy_queue and returns it to idle_queue. | 
|---|
| 223 | * | 
|---|
| 224 | * @param address address of worker | 
|---|
| 225 | */ | 
|---|
| 226 | void WorkerPool::unmarkWorkerBusy(const WorkerAddress &address) | 
|---|
| 227 | { | 
|---|
| 228 | if (isWorkerBusy(address)) { | 
|---|
| 229 | OBSERVE; | 
|---|
| 230 | NOTIFY(WorkerIdle); | 
|---|
| 231 | Busy_Queue_t::const_iterator iter = busy_queue.find(address); | 
|---|
| 232 | const priority_t priority = iter->second; | 
|---|
| 233 | busy_queue.erase(address); | 
|---|
| 234 | idle_queue.insert( make_pair( priority, address) ); | 
|---|
| 235 |  | 
|---|
| 236 | LOG(1, "INFO: Worker " << address << " is now marked idle."); | 
|---|
| 237 | } | 
|---|
| 238 | } | 
|---|