source: src/Fragmentation/Automation/FragmentScheduler.cpp@ 9a3f84

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 Candidate_v1.7.0 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since 9a3f84 was 9a3f84, checked in by Frederik Heber <heber@…>, 13 years ago

FragmentScheduler now uses WorkerChoices for handling connection workers.

  • Workers always first send address, then their choice and depending on this we branch into various handlers.
  • SubmitResultOperation and EnrollInPoolOperation now correctly give choice after sending address.
  • handle_enrolled uses getNextIdleWorker() to mark indirectly as busy. This is temporary.
  • we do not resubmit if Max_Attempts is 1.
  • Property mode set to 100644
File size: 17.0 KB
RevLine 
[72eaf7f]1/*
[cd4a6e]2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * \file FragmentScheduler.cpp
10 *
11 * This file strongly follows the Serialization example from the boost::asio
12 * library (see server.cpp)
[72eaf7f]13 *
[cd4a6e]14 * Created on: Oct 19, 2011
[72eaf7f]15 * Author: heber
16 */
17
[f93842]18// include config.h
19#ifdef HAVE_CONFIG_H
20#include <config.h>
21#endif
22
[c6bcd0]23// boost asio needs specific operator new
[72eaf7f]24#include <boost/asio.hpp>
[c6bcd0]25
26#include "CodePatterns/MemDebug.hpp"
27
[72eaf7f]28#include <boost/bind.hpp>
29#include <boost/lexical_cast.hpp>
30#include <iostream>
31#include <vector>
[af3aed]32#include "Connection.hpp" // Must come before boost/serialization headers.
[72eaf7f]33#include <boost/serialization/vector.hpp>
[af3aed]34#include "CodePatterns/Info.hpp"
[b0b64c]35#include "CodePatterns/Log.hpp"
[41c1b7]36#include "Controller/Commands/EnrollInPoolOperation.hpp"
[ff60cfa]37#include "Jobs/MPQCCommandJob.hpp"
[d920b9]38#include "Jobs/SystemCommandJob.hpp"
[ef2767]39#include "JobId.hpp"
[72eaf7f]40
[cd4a6e]41#include "FragmentScheduler.hpp"
[72eaf7f]42
[8036b7]43FragmentJob::ptr FragmentScheduler::WorkerListener_t::NoJob(new SystemCommandJob(std::string("/bin/true"), std::string("dosomething"), JobId::NoJob));
[ff60cfa]44
45/** Helper function to enforce binding of FragmentWorker to possible derived
46 * FragmentJob classes.
47 */
48void dummyInit() {
49 SystemCommandJob("/bin/false", "something", JobId::IllegalJob);
50 MPQCCommandJob("nofile", JobId::IllegalJob);
51}
[c7deca]52
[db03d9]53/** Constructor of class FragmentScheduler.
54 *
55 * We setup both acceptors to accept connections from workers and Controller.
56 *
57 * \param io_service io_service of the asynchronous communications
58 * \param workerport port to listen for worker connections
59 * \param controllerport port to listen for controller connections.
60 */
61FragmentScheduler::FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport) :
[41c1b7]62 WorkerListener(io_service, workerport, JobsQueue, pool,
63 boost::bind(&FragmentScheduler::sendJobToWorker, boost::ref(*this), _1, _2)),
[8036b7]64 ControllerListener(io_service, controllerport, JobsQueue,
[41c1b7]65 boost::bind(&Listener::initiateSocket, boost::ref(WorkerListener))),
66 connection(io_service),
67 sendJobOp(connection)
[ed2c5b]68{
[b0b64c]69 Info info(__FUNCTION__);
[72eaf7f]70
[41c1b7]71 // listen for controller
72 ControllerListener.initiateSocket();
73
[778abb]74 // only initiate socket if jobs are already present
75 if (JobsQueue.isJobPresent()) {
[8036b7]76 WorkerListener.initiateSocket();
[778abb]77 }
[402bde]78}
79
[db03d9]80/** Handle a new worker connection.
81 *
[41c1b7]82 * We store the given address in the pool.
[db03d9]83 *
84 * \param e error code if something went wrong
85 * \param conn reference with the connection
86 */
[8036b7]87void FragmentScheduler::WorkerListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn)
[ed2c5b]88{
[cd4a6e]89 Info info(__FUNCTION__);
[ed2c5b]90 if (!e)
[72eaf7f]91 {
[b0b64c]92 // Successfully accepted a new connection.
[41c1b7]93 // read address
94 conn->async_read(address,
[9a3f84]95 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReadAddress, this,
[41c1b7]96 boost::asio::placeholders::error, conn));
[9a3f84]97 }
98 else
99 {
[41c1b7]100 // An error occurred. Log it and return. Since we are not starting a new
101 // accept operation the io_service will run out of work to do and the
102 // server will exit.
103 Exitflag = ErrorFlag;
104 ELOG(0, e.message());
105 }
106}
[0bdd51b]107
[9a3f84]108/** Handle having received Worker's address
[41c1b7]109 *
110 * \param e error code if something went wrong
111 * \param conn reference with the connection
112 */
[9a3f84]113void FragmentScheduler::WorkerListener_t::handle_ReadAddress(const boost::system::error_code& e, connection_ptr conn)
[41c1b7]114{
115 Info info(__FUNCTION__);
116 if (!e)
117 {
[9a3f84]118 // Successfully accepted a new connection.
119 // read address
120 conn->async_read(choice,
121 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReadChoice, this,
122 boost::asio::placeholders::error, conn));
123 }
124 else
125 {
126 // An error occurred. Log it and return. Since we are not starting a new
127 // accept operation the io_service will run out of work to do and the
128 // server will exit.
129 Exitflag = ErrorFlag;
130 ELOG(0, e.message());
131 }
132}
133
134/** Controller callback function to read the choice for next operation.
135 *
136 * \param e error code if something went wrong
137 * \param conn reference with the connection
138 */
139void FragmentScheduler::WorkerListener_t::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
140{
141 Info info(__FUNCTION__);
142 if (!e)
143 {
144 LOG(1, "INFO: Received request for operation " << choice << ".");
145 // switch over the desired choice read previously
146 switch(choice) {
147 case NoWorkerOperation:
148 {
149 ELOG(1, "WorkerListener_t::handle_ReadChoice() - called with NoOperation.");
150 break;
151 }
152 case EnrollInPool:
153 {
154 if (pool.presentInPool(address)) {
155 ELOG(1, "INFO: worker "+toString(address)+" is already contained in pool.");
156 enum EnrollInPoolOperation::EnrollFlag flag = EnrollInPoolOperation::Fail;
157 conn->async_write(flag,
158 boost::bind(&FragmentScheduler::WorkerListener_t::handle_enrolled, this,
159 boost::asio::placeholders::error, conn));
160 } else {
161 // insert as its new worker
162 LOG(1, "INFO: Adding " << address << " to pool ...");
163 pool.addWorker(address);
164 enum EnrollInPoolOperation::EnrollFlag flag = EnrollInPoolOperation::Success;
165 conn->async_write(flag,
166 boost::bind(&FragmentScheduler::WorkerListener_t::handle_enrolled, this,
167 boost::asio::placeholders::error, conn));
168 break;
169 }
170 case SendResult:
171 {
172 if (pool.presentInPool(address)) {
173 // check whether its priority is busy_priority
174 if (pool.isWorkerBusy(address)) {
175 conn->async_read(result,
176 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker, this,
177 boost::asio::placeholders::error, conn));
178 } else {
179 ELOG(1, "Worker " << address << " trying to send result who is not marked as busy.");
180 conn->async_read(result,
181 boost::bind(&FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker, this,
182 boost::asio::placeholders::error, conn));
183 }
184 } else {
185 ELOG(1, "Worker " << address << " trying to send result who is not in pool.");
186 conn->async_read(result,
187 boost::bind(&FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker, this,
188 boost::asio::placeholders::error, conn));
189 }
190 break;
191 }
192 case RemoveFromPool:
193 {
194 if (pool.presentInPool(address)) {
195 // removing present worker
196 pool.removeWorker(address);
197 } else {
198 ELOG(1, "Shutting down Worker " << address << " not contained in pool.");
199 }
200 break;
201 }
202 default:
203 Exitflag = ErrorFlag;
204 ELOG(1, "WorkerListener_t::handle_ReadChoice() - called with no valid choice.");
205 break;
[41c1b7]206 }
[b0b64c]207 }
[9a3f84]208 // restore NoOperation choice such that choice is not read twice
209 choice = NoWorkerOperation;
[cd4a6e]210 }
211 else
212 {
213 // An error occurred. Log it and return. Since we are not starting a new
214 // accept operation the io_service will run out of work to do and the
215 // server will exit.
[8036b7]216 Exitflag = ErrorFlag;
[b0b64c]217 ELOG(0, e.message());
[cd4a6e]218 }
[778abb]219
[95b384]220 if (JobsQueue.isJobPresent()) {
221 // Start an accept operation for a new Connection only when there
222 // are still jobs present
[8036b7]223 initiateSocket();
[95b384]224 }
[ed2c5b]225}
[72eaf7f]226
[9a3f84]227
[41c1b7]228/** Callback function when new worker has enrolled.
[db03d9]229 *
230 * \param e error code if something went wrong
231 * \param conn reference with the connection
232 */
[41c1b7]233void FragmentScheduler::WorkerListener_t::handle_enrolled(const boost::system::error_code& e, connection_ptr conn)
[ed2c5b]234{
[41c1b7]235 Info info(__FUNCTION__);
236 if (!e)
237 {
238 FragmentJob::ptr job;
239 if (JobsQueue.isJobPresent()) {
240 job = JobsQueue.popJob();
241 } else {
242 job = NoJob;
243 }
[9a3f84]244 callback_sendJobToWorker(pool.getNextIdleWorker(), job);
[41c1b7]245 }
246 else
247 {
248 // An error occurred. Log it and return. Since we are not starting a new
249 // accept operation the io_service will run out of work to do and the
250 // server will exit.
251 Exitflag = ErrorFlag;
252 ELOG(0, e.message());
253 }
[ef2767]254}
255
[db03d9]256/** Callback function when result has been received.
257 *
258 * \param e error code if something went wrong
259 * \param conn reference with the connection
260 */
[8036b7]261void FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
[ef2767]262{
[db03d9]263 Info info(__FUNCTION__);
[35f587]264 LOG(1, "INFO: Received result for job #" << result->getId() << " ...");
[41c1b7]265
[35f587]266 // and push into queue
267 ASSERT(result->getId() != (JobId_t)JobId::NoJob,
[41c1b7]268 "WorkerListener_t::handle_ReceiveResultFromWorker() - result received has NoJob id.");
[35f587]269 ASSERT(result->getId() != (JobId_t)JobId::IllegalJob,
[41c1b7]270 "WorkerListener_t::handle_ReceiveResultFromWorker() - result received has IllegalJob id.");
[778abb]271 // place id into expected
[35f587]272 if ((result->getId() != (JobId_t)JobId::NoJob) && (result->getId() != (JobId_t)JobId::IllegalJob))
[db03d9]273 JobsQueue.pushResult(result);
[41c1b7]274
275 // mark as idle
276 pool.unmarkWorkerBusy(address);
277 // for now remove worker again from pool such that other may connect
278 pool.removeWorker(address);
279
[db03d9]280 // erase result
[35f587]281 result.reset();
[778abb]282 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
[db03d9]283}
284
[9a3f84]285/** Callback function when result has been received.
286 *
287 * \param e error code if something went wrong
288 * \param conn reference with the connection
289 */
290void FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
291{
292 Info info(__FUNCTION__);
293 // nothing to do
294 LOG(1, "INFO: Rejecting result for job #" << result->getId() << ", placing back into queue.");
295
296 JobsQueue.resubmitJob(result->getId());
297
298 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
299}
300
[41c1b7]301
[db03d9]302/** Handle a new controller connection.
303 *
304 * \sa handle_ReceiveJobs()
305 * \sa handle_CheckResultState()
306 * \sa handle_SendResults()
307 *
308 * \param e error code if something went wrong
309 * \param conn reference with the connection
310 */
[8036b7]311void FragmentScheduler::ControllerListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn)
[db03d9]312{
313 Info info(__FUNCTION__);
314 if (!e)
315 {
[778abb]316 conn->async_read(choice,
[8036b7]317 boost::bind(&FragmentScheduler::ControllerListener_t::handle_ReadChoice, this,
[778abb]318 boost::asio::placeholders::error, conn));
319 }
320 else
321 {
322 // An error occurred. Log it and return. Since we are not starting a new
323 // accept operation the io_service will run out of work to do and the
324 // server will exit.
[8036b7]325 Exitflag = ErrorFlag;
[778abb]326 ELOG(0, e.message());
327 }
328}
329
330/** Controller callback function to read the choice for next operation.
331 *
332 * \param e error code if something went wrong
333 * \param conn reference with the connection
334 */
[8036b7]335void FragmentScheduler::ControllerListener_t::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
[778abb]336{
337 Info info(__FUNCTION__);
338 if (!e)
339 {
[0196c6]340 bool LaunchNewAcceptor = true;
[d1dbfc]341 LOG(1, "INFO: Received request for operation " << choice << ".");
[778abb]342 // switch over the desired choice read previously
343 switch(choice) {
[38032a]344 case NoControllerOperation:
[778abb]345 {
[9a3f84]346 ELOG(1, "ControllerListener_t::handle_ReadChoice() - called with NoOperation.");
[778abb]347 break;
348 }
[d1dbfc]349 case GetNextJobId:
350 {
351 const JobId_t nextid = globalId.getNextId();
352 LOG(1, "INFO: Sending next available job id " << nextid << " to controller ...");
353 conn->async_write(nextid,
[8036b7]354 boost::bind(&FragmentScheduler::ControllerListener_t::handle_GetNextJobIdState, this,
[d1dbfc]355 boost::asio::placeholders::error, conn));
356 break;
357 }
[778abb]358 case ReceiveJobs:
[d1dbfc]359 {
360 // The connection::async_write() function will automatically
361 // serialize the data structure for us.
362 LOG(1, "INFO: Receiving bunch of jobs from a controller ...");
363 conn->async_read(jobs,
[8036b7]364 boost::bind(&FragmentScheduler::ControllerListener_t::handle_ReceiveJobs, this,
[d1dbfc]365 boost::asio::placeholders::error, conn));
366 break;
367 }
[778abb]368 case CheckState:
369 {
[3c4a5e]370 // first update number
[6f2bc7]371 jobInfo[0] = JobsQueue.getPresentJobs();
372 jobInfo[1] = JobsQueue.getDoneJobs();
[3c4a5e]373 // now we accept connections to check for state of calculations
[6f2bc7]374 LOG(1, "INFO: Sending state that "+toString(jobInfo[0])+" jobs are present and "+toString(jobInfo[1])+" jobs are done to controller ...");
375 conn->async_write(jobInfo,
[8036b7]376 boost::bind(&FragmentScheduler::ControllerListener_t::handle_CheckResultState, this,
[3c4a5e]377 boost::asio::placeholders::error, conn));
[778abb]378 break;
379 }
380 case SendResults:
381 {
[35f587]382 const std::vector<FragmentResult::ptr> results = JobsQueue.getAllResults();
[778abb]383 // ... or we give the results
384 LOG(1, "INFO: Sending "+toString(results.size())+" results to controller ...");
385 conn->async_write(results,
[8036b7]386 boost::bind(&FragmentScheduler::ControllerListener_t::handle_SendResults, this,
[778abb]387 boost::asio::placeholders::error, conn));
[0196c6]388 break;
389 }
[38032a]390 case ShutdownControllerSocket:
[0196c6]391 {
[9a3f84]392 LOG(1, "INFO: Received shutdown from controller ...");
393 // only allow for shutdown when there are no more jobs in the queue
394 if (!JobsQueue.isJobPresent()) {
395 LaunchNewAcceptor = false;
396 } else {
397 ELOG(2, "There are still jobs waiting in the queue.");
398 }
[778abb]399 break;
[db03d9]400 }
[778abb]401 default:
[8036b7]402 Exitflag = ErrorFlag;
[9a3f84]403 ELOG(1, "ControllerListener_t::handle_ReadChoice() - called with no valid choice.");
[778abb]404 break;
405 }
[38032a]406 // restore NoControllerOperation choice such that choice is not read twice
407 choice = NoControllerOperation;
[778abb]408
[0196c6]409 if (LaunchNewAcceptor) {
410 LOG(1, "Launching new acceptor on socket.");
411 // Start an accept operation for a new Connection.
[8036b7]412 initiateSocket();
[0196c6]413 }
[db03d9]414 }
415 else
416 {
417 // An error occurred. Log it and return. Since we are not starting a new
418 // accept operation the io_service will run out of work to do and the
419 // server will exit.
[8036b7]420 Exitflag = ErrorFlag;
[db03d9]421 ELOG(0, e.message());
422 }
423}
424
425/** Controller callback function when job has been sent.
[778abb]426 *
427 * We check here whether the worker socket is accepting, if there
428 * have been no jobs we re-activate it, as it is shut down after
429 * last job.
[db03d9]430 *
431 * \param e error code if something went wrong
432 * \param conn reference with the connection
433 */
[8036b7]434void FragmentScheduler::ControllerListener_t::handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn)
[db03d9]435{
436 Info info(__FUNCTION__);
[8036b7]437 bool need_initiateSocket = !JobsQueue.isJobPresent();
[778abb]438
[db03d9]439 // jobs are received, hence place in JobsQueue
440 if (!jobs.empty()) {
441 LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue.");
442 JobsQueue.pushJobs(jobs);
443 }
444
445 jobs.clear();
[778abb]446
[8036b7]447 // initiate socket if we had no jobs before
448 if (need_initiateSocket)
449 initiateWorkerSocket();
[ed2c5b]450}
[cd4a6e]451
[3c4a5e]452/** Controller callback function when checking on state of results.
453 *
454 * \param e error code if something went wrong
455 * \param conn reference with the connection
456 */
[8036b7]457void FragmentScheduler::ControllerListener_t::handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn)
[3c4a5e]458{
459 Info info(__FUNCTION__);
460 // do nothing
[6f2bc7]461 LOG(1, "INFO: Sent that " << jobInfo << " jobs are (scheduled, done).");
[3c4a5e]462}
[778abb]463
[d1dbfc]464/** Controller callback function when checking on state of results.
465 *
466 * \param e error code if something went wrong
467 * \param conn reference with the connection
468 */
[8036b7]469void FragmentScheduler::ControllerListener_t::handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn)
[d1dbfc]470{
471 Info info(__FUNCTION__);
472 // do nothing
473 LOG(1, "INFO: Sent next available job id.");
474}
475
[778abb]476/** Controller callback function when result has been received.
477 *
478 * \param e error code if something went wrong
479 * \param conn reference with the connection
480 */
[8036b7]481void FragmentScheduler::ControllerListener_t::handle_SendResults(const boost::system::error_code& e, connection_ptr conn)
[778abb]482{
483 Info info(__FUNCTION__);
484 // do nothing
485 LOG(1, "INFO: Results have been sent.");
486}
487
[41c1b7]488
489/** Helper function to send a job to worker.
[9a3f84]490 *
491 * Note that we do not set the worker as busy. We simply send it the job.
[41c1b7]492 *
493 * @param address address of worker
494 * @param job job to send
495 */
496void FragmentScheduler::sendJobToWorker(const WorkerAddress &address, FragmentJob::ptr &job)
497{
[9a3f84]498 ASSERT( pool.isWorkerBusy(address),
499 "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is not marked as busy.");
[41c1b7]500 LOG(1, "INFO: Sending job " << job->getId() << " to worker " << address << ".");
501 sendJobOp.setJob(job);
502 sendJobOp(address.host, address.service);
503}
504
505///** Helper function to shutdown a single worker.
506// *
507// * We send NoJob to indicate shutdown
508// *
509// * @param address of worker to shutdown
510// */
511//void FragmentScheduler::shutdownWorker(const WorkerAddress &address)
512//{
513// sendJobToWorker(address, NoJob);
514//}
515//
516///** Sends shutdown to all current workers in the pool.
517// *
518// */
519//void FragmentScheduler::removeAllWorkers()
520//{
521// // give all workers shutdown signal
522// while (pool.presentIdleWorkers()) {
523// const WorkerAddress address = pool.getNextIdleWorker();
524// shutdownWorker(address);
525// }
526//}
Note: See TracBrowser for help on using the repository browser.