source: src/JobMarket/Pool/PoolGuard.cpp@ 404d2b

Action_Thermostats Add_AtomRandomPerturbation Add_RotateAroundBondAction Add_SelectAtomByNameAction Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_StructOpt_integration_tests Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.6.0 Candidate_v1.6.1 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_ChronosMutex Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion GeometryObjects Gui_displays_atomic_force_velocity IndependentFragmentGrids_IntegrationTest JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks RotateToPrincipalAxisSystem_UndoRedo StoppableMakroAction Subpackage_JobMarket Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg ThirdParty_MPQC_rebuilt_buildsystem TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps Ubuntu_1604_changes stable
Last change on this file since 404d2b was 404d2b, checked in by Frederik Heber <heber@…>, 8 years ago

Squashed 'ThirdParty/JobMarket/' content from commit e194722

git-subtree-dir: ThirdParty/JobMarket
git-subtree-split: e19472277e62c493f6c10f1483fe21e64c1039e9

  • Property mode set to 100644
File size: 6.0 KB
Line 
1/*
2 * Project: JobMarket
3 * Description: asynchronous Server/Controller/Client-approach to parallel computing, based on boost::asio
4 * Copyright (C) 2012 Frederik Heber. All rights reserved.
5 *
6 */
7
8/*
9 * PoolGuard.cpp
10 *
11 * Created on: Sep 5, 2012
12 * Author: heber
13 */
14
15
16// include config.h
17#ifdef HAVE_CONFIG_H
18#include <config.h>
19#endif
20
21// boost asio needs specific operator new
22#include <boost/asio.hpp>
23
24#include "CodePatterns/MemDebug.hpp"
25
26#include "Pool/PoolGuard.hpp"
27
28#include <boost/bind.hpp>
29
30#include "CodePatterns/Assert.hpp"
31#include "CodePatterns/Log.hpp"
32
33#include "FragmentQueue.hpp"
34#include "Operations/Servers/CheckAliveWorkerOperation.hpp"
35#include "Operations/OperationQueue.hpp"
36#include "Pool/WorkerPool.hpp"
37
38PoolGuard::PoolGuard(
39 boost::asio::io_service& io_service,
40 const size_t _timeout,
41 Connection &_connection,
42 const boost::function<void (const WorkerAddress)> _removeWorkerfunction,
43 const boost::function<void (const JobId_t)> _resubmitJobfunction,
44 OperationQueue &_OpQueue) :
45 CheckAtNextInterval(false),
46 timeout(_timeout),
47 timer(io_service),
48 removeWorkerfunction(_removeWorkerfunction),
49 resubmitJobfunction(_resubmitJobfunction),
50 OpQueue(_OpQueue),
51 connection(_connection),
52 WaitingOps(0)
53{
54 // set timer if we start by default
55 if (CheckAtNextInterval) {
56 timer.expires_from_now(boost::posix_time::seconds(timeout));
57 timer.async_wait(boost::bind(&PoolGuard::checkWorkers, this));
58 }
59}
60
61void PoolGuard::checkWorkers()
62{
63 if (WaitingOps == 0) {
64 LOG(1, "INFO: There are " << LastWorkerList.size() << " busy workers from last time, "
65 << CurrentWorkerList.size() << " are currently busy.");
66 // create a vector of workers to check
67 typedef std::set<WorkerAddress> CheckList_t;
68 CheckList_t currentworkers;
69 for (WorkerList_t::const_iterator iter = LastWorkerList.begin();
70 iter != LastWorkerList.end();
71 ++iter) {
72 const WorkerAddress &address = iter->first;
73 LOG(2, "DEBUG: Checking whether worker " << address << " is still busy ...");
74 WorkerList_t::const_iterator currentiter = CurrentWorkerList.find(address);
75 // check if worker was busy last time and on same job
76 if (currentiter != CurrentWorkerList.end()) {
77 LOG(2, "DEBUG: Worker " << address << " was busy last time on job " << iter->second << ".");
78 if (currentiter->second == iter->second) {
79 LOG(1, "INFO: Worker " << address << " is working on same job "
80 << iter->second << " as last time, scheduling for checkalive.");
81 currentworkers.insert(address);
82 } else {
83 LOG(1, "INFO: Worker " << address << " is working on different job "
84 << currentiter->second << " than last time "
85 << iter->second << ", scheduling for checkalive.");
86 }
87 }
88 }
89 // go through candidates to check
90 LOG(1, "INFO: Checking on " << currentworkers.size() << " possible dead workers.");
91 for(CheckList_t::const_iterator iter = currentworkers.begin();
92 iter != currentworkers.end(); ++iter) {
93 const WorkerAddress &address = *iter;
94 LOG(1, "INFO: Checking whether " << address << " is alive.");
95 AsyncOperation *checkaliveWorkerOp = new CheckAliveWorkerOperation(connection,
96 boost::bind(&PoolGuard::checkAddress, this, address, _1),
97 boost::bind(&PoolGuard::printWorkerIsAlive, this, address),
98 boost::bind(&PoolGuard::removeFromPool, this, address));
99 OpQueue.push_back(checkaliveWorkerOp, address);
100 ++WaitingOps;
101 }
102 } else {
103 ELOG(2, "We are lacking behind on CheckAliveOps, skipping this check interval.");
104 }
105 // set old list to new list
106 LastWorkerList = CurrentWorkerList;
107
108 // set next check interval
109 if (CheckAtNextInterval) {
110 timer.expires_from_now(boost::posix_time::seconds(timeout));
111 timer.async_wait(boost::bind(&PoolGuard::checkWorkers, this));
112 }
113}
114
115void PoolGuard::checkAddress(
116 const WorkerAddress trueaddress,
117 const WorkerAddress returnedaddress)
118{
119 if (trueaddress != returnedaddress) {
120 ELOG(1, "Worker at " << trueaddress << " returned itself wrongly as "
121 << returnedaddress << ", removing.");
122 removeFromPool(trueaddress);
123 }
124}
125
126void PoolGuard::removeFromPool(const WorkerAddress address)
127{
128 LOG(1, "INFO: Worker " << address << " does not react to CheckAlive, removing.");
129 // erase from our lists
130 LastWorkerList.erase(address);
131 WorkerList_t::iterator iter = CurrentWorkerList.find(address);
132 if ( iter != CurrentWorkerList.end()) {
133 const JobId_t jobid = iter->second;
134 CurrentWorkerList.erase(iter);
135 // erase from pool
136 removeWorkerfunction(address);
137 // resubmit job
138 resubmitJobfunction(jobid);
139 }
140}
141
142void PoolGuard::addBusyWorker(const WorkerAddress address, const JobId_t id)
143{
144 LOG(1, "INFO: Adding worker " << address << " with job " << id << " as busy.");
145#ifndef NDEBUG
146 std::pair<WorkerList_t::iterator, bool> inserter =
147#endif
148 CurrentWorkerList.insert( std::make_pair(address, id) );
149 ASSERT( inserter.second,
150 "PoolGuard::addBusyWorker() - worker "+toString(address)+" "
151 "is already busy to our knowledge on job "+toString(inserter.second)+".");
152}
153
154void PoolGuard::removeBusyWorker(const WorkerAddress address)
155{
156 LOG(1, "INFO: Removing worker " << address << " as busy.");
157 WorkerList_t::iterator iter = CurrentWorkerList.find(address);
158 if (iter != CurrentWorkerList.end()) {
159 CurrentWorkerList.erase(address);
160 LastWorkerList.erase(address);
161 }
162}
163
164void PoolGuard::printWorkerIsAlive(const WorkerAddress address)
165{
166 LOG(2, "DEBUG: Worker " << address << " checked and is still alive.");
167 --WaitingOps;
168}
169
170void PoolGuard::stop()
171{
172 // set flag to false and cancel timer
173 CheckAtNextInterval = false;
174 timer.cancel();
175 // clear internal list such that we may correctly be started again
176 LastWorkerList.clear();
177 CurrentWorkerList.clear();
178}
179
180void PoolGuard::start()
181{
182 // set flag to true
183 CheckAtNextInterval = true;
184 // and check right away to fill LastWorkerList
185 checkWorkers();
186}
Note: See TracBrowser for help on using the repository browser.