| [b5ebb5] | 1 | /*
 | 
|---|
 | 2 |  * Project: MoleCuilder
 | 
|---|
 | 3 |  * Description: creates and alters molecular systems
 | 
|---|
 | 4 |  * Copyright (C)  2011 University of Bonn. All rights reserved.
 | 
|---|
 | 5 |  * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
 | 
|---|
 | 6 |  */
 | 
|---|
 | 7 | 
 | 
|---|
 | 8 | /*
 | 
|---|
 | 9 |  * FragmentQueue.cpp
 | 
|---|
 | 10 |  *
 | 
|---|
 | 11 |  *  Created on: Oct 19, 2011
 | 
|---|
 | 12 |  *      Author: heber
 | 
|---|
 | 13 |  */
 | 
|---|
 | 14 | 
 | 
|---|
 | 15 | // include config.h
 | 
|---|
 | 16 | #ifdef HAVE_CONFIG_H
 | 
|---|
 | 17 | #include <config.h>
 | 
|---|
 | 18 | #endif
 | 
|---|
 | 19 | 
 | 
|---|
 | 20 | #include "CodePatterns/MemDebug.hpp"
 | 
|---|
 | 21 | 
 | 
|---|
 | 22 | #include "FragmentQueue.hpp"
 | 
|---|
 | 23 | 
 | 
|---|
 | 24 | #include "CodePatterns/Assert.hpp"
 | 
|---|
| [fe95b7] | 25 | #include "CodePatterns/Log.hpp"
 | 
|---|
| [b5ebb5] | 26 | 
 | 
|---|
| [35f587] | 27 | FragmentResult::ptr FragmentQueue::NoResult( new FragmentResult(-1) );
 | 
|---|
 | 28 | FragmentResult::ptr FragmentQueue::NoResultQueued( new FragmentResult(-2) );
 | 
|---|
 | 29 | FragmentResult::ptr FragmentQueue::ResultDelivered( new FragmentResult(-3) );
 | 
|---|
| [fe95b7] | 30 | size_t FragmentQueue::Max_Attempts = (size_t)1;
 | 
|---|
| [b5ebb5] | 31 | 
 | 
|---|
 | 32 | /** Constructor for class FragmentQueue.
 | 
|---|
 | 33 |  *
 | 
|---|
 | 34 |  */
 | 
|---|
 | 35 | FragmentQueue::FragmentQueue()
 | 
|---|
 | 36 | {}
 | 
|---|
 | 37 | 
 | 
|---|
 | 38 | /** Destructor for class FragmentQueue.
 | 
|---|
 | 39 |  *
 | 
|---|
 | 40 |  */
 | 
|---|
 | 41 | FragmentQueue::~FragmentQueue()
 | 
|---|
| [78ad7d] | 42 | {
 | 
|---|
 | 43 |   jobs.clear();
 | 
|---|
 | 44 |   results.clear();
 | 
|---|
 | 45 | }
 | 
|---|
| [b5ebb5] | 46 | 
 | 
|---|
| [12d15a] | 47 | /** Checks whether there are jobs in the queue at all.
 | 
|---|
 | 48 |  * \return true - jobs present, false - queue is empty
 | 
|---|
 | 49 |  */
 | 
|---|
 | 50 | bool FragmentQueue::isJobPresent() const
 | 
|---|
 | 51 | {
 | 
|---|
 | 52 |   return !jobs.empty();
 | 
|---|
 | 53 | }
 | 
|---|
 | 54 | 
 | 
|---|
| [b5ebb5] | 55 | /** Pushes a FragmentJob into the internal queue for delivery to server.
 | 
|---|
 | 56 |  *
 | 
|---|
 | 57 |  * \note we throw assertion when jobid has already been used.
 | 
|---|
 | 58 |  *
 | 
|---|
 | 59 |  * \param job job to enter into queue
 | 
|---|
 | 60 |  */
 | 
|---|
| [78ad7d] | 61 | void FragmentQueue::pushJob(FragmentJob::ptr job)
 | 
