source: src/Fragmentation/Automation/FragmentQueue.cpp@ 701bbc

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 Candidate_v1.7.0 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since 701bbc was fe95b7, checked in by Frederik Heber <heber@…>, 13 years ago

FragmentQueue can now resubmit jobs.

  • we keep an internal list of jobs currently being worked on. If the worker returns with a failure, they can be resubmit to the queue.
  • there are Max_Attempts (currently set to 1) tries to resubmit.
  • added unit test function on this.
  • added regression test Fragmentation/Automation resubmitjobs.
  • Property mode set to 100644
File size: 8.2 KB
RevLine 
[b5ebb5]1/*
2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * FragmentQueue.cpp
10 *
11 * Created on: Oct 19, 2011
12 * Author: heber
13 */
14
15// include config.h
16#ifdef HAVE_CONFIG_H
17#include <config.h>
18#endif
19
20#include "CodePatterns/MemDebug.hpp"
21
22#include "FragmentQueue.hpp"
23
24#include "CodePatterns/Assert.hpp"
[fe95b7]25#include "CodePatterns/Log.hpp"
[b5ebb5]26
[35f587]27FragmentResult::ptr FragmentQueue::NoResult( new FragmentResult(-1) );
28FragmentResult::ptr FragmentQueue::NoResultQueued( new FragmentResult(-2) );
29FragmentResult::ptr FragmentQueue::ResultDelivered( new FragmentResult(-3) );
[fe95b7]30size_t FragmentQueue::Max_Attempts = (size_t)1;
[b5ebb5]31
32/** Constructor for class FragmentQueue.
33 *
34 */
35FragmentQueue::FragmentQueue()
36{}
37
38/** Destructor for class FragmentQueue.
39 *
40 */
41FragmentQueue::~FragmentQueue()
[78ad7d]42{
43 jobs.clear();
44 results.clear();
45}
[b5ebb5]46
[12d15a]47/** Checks whether there are jobs in the queue at all.
48 * \return true - jobs present, false - queue is empty
49 */
50bool FragmentQueue::isJobPresent() const
51{
52 return !jobs.empty();
53}
54
[b5ebb5]55/** Pushes a FragmentJob into the internal queue for delivery to server.
56 *
57 * \note we throw assertion when jobid has already been used.
58 *
59 * \param job job to enter into queue
60 */
[78ad7d]61void FragmentQueue::pushJob(FragmentJob::ptr job)
[b5ebb5]62{
[78ad7d]63 ASSERT(job->getId() != JobId::IllegalJob,
[9875cc]64 "FragmentQueue::pushJob() - job to push has IllegalJob id.");
[78ad7d]65 ASSERT(!results.count(job->getId()),
66 "FragmentQueue::pushJob() - job id "+toString(job->getId())+" has already been used.");
67 results.insert( std::make_pair(job->getId(), NoResult));
[12d15a]68 jobs.push_back(job);
69}
70
[9875cc]71/** Pushes a bunch of FragmentJob's into the internal queue for delivery to server.
72 *
73 * \note we throw assertion when jobid has already been used.
74 *
75 * \sa pushJob()
76 *
77 * \param _jobs jobs to enter into queue
78 */
[78ad7d]79void FragmentQueue::pushJobs(std::vector<FragmentJob::ptr> &_jobs)
[9875cc]80{
[78ad7d]81 for (std::vector<FragmentJob::ptr>::iterator iter = _jobs.begin();
[9875cc]82 iter != _jobs.end(); ++iter)
83 pushJob(*iter);
84}
85
[12d15a]86/** Pops top-most FragmentJob from internal queue.
87 *
88 * From here on, we expect a result in FragmentQueue::results.
89 *
90 * \return job topmost job from queue
91 */
[78ad7d]92FragmentJob::ptr FragmentQueue::popJob()
[12d15a]93{
94 ASSERT(jobs.size(),
95 "FragmentQueue::popJob() - there are no jobs on the queue.");
[78ad7d]96 FragmentJob::ptr job = jobs.front();
[fe95b7]97#ifndef NDEBUG
98 std::pair< BackupMap::iterator, bool> inserter =
99#endif
100 backup.insert( std::make_pair( job->getId(), job ));
101 ASSERT (inserter.second,
102 "FragmentQueue::popJob() - job "+toString(job->getId())+
103 " is already in the backup.");
[78ad7d]104 ResultMap::iterator iter = results.find(job->getId());
[12d15a]105 ASSERT(iter != results.end(),
[78ad7d]106 "FragmentQueue::popJob() - for job "+toString(job->getId())+" no result place has been stored.");
[12d15a]107 iter->second = NoResultQueued;
108 jobs.pop_front();
109 return job;
[b5ebb5]110}
111
[b9c486]112/** Internal function to check whether result is not one of static entities.
113 *
114 * @param result result to check against
115 * @return true - result is a present, valid result, false - result is one of the statics
116 */
[35f587]117bool FragmentQueue::isPresentResult(const FragmentResult::ptr result) const
[b9c486]118{
[35f587]119 return (*result != *NoResult)
120 && (*result != *NoResultQueued)
121 && (*result != *ResultDelivered);
[b9c486]122}
123
[b5ebb5]124/** Queries whether a job has already been finished and the result is present.
125 *
126 * \param jobid id of job to query
127 * \return true - result is present, false - result is not present
128 */
129bool FragmentQueue::isResultPresent(JobId_t jobid) const
130{
131 ResultMap::const_iterator iter = results.find(jobid);
132 return ((iter != results.end())
[b9c486]133 && isPresentResult(iter->second));
[b5ebb5]134}
135
[8ee5ac]136/** Counts the number of jobs for which we have a calculated result present.
137 *
138 * \return number of calculated results
139 */
140size_t FragmentQueue::getDoneJobs() const
141{
142 size_t doneJobs = 0;
143 for (ResultMap::const_iterator iter = results.begin();
144 iter != results.end(); ++iter)
[35f587]145 if (isPresentResult(iter->second))
[8ee5ac]146 ++doneJobs;
[bf56f6]147 return doneJobs;
148}
149
150/** Counts the number of jobs for which still have to be calculated.
151 *
152 * \return number of jobs to be calculated
153 */
154size_t FragmentQueue::getPresentJobs() const
155{
156 const size_t presentJobs = jobs.size();
157 return presentJobs;
[8ee5ac]158}
159
[b5ebb5]160/** Delivers result for a finished job.
161 *
162 * \note we throw assertion if not present
163 *
164 * \param jobid id of job
165 * \return result for job of given \a jobid
166 */
[35f587]167FragmentResult::ptr FragmentQueue::getResult(JobId_t jobid)
[b5ebb5]168{
169 ResultMap::iterator iter = results.find(jobid);
170 ASSERT(iter != results.end(),
171 "FragmentQueue::pushResult() - job "+toString(jobid)+" is not known to us.");
[35f587]172 ASSERT(*iter->second != *NoResult,
[12d15a]173 "FragmentQueue::pushResult() - job "+toString(jobid)+" has not been request for calculation yet.");
[35f587]174 ASSERT(*iter->second != *NoResultQueued,
[12d15a]175 "FragmentQueue::pushResult() - job "+toString(jobid)+"'s calculation is underway but not result has arrived yet.");
[35f587]176 ASSERT(*iter->second != *ResultDelivered,
[b5ebb5]177 "FragmentQueue::pushResult() - job "+toString(jobid)+"'s result has already been delivered.");
178 /// store result
[35f587]179 FragmentResult::ptr _result = iter->second;
[b5ebb5]180 /// mark as delivered in map
181 iter->second = ResultDelivered;
182 /// and return result
[b9c486]183 return _result;
184}
185
[35f587]186std::vector<FragmentResult::ptr> FragmentQueue::getAllResults()
[b9c486]187{
[35f587]188 std::vector<FragmentResult::ptr> returnresults;
[b9c486]189 for (ResultMap::iterator iter = results.begin();
190 iter != results.end(); ++iter) {
191 if (isPresentResult(iter->second)) {
192 returnresults.push_back(getResult(iter->first));
193 iter = results.begin();
194 }
195 }
196
197 return returnresults;
[b5ebb5]198}
199
200/** Pushes a result for a finished job.
201 *
202 * \note we throw assertion if job already has result or is not known.
203 *
204 * \param result result of job to store
205 */
[35f587]206void FragmentQueue::pushResult(FragmentResult::ptr &_result)
[b5ebb5]207{
[fe95b7]208 const JobId_t id = _result->getId();
[b5ebb5]209 /// check for presence
[fe95b7]210 ResultMap::iterator iter = results.find(id);
[b5ebb5]211 ASSERT(iter != results.end(),
[fe95b7]212 "FragmentQueue::pushResult() - job "+toString(id)+" is not known to us.");
[35f587]213 ASSERT(*iter->second == *NoResultQueued,
[fe95b7]214 "FragmentQueue::pushResult() - is not waiting for the result of job "+toString(id)+".");
215 // check whether this is a resubmitted job
216 AttemptsMap::iterator attemptiter = attempts.find(id);
217 // check whether succeeded or (finally) failed
218 if ((_result->exitflag == 0) || ((attemptiter != attempts.end()) && (attemptiter->second >= Max_Attempts))) {
219 // give notice if it is resubmitted job
220 if (attemptiter != attempts.end()) {
221 if (attemptiter->second >= Max_Attempts)
222 ELOG(1, "Job #" << id << " failed on " << Max_Attempts << "th attempt for the last time.");
223 else
224 LOG(1, "INFO: Job #" << id << " succeeded on " << attemptiter->second << "th attempt.");
225 }
226 // remove in attempts
227 if (attemptiter != attempts.end())
228 attempts.erase(attemptiter);
229 // remove in backup map
230 BackupMap::iterator backupiter = backup.find(id);
231 ASSERT( backupiter != backup.end(),
232 "FragmentQueue::pushResult() - cannot find job "+toString(id)
233 +" in backup.");
234 backup.erase(backupiter);
235 /// and overwrite NoResult in found entry
236 iter->second = _result;
237 } else {
238 LOG(1, "Job " << id << " failed, resubmitting.");
239 // increase attempts
240 if (attemptiter != attempts.end())
241 ++(attemptiter->second);
242 else
243 attempts.insert( std::make_pair(id, (size_t)1) );
244 // resubmit job
245 resubmitJob(id);
246 }
247}
248
249/** Resubmit a job which a worker failed to calculate.
250 *
251 * @param jobid id of the failed job
252 */
253void FragmentQueue::resubmitJob(const JobId_t jobid)
254{
255 BackupMap::iterator iter = backup.find(jobid);
256 ASSERT( iter != backup.end(),
257 "FragmentQueue::resubmitJob() - job id "+toString(jobid)
258 +" not stored in backup.");
259 if (iter != backup.end()) {
260 // remove result
261 ResultMap::iterator resiter = results.find(jobid);
262 ASSERT( resiter != results.end(),
263 "FragmentQueue::resubmitJob() - resubmitting job "+toString(jobid)
264 +" for which no result is present.");
265 results.erase(resiter);
266 pushJob(iter->second);
267 backup.erase(iter);
268 }
[b5ebb5]269}
[b9c486]270
Note: See TracBrowser for help on using the repository browser.