source: src/Fragmentation/Automation/Pool/WorkerPool.cpp@ 122de0

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 Candidate_v1.7.0 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since 122de0 was befcf8, checked in by Frederik Heber <heber@…>, 13 years ago

Rewrote FragmentScheduler::shutdown to make sure idle_queue is truely const.

  • WorkerPool has new function to return idle_queue addresses as Vector.
  • We first get all addresses, then shutdown each and also we iterate over this as long as OperationQueue has Ops and idle_queue has idlers.
  • Property mode set to 100644
File size: 6.6 KB
Line 
1/*
2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * WorkerPool.cpp
10 *
11 * Created on: 22.02.2012
12 * Author: heber
13 */
14
15// include config.h
16#ifdef HAVE_CONFIG_H
17#include <config.h>
18#endif
19
20// boost asio needs specific operator new
21#include <boost/asio.hpp>
22
23#include "CodePatterns/MemDebug.hpp"
24
25#include "WorkerPool.hpp"
26
27#include "CodePatterns/Assert.hpp"
28#include "CodePatterns/Info.hpp"
29#include "CodePatterns/Log.hpp"
30#include "CodePatterns/Observer/Channels.hpp"
31#include "Connection.hpp"
32
33WorkerPool::priority_t WorkerPool::default_priority = 0;
34WorkerAddress WorkerPool::emptyAddress("empty", "empty");
35
36/** Constructor for class WorkerPool.
37 *
38 */
39WorkerPool::WorkerPool() :
40 Observable("WorkerPool")
41{
42 // observable stuff
43 Channels *OurChannel = new Channels;
44 NotificationChannels.insert( std::make_pair(this, OurChannel) );
45 // add instance for each notification type
46 for (size_t type = 0; type < NotificationType_MAX; ++type)
47 OurChannel->addChannel(type);
48}
49
50/** Destructor for class WorkerPool.
51 *
52 */
53WorkerPool::~WorkerPool()
54{}
55
56/** Helper function to check whether an address is already in the pool.
57 *
58 * @param address worker address to check
59 * @return true - address is present, false - else
60 */
61bool WorkerPool::presentInPool(const WorkerAddress &address) const
62{
63 return pool.find(address) != pool.end();
64}
65
66/** Get address of next idle worker.
67 *
68 * Note that worker is automatically marked as busy, \sa WorkerPool::markWorkerBusy()
69 *
70 * @return address of idle worker
71 */
72WorkerAddress WorkerPool::getNextIdleWorker()
73{
74 // get first idle worker
75 ASSERT( presentIdleWorkers(),
76 "WorkerPool::getNextIdleWorker() - there is no idle worker.");
77 if (!presentIdleWorkers())
78 return emptyAddress;
79 Idle_Queue_t::iterator iter = idle_queue.begin();
80 const WorkerAddress returnaddress = iter->second;
81
82 // enter in busy queue
83 markWorkerBusy( iter );
84
85 // return address
86 return returnaddress;
87}
88
89WorkerPool::Idle_Queue_t::iterator WorkerPool::getIdleWorker(const WorkerAddress &address)
90{
91 Idle_Queue_t::iterator idleiter = idle_queue.begin();
92 while (idleiter != idle_queue.end()) {
93 if (idleiter->second == address) {
94 break;
95 }
96 ++idleiter;
97 }
98 return idleiter;
99}
100
101/** Checks whether a worker is busy or not.
102 *
103 * @param address address of worker to check
104 */
105bool WorkerPool::isWorkerBusy(const WorkerAddress &address) const
106{
107 Busy_Queue_t::const_iterator iter = busy_queue.find(address);
108 if (iter != busy_queue.end())
109 return true;
110#ifndef NDEBUG
111 else {
112 Idle_Queue_t::const_iterator iter = idle_queue.begin();
113 for(;iter != idle_queue.end(); ++iter)
114 if (iter->second == address)
115 break;
116 ASSERT( iter != idle_queue.end(),
117 "WorkerPool::isWorkerBusy() - worker "+toString(address)
118 +" is neither busy nor idle.");
119
120 }
121#endif
122 return false;
123}
124
125/** Adds another worker to the pool by noting down its address.
126 *
127 * @param address host and service address of the listening worker
128 * @return true - added successfully, false - not added
129 */
130bool WorkerPool::addWorker(const WorkerAddress& address)
131{
132 OBSERVE;
133 NOTIFY(WorkerAdded);
134 std::pair<Pool_t::iterator, bool> inserter =
135 pool.insert( address );
136 if (inserter.second) { // if new also add to queue
137 LOG(1, "INFO: Successfully added "+toString(address)+" to pool.");
138 idle_queue.insert( make_pair( default_priority, address ) );
139 NOTIFY(WorkerIdle);
140 return true;
141 } else {
142 LOG(1, "INFO: "+toString(address)+" is already present pool.");
143 return false;
144 }
145}
146
147/** Removes a worker from the pool.
148 *
149 * @param address host and service address of the listening worker
150 * @return true - removed successfully, false - not removed
151 */
152bool WorkerPool::removeWorker(const WorkerAddress& address)
153{
154 Pool_t::iterator iter = pool.find( address );
155 if (iter != pool.end()) {
156 Idle_Queue_t::iterator idleiter = getIdleWorker(address);
157 if (idleiter != idle_queue.end())
158 idle_queue.erase(idleiter);
159 Busy_Queue_t::iterator busyiter = busy_queue.find(address);
160 if (busyiter != busy_queue.end())
161 busy_queue.erase(busyiter);
162 ASSERT( idleiter != idle_queue.end() || busyiter != busy_queue.end(),
163 "WorkerPool::removeWorker() - Worker "+toString(address)
164 +" is in pool but neither idle nor busy!");
165 ASSERT( !(idleiter != idle_queue.end() && busyiter != busy_queue.end()),
166 "WorkerPool::removeWorker() - Worker "+toString(address)
167 +" is in pool and both idle and busy!");
168 pool.erase(iter);
169 LOG(1, "INFO: Removed worker " << address << " from pool.");
170 return true;
171 } else {
172 ELOG(1, "Worker "+toString(address)+" is not present pool.");
173 return false;
174 }
175}
176
177/** Sends shutdown to all current workers in the pool.
178 *
179 */
180void WorkerPool::removeAllWorkers()
181{
182 // empty pool and queue
183 idle_queue.clear();
184 busy_queue.clear();
185 pool.clear();
186}
187
188/** Helper function to mark a worker as busy.
189 *
190 * Removes from idle_queue and places into busy_queue.
191 * Sets \a iter to Idle_Queue_t::end().
192 *
193 * @param iter iterator on idle worker
194 */
195void WorkerPool::markWorkerBusy(Idle_Queue_t::iterator &iter)
196{
197 const WorkerAddress returnaddress = iter->second;
198 if (isWorkerBusy(returnaddress))
199 return;
200 const priority_t priority = iter->first;
201
202 // remove from idle queue
203 idle_queue.erase(iter);
204
205 // insert into busy queue
206#ifndef NDEBUG
207 std::pair< Busy_Queue_t::iterator, bool > inserter =
208#endif
209 busy_queue.insert( make_pair(returnaddress, priority) );
210 ASSERT( inserter.second,
211 "WorkerPool::sendJobToWorker() - Worker "+toString(inserter.first->first)+" is already busy.");
212
213 LOG(1, "INFO: Worker " << returnaddress << " is now marked busy.");
214}
215
216/** Helper function to unmark a worker as busy.
217 *
218 * Removes worker from busy_queue and returns it to idle_queue.
219 *
220 * @param address address of worker
221 */
222void WorkerPool::unmarkWorkerBusy(const WorkerAddress &address)
223{
224 if (isWorkerBusy(address)) {
225 OBSERVE;
226 NOTIFY(WorkerIdle);
227 Busy_Queue_t::const_iterator iter = busy_queue.find(address);
228 const priority_t priority = iter->second;
229 busy_queue.erase(address);
230 idle_queue.insert( make_pair( priority, address) );
231
232 LOG(1, "INFO: Worker " << address << " is now marked idle.");
233 }
234}
235
236WorkerPool::WorkerList_t WorkerPool::getListOfIdleWorkers() const
237{
238 WorkerList_t WorkerList;
239 for (Idle_Queue_t::const_iterator iter = idle_queue.begin(); iter != idle_queue.end(); ++iter)
240 WorkerList.push_back( make_pair(iter->second.host, iter->second.service) );
241 return WorkerList;
242}
Note: See TracBrowser for help on using the repository browser.