|---|
| [b5ebb5] | 62 | {
 | 
|---|
| [78ad7d] | 63 |   ASSERT(job->getId() != JobId::IllegalJob,
 | 
|---|
| [9875cc] | 64 |       "FragmentQueue::pushJob() - job to push has IllegalJob id.");
 | 
|---|
| [78ad7d] | 65 |   ASSERT(!results.count(job->getId()),
 | 
|---|
 | 66 |       "FragmentQueue::pushJob() - job id "+toString(job->getId())+" has already been used.");
 | 
|---|
 | 67 |   results.insert( std::make_pair(job->getId(), NoResult));
 | 
|---|
| [12d15a] | 68 |   jobs.push_back(job);
 | 
|---|
 | 69 | }
 | 
|---|
 | 70 | 
 | 
|---|
| [9875cc] | 71 | /** Pushes a bunch of FragmentJob's into the internal queue for delivery to server.
 | 
|---|
 | 72 |  *
 | 
|---|
 | 73 |  * \note we throw assertion when jobid has already been used.
 | 
|---|
 | 74 |  *
 | 
|---|
 | 75 |  * \sa pushJob()
 | 
|---|
 | 76 |  *
 | 
|---|
 | 77 |  * \param _jobs jobs to enter into queue
 | 
|---|
 | 78 |  */
 | 
|---|
| [78ad7d] | 79 | void FragmentQueue::pushJobs(std::vector<FragmentJob::ptr> &_jobs)
 | 
|---|
| [9875cc] | 80 | {
 | 
|---|
| [78ad7d] | 81 |   for (std::vector<FragmentJob::ptr>::iterator iter = _jobs.begin();
 | 
|---|
| [9875cc] | 82 |       iter != _jobs.end(); ++iter)
 | 
|---|
 | 83 |     pushJob(*iter);
 | 
|---|
 | 84 | }
 | 
|---|
 | 85 | 
 | 
|---|
| [12d15a] | 86 | /** Pops top-most FragmentJob from internal queue.
 | 
|---|
 | 87 |  *
 | 
|---|
 | 88 |  * From here on, we expect a result in FragmentQueue::results.
 | 
|---|
 | 89 |  *
 | 
|---|
 | 90 |  * \return job topmost job from queue
 | 
|---|
 | 91 |  */
 | 
|---|
| [78ad7d] | 92 | FragmentJob::ptr FragmentQueue::popJob()
 | 
|---|
| [12d15a] | 93 | {
 | 
|---|
 | 94 |   ASSERT(jobs.size(),
 | 
|---|
 | 95 |       "FragmentQueue::popJob() - there are no jobs on the queue.");
 | 
|---|
| [78ad7d] | 96 |   FragmentJob::ptr job = jobs.front();
 | 
|---|
| [fe95b7] | 97 | #ifndef NDEBUG
 | 
|---|
 | 98 |   std::pair< BackupMap::iterator, bool> inserter =
 | 
|---|
 | 99 | #endif
 | 
|---|
 | 100 |   backup.insert( std::make_pair( job->getId(), job ));
 | 
|---|
 | 101 |   ASSERT (inserter.second,
 | 
|---|
 | 102 |       "FragmentQueue::popJob() - job "+toString(job->getId())+
 | 
|---|
 | 103 |       " is already in the backup.");
 | 
|---|
| [78ad7d] | 104 |   ResultMap::iterator iter = results.find(job->getId());
 | 
|---|
| [12d15a] | 105 |   ASSERT(iter != results.end(),
 | 
|---|
| [78ad7d] | 106 |       "FragmentQueue::popJob() - for job "+toString(job->getId())+" no result place has been stored.");
 | 
|---|
| [12d15a] | 107 |   iter->second = NoResultQueued;
 | 
|---|
 | 108 |   jobs.pop_front();
 | 
|---|
 | 109 |   return job;
 | 
|---|
| [b5ebb5] | 110 | }
 | 
