source: src/Fragmentation/Automation/FragmentQueue.cpp@ d4885d

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 Candidate_v1.7.0 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since d4885d was 2344a3, checked in by Frederik Heber <heber@…>, 13 years ago

FragmentScheduler is now an Observer of JobsQueue and pool.

  • Property mode set to 100644
File size: 8.7 KB
RevLine 
[b5ebb5]1/*
2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * FragmentQueue.cpp
10 *
11 * Created on: Oct 19, 2011
12 * Author: heber
13 */
14
15// include config.h
16#ifdef HAVE_CONFIG_H
17#include <config.h>
18#endif
19
20#include "CodePatterns/MemDebug.hpp"
21
22#include "FragmentQueue.hpp"
23
24#include "CodePatterns/Assert.hpp"
[d6b12c]25#include "CodePatterns/Observer/Channels.hpp"
[fe95b7]26#include "CodePatterns/Log.hpp"
[b5ebb5]27
[35f587]28FragmentResult::ptr FragmentQueue::NoResult( new FragmentResult(-1) );
29FragmentResult::ptr FragmentQueue::NoResultQueued( new FragmentResult(-2) );
30FragmentResult::ptr FragmentQueue::ResultDelivered( new FragmentResult(-3) );
[2344a3]31size_t FragmentQueue::Max_Attempts = (size_t)3;
[b5ebb5]32
33/** Constructor for class FragmentQueue.
34 *
35 */
[d6b12c]36FragmentQueue::FragmentQueue() :
37 Observable("FragmentQueue")
38{
39 // observable stuff
40 Channels *OurChannel = new Channels;
41 NotificationChannels.insert( std::make_pair(this, OurChannel) );
42 // add instance for each notification type
43 for (size_t type = 0; type < NotificationType_MAX; ++type)
44 OurChannel->addChannel(type);
45}
[b5ebb5]46
47/** Destructor for class FragmentQueue.
48 *
49 */
50FragmentQueue::~FragmentQueue()
[78ad7d]51{
52 jobs.clear();
53 results.clear();
54}
[b5ebb5]55
[12d15a]56/** Checks whether there are jobs in the queue at all.
57 * \return true - jobs present, false - queue is empty
58 */
59bool FragmentQueue::isJobPresent() const
60{
61 return !jobs.empty();
62}
63
[b5ebb5]64/** Pushes a FragmentJob into the internal queue for delivery to server.
65 *
66 * \note we throw assertion when jobid has already been used.
67 *
68 * \param job job to enter into queue
69 */
[78ad7d]70void FragmentQueue::pushJob(FragmentJob::ptr job)
[b5ebb5]71{
[fb255d]72 if ((job->getId() != JobId::IllegalJob) && (!results.count(job->getId()))) {
73 OBSERVE;
74 NOTIFY(JobAdded);
75 results.insert( std::make_pair(job->getId(), NoResult));
76 jobs.push_back(job);
77 } else {
78 ELOG(1, "job to push has IllegalJob id or id "+toString(job->getId())+" has already been used.");
79 ASSERT(false,
80 "job to push has IllegalJob id or id "+toString(job->getId())+" has already been used.");
81 }
[12d15a]82}
83
[9875cc]84/** Pushes a bunch of FragmentJob's into the internal queue for delivery to server.
85 *
86 * \note we throw assertion when jobid has already been used.
87 *
88 * \sa pushJob()
89 *
90 * \param _jobs jobs to enter into queue
91 */
[78ad7d]92void FragmentQueue::pushJobs(std::vector<FragmentJob::ptr> &_jobs)
[9875cc]93{
[78ad7d]94 for (std::vector<FragmentJob::ptr>::iterator iter = _jobs.begin();
[9875cc]95 iter != _jobs.end(); ++iter)
96 pushJob(*iter);
97}
98
[12d15a]99/** Pops top-most FragmentJob from internal queue.
100 *
101 * From here on, we expect a result in FragmentQueue::results.
102 *
103 * \return job topmost job from queue
104 */
[78ad7d]105FragmentJob::ptr FragmentQueue::popJob()
[12d15a]106{
[d6b12c]107 OBSERVE;
108 NOTIFY(JobRemoved);
[12d15a]109 ASSERT(jobs.size(),
110 "FragmentQueue::popJob() - there are no jobs on the queue.");
[78ad7d]111 FragmentJob::ptr job = jobs.front();
[fe95b7]112#ifndef NDEBUG
113 std::pair< BackupMap::iterator, bool> inserter =
114#endif
115 backup.insert( std::make_pair( job->getId(), job ));
116 ASSERT (inserter.second,
117 "FragmentQueue::popJob() - job "+toString(job->getId())+
118 " is already in the backup.");
[78ad7d]119 ResultMap::iterator iter = results.find(job->getId());
[12d15a]120 ASSERT(iter != results.end(),
[78ad7d]121 "FragmentQueue::popJob() - for job "+toString(job->getId())+" no result place has been stored.");
[12d15a]122 iter->second = NoResultQueued;
123 jobs.pop_front();
124 return job;
[b5ebb5]125}
126
[b9c486]127/** Internal function to check whether result is not one of static entities.
128 *
129 * @param result result to check against
130 * @return true - result is a present, valid result, false - result is one of the statics
131 */
[35f587]132bool FragmentQueue::isPresentResult(const FragmentResult::ptr result) const
[b9c486]133{
[35f587]134 return (*result != *NoResult)
135 && (*result != *NoResultQueued)
136 && (*result != *ResultDelivered);
[b9c486]137}
138
[b5ebb5]139/** Queries whether a job has already been finished and the result is present.
140 *
141 * \param jobid id of job to query
142 * \return true - result is present, false - result is not present
143 */
144bool FragmentQueue::isResultPresent(JobId_t jobid) const
145{
146 ResultMap::const_iterator iter = results.find(jobid);
147 return ((iter != results.end())
[b9c486]148 && isPresentResult(iter->second));
[b5ebb5]149}
150
[8ee5ac]151/** Counts the number of jobs for which we have a calculated result present.
152 *
153 * \return number of calculated results
154 */
155size_t FragmentQueue::getDoneJobs() const
156{
157 size_t doneJobs = 0;
158 for (ResultMap::const_iterator iter = results.begin();
159 iter != results.end(); ++iter)
[35f587]160 if (isPresentResult(iter->second))
[8ee5ac]161 ++doneJobs;
[bf56f6]162 return doneJobs;
163}
164
165/** Counts the number of jobs for which still have to be calculated.
166 *
167 * \return number of jobs to be calculated
168 */
169size_t FragmentQueue::getPresentJobs() const
170{
171 const size_t presentJobs = jobs.size();
172 return presentJobs;
[8ee5ac]173}
174
[b5ebb5]175/** Delivers result for a finished job.
176 *
177 * \note we throw assertion if not present
178 *
179 * \param jobid id of job
180 * \return result for job of given \a jobid
181 */
[35f587]182FragmentResult::ptr FragmentQueue::getResult(JobId_t jobid)
[b5ebb5]183{
184 ResultMap::iterator iter = results.find(jobid);
185 ASSERT(iter != results.end(),
186 "FragmentQueue::pushResult() - job "+toString(jobid)+" is not known to us.");
[35f587]187 ASSERT(*iter->second != *NoResult,
[12d15a]188 "FragmentQueue::pushResult() - job "+toString(jobid)+" has not been request for calculation yet.");
[35f587]189 ASSERT(*iter->second != *NoResultQueued,
[12d15a]190 "FragmentQueue::pushResult() - job "+toString(jobid)+"'s calculation is underway but not result has arrived yet.");
[35f587]191 ASSERT(*iter->second != *ResultDelivered,
[b5ebb5]192 "FragmentQueue::pushResult() - job "+toString(jobid)+"'s result has already been delivered.");
193 /// store result
[35f587]194 FragmentResult::ptr _result = iter->second;
[b5ebb5]195 /// mark as delivered in map
196 iter->second = ResultDelivered;
197 /// and return result
[b9c486]198 return _result;
199}
200
[35f587]201std::vector<FragmentResult::ptr> FragmentQueue::getAllResults()
[b9c486]202{
[35f587]203 std::vector<FragmentResult::ptr> returnresults;
[b9c486]204 for (ResultMap::iterator iter = results.begin();
205 iter != results.end(); ++iter) {
206 if (isPresentResult(iter->second)) {
207 returnresults.push_back(getResult(iter->first));
208 iter = results.begin();
209 }
210 }
211
212 return returnresults;
[b5ebb5]213}
214
215/** Pushes a result for a finished job.
216 *
217 * \note we throw assertion if job already has result or is not known.
218 *
219 * \param result result of job to store
220 */
[35f587]221void FragmentQueue::pushResult(FragmentResult::ptr &_result)
[b5ebb5]222{
[fe95b7]223 const JobId_t id = _result->getId();
[b5ebb5]224 /// check for presence
[fe95b7]225 ResultMap::iterator iter = results.find(id);
[b5ebb5]226 ASSERT(iter != results.end(),
[fe95b7]227 "FragmentQueue::pushResult() - job "+toString(id)+" is not known to us.");
[35f587]228 ASSERT(*iter->second == *NoResultQueued,
[fe95b7]229 "FragmentQueue::pushResult() - is not waiting for the result of job "+toString(id)+".");
230 // check whether this is a resubmitted job
231 AttemptsMap::iterator attemptiter = attempts.find(id);
232 // check whether succeeded or (finally) failed
[9a3f84]233 if ((_result->exitflag == 0)
234 || ((attemptiter != attempts.end()) && (attemptiter->second >= Max_Attempts))
235 || (Max_Attempts == 1)) {
[fe95b7]236 // give notice if it is resubmitted job
237 if (attemptiter != attempts.end()) {
238 if (attemptiter->second >= Max_Attempts)
239 ELOG(1, "Job #" << id << " failed on " << Max_Attempts << "th attempt for the last time.");
240 else
241 LOG(1, "INFO: Job #" << id << " succeeded on " << attemptiter->second << "th attempt.");
242 }
243 // remove in attempts
244 if (attemptiter != attempts.end())
245 attempts.erase(attemptiter);
246 // remove in backup map
247 BackupMap::iterator backupiter = backup.find(id);
248 ASSERT( backupiter != backup.end(),
249 "FragmentQueue::pushResult() - cannot find job "+toString(id)
250 +" in backup.");
251 backup.erase(backupiter);
252 /// and overwrite NoResult in found entry
253 iter->second = _result;
254 } else {
255 LOG(1, "Job " << id << " failed, resubmitting.");
256 // increase attempts
257 if (attemptiter != attempts.end())
258 ++(attemptiter->second);
259 else
260 attempts.insert( std::make_pair(id, (size_t)1) );
261 // resubmit job
262 resubmitJob(id);
263 }
264}
265
266/** Resubmit a job which a worker failed to calculate.
267 *
268 * @param jobid id of the failed job
269 */
270void FragmentQueue::resubmitJob(const JobId_t jobid)
271{
272 BackupMap::iterator iter = backup.find(jobid);
273 ASSERT( iter != backup.end(),
274 "FragmentQueue::resubmitJob() - job id "+toString(jobid)
275 +" not stored in backup.");
276 if (iter != backup.end()) {
[9a3f84]277 // remove present result
[fe95b7]278 ResultMap::iterator resiter = results.find(jobid);
279 ASSERT( resiter != results.end(),
[9a3f84]280 "FragmentQueue::resubmitJob() - job "+toString(jobid)
281 +" to resubmit has no result present.");
[fe95b7]282 results.erase(resiter);
283 pushJob(iter->second);
284 backup.erase(iter);
285 }
[b5ebb5]286}
[b9c486]287
Note: See TracBrowser for help on using the repository browser.