source: src/Fragmentation/Automation/Pool/WorkerPool.cpp@ e032b4

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 Candidate_v1.7.0 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since e032b4 was fb255d, checked in by Frederik Heber <heber@…>, 13 years ago

WorkerPool is now observable.

  • Property mode set to 100644
File size: 6.4 KB
RevLine 
[5d8c0f]1/*
2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * WorkerPool.cpp
10 *
11 * Created on: 22.02.2012
12 * Author: heber
13 */
14
15// include config.h
16#ifdef HAVE_CONFIG_H
17#include <config.h>
18#endif
19
20// boost asio needs specific operator new
21#include <boost/asio.hpp>
22
23#include "CodePatterns/MemDebug.hpp"
24
25#include "WorkerPool.hpp"
26
27#include "CodePatterns/Assert.hpp"
28#include "CodePatterns/Info.hpp"
29#include "CodePatterns/Log.hpp"
[fb255d]30#include "CodePatterns/Observer/Channels.hpp"
[5d8c0f]31#include "Connection.hpp"
32
33WorkerPool::priority_t WorkerPool::default_priority = 0;
34WorkerAddress WorkerPool::emptyAddress("empty", "empty");
35
36/** Constructor for class WorkerPool.
37 *
38 */
[fb255d]39WorkerPool::WorkerPool() :
40 Observable("WorkerPool")
41{
42 // observable stuff
43 Channels *OurChannel = new Channels;
44 NotificationChannels.insert( std::make_pair(this, OurChannel) );
45 // add instance for each notification type
46 for (size_t type = 0; type < NotificationType_MAX; ++type)
47 OurChannel->addChannel(type);
48}
[5d8c0f]49
50/** Destructor for class WorkerPool.
51 *
52 */
53WorkerPool::~WorkerPool()
54{}
55
56/** Helper function to check whether an address is already in the pool.
57 *
58 * @param address worker address to check
59 * @return true - address is present, false - else
60 */
61bool WorkerPool::presentInPool(const WorkerAddress &address) const
62{
63 return pool.find(address) != pool.end();
64}
65
66/** Get address of next idle worker.
67 *
68 * Note that worker is automatically marked as busy, \sa WorkerPool::markWorkerBusy()
69 *
70 * @return address of idle worker
71 */
72WorkerAddress WorkerPool::getNextIdleWorker()
73{
74 // get first idle worker
75 ASSERT( presentIdleWorkers(),
76 "WorkerPool::getNextIdleWorker() - there is no idle worker.");
77 if (!presentIdleWorkers())
78 return emptyAddress;
79 Idle_Queue_t::iterator iter = idle_queue.begin();
80 const WorkerAddress returnaddress = iter->second;
81
82 // enter in busy queue
83 markWorkerBusy( iter );
84
85 // return address
86 return returnaddress;
87}
88
[41c1b7]89WorkerPool::Idle_Queue_t::iterator WorkerPool::getIdleWorker(const WorkerAddress &address)
90{
91 Idle_Queue_t::iterator idleiter = idle_queue.begin();
92 while (idleiter != idle_queue.end()) {
93 if (idleiter->second == address) {
94 break;
95 }
96 ++idleiter;
97 }
98 return idleiter;
99}
100
[5d8c0f]101/** Checks whether a worker is busy or not.
102 *
103 * @param address address of worker to check
104 */
105bool WorkerPool::isWorkerBusy(const WorkerAddress &address) const
106{
107 Busy_Queue_t::const_iterator iter = busy_queue.find(address);
108 if (iter != busy_queue.end())
109 return true;
110#ifndef NDEBUG
111 else {
112 Idle_Queue_t::const_iterator iter = idle_queue.begin();
113 for(;iter != idle_queue.end(); ++iter)
114 if (iter->second == address)
115 break;
116 ASSERT( iter != idle_queue.end(),
117 "WorkerPool::isWorkerBusy() - worker "+toString(address)
118 +" is neither busy nor idle.");
119
120 }
121#endif
122 return false;
123}
124
125/** Adds another worker to the pool by noting down its address.
126 *
127 * @param address host and service address of the listening worker
128 * @return true - added successfully, false - not added
129 */
130bool WorkerPool::addWorker(const WorkerAddress& address)
131{
[fb255d]132 OBSERVE;
133 NOTIFY(WorkerAdded);
[5d8c0f]134 std::pair<Pool_t::iterator, bool> inserter =
135 pool.insert( address );
136 if (inserter.second) { // if new also add to queue
137 LOG(1, "INFO: Successfully added "+toString(address)+" to pool.");
138 idle_queue.insert( make_pair( default_priority, address ) );
[fb255d]139 NOTIFY(WorkerIdle);
[5d8c0f]140 return true;
141 } else {
142 LOG(1, "INFO: "+toString(address)+" is already present pool.");
143 return false;
144 }
145}
146
147/** Removes a worker from the pool.
148 *
149 * @param address host and service address of the listening worker
150 * @return true - removed successfully, false - not removed
151 */
152bool WorkerPool::removeWorker(const WorkerAddress& address)
153{
154 Pool_t::iterator iter = pool.find( address );
155 if (iter != pool.end()) {
[fb255d]156 OBSERVE;
157 NOTIFY(WorkerRemoved);
[41c1b7]158 Idle_Queue_t::iterator idleiter = getIdleWorker(address);
159 if (idleiter != idle_queue.end())
160 idle_queue.erase(idleiter);
[5d8c0f]161 Busy_Queue_t::iterator busyiter = busy_queue.find(address);
162 if (busyiter != busy_queue.end())
163 busy_queue.erase(busyiter);
164 ASSERT( idleiter != idle_queue.end() || busyiter != busy_queue.end(),
165 "WorkerPool::removeWorker() - Worker "+toString(address)
166 +" is in pool but neither idle nor busy!");
167 ASSERT( !(idleiter != idle_queue.end() && busyiter != busy_queue.end()),
168 "WorkerPool::removeWorker() - Worker "+toString(address)
169 +" is in pool and both idle and busy!");
170 pool.erase(iter);
[41c1b7]171 LOG(1, "INFO: Removed worker " << address << " from pool.");
[5d8c0f]172 return true;
173 } else {
174 ELOG(1, "Worker "+toString(address)+" is not present pool.");
175 return false;
176 }
177}
178
179/** Sends shutdown to all current workers in the pool.
180 *
181 */
182void WorkerPool::removeAllWorkers()
183{
[fb255d]184 OBSERVE;
185 NOTIFY(WorkerRemoved);
[5d8c0f]186 // empty pool and queue
187 idle_queue.clear();
188 busy_queue.clear();
189 pool.clear();
190}
191
192/** Helper function to mark a worker as busy.
193 *
194 * Removes from idle_queue and places into busy_queue.
195 * Sets \a iter to Idle_Queue_t::end().
196 *
197 * @param iter iterator on idle worker
198 */
199void WorkerPool::markWorkerBusy(Idle_Queue_t::iterator &iter)
200{
201 const WorkerAddress returnaddress = iter->second;
[41c1b7]202 if (isWorkerBusy(returnaddress))
203 return;
[5d8c0f]204 const priority_t priority = iter->first;
205
206 // remove from idle queue
207 idle_queue.erase(iter);
208
209 // insert into busy queue
210#ifndef NDEBUG
211 std::pair< Busy_Queue_t::iterator, bool > inserter =
212#endif
213 busy_queue.insert( make_pair(returnaddress, priority) );
214 ASSERT( inserter.second,
215 "WorkerPool::sendJobToWorker() - Worker "+toString(inserter.first->first)+" is already busy.");
216
217 LOG(1, "INFO: Worker " << returnaddress << " is now marked busy.");
218}
219
220/** Helper function to unmark a worker as busy.
221 *
222 * Removes worker from busy_queue and returns it to idle_queue.
223 *
224 * @param address address of worker
225 */
226void WorkerPool::unmarkWorkerBusy(const WorkerAddress &address)
227{
228 if (isWorkerBusy(address)) {
[fb255d]229 OBSERVE;
230 NOTIFY(WorkerIdle);
[5d8c0f]231 Busy_Queue_t::const_iterator iter = busy_queue.find(address);
232 const priority_t priority = iter->second;
233 busy_queue.erase(address);
234 idle_queue.insert( make_pair( priority, address) );
[41c1b7]235
236 LOG(1, "INFO: Worker " << address << " is now marked idle.");
[5d8c0f]237 }
238}
Note: See TracBrowser for help on using the repository browser.