source: src/Fragmentation/Automation/FragmentQueue.cpp@ 270364

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 Candidate_v1.7.0 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since 270364 was d76161, checked in by Frederik Heber <heber@…>, 13 years ago

FIX: Removed channel JobRemoved from FragmentQueue as it may cause cyclic updates in FragmentScheduler.

  • An update is triggered, when a job is popped from the queue. If a new job has just been pushed and a worker is free to pick it up, the callback function is used to do this. If we trigger an update by popping a job, even if it's now a JobRemoved, not JobAdded, all notifications will again call notifyAll() and the same notification will be triggered twice.
  • Property mode set to 100644
File size: 8.6 KB
RevLine 
[b5ebb5]1/*
2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * FragmentQueue.cpp
10 *
11 * Created on: Oct 19, 2011
12 * Author: heber
13 */
14
15// include config.h
16#ifdef HAVE_CONFIG_H
17#include <config.h>
18#endif
19
20#include "CodePatterns/MemDebug.hpp"
21
22#include "FragmentQueue.hpp"
23
24#include "CodePatterns/Assert.hpp"
[d6b12c]25#include "CodePatterns/Observer/Channels.hpp"
[fe95b7]26#include "CodePatterns/Log.hpp"
[b5ebb5]27
[35f587]28FragmentResult::ptr FragmentQueue::NoResult( new FragmentResult(-1) );
29FragmentResult::ptr FragmentQueue::NoResultQueued( new FragmentResult(-2) );
30FragmentResult::ptr FragmentQueue::ResultDelivered( new FragmentResult(-3) );
[2344a3]31size_t FragmentQueue::Max_Attempts = (size_t)3;
[b5ebb5]32
33/** Constructor for class FragmentQueue.
34 *
35 */
[d6b12c]36FragmentQueue::FragmentQueue() :
37 Observable("FragmentQueue")
38{
39 // observable stuff
40 Channels *OurChannel = new Channels;
41 NotificationChannels.insert( std::make_pair(this, OurChannel) );
42 // add instance for each notification type
43 for (size_t type = 0; type < NotificationType_MAX; ++type)
44 OurChannel->addChannel(type);
45}
[b5ebb5]46
47/** Destructor for class FragmentQueue.
48 *
49 */
50FragmentQueue::~FragmentQueue()
[78ad7d]51{
52 jobs.clear();
53 results.clear();
54}
[b5ebb5]55
[12d15a]56/** Checks whether there are jobs in the queue at all.
57 * \return true - jobs present, false - queue is empty
58 */
59bool FragmentQueue::isJobPresent() const
60{
61 return !jobs.empty();
62}
63
[b5ebb5]64/** Pushes a FragmentJob into the internal queue for delivery to server.
65 *
66 * \note we throw assertion when jobid has already been used.
67 *
68 * \param job job to enter into queue
69 */
[78ad7d]70void FragmentQueue::pushJob(FragmentJob::ptr job)
[b5ebb5]71{
[fb255d]72 if ((job->getId() != JobId::IllegalJob) && (!results.count(job->getId()))) {
73 OBSERVE;
74 NOTIFY(JobAdded);
75 results.insert( std::make_pair(job->getId(), NoResult));
76 jobs.push_back(job);
77 } else {
78 ELOG(1, "job to push has IllegalJob id or id "+toString(job->getId())+" has already been used.");
79 ASSERT(false,
80 "job to push has IllegalJob id or id "+toString(job->getId())+" has already been used.");
81 }
[12d15a]82}
83
[9875cc]84/** Pushes a bunch of FragmentJob's into the internal queue for delivery to server.
85 *
86 * \note we throw assertion when jobid has already been used.
87 *
88 * \sa pushJob()
89 *
90 * \param _jobs jobs to enter into queue
91 */
[78ad7d]92void FragmentQueue::pushJobs(std::vector<FragmentJob::ptr> &_jobs)
[9875cc]93{
[78ad7d]94 for (std::vector<FragmentJob::ptr>::iterator iter = _jobs.begin();
[9875cc]95 iter != _jobs.end(); ++iter)
96 pushJob(*iter);
97}
98
[12d15a]99/** Pops top-most FragmentJob from internal queue.
100 *
101 * From here on, we expect a result in FragmentQueue::results.
102 *
103 * \return job topmost job from queue
104 */
[78ad7d]105FragmentJob::ptr FragmentQueue::popJob()
[12d15a]106{
107 ASSERT(jobs.size(),
108 "FragmentQueue::popJob() - there are no jobs on the queue.");
[78ad7d]109 FragmentJob::ptr job = jobs.front();
[fe95b7]110#ifndef NDEBUG
111 std::pair< BackupMap::iterator, bool> inserter =
112#endif
113 backup.insert( std::make_pair( job->getId(), job ));
114 ASSERT (inserter.second,
115 "FragmentQueue::popJob() - job "+toString(job->getId())+
116 " is already in the backup.");
[78ad7d]117 ResultMap::iterator iter = results.find(job->getId());
[12d15a]118 ASSERT(iter != results.end(),
[78ad7d]119 "FragmentQueue::popJob() - for job "+toString(job->getId())+" no result place has been stored.");
[12d15a]120 iter->second = NoResultQueued;
121 jobs.pop_front();
122 return job;
[b5ebb5]123}
124
[b9c486]125/** Internal function to check whether result is not one of static entities.
126 *
127 * @param result result to check against
128 * @return true - result is a present, valid result, false - result is one of the statics
129 */
[35f587]130bool FragmentQueue::isPresentResult(const FragmentResult::ptr result) const
[b9c486]131{
[35f587]132 return (*result != *NoResult)
133 && (*result != *NoResultQueued)
134 && (*result != *ResultDelivered);
[b9c486]135}
136
[b5ebb5]137/** Queries whether a job has already been finished and the result is present.
138 *
139 * \param jobid id of job to query
140 * \return true - result is present, false - result is not present
141 */
142bool FragmentQueue::isResultPresent(JobId_t jobid) const
143{
144 ResultMap::const_iterator iter = results.find(jobid);
145 return ((iter != results.end())
[b9c486]146 && isPresentResult(iter->second));
[b5ebb5]147}
148
[8ee5ac]149/** Counts the number of jobs for which we have a calculated result present.
150 *
151 * \return number of calculated results
152 */
153size_t FragmentQueue::getDoneJobs() const
154{
155 size_t doneJobs = 0;
156 for (ResultMap::const_iterator iter = results.begin();
157 iter != results.end(); ++iter)
[35f587]158 if (isPresentResult(iter->second))
[8ee5ac]159 ++doneJobs;
[bf56f6]160 return doneJobs;
161}
162
163/** Counts the number of jobs for which still have to be calculated.
164 *
165 * \return number of jobs to be calculated
166 */
167size_t FragmentQueue::getPresentJobs() const
168{
169 const size_t presentJobs = jobs.size();
170 return presentJobs;
[8ee5ac]171}
172
[b5ebb5]173/** Delivers result for a finished job.
174 *
175 * \note we throw assertion if not present
176 *
177 * \param jobid id of job
178 * \return result for job of given \a jobid
179 */
[35f587]180FragmentResult::ptr FragmentQueue::getResult(JobId_t jobid)
[b5ebb5]181{
182 ResultMap::iterator iter = results.find(jobid);
183 ASSERT(iter != results.end(),
184 "FragmentQueue::pushResult() - job "+toString(jobid)+" is not known to us.");
[35f587]185 ASSERT(*iter->second != *NoResult,
[12d15a]186 "FragmentQueue::pushResult() - job "+toString(jobid)+" has not been request for calculation yet.");
[35f587]187 ASSERT(*iter->second != *NoResultQueued,
[12d15a]188 "FragmentQueue::pushResult() - job "+toString(jobid)+"'s calculation is underway but not result has arrived yet.");
[35f587]189 ASSERT(*iter->second != *ResultDelivered,
[b5ebb5]190 "FragmentQueue::pushResult() - job "+toString(jobid)+"'s result has already been delivered.");
191 /// store result
[35f587]192 FragmentResult::ptr _result = iter->second;
[b5ebb5]193 /// mark as delivered in map
194 iter->second = ResultDelivered;
195 /// and return result
[b9c486]196 return _result;
197}
198
[35f587]199std::vector<FragmentResult::ptr> FragmentQueue::getAllResults()
[b9c486]200{
[35f587]201 std::vector<FragmentResult::ptr> returnresults;
[b9c486]202 for (ResultMap::iterator iter = results.begin();
203 iter != results.end(); ++iter) {
204 if (isPresentResult(iter->second)) {
205 returnresults.push_back(getResult(iter->first));
206 iter = results.begin();
207 }
208 }
209
210 return returnresults;
[b5ebb5]211}
212
213/** Pushes a result for a finished job.
214 *
215 * \note we throw assertion if job already has result or is not known.
216 *
217 * \param result result of job to store
218 */
[35f587]219void FragmentQueue::pushResult(FragmentResult::ptr &_result)
[b5ebb5]220{
[fe95b7]221 const JobId_t id = _result->getId();
[b5ebb5]222 /// check for presence
[fe95b7]223 ResultMap::iterator iter = results.find(id);
[b5ebb5]224 ASSERT(iter != results.end(),
[fe95b7]225 "FragmentQueue::pushResult() - job "+toString(id)+" is not known to us.");
[35f587]226 ASSERT(*iter->second == *NoResultQueued,
[fe95b7]227 "FragmentQueue::pushResult() - is not waiting for the result of job "+toString(id)+".");
228 // check whether this is a resubmitted job
229 AttemptsMap::iterator attemptiter = attempts.find(id);
230 // check whether succeeded or (finally) failed
[9a3f84]231 if ((_result->exitflag == 0)
232 || ((attemptiter != attempts.end()) && (attemptiter->second >= Max_Attempts))
233 || (Max_Attempts == 1)) {
[fe95b7]234 // give notice if it is resubmitted job
235 if (attemptiter != attempts.end()) {
236 if (attemptiter->second >= Max_Attempts)
237 ELOG(1, "Job #" << id << " failed on " << Max_Attempts << "th attempt for the last time.");
238 else
239 LOG(1, "INFO: Job #" << id << " succeeded on " << attemptiter->second << "th attempt.");
240 }
241 // remove in attempts
242 if (attemptiter != attempts.end())
243 attempts.erase(attemptiter);
244 // remove in backup map
245 BackupMap::iterator backupiter = backup.find(id);
246 ASSERT( backupiter != backup.end(),
247 "FragmentQueue::pushResult() - cannot find job "+toString(id)
248 +" in backup.");
249 backup.erase(backupiter);
250 /// and overwrite NoResult in found entry
251 iter->second = _result;
252 } else {
253 LOG(1, "Job " << id << " failed, resubmitting.");
254 // increase attempts
255 if (attemptiter != attempts.end())
256 ++(attemptiter->second);
257 else
258 attempts.insert( std::make_pair(id, (size_t)1) );
259 // resubmit job
260 resubmitJob(id);
261 }
262}
263
264/** Resubmit a job which a worker failed to calculate.
265 *
266 * @param jobid id of the failed job
267 */
268void FragmentQueue::resubmitJob(const JobId_t jobid)
269{
270 BackupMap::iterator iter = backup.find(jobid);
271 ASSERT( iter != backup.end(),
272 "FragmentQueue::resubmitJob() - job id "+toString(jobid)
273 +" not stored in backup.");
274 if (iter != backup.end()) {
[9a3f84]275 // remove present result
[fe95b7]276 ResultMap::iterator resiter = results.find(jobid);
277 ASSERT( resiter != results.end(),
[9a3f84]278 "FragmentQueue::resubmitJob() - job "+toString(jobid)
279 +" to resubmit has no result present.");
[fe95b7]280 results.erase(resiter);
281 pushJob(iter->second);
282 backup.erase(iter);
283 }
[b5ebb5]284}
[b9c486]285
Note: See TracBrowser for help on using the repository browser.