source: src/Fragmentation/Automation/FragmentScheduler.cpp@ e032b4

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 Candidate_v1.7.0 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since e032b4 was e032b4, checked in by Frederik Heber <heber@…>, 13 years ago

FragmentScheduler::recieveNotification() now asserts that we only received desired notifications.

  • Property mode set to 100644
File size: 19.1 KB
RevLine 
[72eaf7f]1/*
[cd4a6e]2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * \file FragmentScheduler.cpp
10 *
11 * This file strongly follows the Serialization example from the boost::asio
12 * library (see server.cpp)
[72eaf7f]13 *
[cd4a6e]14 * Created on: Oct 19, 2011
[72eaf7f]15 * Author: heber
16 */
17
[f93842]18// include config.h
19#ifdef HAVE_CONFIG_H
20#include <config.h>
21#endif
22
[c6bcd0]23// boost asio needs specific operator new
[72eaf7f]24#include <boost/asio.hpp>
[c6bcd0]25
26#include "CodePatterns/MemDebug.hpp"
27
[72eaf7f]28#include <boost/bind.hpp>
29#include <boost/lexical_cast.hpp>
30#include <iostream>
31#include <vector>
[af3aed]32#include "Connection.hpp" // Must come before boost/serialization headers.
[72eaf7f]33#include <boost/serialization/vector.hpp>
[af3aed]34#include "CodePatterns/Info.hpp"
[b0b64c]35#include "CodePatterns/Log.hpp"
[2344a3]36#include "CodePatterns/Observer/Notification.hpp"
37#include "ControllerChoices.hpp"
[50d095]38#include "Operations/Workers/EnrollInPoolOperation.hpp"
[ff60cfa]39#include "Jobs/MPQCCommandJob.hpp"
[d920b9]40#include "Jobs/SystemCommandJob.hpp"
[ef2767]41#include "JobId.hpp"
[72eaf7f]42
[cd4a6e]43#include "FragmentScheduler.hpp"
[72eaf7f]44
[ff60cfa]45/** Helper function to enforce binding of FragmentWorker to possible derived
46 * FragmentJob classes.
47 */
48void dummyInit() {
49 SystemCommandJob("/bin/false", "something", JobId::IllegalJob);
50 MPQCCommandJob("nofile", JobId::IllegalJob);
51}
[c7deca]52
[db03d9]53/** Constructor of class FragmentScheduler.
54 *
55 * We setup both acceptors to accept connections from workers and Controller.
56 *
57 * \param io_service io_service of the asynchronous communications
58 * \param workerport port to listen for worker connections
59 * \param controllerport port to listen for controller connections.
60 */
[2344a3]61FragmentScheduler::FragmentScheduler(boost::asio::io_service& _io_service, unsigned short workerport, unsigned short controllerport) :
62 Observer("FragmentScheduler"),
63 io_service(_io_service),
64 WorkerListener(_io_service, workerport, JobsQueue, pool,
[41c1b7]65 boost::bind(&FragmentScheduler::sendJobToWorker, boost::ref(*this), _1, _2)),
[2344a3]66 ControllerListener(_io_service, controllerport, JobsQueue,
67 boost::bind(&FragmentScheduler::shutdown, boost::ref(*this))),
68 connection(_io_service),
[6ea7f4]69 sendJobOp(connection),
70 shutdownWorkerOp(connection)
[ed2c5b]71{
[b0b64c]72 Info info(__FUNCTION__);
[72eaf7f]73
[2344a3]74 // sign on to idle workers and present jobs
75 pool.signOn(this, WorkerPool::WorkerIdle);
76 JobsQueue.signOn(this, FragmentQueue::JobAdded);
77
[41c1b7]78 // listen for controller
79 ControllerListener.initiateSocket();
80
[2344a3]81 // listen for workers
82 WorkerListener.initiateSocket();
83}
84
85FragmentScheduler::~FragmentScheduler()
86{
87 // sign off
88 pool.signOff(this, WorkerPool::WorkerIdle);
89 JobsQueue.signOff(this, FragmentQueue::JobAdded);
[402bde]90}
91
[db03d9]92/** Handle a new worker connection.
93 *
[41c1b7]94 * We store the given address in the pool.
[db03d9]95 *
96 * \param e error code if something went wrong
97 * \param conn reference with the connection
98 */
[8036b7]99void FragmentScheduler::WorkerListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn)
[ed2c5b]100{
[cd4a6e]101 Info info(__FUNCTION__);
[ed2c5b]102 if (!e)
[72eaf7f]103 {
[b0b64c]104 // Successfully accepted a new connection.
[41c1b7]105 // read address
106 conn->async_read(address,
[9a3f84]107 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReadAddress, this,
[41c1b7]108 boost::asio::placeholders::error, conn));
[9a3f84]109 }
110 else
111 {
[41c1b7]112 // An error occurred. Log it and return. Since we are not starting a new
113 // accept operation the io_service will run out of work to do and the
114 // server will exit.
115 Exitflag = ErrorFlag;
116 ELOG(0, e.message());
117 }
118}
[0bdd51b]119
[9a3f84]120/** Handle having received Worker's address
[41c1b7]121 *
122 * \param e error code if something went wrong
123 * \param conn reference with the connection
124 */
[9a3f84]125void FragmentScheduler::WorkerListener_t::handle_ReadAddress(const boost::system::error_code& e, connection_ptr conn)
[41c1b7]126{
127 Info info(__FUNCTION__);
128 if (!e)
129 {
[9a3f84]130 // Successfully accepted a new connection.
131 // read address
132 conn->async_read(choice,
133 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReadChoice, this,
134 boost::asio::placeholders::error, conn));
135 }
136 else
137 {
138 // An error occurred. Log it and return. Since we are not starting a new
139 // accept operation the io_service will run out of work to do and the
140 // server will exit.
141 Exitflag = ErrorFlag;
142 ELOG(0, e.message());
143 }
144}
145
146/** Controller callback function to read the choice for next operation.
147 *
148 * \param e error code if something went wrong
149 * \param conn reference with the connection
150 */
151void FragmentScheduler::WorkerListener_t::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
152{
153 Info info(__FUNCTION__);
154 if (!e)
155 {
156 LOG(1, "INFO: Received request for operation " << choice << ".");
157 // switch over the desired choice read previously
158 switch(choice) {
159 case NoWorkerOperation:
160 {
161 ELOG(1, "WorkerListener_t::handle_ReadChoice() - called with NoOperation.");
162 break;
163 }
164 case EnrollInPool:
165 {
166 if (pool.presentInPool(address)) {
167 ELOG(1, "INFO: worker "+toString(address)+" is already contained in pool.");
168 enum EnrollInPoolOperation::EnrollFlag flag = EnrollInPoolOperation::Fail;
169 conn->async_write(flag,
170 boost::bind(&FragmentScheduler::WorkerListener_t::handle_enrolled, this,
171 boost::asio::placeholders::error, conn));
172 } else {
173 // insert as its new worker
174 LOG(1, "INFO: Adding " << address << " to pool ...");
175 pool.addWorker(address);
176 enum EnrollInPoolOperation::EnrollFlag flag = EnrollInPoolOperation::Success;
177 conn->async_write(flag,
178 boost::bind(&FragmentScheduler::WorkerListener_t::handle_enrolled, this,
179 boost::asio::placeholders::error, conn));
180 break;
181 }
182 case SendResult:
183 {
184 if (pool.presentInPool(address)) {
185 // check whether its priority is busy_priority
186 if (pool.isWorkerBusy(address)) {
187 conn->async_read(result,
188 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker, this,
189 boost::asio::placeholders::error, conn));
190 } else {
191 ELOG(1, "Worker " << address << " trying to send result who is not marked as busy.");
192 conn->async_read(result,
193 boost::bind(&FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker, this,
194 boost::asio::placeholders::error, conn));
195 }
196 } else {
197 ELOG(1, "Worker " << address << " trying to send result who is not in pool.");
198 conn->async_read(result,
199 boost::bind(&FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker, this,
200 boost::asio::placeholders::error, conn));
201 }
202 break;
203 }
204 case RemoveFromPool:
205 {
206 if (pool.presentInPool(address)) {
207 // removing present worker
208 pool.removeWorker(address);
209 } else {
210 ELOG(1, "Shutting down Worker " << address << " not contained in pool.");
211 }
212 break;
213 }
214 default:
215 Exitflag = ErrorFlag;
216 ELOG(1, "WorkerListener_t::handle_ReadChoice() - called with no valid choice.");
217 break;
[41c1b7]218 }
[b0b64c]219 }
[9a3f84]220 // restore NoOperation choice such that choice is not read twice
221 choice = NoWorkerOperation;
[2344a3]222
223 initiateSocket();
[cd4a6e]224 }
225 else
226 {
227 // An error occurred. Log it and return. Since we are not starting a new
228 // accept operation the io_service will run out of work to do and the
229 // server will exit.
[8036b7]230 Exitflag = ErrorFlag;
[b0b64c]231 ELOG(0, e.message());
[cd4a6e]232 }
[ed2c5b]233}
[72eaf7f]234
[9a3f84]235
[41c1b7]236/** Callback function when new worker has enrolled.
[db03d9]237 *
238 * \param e error code if something went wrong
239 * \param conn reference with the connection
240 */
[41c1b7]241void FragmentScheduler::WorkerListener_t::handle_enrolled(const boost::system::error_code& e, connection_ptr conn)
[ed2c5b]242{
[41c1b7]243 Info info(__FUNCTION__);
[2344a3]244 if (e)
[41c1b7]245 {
246 // An error occurred. Log it and return. Since we are not starting a new
247 // accept operation the io_service will run out of work to do and the
248 // server will exit.
249 Exitflag = ErrorFlag;
250 ELOG(0, e.message());
251 }
[ef2767]252}
253
[db03d9]254/** Callback function when result has been received.
255 *
256 * \param e error code if something went wrong
257 * \param conn reference with the connection
258 */
[8036b7]259void FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
[ef2767]260{
[db03d9]261 Info info(__FUNCTION__);
[35f587]262 LOG(1, "INFO: Received result for job #" << result->getId() << " ...");
[41c1b7]263
[35f587]264 // and push into queue
265 ASSERT(result->getId() != (JobId_t)JobId::NoJob,
[41c1b7]266 "WorkerListener_t::handle_ReceiveResultFromWorker() - result received has NoJob id.");
[35f587]267 ASSERT(result->getId() != (JobId_t)JobId::IllegalJob,
[41c1b7]268 "WorkerListener_t::handle_ReceiveResultFromWorker() - result received has IllegalJob id.");
[778abb]269 // place id into expected
[35f587]270 if ((result->getId() != (JobId_t)JobId::NoJob) && (result->getId() != (JobId_t)JobId::IllegalJob))
[db03d9]271 JobsQueue.pushResult(result);
[41c1b7]272
273 // mark as idle
274 pool.unmarkWorkerBusy(address);
275
[db03d9]276 // erase result
[35f587]277 result.reset();
[778abb]278 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
[db03d9]279}
280
[9a3f84]281/** Callback function when result has been received.
282 *
283 * \param e error code if something went wrong
284 * \param conn reference with the connection
285 */
286void FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
287{
288 Info info(__FUNCTION__);
289 // nothing to do
290 LOG(1, "INFO: Rejecting result for job #" << result->getId() << ", placing back into queue.");
291
292 JobsQueue.resubmitJob(result->getId());
293
294 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
295}
296
[41c1b7]297
[db03d9]298/** Handle a new controller connection.
299 *
300 * \sa handle_ReceiveJobs()
301 * \sa handle_CheckResultState()
302 * \sa handle_SendResults()
303 *
304 * \param e error code if something went wrong
305 * \param conn reference with the connection
306 */
[8036b7]307void FragmentScheduler::ControllerListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn)
[db03d9]308{
309 Info info(__FUNCTION__);
310 if (!e)
311 {
[778abb]312 conn->async_read(choice,
[8036b7]313 boost::bind(&FragmentScheduler::ControllerListener_t::handle_ReadChoice, this,
[778abb]314 boost::asio::placeholders::error, conn));
315 }
316 else
317 {
318 // An error occurred. Log it and return. Since we are not starting a new
319 // accept operation the io_service will run out of work to do and the
320 // server will exit.
[8036b7]321 Exitflag = ErrorFlag;
[778abb]322 ELOG(0, e.message());
323 }
324}
325
326/** Controller callback function to read the choice for next operation.
327 *
328 * \param e error code if something went wrong
329 * \param conn reference with the connection
330 */
[8036b7]331void FragmentScheduler::ControllerListener_t::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
[778abb]332{
333 Info info(__FUNCTION__);
334 if (!e)
335 {
[0196c6]336 bool LaunchNewAcceptor = true;
[d1dbfc]337 LOG(1, "INFO: Received request for operation " << choice << ".");
[778abb]338 // switch over the desired choice read previously
339 switch(choice) {
[38032a]340 case NoControllerOperation:
[778abb]341 {
[9a3f84]342 ELOG(1, "ControllerListener_t::handle_ReadChoice() - called with NoOperation.");
[778abb]343 break;
344 }
[d1dbfc]345 case GetNextJobId:
346 {
347 const JobId_t nextid = globalId.getNextId();
348 LOG(1, "INFO: Sending next available job id " << nextid << " to controller ...");
349 conn->async_write(nextid,
[8036b7]350 boost::bind(&FragmentScheduler::ControllerListener_t::handle_GetNextJobIdState, this,
[d1dbfc]351 boost::asio::placeholders::error, conn));
352 break;
353 }
[425fc6]354 case SendJobs:
[d1dbfc]355 {
356 // The connection::async_write() function will automatically
357 // serialize the data structure for us.
358 LOG(1, "INFO: Receiving bunch of jobs from a controller ...");
359 conn->async_read(jobs,
[8036b7]360 boost::bind(&FragmentScheduler::ControllerListener_t::handle_ReceiveJobs, this,
[d1dbfc]361 boost::asio::placeholders::error, conn));
362 break;
363 }
[778abb]364 case CheckState:
365 {
[3c4a5e]366 // first update number
[6f2bc7]367 jobInfo[0] = JobsQueue.getPresentJobs();
368 jobInfo[1] = JobsQueue.getDoneJobs();
[3c4a5e]369 // now we accept connections to check for state of calculations
[6f2bc7]370 LOG(1, "INFO: Sending state that "+toString(jobInfo[0])+" jobs are present and "+toString(jobInfo[1])+" jobs are done to controller ...");
371 conn->async_write(jobInfo,
[8036b7]372 boost::bind(&FragmentScheduler::ControllerListener_t::handle_CheckResultState, this,
[3c4a5e]373 boost::asio::placeholders::error, conn));
[778abb]374 break;
375 }
[9d14c3]376 case ReceiveResults:
[778abb]377 {
[35f587]378 const std::vector<FragmentResult::ptr> results = JobsQueue.getAllResults();
[778abb]379 // ... or we give the results
380 LOG(1, "INFO: Sending "+toString(results.size())+" results to controller ...");
381 conn->async_write(results,
[8036b7]382 boost::bind(&FragmentScheduler::ControllerListener_t::handle_SendResults, this,
[778abb]383 boost::asio::placeholders::error, conn));
[0196c6]384 break;
385 }
[38032a]386 case ShutdownControllerSocket:
[0196c6]387 {
[9a3f84]388 LOG(1, "INFO: Received shutdown from controller ...");
389 // only allow for shutdown when there are no more jobs in the queue
390 if (!JobsQueue.isJobPresent()) {
391 LaunchNewAcceptor = false;
392 } else {
393 ELOG(2, "There are still jobs waiting in the queue.");
394 }
[778abb]395 break;
[db03d9]396 }
[778abb]397 default:
[8036b7]398 Exitflag = ErrorFlag;
[9a3f84]399 ELOG(1, "ControllerListener_t::handle_ReadChoice() - called with no valid choice.");
[778abb]400 break;
401 }
[38032a]402 // restore NoControllerOperation choice such that choice is not read twice
403 choice = NoControllerOperation;
[778abb]404
[0196c6]405 if (LaunchNewAcceptor) {
406 LOG(1, "Launching new acceptor on socket.");
407 // Start an accept operation for a new Connection.
[8036b7]408 initiateSocket();
[2344a3]409 } else {
410 // we shutdown? Hence, also shutdown controller
411 shutdownAllSockets();
[0196c6]412 }
[db03d9]413 }
414 else
415 {
416 // An error occurred. Log it and return. Since we are not starting a new
417 // accept operation the io_service will run out of work to do and the
418 // server will exit.
[8036b7]419 Exitflag = ErrorFlag;
[db03d9]420 ELOG(0, e.message());
421 }
422}
423
424/** Controller callback function when job has been sent.
[778abb]425 *
426 * We check here whether the worker socket is accepting, if there
427 * have been no jobs we re-activate it, as it is shut down after
428 * last job.
[db03d9]429 *
430 * \param e error code if something went wrong
431 * \param conn reference with the connection
432 */
[8036b7]433void FragmentScheduler::ControllerListener_t::handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn)
[db03d9]434{
435 Info info(__FUNCTION__);
436 // jobs are received, hence place in JobsQueue
437 if (!jobs.empty()) {
438 LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue.");
439 JobsQueue.pushJobs(jobs);
440 }
441 jobs.clear();
[ed2c5b]442}
[cd4a6e]443
[3c4a5e]444/** Controller callback function when checking on state of results.
445 *
446 * \param e error code if something went wrong
447 * \param conn reference with the connection
448 */
[8036b7]449void FragmentScheduler::ControllerListener_t::handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn)
[3c4a5e]450{
451 Info info(__FUNCTION__);
452 // do nothing
[6f2bc7]453 LOG(1, "INFO: Sent that " << jobInfo << " jobs are (scheduled, done).");
[3c4a5e]454}
[778abb]455
[d1dbfc]456/** Controller callback function when checking on state of results.
457 *
458 * \param e error code if something went wrong
459 * \param conn reference with the connection
460 */
[8036b7]461void FragmentScheduler::ControllerListener_t::handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn)
[d1dbfc]462{
463 Info info(__FUNCTION__);
464 // do nothing
465 LOG(1, "INFO: Sent next available job id.");
466}
467
[778abb]468/** Controller callback function when result has been received.
469 *
470 * \param e error code if something went wrong
471 * \param conn reference with the connection
472 */
[8036b7]473void FragmentScheduler::ControllerListener_t::handle_SendResults(const boost::system::error_code& e, connection_ptr conn)
[778abb]474{
475 Info info(__FUNCTION__);
476 // do nothing
477 LOG(1, "INFO: Results have been sent.");
478}
479
[41c1b7]480
481/** Helper function to send a job to worker.
[9a3f84]482 *
483 * Note that we do not set the worker as busy. We simply send it the job.
[41c1b7]484 *
485 * @param address address of worker
486 * @param job job to send
487 */
488void FragmentScheduler::sendJobToWorker(const WorkerAddress &address, FragmentJob::ptr &job)
489{
[9a3f84]490 ASSERT( pool.isWorkerBusy(address),
491 "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is not marked as busy.");
[41c1b7]492 LOG(1, "INFO: Sending job " << job->getId() << " to worker " << address << ".");
493 sendJobOp.setJob(job);
494 sendJobOp(address.host, address.service);
495}
496
[2344a3]497/** Helper function to shutdown a single worker.
498 *
499 * We send NoJob to indicate shutdown
500 *
501 * @param address of worker to shutdown
502 */
503void FragmentScheduler::shutdownWorker(const WorkerAddress &address)
504{
[6ea7f4]505 ASSERT( !pool.isWorkerBusy(address),
506 "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is already busy.");
507 LOG(2, "INFO: Shutting down worker " << address << "...");
508 shutdownWorkerOp(address.host, address.service);
[2344a3]509}
510
511/** Sends shutdown to all current workers in the pool.
512 *
513 */
514void FragmentScheduler::removeAllWorkers()
515{
516 LOG(2, "INFO: Shutting down workers ...");
517
[6b3a37]518 // \todo We have to wait here until all workers are done
519 // first, sign off such that no new jobs are given to workers
520 pool.signOff(this, WorkerPool::WorkerIdle);
521 while (pool.hasBusyWorkers())
522 ;
523
[2344a3]524 // give all workers shutdown signal
[6b3a37]525 for (WorkerPool::Idle_Queue_t::const_iterator iter = pool.begin_idle(); iter != pool.end_idle(); ++iter) {
526 const WorkerAddress address = iter->second;
[2344a3]527 shutdownWorker(address);
528 }
[6b3a37]529 pool.removeAllWorkers();
[2344a3]530}
531
532/** Helper function to shutdown the server properly.
533 *
534 * \todo one should idle here until all workers have returned from
535 * calculating stuff (or workers need to still listen while the are
536 * calculating which is probably better).
537 *
538 */
539void FragmentScheduler::shutdown()
540{
541 LOG(1, "INFO: Shutting all down ...");
542
543 /// Remove all workers
544 removeAllWorkers();
545
546 /// close the worker listener's socket
547 WorkerListener.closeSocket();
548
549 /// close the controller listener's socket
550 ControllerListener.closeSocket();
551
552 /// finally, stop the io_service
553 io_service.stop();
554}
555
556/** Internal helper to send the next available job to the next idle worker.
557 *
558 */
559void FragmentScheduler::sendAvailableJobToNextIdleWorker()
560{
561 const WorkerAddress address = pool.getNextIdleWorker();
562 FragmentJob::ptr job = JobsQueue.popJob();
563 sendJobToWorker(address, job);
564}
565
566void FragmentScheduler::update(Observable *publisher)
567{
568 ASSERT(0, "FragmentScheduler::update() - we are not signed on for global updates.");
569}
570
571void FragmentScheduler::recieveNotification(Observable *publisher, Notification_ptr notification)
572{
573 if ((publisher == &pool) && (notification->getChannelNo() == WorkerPool::WorkerIdle)) {
[e032b4]574 // we have an idle worker
[2344a3]575 LOG(1, "INFO: We are notified of an idle worker.");
576 // are jobs available?
577 if (JobsQueue.isJobPresent()) {
578 sendAvailableJobToNextIdleWorker();
579 }
[e032b4]580 } else if ((publisher == &JobsQueue) && (notification->getChannelNo() == FragmentQueue::JobAdded)) {
581 // we have new jobs
[2344a3]582 LOG(1, "INFO: We are notified of a new job.");
583 // check for idle workers
584 if (pool.presentIdleWorkers()) {
585 sendAvailableJobToNextIdleWorker();
586 }
[e032b4]587 } else {
588 ASSERT(0, "FragmentScheduler::recieveNotification() - we are not signed on for updates in channel "
589 +toString(notification->getChannelNo())+".");
[2344a3]590 }
591}
592
593void FragmentScheduler::subjectKilled(Observable *publisher)
594{}
Note: See TracBrowser for help on using the repository browser.