1 | /*
|
---|
2 | * Project: JobMarket
|
---|
3 | * Description: asynchronous Server/Controller/Client-approach to parallel computing, based on boost::asio
|
---|
4 | * Copyright (C) 2012 Frederik Heber. All rights reserved.
|
---|
5 | *
|
---|
6 | */
|
---|
7 |
|
---|
8 | /*
|
---|
9 | * PoolGuard.cpp
|
---|
10 | *
|
---|
11 | * Created on: Sep 5, 2012
|
---|
12 | * Author: heber
|
---|
13 | */
|
---|
14 |
|
---|
15 |
|
---|
16 | // include config.h
|
---|
17 | #ifdef HAVE_CONFIG_H
|
---|
18 | #include <config.h>
|
---|
19 | #endif
|
---|
20 |
|
---|
21 | // boost asio needs specific operator new
|
---|
22 | #include <boost/asio.hpp>
|
---|
23 |
|
---|
24 | #include "CodePatterns/MemDebug.hpp"
|
---|
25 |
|
---|
26 | #include "Pool/PoolGuard.hpp"
|
---|
27 |
|
---|
28 | #include <boost/bind.hpp>
|
---|
29 |
|
---|
30 | #include "CodePatterns/Assert.hpp"
|
---|
31 | #include "CodePatterns/Log.hpp"
|
---|
32 |
|
---|
33 | #include "FragmentQueue.hpp"
|
---|
34 | #include "Operations/Servers/CheckAliveWorkerOperation.hpp"
|
---|
35 | #include "Operations/OperationQueue.hpp"
|
---|
36 | #include "Pool/WorkerPool.hpp"
|
---|
37 |
|
---|
38 | PoolGuard::PoolGuard(
|
---|
39 | boost::asio::io_service& io_service,
|
---|
40 | const size_t _timeout,
|
---|
41 | Connection &_connection,
|
---|
42 | const boost::function<void (const WorkerAddress)> _removeWorkerfunction,
|
---|
43 | const boost::function<void (const JobId_t)> _resubmitJobfunction,
|
---|
44 | OperationQueue &_OpQueue) :
|
---|
45 | CheckAtNextInterval(false),
|
---|
46 | timeout(_timeout),
|
---|
47 | timer(io_service),
|
---|
48 | removeWorkerfunction(_removeWorkerfunction),
|
---|
49 | resubmitJobfunction(_resubmitJobfunction),
|
---|
50 | OpQueue(_OpQueue),
|
---|
51 | connection(_connection),
|
---|
52 | WaitingOps(0)
|
---|
53 | {
|
---|
54 | // set timer if we start by default
|
---|
55 | if (CheckAtNextInterval) {
|
---|
56 | timer.expires_from_now(boost::posix_time::seconds(timeout));
|
---|
57 | timer.async_wait(boost::bind(&PoolGuard::checkWorkers, this));
|
---|
58 | }
|
---|
59 | }
|
---|
60 |
|
---|
61 | void PoolGuard::checkWorkers()
|
---|
62 | {
|
---|
63 | if (WaitingOps == 0) {
|
---|
64 | LOG(1, "INFO: There are " << LastWorkerList.size() << " busy workers from last time, "
|
---|
65 | << CurrentWorkerList.size() << " are currently busy.");
|
---|
66 | // create a vector of workers to check
|
---|
67 | typedef std::set<WorkerAddress> CheckList_t;
|
---|
68 | CheckList_t currentworkers;
|
---|
69 | for (WorkerList_t::const_iterator iter = LastWorkerList.begin();
|
---|
70 | iter != LastWorkerList.end();
|
---|
71 | ++iter) {
|
---|
72 | const WorkerAddress &address = iter->first;
|
---|
73 | LOG(2, "DEBUG: Checking whether worker " << address << " is still busy ...");
|
---|
74 | WorkerList_t::const_iterator currentiter = CurrentWorkerList.find(address);
|
---|
75 | // check if worker was busy last time and on same job
|
---|
76 | if (currentiter != CurrentWorkerList.end()) {
|
---|
77 | LOG(2, "DEBUG: Worker " << address << " was busy last time on job " << iter->second << ".");
|
---|
78 | if (currentiter->second == iter->second) {
|
---|
79 | LOG(1, "INFO: Worker " << address << " is working on same job "
|
---|
80 | << iter->second << " as last time, scheduling for checkalive.");
|
---|
81 | currentworkers.insert(address);
|
---|
82 | } else {
|
---|
83 | LOG(1, "INFO: Worker " << address << " is working on different job "
|
---|
84 | << currentiter->second << " than last time "
|
---|
85 | << iter->second << ", scheduling for checkalive.");
|
---|
86 | }
|
---|
87 | }
|
---|
88 | }
|
---|
89 | // go through candidates to check
|
---|
90 | LOG(1, "INFO: Checking on " << currentworkers.size() << " possible dead workers.");
|
---|
91 | for(CheckList_t::const_iterator iter = currentworkers.begin();
|
---|
92 | iter != currentworkers.end(); ++iter) {
|
---|
93 | const WorkerAddress &address = *iter;
|
---|
94 | LOG(1, "INFO: Checking whether " << address << " is alive.");
|
---|
95 | AsyncOperation *checkaliveWorkerOp = new CheckAliveWorkerOperation(connection,
|
---|
96 | boost::bind(&PoolGuard::checkAddress, this, address, _1),
|
---|
97 | boost::bind(&PoolGuard::printWorkerIsAlive, this, address),
|
---|
98 | boost::bind(&PoolGuard::removeFromPool, this, address));
|
---|
99 | OpQueue.push_back(checkaliveWorkerOp, address);
|
---|
100 | ++WaitingOps;
|
---|
101 | }
|
---|
102 | } else {
|
---|
103 | ELOG(2, "We are lacking behind on CheckAliveOps, skipping this check interval.");
|
---|
104 | }
|
---|
105 | // set old list to new list
|
---|
106 | LastWorkerList = CurrentWorkerList;
|
---|
107 |
|
---|
108 | // set next check interval
|
---|
109 | if (CheckAtNextInterval) {
|
---|
110 | timer.expires_from_now(boost::posix_time::seconds(timeout));
|
---|
111 | timer.async_wait(boost::bind(&PoolGuard::checkWorkers, this));
|
---|
112 | }
|
---|
113 | }
|
---|
114 |
|
---|
115 | void PoolGuard::checkAddress(
|
---|
116 | const WorkerAddress trueaddress,
|
---|
117 | const WorkerAddress returnedaddress)
|
---|
118 | {
|
---|
119 | if (trueaddress != returnedaddress) {
|
---|
120 | ELOG(1, "Worker at " << trueaddress << " returned itself wrongly as "
|
---|
121 | << returnedaddress << ", removing.");
|
---|
122 | removeFromPool(trueaddress);
|
---|
123 | }
|
---|
124 | }
|
---|
125 |
|
---|
126 | void PoolGuard::removeFromPool(const WorkerAddress address)
|
---|
127 | {
|
---|
128 | LOG(1, "INFO: Worker " << address << " does not react to CheckAlive, removing.");
|
---|
129 | // erase from our lists
|
---|
130 | LastWorkerList.erase(address);
|
---|
131 | WorkerList_t::iterator iter = CurrentWorkerList.find(address);
|
---|
132 | if ( iter != CurrentWorkerList.end()) {
|
---|
133 | const JobId_t jobid = iter->second;
|
---|
134 | CurrentWorkerList.erase(iter);
|
---|
135 | // erase from pool
|
---|
136 | removeWorkerfunction(address);
|
---|
137 | // resubmit job
|
---|
138 | resubmitJobfunction(jobid);
|
---|
139 | }
|
---|
140 | }
|
---|
141 |
|
---|
142 | void PoolGuard::addBusyWorker(const WorkerAddress address, const JobId_t id)
|
---|
143 | {
|
---|
144 | LOG(1, "INFO: Adding worker " << address << " with job " << id << " as busy.");
|
---|
145 | #ifndef NDEBUG
|
---|
146 | std::pair<WorkerList_t::iterator, bool> inserter =
|
---|
147 | #endif
|
---|
148 | CurrentWorkerList.insert( std::make_pair(address, id) );
|
---|
149 | ASSERT( inserter.second,
|
---|
150 | "PoolGuard::addBusyWorker() - worker "+toString(address)+" "
|
---|
151 | "is already busy to our knowledge on job "+toString(inserter.second)+".");
|
---|
152 | }
|
---|
153 |
|
---|
154 | void PoolGuard::removeBusyWorker(const WorkerAddress address)
|
---|
155 | {
|
---|
156 | LOG(1, "INFO: Removing worker " << address << " as busy.");
|
---|
157 | WorkerList_t::iterator iter = CurrentWorkerList.find(address);
|
---|
158 | if (iter != CurrentWorkerList.end()) {
|
---|
159 | CurrentWorkerList.erase(address);
|
---|
160 | LastWorkerList.erase(address);
|
---|
161 | }
|
---|
162 | }
|
---|
163 |
|
---|
164 | void PoolGuard::printWorkerIsAlive(const WorkerAddress address)
|
---|
165 | {
|
---|
166 | LOG(2, "DEBUG: Worker " << address << " checked and is still alive.");
|
---|
167 | --WaitingOps;
|
---|
168 | }
|
---|
169 |
|
---|
170 | void PoolGuard::stop()
|
---|
171 | {
|
---|
172 | // set flag to false and cancel timer
|
---|
173 | CheckAtNextInterval = false;
|
---|
174 | timer.cancel();
|
---|
175 | // clear internal list such that we may correctly be started again
|
---|
176 | LastWorkerList.clear();
|
---|
177 | CurrentWorkerList.clear();
|
---|
178 | }
|
---|
179 |
|
---|
180 | void PoolGuard::start()
|
---|
181 | {
|
---|
182 | // set flag to true
|
---|
183 | CheckAtNextInterval = true;
|
---|
184 | // and check right away to fill LastWorkerList
|
---|
185 | checkWorkers();
|
---|
186 | }
|
---|