|---|
 | 111 | 
 | 
|---|
| [b9c486] | 112 | /** Internal function to check whether result is not one of static entities.
 | 
|---|
 | 113 |  *
 | 
|---|
 | 114 |  * @param result result to check against
 | 
|---|
 | 115 |  * @return true - result is a present, valid result, false - result is one of the statics
 | 
|---|
 | 116 |  */
 | 
|---|
| [35f587] | 117 | bool FragmentQueue::isPresentResult(const FragmentResult::ptr result) const
 | 
|---|
| [b9c486] | 118 | {
 | 
|---|
| [35f587] | 119 |   return (*result != *NoResult)
 | 
|---|
 | 120 |       && (*result != *NoResultQueued)
 | 
|---|
 | 121 |       && (*result != *ResultDelivered);
 | 
|---|
| [b9c486] | 122 | }
 | 
|---|
 | 123 | 
 | 
|---|
| [b5ebb5] | 124 | /** Queries whether a job has already been finished and the result is present.
 | 
|---|
 | 125 |  *
 | 
|---|
 | 126 |  * \param jobid id of job to query
 | 
|---|
 | 127 |  * \return true - result is present, false - result is not present
 | 
|---|
 | 128 |  */
 | 
|---|
 | 129 | bool FragmentQueue::isResultPresent(JobId_t jobid) const
 | 
|---|
 | 130 | {
 | 
|---|
 | 131 |   ResultMap::const_iterator iter = results.find(jobid);
 | 
|---|
 | 132 |   return ((iter != results.end())
 | 
|---|
| [b9c486] | 133 |       && isPresentResult(iter->second));
 | 
|---|
| [b5ebb5] | 134 | }
 | 
|---|
 | 135 | 
 | 
|---|
| [8ee5ac] | 136 | /** Counts the number of jobs for which we have a calculated result present.
 | 
|---|
 | 137 |  *
 | 
|---|
 | 138 |  * \return number of calculated results
 | 
|---|
 | 139 |  */
 | 
|---|
 | 140 | size_t FragmentQueue::getDoneJobs() const
 | 
|---|
 | 141 | {
 | 
|---|
 | 142 |   size_t doneJobs = 0;
 | 
|---|
 | 143 |   for (ResultMap::const_iterator iter = results.begin();
 | 
|---|
 | 144 |       iter != results.end(); ++iter)
 | 
|---|
| [35f587] | 145 |     if (isPresentResult(iter->second))
 | 
|---|
| [8ee5ac] | 146 |       ++doneJobs;
 | 
|---|
| [bf56f6] | 147 |   return doneJobs;
 | 
|---|
 | 148 | }
 | 
|---|
 | 149 | 
 | 
|---|
 | 150 | /** Counts the number of jobs for which still have to be calculated.
 | 
|---|
 | 151 |  *
 | 
|---|
 | 152 |  * \return number of jobs to be calculated
 | 
|---|
 | 153 |  */
 | 
|---|
 | 154 | size_t FragmentQueue::getPresentJobs() const
 | 
|---|
 | 155 | {
 | 
|---|
 | 156 |   const size_t presentJobs = jobs.size();
 | 
|---|
 | 157 |   return presentJobs;
 | 
|---|
| [8ee5ac] | 158 | }
 | 
|---|
 | 159 | 
 | 
|---|
| [b5ebb5] | 160 | /** Delivers result for a finished job.
 | 
|---|
 | 161 |  *
 | 
|---|
 | 162 |  * \note we throw assertion if not present
 | 
|---|
 | 163 |  *
 | 
|---|
 | 164 |  * \param jobid id of job
 | 
|---|
 | 165 |  * \return result for job of given \a jobid
 | 
|---|
 | 166 |  */
 | 
|---|
| [35f587] | 167 | FragmentResult::ptr FragmentQueue::getResult(JobId_t jobid)
 | 
