source: src/Fragmentation/Automation/Pool/WorkerPool.cpp@ 41c1b7

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 Candidate_v1.7.0 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since 41c1b7 was 41c1b7, checked in by Frederik Heber <heber@…>, 13 years ago

HUGE: Added PoolWorker, removed (Fragment)Worker, and rewrote parts of FragmentScheduler.

ToDo:

  • FragmentQueue needs a callback function when new jobs have been added.
  • WorkerPool needs a callback function when a worker is idle.
  • WorkerListener_t then does not need a callback anymore, just access to the pool who has then the callback.
  • so far PoolWorker quits after first job (and is also removed by Server).
  • Property mode set to 100644
File size: 5.9 KB
RevLine 
[5d8c0f]1/*
2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * WorkerPool.cpp
10 *
11 * Created on: 22.02.2012
12 * Author: heber
13 */
14
15// include config.h
16#ifdef HAVE_CONFIG_H
17#include <config.h>
18#endif
19
20// boost asio needs specific operator new
21#include <boost/asio.hpp>
22
23#include "CodePatterns/MemDebug.hpp"
24
25#include "WorkerPool.hpp"
26
27#include "CodePatterns/Assert.hpp"
28#include "CodePatterns/Info.hpp"
29#include "CodePatterns/Log.hpp"
30#include "Connection.hpp"
31
32WorkerPool::priority_t WorkerPool::default_priority = 0;
33WorkerAddress WorkerPool::emptyAddress("empty", "empty");
34
35/** Constructor for class WorkerPool.
36 *
37 */
38WorkerPool::WorkerPool()
39{}
40
41/** Destructor for class WorkerPool.
42 *
43 */
44WorkerPool::~WorkerPool()
45{}
46
47/** Helper function to check whether an address is already in the pool.
48 *
49 * @param address worker address to check
50 * @return true - address is present, false - else
51 */
52bool WorkerPool::presentInPool(const WorkerAddress &address) const
53{
54 return pool.find(address) != pool.end();
55}
56
57/** Get address of next idle worker.
58 *
59 * Note that worker is automatically marked as busy, \sa WorkerPool::markWorkerBusy()
60 *
61 * @return address of idle worker
62 */
63WorkerAddress WorkerPool::getNextIdleWorker()
64{
65 // get first idle worker
66 ASSERT( presentIdleWorkers(),
67 "WorkerPool::getNextIdleWorker() - there is no idle worker.");
68 if (!presentIdleWorkers())
69 return emptyAddress;
70 Idle_Queue_t::iterator iter = idle_queue.begin();
71 const WorkerAddress returnaddress = iter->second;
72
73 // enter in busy queue
74 markWorkerBusy( iter );
75
76 // return address
77 return returnaddress;
78}
79
[41c1b7]80WorkerPool::Idle_Queue_t::iterator WorkerPool::getIdleWorker(const WorkerAddress &address)
81{
82 Idle_Queue_t::iterator idleiter = idle_queue.begin();
83 while (idleiter != idle_queue.end()) {
84 if (idleiter->second == address) {
85 break;
86 }
87 ++idleiter;
88 }
89 return idleiter;
90}
91
[5d8c0f]92/** Checks whether a worker is busy or not.
93 *
94 * @param address address of worker to check
95 */
96bool WorkerPool::isWorkerBusy(const WorkerAddress &address) const
97{
98 Busy_Queue_t::const_iterator iter = busy_queue.find(address);
99 if (iter != busy_queue.end())
100 return true;
101#ifndef NDEBUG
102 else {
103 Idle_Queue_t::const_iterator iter = idle_queue.begin();
104 for(;iter != idle_queue.end(); ++iter)
105 if (iter->second == address)
106 break;
107 ASSERT( iter != idle_queue.end(),
108 "WorkerPool::isWorkerBusy() - worker "+toString(address)
109 +" is neither busy nor idle.");
110
111 }
112#endif
113 return false;
114}
115
116/** Adds another worker to the pool by noting down its address.
117 *
118 * @param address host and service address of the listening worker
119 * @return true - added successfully, false - not added
120 */
121bool WorkerPool::addWorker(const WorkerAddress& address)
122{
123 std::pair<Pool_t::iterator, bool> inserter =
124 pool.insert( address );
125 if (inserter.second) { // if new also add to queue
126 LOG(1, "INFO: Successfully added "+toString(address)+" to pool.");
127 idle_queue.insert( make_pair( default_priority, address ) );
128 return true;
129 } else {
130 LOG(1, "INFO: "+toString(address)+" is already present pool.");
131 return false;
132 }
133}
134
135/** Removes a worker from the pool.
136 *
137 * @param address host and service address of the listening worker
138 * @return true - removed successfully, false - not removed
139 */
140bool WorkerPool::removeWorker(const WorkerAddress& address)
141{
142 Pool_t::iterator iter = pool.find( address );
143 if (iter != pool.end()) {
[41c1b7]144 Idle_Queue_t::iterator idleiter = getIdleWorker(address);
145 if (idleiter != idle_queue.end())
146 idle_queue.erase(idleiter);
[5d8c0f]147 Busy_Queue_t::iterator busyiter = busy_queue.find(address);
148 if (busyiter != busy_queue.end())
149 busy_queue.erase(busyiter);
150 ASSERT( idleiter != idle_queue.end() || busyiter != busy_queue.end(),
151 "WorkerPool::removeWorker() - Worker "+toString(address)
152 +" is in pool but neither idle nor busy!");
153 ASSERT( !(idleiter != idle_queue.end() && busyiter != busy_queue.end()),
154 "WorkerPool::removeWorker() - Worker "+toString(address)
155 +" is in pool and both idle and busy!");
156 pool.erase(iter);
[41c1b7]157 LOG(1, "INFO: Removed worker " << address << " from pool.");
[5d8c0f]158 return true;
159 } else {
160 ELOG(1, "Worker "+toString(address)+" is not present pool.");
161 return false;
162 }
163}
164
165/** Sends shutdown to all current workers in the pool.
166 *
167 */
168void WorkerPool::removeAllWorkers()
169{
170 // empty pool and queue
171 idle_queue.clear();
172 busy_queue.clear();
173 pool.clear();
174}
175
176/** Helper function to mark a worker as busy.
177 *
178 * Removes from idle_queue and places into busy_queue.
179 * Sets \a iter to Idle_Queue_t::end().
180 *
181 * @param iter iterator on idle worker
182 */
183void WorkerPool::markWorkerBusy(Idle_Queue_t::iterator &iter)
184{
185 const WorkerAddress returnaddress = iter->second;
[41c1b7]186 if (isWorkerBusy(returnaddress))
187 return;
[5d8c0f]188 const priority_t priority = iter->first;
189
190 // remove from idle queue
191 idle_queue.erase(iter);
192
193 // insert into busy queue
194#ifndef NDEBUG
195 std::pair< Busy_Queue_t::iterator, bool > inserter =
196#endif
197 busy_queue.insert( make_pair(returnaddress, priority) );
198 ASSERT( inserter.second,
199 "WorkerPool::sendJobToWorker() - Worker "+toString(inserter.first->first)+" is already busy.");
200
201 LOG(1, "INFO: Worker " << returnaddress << " is now marked busy.");
202}
203
204/** Helper function to unmark a worker as busy.
205 *
206 * Removes worker from busy_queue and returns it to idle_queue.
207 *
208 * @param address address of worker
209 */
210void WorkerPool::unmarkWorkerBusy(const WorkerAddress &address)
211{
212 if (isWorkerBusy(address)) {
213 Busy_Queue_t::const_iterator iter = busy_queue.find(address);
214 const priority_t priority = iter->second;
215 busy_queue.erase(address);
216 idle_queue.insert( make_pair( priority, address) );
[41c1b7]217
218 LOG(1, "INFO: Worker " << address << " is now marked idle.");
[5d8c0f]219 }
220}
Note: See TracBrowser for help on using the repository browser.