source: src/JobMarket/FragmentScheduler.cpp@ 404d2b

Action_Thermostats Add_AtomRandomPerturbation Add_RotateAroundBondAction Add_SelectAtomByNameAction Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_StructOpt_integration_tests Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.6.0 Candidate_v1.6.1 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_ChronosMutex Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion GeometryObjects Gui_displays_atomic_force_velocity IndependentFragmentGrids_IntegrationTest JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks RotateToPrincipalAxisSystem_UndoRedo StoppableMakroAction Subpackage_JobMarket Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg ThirdParty_MPQC_rebuilt_buildsystem TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps Ubuntu_1604_changes stable
Last change on this file since 404d2b was 404d2b, checked in by Frederik Heber <heber@…>, 8 years ago

Squashed 'ThirdParty/JobMarket/' content from commit e194722

git-subtree-dir: ThirdParty/JobMarket
git-subtree-split: e19472277e62c493f6c10f1483fe21e64c1039e9

  • Property mode set to 100644
File size: 9.9 KB
Line 
1/*
2 * Project: JobMarket
3 * Description: asynchronous Server/Controller/Client-approach to parallel computing, based on boost::asio
4 * Copyright (C) 2011 Frederik Heber. All rights reserved.
5 *
6 */
7
8/*
9 * \file FragmentScheduler.cpp
10 *
11 * This file strongly follows the Serialization example from the boost::asio
12 * library (see server.cpp)
13 *
14 * Created on: Oct 19, 2011
15 * Author: heber
16 */
17
18// include config.h
19#ifdef HAVE_CONFIG_H
20#include <config.h>
21#endif
22
23// boost asio needs specific operator new
24#include <boost/asio.hpp>
25
26#include "CodePatterns/MemDebug.hpp"
27
28#include <algorithm>
29#include <boost/bind.hpp>
30#include <vector>
31#include "JobMarket/Connection.hpp" // Must come before boost/serialization headers.
32#include <boost/serialization/set.hpp>
33#include <boost/serialization/vector.hpp>
34#include "CodePatterns/Info.hpp"
35#include "CodePatterns/Log.hpp"
36#include "CodePatterns/Observer/Notification.hpp"
37#include "Operations/Servers/SendJobToWorkerOperation.hpp"
38#include "Operations/Servers/ShutdownWorkerOperation.hpp"
39#include "Operations/Workers/EnrollInPoolOperation.hpp"
40#include "JobMarket/JobId.hpp"
41
42#include "JobMarket/FragmentScheduler.hpp"
43
44/** Constructor of class FragmentScheduler.
45 *
46 * We setup both acceptors to accept connections from workers and Controller.
47 *
48 * \param io_service io_service of the asynchronous communications
49 * \param workerport port to listen for worker connections
50 * \param controllerport port to listen for controller connections.
51 * \param timeout interval in seconds for how often to check on workers status
52 */
53FragmentScheduler::FragmentScheduler(
54 boost::asio::io_service& _io_service,
55 unsigned short workerport,
56 unsigned short controllerport,
57 const size_t timeout) :
58 Observer("FragmentScheduler"),
59 io_service(_io_service),
60 WorkerListener(_io_service, workerport, JobsQueue, pool,
61 boost::bind(&FragmentScheduler::sendJobToWorker, boost::ref(*this), _1, _2),
62 boost::bind(&FragmentScheduler::unmarkWorkerBusy, boost::ref(*this), _1)),
63 ControllerListener(_io_service, controllerport, JobsQueue,
64 boost::bind(&FragmentScheduler::removeAllWorkers, boost::ref(*this)),
65 boost::bind(&FragmentScheduler::shutdown, boost::ref(*this))),
66 connection(_io_service),
67 guard(_io_service, timeout, connection,
68 boost::bind(&FragmentScheduler::removeWorker, boost::ref(*this), _1),
69 boost::bind(&FragmentQueue::resubmitJob, boost::ref(JobsQueue), _1),
70 OpQueue)
71{
72 DEBUG_FUNCTION_ENTRYEXIT
73
74 if ((WorkerListener.getExitflag() == Listener::OkFlag)
75 && (ControllerListener.getExitflag() == Listener::OkFlag)) {
76 // sign on to idle workers and present jobs
77 pool.signOn(this, WorkerPool::WorkerIdle);
78 JobsQueue.signOn(this, FragmentQueue::JobAdded);
79
80 // listen for controller
81 ControllerListener.initiateSocket();
82
83 // listen for workers
84 WorkerListener.initiateSocket();
85 } else {
86 ELOG(0, "Not starting, we just exit due to failed listen bind.");
87 }
88}
89
90FragmentScheduler::~FragmentScheduler()
91{
92 // sign off
93 pool.signOff(this, WorkerPool::WorkerIdle);
94 JobsQueue.signOff(this, FragmentQueue::JobAdded);
95}
96
97FragmentScheduler::WorkerListener_t::HandlerData::HandlerData() :
98 address("127.0.0.1", "0"),
99 result( new FragmentResult(JobId::NoJob) ),
100 choice(NoWorkerOperation)
101{}
102
103/** Helper function to send a job to worker.
104 *
105 * Note that we do not set the worker as busy. We simply send it the job.
106 *
107 * @param address address of worker
108 * @param job job to send
109 */
110void FragmentScheduler::sendJobToWorker(const WorkerAddress &address, FragmentJob::ptr &job)
111{
112 ASSERT( pool.isWorkerBusy(address),
113 "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is not marked as busy.");
114 LOG(1, "INFO: Sending job " << job->getId() << " to worker " << address << ".");
115
116 // create op, callbacks for success and failure, and hand over to queue
117 boost::function<void ()> AcceptsJob = boost::bind(
118 &FragmentScheduler::workerAcceptsJob, boost::ref(*this), address, job->getId());
119 boost::function<void ()> RejectedJob = boost::bind(
120 &FragmentScheduler::workerRejectsJob, boost::ref(*this), address, job->getId());
121 AsyncOperation *sendJobOp =
122 new SendJobToWorkerOperation(connection,job, AcceptsJob, RejectedJob);
123 OpQueue.push_back(sendJobOp, address);
124}
125
126/** Helper function that is called asynchronously after worker accepts a new
127 * job and we need to add a check alive for it.
128 *
129 * \param _address address of worker to guard
130 * \param _id id of job the worker is working on
131 */
132void FragmentScheduler::workerAcceptsJob(const WorkerAddress _address, const JobId_t _id)
133{
134 // inform guard and if not already running, also launch guard
135 guard.addBusyWorker(_address, _id);
136 if (!guard.isRunning())
137 guard.start();
138
139}
140
141/** Helper function that is called asynchronously after worker declined new
142 * job.
143 *
144 * \param _address address of worker to guard
145 * \param _id id of job the worker is working on
146 */
147void FragmentScheduler::workerRejectsJob(const WorkerAddress _address, const JobId_t _id)
148{
149 unmarkWorkerBusy(_address);
150 JobsQueue.resubmitJob(_id);
151}
152
153/** Helper function to shutdown a single worker.
154 *
155 * We send NoJob to indicate shutdown
156 *
157 * @param address of worker to shutdown
158 */
159void FragmentScheduler::shutdownWorker(const WorkerAddress &address)
160{
161 ASSERT( !pool.isWorkerBusy(address),
162 "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is already busy.");
163 LOG(2, "INFO: Shutting down worker " << address << "...");
164 AsyncOperation *shutdownWorkerOp = new ShutdownWorkerOperation(connection);
165 OpQueue.push_back(shutdownWorkerOp, address);
166}
167
168/** Sends shutdown to all current workers in the pool.
169 *
170 */
171void FragmentScheduler::removeAllWorkers()
172{
173 // first, sign off such that no new jobs are given to workers
174 pool.signOff(this, WorkerPool::WorkerIdle);
175
176 LOG(2, "DEBUG: Waiting for busy workers to finish ...");
177 while (pool.hasBusyWorkers())
178 ;
179
180 LOG(2, "INFO: Shutting down workers ...");
181 // iterate until there are no more idle workers
182 // get list of all idle workers
183 typedef std::vector<std::pair<std::string, std::string> > WorkerList_t;
184 WorkerList_t WorkerList = pool.getListOfIdleWorkers();
185
186 // give all workers shutdown signal
187 for (WorkerList_t::const_iterator iter = WorkerList.begin(); iter != WorkerList.end(); ++iter)
188 shutdownWorker(WorkerAddress(iter->first, iter->second));
189}
190
191/** Function to shutdown server properly, e.g. for use as signal handler.
192 *
193 * @param sig signal number
194 */
195void FragmentScheduler::shutdown(int sig)
196{
197 LOG(0, "STATUS: Shutting down due to signal " << sig << ".");
198
199 if (!pool.presentIdleWorkers() && !pool.hasBusyWorkers()) {
200 shutdown();
201 } else {
202 removeAllWorkers();
203 }
204}
205
206/** Helper function to shutdown the server properly.
207 *
208 * \todo one should idle here until all workers have returned from
209 * calculating stuff (or workers need to still listen while they are
210 * calculating which is probably better).
211 *
212 * \note We only shutdown when there are no workers left
213 *
214 * @return true - doing shutdown, false - precondition not met, not shutting down
215 */
216bool FragmentScheduler::shutdown()
217{
218 if (!pool.presentIdleWorkers() && !pool.hasBusyWorkers()) {
219 LOG(1, "INFO: Shutting all down ...");
220
221 // close the guard's watch
222 guard.stop();
223
224 /// close the worker listener's socket
225 WorkerListener.closeSocket();
226
227 /// close the controller listener's socket
228 ControllerListener.closeSocket();
229
230 /// finally, stop the io_service
231 io_service.stop();
232 return true;
233 } else {
234 ELOG(2, "There are still idle or busy workers present.");
235 return false;
236 }
237}
238
239/** Internal helper to send the next available job to the next idle worker.
240 *
241 */
242void FragmentScheduler::sendAvailableJobToNextIdleWorker()
243{
244 const WorkerAddress address = pool.getNextIdleWorker();
245 FragmentJob::ptr job = JobsQueue.popJob();
246 sendJobToWorker(address, job);
247}
248
249void FragmentScheduler::update(Observable *publisher)
250{
251 ASSERT(0, "FragmentScheduler::update() - we are not signed on for global updates.");
252}
253
254void FragmentScheduler::recieveNotification(Observable *publisher, Notification_ptr notification)
255{
256 if ((publisher == &pool) && (notification->getChannelNo() == WorkerPool::WorkerIdle)) {
257 // we have an idle worker
258 LOG(1, "INFO: We are notified of an idle worker.");
259 // are jobs available?
260 if (JobsQueue.isJobPresent()) {
261 sendAvailableJobToNextIdleWorker();
262 } else {
263 // if it was the last busy worker, stop the guard for the moment
264 if (!pool.presentBusyWorkers())
265 guard.stop();
266 }
267 } else if ((publisher == &JobsQueue) && (notification->getChannelNo() == FragmentQueue::JobAdded)) {
268 // we have new jobs
269 LOG(1, "INFO: We are notified of a new job.");
270 // check for idle workers
271 if (pool.presentIdleWorkers()) {
272 sendAvailableJobToNextIdleWorker();
273 }
274 } else {
275 ASSERT(0, "FragmentScheduler::recieveNotification() - we are not signed on for updates in channel "
276 +toString(notification->getChannelNo())+".");
277 }
278}
279
280void FragmentScheduler::subjectKilled(Observable *publisher)
281{}
282
283/** Removes worker at \a address and stops guard in case of last busy one.
284 *
285 * This is is specifically meant to remove a busy worker as in this case we
286 * might also have to call PoolGuard::stop() if it has been the last busy
287 * worker.
288 *
289 * @param address address of worker to remove.
290 */
291void FragmentScheduler::removeWorker(const WorkerAddress address)
292{
293 pool.removeWorker(address);
294 // stop if it has been the last busy worker
295 if (!pool.presentBusyWorkers())
296 guard.stop();
297}
298
299/** Unmarks worker from being busy in WorkerPool.
300 *
301 * This is required to catch when a worker has finished working on a job and to
302 * inform the PoolGuard
303 *
304 * @param address address of once busy worker
305 */
306void FragmentScheduler::unmarkWorkerBusy(const WorkerAddress address)
307{
308 // inform guard
309 guard.removeBusyWorker(address);
310 // unmark in pool
311 pool.unmarkWorkerBusy(address);
312}
Note: See TracBrowser for help on using the repository browser.