|---|
| [b5ebb5] | 168 | {
 | 
|---|
 | 169 |   ResultMap::iterator iter = results.find(jobid);
 | 
|---|
 | 170 |   ASSERT(iter != results.end(),
 | 
|---|
 | 171 |       "FragmentQueue::pushResult() - job "+toString(jobid)+" is not known to us.");
 | 
|---|
| [35f587] | 172 |   ASSERT(*iter->second != *NoResult,
 | 
|---|
| [12d15a] | 173 |       "FragmentQueue::pushResult() - job "+toString(jobid)+" has not been request for calculation yet.");
 | 
|---|
| [35f587] | 174 |   ASSERT(*iter->second != *NoResultQueued,
 | 
|---|
| [12d15a] | 175 |       "FragmentQueue::pushResult() - job "+toString(jobid)+"'s calculation is underway but not result has arrived yet.");
 | 
|---|
| [35f587] | 176 |   ASSERT(*iter->second != *ResultDelivered,
 | 
|---|
| [b5ebb5] | 177 |       "FragmentQueue::pushResult() - job "+toString(jobid)+"'s result has already been delivered.");
 | 
|---|
 | 178 |   /// store result
 | 
|---|
| [35f587] | 179 |   FragmentResult::ptr _result = iter->second;
 | 
|---|
| [b5ebb5] | 180 |   /// mark as delivered in map
 | 
|---|
 | 181 |   iter->second = ResultDelivered;
 | 
|---|
 | 182 |   /// and return result
 | 
|---|
| [b9c486] | 183 |   return _result;
 | 
|---|
 | 184 | }
 | 
|---|
 | 185 | 
 | 
|---|
| [35f587] | 186 | std::vector<FragmentResult::ptr> FragmentQueue::getAllResults()
 | 
|---|
| [b9c486] | 187 | {
 | 
|---|
| [35f587] | 188 |   std::vector<FragmentResult::ptr> returnresults;
 | 
|---|
| [b9c486] | 189 |   for (ResultMap::iterator iter = results.begin();
 | 
|---|
 | 190 |       iter != results.end(); ++iter) {
 | 
|---|
 | 191 |     if (isPresentResult(iter->second)) {
 | 
|---|
 | 192 |       returnresults.push_back(getResult(iter->first));
 | 
|---|
 | 193 |       iter = results.begin();
 | 
|---|
 | 194 |     }
 | 
|---|
 | 195 |   }
 | 
|---|
 | 196 | 
 | 
|---|
 | 197 |   return returnresults;
 | 
|---|
| [b5ebb5] | 198 | }
 | 
|---|
 | 199 | 
 | 
|---|
 | 200 | /** Pushes a result for a finished job.
 | 
|---|
 | 201 |  *
 | 
|---|
 | 202 |  * \note we throw assertion if job already has result or is not known.
 | 
|---|
 | 203 |  *
 | 
|---|
 | 204 |  * \param result result of job to store
 | 
|---|
 | 205 |  */
 | 
|---|
| [35f587] | 206 | void FragmentQueue::pushResult(FragmentResult::ptr &_result)
 | 
