source: src/JobMarket/FragmentQueue.cpp@ 404d2b

Action_Thermostats Add_AtomRandomPerturbation Add_RotateAroundBondAction Add_SelectAtomByNameAction Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_StructOpt_integration_tests Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.6.0 Candidate_v1.6.1 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_ChronosMutex Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion GeometryObjects Gui_displays_atomic_force_velocity IndependentFragmentGrids_IntegrationTest JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks RotateToPrincipalAxisSystem_UndoRedo StoppableMakroAction Subpackage_JobMarket Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg ThirdParty_MPQC_rebuilt_buildsystem TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps Ubuntu_1604_changes stable
Last change on this file since 404d2b was 404d2b, checked in by Frederik Heber <heber@…>, 8 years ago

Squashed 'ThirdParty/JobMarket/' content from commit e194722

git-subtree-dir: ThirdParty/JobMarket
git-subtree-split: e19472277e62c493f6c10f1483fe21e64c1039e9

  • Property mode set to 100644
File size: 11.6 KB
RevLine 
[404d2b]1/*
2 * Project: JobMarket
3 * Description: asynchronous Server/Controller/Client-approach to parallel computing, based on boost::asio
4 * Copyright (C) 2011 Frederik Heber. All rights reserved.
5 *
6 */
7
8/*
9 * FragmentQueue.cpp
10 *
11 * Created on: Oct 19, 2011
12 * Author: heber
13 */
14
15// include config.h
16#ifdef HAVE_CONFIG_H
17#include <config.h>
18#endif
19
20#include "CodePatterns/MemDebug.hpp"
21
22#include "JobMarket/FragmentQueue.hpp"
23
24#include "CodePatterns/Assert.hpp"
25#include "CodePatterns/Observer/Channels.hpp"
26#include "CodePatterns/Log.hpp"
27
28// functions wrapping some static instances
29const FragmentResult::ptr FragmentQueue::getNoResult() const {
30 static const FragmentResult::ptr NoResult( new FragmentResult(-1) );
31 return NoResult;
32}
33
34const FragmentResult::ptr FragmentQueue::getNoResultQueued() const {
35 static const FragmentResult::ptr NoResultQueued( new FragmentResult(-2) );
36 return NoResultQueued;
37}
38
39const FragmentResult::ptr FragmentQueue::getResultDelivered() const {
40 static const FragmentResult::ptr ResultDelivered( new FragmentResult(-3) );
41 return ResultDelivered;
42}
43
44size_t FragmentQueue::Max_Attempts = (size_t)3;
45
46/** Constructor for class FragmentQueue.
47 *
48 */
49FragmentQueue::FragmentQueue() :
50 Observable("FragmentQueue"),
51 doneJobs(0)
52{
53 // observable stuff
54 Channels *OurChannel = new Channels;
55 Observable::insertNotificationChannel( std::make_pair(static_cast<Observable *>(this), OurChannel) );
56 // add instance for each notification type
57 for (size_t type = 0; type < NotificationType_MAX; ++type)
58 OurChannel->addChannel(type);
59}
60
61/** Destructor for class FragmentQueue.
62 *
63 */
64FragmentQueue::~FragmentQueue()
65{
66 while (!jobs.empty())
67 jobs.pop();
68 backup.clear();
69 results.clear();
70 attempts.clear();
71}
72
73/** Checks whether there are jobs in the queue at all.
74 * \return true - jobs present, false - queue is empty
75 */
76bool FragmentQueue::isJobPresent() const
77{
78 return !jobs.empty();
79}
80
81/** Pushes a FragmentJob into the internal queue for delivery to server.
82 *
83 * \note we throw assertion when jobid has already been used.
84 *
85 * \param job job to enter into queue
86 */
87void FragmentQueue::pushJob(FragmentJob::ptr job)
88{
89 if ((job->getId() != JobId::IllegalJob) && (!results.count(job->getId()))) {
90 OBSERVE;
91 NOTIFY(JobAdded);
92 results.insert( std::make_pair(job->getId(), getNoResult()));
93 jobs.push(job);
94 } else {
95 ELOG(1, "job to push has IllegalJob id or id "+toString(job->getId())+" has already been used.");
96 ASSERT(false,
97 "job to push has IllegalJob id or id "+toString(job->getId())+" has already been used.");
98 }
99}
100
101/** Pushes a bunch of FragmentJob's into the internal queue for delivery to server.
102 *
103 * \note we throw assertion when jobid has already been used.
104 *
105 * \sa pushJob()
106 *
107 * \param _jobs jobs to enter into queue
108 * \return true - all job ids ok, false - at least one id is taken or illegal
109 */
110bool FragmentQueue::pushJobs(std::vector<FragmentJob::ptr> &_jobs)
111{
112 // check all presented jobs
113 bool status = true;
114 for (std::vector<FragmentJob::ptr>::const_iterator iter = _jobs.begin();
115 iter != _jobs.end(); ++iter) {
116 const FragmentJob::ptr &job = *iter;
117 if ((job->getId() == JobId::IllegalJob) || (results.count(job->getId()))) {
118 ELOG(1, "job to push has IllegalJob id or id "+toString(job->getId())+" has already been used.");
119 ASSERT(false,
120 "job to push has IllegalJob id or id "+toString(job->getId())+" has already been used.");
121 status = false;
122 }
123 }
124 // if ok, add jobs to queue
125 if (status) {
126 for (std::vector<FragmentJob::ptr>::iterator iter = _jobs.begin();
127 iter != _jobs.end(); ++iter) {
128 FragmentJob::ptr &job = *iter;
129 results.insert( std::make_pair(job->getId(), getNoResult()));
130 jobs.push(job);
131 }
132 // then give signal for each job (we don't know how many workers there are)
133 for (std::vector<FragmentJob::ptr>::const_iterator iter = _jobs.begin();
134 iter != _jobs.end(); ++iter) {
135 OBSERVE;
136 NOTIFY(JobAdded);
137 }
138 }
139 return status;
140}
141
142/** Pops top-most FragmentJob from internal queue.
143 *
144 * From here on, we expect a result in FragmentQueue::results.
145 *
146 * \return job topmost job from queue
147 */
148FragmentJob::ptr FragmentQueue::popJob()
149{
150 ASSERT(jobs.size(),
151 "FragmentQueue::popJob() - there are no jobs on the queue.");
152 FragmentJob::ptr job = jobs.top();
153#ifndef NDEBUG
154 std::pair< BackupMap::iterator, bool> inserter =
155#endif
156 backup.insert( std::make_pair( job->getId(), job ));
157 ASSERT (inserter.second,
158 "FragmentQueue::popJob() - job "+toString(job->getId())+
159 " is already in the backup.");
160 ResultMap::iterator iter = results.find(job->getId());
161 ASSERT(iter != results.end(),
162 "FragmentQueue::popJob() - for job "+toString(job->getId())+" no result place has been stored.");
163 iter->second = getNoResultQueued();
164 jobs.pop();
165 return job;
166}
167
168/** Internal function to check whether result is not one of static entities.
169 *
170 * @param result result to check against
171 * @return true - result is a present, valid result, false - result is one of the statics
172 */
173bool FragmentQueue::isPresentResult(const FragmentResult::ptr result) const
174{
175 return (*result != *getNoResult())
176 && (*result != *getNoResultQueued())
177 && (*result != *getResultDelivered());
178}
179
180/** Queries whether a job has already been finished and the result is present.
181 *
182 * \param jobid id of job to query
183 * \return true - result is present, false - result is not present
184 */
185bool FragmentQueue::isResultPresent(JobId_t jobid) const
186{
187 ResultMap::const_iterator iter = results.find(jobid);
188 return ((iter != results.end())
189 && isPresentResult(iter->second));
190}
191
192/** Counts the number of jobs for a given set of ids for which we have a calculated result present.
193 *
194 * \param ids vector of job ids to check
195 * \return number of calculated results
196 */
197size_t FragmentQueue::getDoneJobs(const std::set<JobId_t> &ids) const
198{
199 size_t doneJobs = 0;
200 for (std::set<JobId_t>::const_iterator iter = ids.begin();
201 iter != ids.end(); ++iter) {
202 ResultMap::const_iterator resiter = results.find(*iter);
203 if (resiter != results.end())
204 if (isPresentResult(resiter->second))
205 ++doneJobs;
206 }
207 return doneJobs;
208}
209
210/** Counts the number of jobs for which still have to be calculated.
211 *
212 * \return number of jobs to be calculated
213 */
214size_t FragmentQueue::getPresentJobs() const
215{
216 const size_t presentJobs = jobs.size();
217 return presentJobs;
218}
219
220/** Delivers result for a finished job.
221 *
222 * \note we throw assertion if not present
223 *
224 * \param jobid id of job
225 * \return result for job of given \a jobid
226 */
227FragmentResult::ptr FragmentQueue::getResult(JobId_t jobid)
228{
229 ResultMap::iterator iter = results.find(jobid);
230 ASSERT(iter != results.end(),
231 "FragmentQueue::pushResult() - job "+toString(jobid)+" is not known to us.");
232 ASSERT(*iter->second != *getNoResult(),
233 "FragmentQueue::pushResult() - job "+toString(jobid)+" has not been request for calculation yet.");
234 ASSERT(*iter->second != *getNoResultQueued(),
235 "FragmentQueue::pushResult() - job "+toString(jobid)+"'s calculation is underway but not result has arrived yet.");
236 ASSERT(*iter->second != *getResultDelivered(),
237 "FragmentQueue::pushResult() - job "+toString(jobid)+"'s result has already been delivered.");
238 /// store result
239 FragmentResult::ptr _result = iter->second;
240 /// mark as delivered in map
241 if (iter->second != getResultDelivered()) {
242 iter->second = getResultDelivered();
243 --doneJobs;
244 }
245 /// and return result
246 return _result;
247}
248
249std::vector<FragmentResult::ptr> FragmentQueue::getAllResults()
250{
251 std::vector<FragmentResult::ptr> returnresults;
252 for (ResultMap::iterator iter = results.begin();
253 iter != results.end(); ++iter) {
254 if (isPresentResult(iter->second)) {
255 returnresults.push_back(getResult(iter->first));
256 iter = results.begin();
257 }
258 }
259
260 return returnresults;
261}
262
263std::vector<FragmentResult::ptr> FragmentQueue::getAllResults(const std::set<JobId_t> &ids)
264{
265 std::vector<FragmentResult::ptr> returnresults;
266 for (std::set<JobId_t>::const_iterator iter = ids.begin();
267 iter != ids.end(); ++iter) {
268 ResultMap::const_iterator resiter = results.find(*iter);
269 if (resiter != results.end())
270 if (isPresentResult(resiter->second))
271 returnresults.push_back(getResult(*iter));
272 }
273
274 return returnresults;
275}
276
277
278/** Pushes a result for a finished job.
279 *
280 * \note we throw assertion if job already has result or is not known.
281 *
282 * \param result result of job to store
283 */
284void FragmentQueue::pushResult(FragmentResult::ptr &_result)
285{
286 const JobId_t id = _result->getId();
287 /// check for presence
288 ResultMap::iterator iter = results.find(id);
289 ASSERT(iter != results.end(),
290 "FragmentQueue::pushResult() - job "+toString(id)+" is not known to us.");
291 ASSERT(*iter->second == *getNoResultQueued(),
292 "FragmentQueue::pushResult() - is not waiting for the result of job "+toString(id)+".");
293 // check whether this is a resubmitted job
294 AttemptsMap::iterator attemptiter = attempts.find(id);
295 // check whether succeeded or (finally) failed
296 if ((_result->exitflag == 0)
297 || ((attemptiter != attempts.end()) && (attemptiter->second >= Max_Attempts))
298 || (Max_Attempts == 1)) {
299 // give notice if it is resubmitted job
300 if (attemptiter != attempts.end()) {
301 if (attemptiter->second >= Max_Attempts)
302 ELOG(1, "Job #" << id << " failed on " << Max_Attempts << "th attempt for the last time.");
303 else
304 LOG(1, "INFO: Job #" << id << " succeeded on " << attemptiter->second << "th attempt.");
305 }
306 // remove in attempts
307 if (attemptiter != attempts.end())
308 attempts.erase(attemptiter);
309 // remove in backup map
310 BackupMap::iterator backupiter = backup.find(id);
311 ASSERT( backupiter != backup.end(),
312 "FragmentQueue::pushResult() - cannot find job "+toString(id)
313 +" in backup.");
314 backup.erase(backupiter);
315 /// and overwrite NoResult in found entry
316 iter->second = _result;
317 // increase done jobs
318 ++doneJobs;
319 } else {
320 LOG(1, "Job " << id << " failed, resubmitting.");
321 // increase attempts
322 if (attemptiter != attempts.end())
323 ++(attemptiter->second);
324 else
325 attempts.insert( std::make_pair(id, (size_t)1) );
326 // resubmit job
327 resubmitJob(id);
328 }
329}
330
331/** Resubmit a job which a worker failed to calculate.
332 *
333 * @param jobid id of the failed job
334 */
335void FragmentQueue::resubmitJob(const JobId_t jobid)
336{
337 BackupMap::iterator iter = backup.find(jobid);
338 ASSERT( iter != backup.end(),
339 "FragmentQueue::resubmitJob() - job id "+toString(jobid)
340 +" not stored in backup.");
341 if (iter != backup.end()) {
342 // remove present result
343 ResultMap::iterator resiter = results.find(jobid);
344 ASSERT( resiter != results.end(),
345 "FragmentQueue::resubmitJob() - job "+toString(jobid)
346 +" to resubmit has no result present.");
347 results.erase(resiter);
348 const FragmentJob::ptr job = iter->second;
349 backup.erase(iter);
350 pushJob(job);
351 }
352}
353
354/** Removes all done results currently in the queue.
355 *
356 */
357void FragmentQueue::removeWaitingResults()
358{
359 for(ResultMap::iterator iter = results.begin();
360 iter != results.end();) {
361 if (isPresentResult(iter->second)) {
362 results.erase(iter++);
363 --doneJobs;
364 } else
365 iter++;
366 }
367}
368
369/** Removes all waiting jobs currently in the queue.
370 *
371 */
372void FragmentQueue::removeWaitingJobs()
373{
374 // remove placeholder results
375 for(ResultMap::iterator iter = results.begin();
376 iter != results.end();) {
377 if (*(iter->second) == *getNoResult()) {
378 results.erase(iter++);
379 } else
380 iter++;
381 }
382 // remove all jobs in the queue
383 while (!jobs.empty())
384 jobs.pop();
385}
Note: See TracBrowser for help on using the repository browser.