source: src/Fragmentation/Automation/Pool/WorkerPool.cpp@ 5d8c0f

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since 5d8c0f was 5d8c0f, checked in by Frederik Heber <heber@…>, 13 years ago

Added WorkerPool to handle a pool of Workers.

  • also added unit test.
  • Property mode set to 100644
File size: 5.6 KB
Line 
1/*
2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * WorkerPool.cpp
10 *
11 * Created on: 22.02.2012
12 * Author: heber
13 */
14
15// include config.h
16#ifdef HAVE_CONFIG_H
17#include <config.h>
18#endif
19
20// boost asio needs specific operator new
21#include <boost/asio.hpp>
22
23#include "CodePatterns/MemDebug.hpp"
24
25#include "WorkerPool.hpp"
26
27#include "CodePatterns/Assert.hpp"
28#include "CodePatterns/Info.hpp"
29#include "CodePatterns/Log.hpp"
30#include "Connection.hpp"
31
32WorkerPool::priority_t WorkerPool::default_priority = 0;
33WorkerAddress WorkerPool::emptyAddress("empty", "empty");
34
35/** Constructor for class WorkerPool.
36 *
37 */
38WorkerPool::WorkerPool()
39{}
40
41/** Destructor for class WorkerPool.
42 *
43 */
44WorkerPool::~WorkerPool()
45{}
46
47/** Helper function to check whether an address is already in the pool.
48 *
49 * @param address worker address to check
50 * @return true - address is present, false - else
51 */
52bool WorkerPool::presentInPool(const WorkerAddress &address) const
53{
54 return pool.find(address) != pool.end();
55}
56
57/** Get address of next idle worker.
58 *
59 * Note that worker is automatically marked as busy, \sa WorkerPool::markWorkerBusy()
60 *
61 * @return address of idle worker
62 */
63WorkerAddress WorkerPool::getNextIdleWorker()
64{
65 // get first idle worker
66 ASSERT( presentIdleWorkers(),
67 "WorkerPool::getNextIdleWorker() - there is no idle worker.");
68 if (!presentIdleWorkers())
69 return emptyAddress;
70 Idle_Queue_t::iterator iter = idle_queue.begin();
71 const WorkerAddress returnaddress = iter->second;
72
73 // enter in busy queue
74 markWorkerBusy( iter );
75
76 // return address
77 return returnaddress;
78}
79
80/** Checks whether a worker is busy or not.
81 *
82 * @param address address of worker to check
83 */
84bool WorkerPool::isWorkerBusy(const WorkerAddress &address) const
85{
86 Busy_Queue_t::const_iterator iter = busy_queue.find(address);
87 if (iter != busy_queue.end())
88 return true;
89#ifndef NDEBUG
90 else {
91 Idle_Queue_t::const_iterator iter = idle_queue.begin();
92 for(;iter != idle_queue.end(); ++iter)
93 if (iter->second == address)
94 break;
95 ASSERT( iter != idle_queue.end(),
96 "WorkerPool::isWorkerBusy() - worker "+toString(address)
97 +" is neither busy nor idle.");
98
99 }
100#endif
101 return false;
102}
103
104/** Adds another worker to the pool by noting down its address.
105 *
106 * @param address host and service address of the listening worker
107 * @return true - added successfully, false - not added
108 */
109bool WorkerPool::addWorker(const WorkerAddress& address)
110{
111 std::pair<Pool_t::iterator, bool> inserter =
112 pool.insert( address );
113 if (inserter.second) { // if new also add to queue
114 LOG(1, "INFO: Successfully added "+toString(address)+" to pool.");
115 idle_queue.insert( make_pair( default_priority, address ) );
116 return true;
117 } else {
118 LOG(1, "INFO: "+toString(address)+" is already present pool.");
119 return false;
120 }
121}
122
123/** Removes a worker from the pool.
124 *
125 * @param address host and service address of the listening worker
126 * @return true - removed successfully, false - not removed
127 */
128bool WorkerPool::removeWorker(const WorkerAddress& address)
129{
130 Pool_t::iterator iter = pool.find( address );
131 if (iter != pool.end()) {
132 Idle_Queue_t::iterator idleiter = idle_queue.begin();
133 while (idleiter != idle_queue.end()) {
134 if (idleiter->second == address) {
135 idle_queue.erase(idleiter);
136 break;
137 }
138 }
139 Busy_Queue_t::iterator busyiter = busy_queue.find(address);
140 if (busyiter != busy_queue.end())
141 busy_queue.erase(busyiter);
142 ASSERT( idleiter != idle_queue.end() || busyiter != busy_queue.end(),
143 "WorkerPool::removeWorker() - Worker "+toString(address)
144 +" is in pool but neither idle nor busy!");
145 ASSERT( !(idleiter != idle_queue.end() && busyiter != busy_queue.end()),
146 "WorkerPool::removeWorker() - Worker "+toString(address)
147 +" is in pool and both idle and busy!");
148 pool.erase(iter);
149 return true;
150 } else {
151 ELOG(1, "Worker "+toString(address)+" is not present pool.");
152 return false;
153 }
154}
155
156/** Sends shutdown to all current workers in the pool.
157 *
158 */
159void WorkerPool::removeAllWorkers()
160{
161 // empty pool and queue
162 idle_queue.clear();
163 busy_queue.clear();
164 pool.clear();
165}
166
167/** Helper function to mark a worker as busy.
168 *
169 * Removes from idle_queue and places into busy_queue.
170 * Sets \a iter to Idle_Queue_t::end().
171 *
172 * @param iter iterator on idle worker
173 */
174void WorkerPool::markWorkerBusy(Idle_Queue_t::iterator &iter)
175{
176 if (isWorkerBusy(iter->second))
177 return;
178 const WorkerAddress returnaddress = iter->second;
179 const priority_t priority = iter->first;
180
181 // remove from idle queue
182 idle_queue.erase(iter);
183
184 // insert into busy queue
185#ifndef NDEBUG
186 std::pair< Busy_Queue_t::iterator, bool > inserter =
187#endif
188 busy_queue.insert( make_pair(returnaddress, priority) );
189 ASSERT( inserter.second,
190 "WorkerPool::sendJobToWorker() - Worker "+toString(inserter.first->first)+" is already busy.");
191
192 LOG(1, "INFO: Worker " << returnaddress << " is now marked busy.");
193}
194
195/** Helper function to unmark a worker as busy.
196 *
197 * Removes worker from busy_queue and returns it to idle_queue.
198 *
199 * @param address address of worker
200 */
201void WorkerPool::unmarkWorkerBusy(const WorkerAddress &address)
202{
203 if (isWorkerBusy(address)) {
204 Busy_Queue_t::const_iterator iter = busy_queue.find(address);
205 const priority_t priority = iter->second;
206 busy_queue.erase(address);
207 idle_queue.insert( make_pair( priority, address) );
208 }
209}
Note: See TracBrowser for help on using the repository browser.