|---|
| [b5ebb5] | 207 | {
 | 
|---|
| [fe95b7] | 208 |   const JobId_t id = _result->getId();
 | 
|---|
| [b5ebb5] | 209 |   /// check for presence
 | 
|---|
| [fe95b7] | 210 |   ResultMap::iterator iter = results.find(id);
 | 
|---|
| [b5ebb5] | 211 |   ASSERT(iter != results.end(),
 | 
|---|
| [fe95b7] | 212 |       "FragmentQueue::pushResult() - job "+toString(id)+" is not known to us.");
 | 
|---|
| [35f587] | 213 |   ASSERT(*iter->second == *NoResultQueued,
 | 
|---|
| [fe95b7] | 214 |       "FragmentQueue::pushResult() - is not waiting for the result of job "+toString(id)+".");
 | 
|---|
 | 215 |   // check whether this is a resubmitted job
 | 
|---|
 | 216 |   AttemptsMap::iterator attemptiter = attempts.find(id);
 | 
|---|
 | 217 |   // check whether succeeded or (finally) failed
 | 
|---|
 | 218 |   if ((_result->exitflag == 0) || ((attemptiter != attempts.end()) && (attemptiter->second >= Max_Attempts))) {
 | 
|---|
 | 219 |     // give notice if it is resubmitted job
 | 
|---|
 | 220 |     if (attemptiter != attempts.end()) {
 | 
|---|
 | 221 |       if (attemptiter->second >= Max_Attempts)
 | 
|---|
 | 222 |         ELOG(1, "Job #" << id << " failed on " << Max_Attempts << "th attempt for the last time.");
 | 
|---|
 | 223 |       else
 | 
|---|
 | 224 |         LOG(1, "INFO: Job #" << id << " succeeded on " << attemptiter->second << "th attempt.");
 | 
|---|
 | 225 |     }
 | 
|---|
 | 226 |     // remove in attempts
 | 
|---|
 | 227 |     if (attemptiter != attempts.end())
 | 
|---|
 | 228 |       attempts.erase(attemptiter);
 | 
|---|
 | 229 |     // remove in backup map
 | 
|---|
 | 230 |     BackupMap::iterator backupiter = backup.find(id);
 | 
|---|
 | 231 |     ASSERT( backupiter != backup.end(),
 | 
|---|
 | 232 |         "FragmentQueue::pushResult() - cannot find job "+toString(id)
 | 
|---|
 | 233 |         +" in backup.");
 | 
|---|
 | 234 |     backup.erase(backupiter);
 | 
|---|
 | 235 |     /// and overwrite NoResult in found entry
 | 
|---|
 | 236 |     iter->second = _result;
 | 
|---|
 | 237 |   } else {
 | 
|---|
 | 238 |     LOG(1, "Job " << id << " failed, resubmitting.");
 | 
|---|
 | 239 |     // increase attempts
 | 
|---|
 | 240 |     if (attemptiter != attempts.end())
 | 
|---|
 | 241 |       ++(attemptiter->second);
 | 
|---|
 | 242 |     else
 | 
|---|
 | 243 |       attempts.insert( std::make_pair(id, (size_t)1) );
 | 
|---|
 | 244 |     // resubmit job
 | 
|---|
 | 245 |     resubmitJob(id);
 | 
|---|
 | 246 |   }
 | 
|---|
 | 247 | }
 | 
|---|
 | 248 | 
 | 
|---|
 | 249 | /** Resubmit a job which a worker failed to calculate.
 | 
|---|
 | 250 |  *
 | 
|---|
 | 251 |  * @param jobid id of the failed job
 | 
|---|
 | 252 |  */
 | 
|---|
 | 253 | void FragmentQueue::resubmitJob(const JobId_t jobid)
 | 
|---|
 | 254 | {
 | 
|---|
 | 255 |   BackupMap::iterator iter = backup.find(jobid);
 | 
|---|
 | 256 |   ASSERT( iter != backup.end(),
 | 
|---|
 | 257 |       "FragmentQueue::resubmitJob() - job id "+toString(jobid)
 | 
|---|
 | 258 |       +" not stored in backup.");
 | 
|---|
 | 259 |   if (iter != backup.end()) {
 | 
|---|
 | 260 |     // remove result
 | 
|---|
 | 261 |     ResultMap::iterator resiter = results.find(jobid);
 | 
|---|
 | 262 |     ASSERT( resiter != results.end(),
 | 
|---|
 | 263 |         "FragmentQueue::resubmitJob() - resubmitting job "+toString(jobid)
 | 
|---|
 | 264 |         +" for which no result is present.");
 | 
|---|
 | 265 |     results.erase(resiter);
 | 
|---|
 | 266 |     pushJob(iter->second);
 | 
|---|
 | 267 |     backup.erase(iter);
 | 
|---|
 | 268 |   }
 | 
|---|
| [b5ebb5] | 269 | }
 | 
|---|
| [b9c486] | 270 | 
 | 
|---|