source: src/Fragmentation/Automation/FragmentScheduler.cpp@ 986885

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 Candidate_v1.7.0 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since 986885 was d1dbfc, checked in by Frederik Heber <heber@…>, 13 years ago

New GetNextJobIdOperation for obtaining next available JobId from server.

  • as JobId is required to create the job, we now need two, separate communication phases: gathering info (ids) and sending info (jobs).
  • new SchedulderState GetNextJobId along with handlers.
  • new GetNextJobIdOperation that requests another id which is internally stored into a list, along with a getter that extracts them one by one.
  • controller's createjobs() and parsejobs() each take an nextid parameter now.
  • Property mode set to 100644
File size: 12.3 KB
RevLine 
[72eaf7f]1/*
[cd4a6e]2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * \file FragmentScheduler.cpp
10 *
11 * This file strongly follows the Serialization example from the boost::asio
12 * library (see server.cpp)
[72eaf7f]13 *
[cd4a6e]14 * Created on: Oct 19, 2011
[72eaf7f]15 * Author: heber
16 */
17
[f93842]18// include config.h
19#ifdef HAVE_CONFIG_H
20#include <config.h>
21#endif
22
[c6bcd0]23// boost asio needs specific operator new
[72eaf7f]24#include <boost/asio.hpp>
[c6bcd0]25
26#include "CodePatterns/MemDebug.hpp"
27
[72eaf7f]28#include <boost/bind.hpp>
29#include <boost/lexical_cast.hpp>
30#include <iostream>
31#include <vector>
[af3aed]32#include "Connection.hpp" // Must come before boost/serialization headers.
[72eaf7f]33#include <boost/serialization/vector.hpp>
[af3aed]34#include "CodePatterns/Info.hpp"
[b0b64c]35#include "CodePatterns/Log.hpp"
[ff60cfa]36#include "Jobs/MPQCCommandJob.hpp"
[d920b9]37#include "Jobs/SystemCommandJob.hpp"
[ef2767]38#include "JobId.hpp"
[72eaf7f]39
[cd4a6e]40#include "FragmentScheduler.hpp"
[72eaf7f]41
[ff60cfa]42FragmentJob::ptr FragmentScheduler::NoJob(new SystemCommandJob(std::string("/bin/true"), std::string("dosomething"), JobId::NoJob));
43
44/** Helper function to enforce binding of FragmentWorker to possible derived
45 * FragmentJob classes.
46 */
47void dummyInit() {
48 SystemCommandJob("/bin/false", "something", JobId::IllegalJob);
49 MPQCCommandJob("nofile", JobId::IllegalJob);
50}
[c7deca]51
[db03d9]52/** Constructor of class FragmentScheduler.
53 *
54 * We setup both acceptors to accept connections from workers and Controller.
55 *
56 * \param io_service io_service of the asynchronous communications
57 * \param workerport port to listen for worker connections
58 * \param controllerport port to listen for controller connections.
59 */
60FragmentScheduler::FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport) :
61 worker_acceptor_(io_service,
62 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), workerport)
63 ),
64 controller_acceptor_(io_service,
65 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), controllerport)
[ef2767]66 ),
[35f587]67 result( new FragmentResult(JobId::NoJob) ),
[6f2bc7]68 jobInfo((size_t)2, 0),
[778abb]69 choice(NoOperation),
[d1dbfc]70 globalId(0),
[db03d9]71 Exitflag(OkFlag)
[ed2c5b]72{
[b0b64c]73 Info info(__FUNCTION__);
[72eaf7f]74
[778abb]75 // only initiate socket if jobs are already present
76 if (JobsQueue.isJobPresent()) {
77 LOG(1, "Listening for workers on port " << workerport << ".");
78 initiateWorkerSocket();
79 }
[402bde]80
81 initiateControllerSocket();
[778abb]82 LOG(1, "Listening for controller on port " << controllerport << ".");
[ed2c5b]83}
[72eaf7f]84
[402bde]85/** Internal function to start worker connection.
86 *
87 */
88void FragmentScheduler::initiateWorkerSocket()
89{
90 // Start an accept operation for worker connections.
91 connection_ptr new_conn(new Connection(worker_acceptor_.get_io_service()));
92 worker_acceptor_.async_accept(new_conn->socket(),
93 boost::bind(&FragmentScheduler::handle_AcceptWorker, this,
94 boost::asio::placeholders::error, new_conn));
95}
96
97/** Internal function to start controller connection.
98 *
99 */
100void FragmentScheduler::initiateControllerSocket()
101{
102 // Start an accept operation for controller connection.
103 connection_ptr new_conn(new Connection(controller_acceptor_.get_io_service()));
104 controller_acceptor_.async_accept(new_conn->socket(),
105 boost::bind(&FragmentScheduler::handle_AcceptController, this,
106 boost::asio::placeholders::error, new_conn));
107}
108
109
[db03d9]110/** Handle a new worker connection.
111 *
112 * We check whether jobs are in the JobsQueue. If present, job is sent.
113 *
114 * \sa handle_SendJobtoWorker()
115 *
116 * \param e error code if something went wrong
117 * \param conn reference with the connection
118 */
119void FragmentScheduler::handle_AcceptWorker(const boost::system::error_code& e, connection_ptr conn)
[ed2c5b]120{
[cd4a6e]121 Info info(__FUNCTION__);
[ed2c5b]122 if (!e)
[72eaf7f]123 {
[b0b64c]124 // Successfully accepted a new connection.
125 // Check whether there are jobs in the queue
126 if (JobsQueue.isJobPresent()) {
127 // pop a job and send it to the client.
[78ad7d]128 FragmentJob::ptr job(JobsQueue.popJob());
[b0b64c]129 // The connection::async_write() function will automatically
130 // serialize the data structure for us.
[78ad7d]131 LOG(1, "INFO: Sending job #" << job->getId() << ".");
[ef2767]132 conn->async_write(job,
[db03d9]133 boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this,
[b0b64c]134 boost::asio::placeholders::error, conn));
[0bdd51b]135
[b0b64c]136 } else {
[c7deca]137 // send the static NoJob
138 conn->async_write(NoJob,
[db03d9]139 boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this,
[c7deca]140 boost::asio::placeholders::error, conn));
141
[ef2767]142 // then there must be no read necesary
143
[b0b64c]144 ELOG(2, "There is currently no job present in the queue.");
145 }
[cd4a6e]146 }
147 else
148 {
149 // An error occurred. Log it and return. Since we are not starting a new
150 // accept operation the io_service will run out of work to do and the
151 // server will exit.
[3c4a5e]152 Exitflag = WorkerErrorFlag;
[b0b64c]153 ELOG(0, e.message());
[cd4a6e]154 }
[778abb]155
156 // Start an accept operation for a new Connection only when there
157 // are still jobs present
158 if (JobsQueue.isJobPresent())
159 initiateWorkerSocket();
[ed2c5b]160}
[72eaf7f]161
[db03d9]162/** Callback function when job has been sent.
163 *
164 * After job has been sent we start async_read() for the result.
165 *
166 * \sa handle_ReceiveResultFromWorker()
167 *
168 * \param e error code if something went wrong
169 * \param conn reference with the connection
170 */
171void FragmentScheduler::handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn)
[ed2c5b]172{
[af3aed]173 Info info(__FUNCTION__);
[ef2767]174 LOG(1, "INFO: Job sent.");
175 // obtain result
176 LOG(1, "INFO: Receiving result for a job ...");
177 conn->async_read(result,
[db03d9]178 boost::bind(&FragmentScheduler::handle_ReceiveResultFromWorker, this,
[ef2767]179 boost::asio::placeholders::error, conn));
180}
181
[db03d9]182/** Callback function when result has been received.
183 *
184 * \param e error code if something went wrong
185 * \param conn reference with the connection
186 */
187void FragmentScheduler::handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
[ef2767]188{
[db03d9]189 Info info(__FUNCTION__);
[35f587]190 LOG(1, "INFO: Received result for job #" << result->getId() << " ...");
191 // and push into queue
192 ASSERT(result->getId() != (JobId_t)JobId::NoJob,
[db03d9]193 "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has NoJob id.");
[35f587]194 ASSERT(result->getId() != (JobId_t)JobId::IllegalJob,
[db03d9]195 "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has IllegalJob id.");
[778abb]196 // place id into expected
[35f587]197 if ((result->getId() != (JobId_t)JobId::NoJob) && (result->getId() != (JobId_t)JobId::IllegalJob))
[db03d9]198 JobsQueue.pushResult(result);
199 // erase result
[35f587]200 result.reset();
[778abb]201 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
[db03d9]202}
203
204/** Handle a new controller connection.
205 *
206 * \sa handle_ReceiveJobs()
207 * \sa handle_CheckResultState()
208 * \sa handle_SendResults()
209 *
210 * \param e error code if something went wrong
211 * \param conn reference with the connection
212 */
213void FragmentScheduler::handle_AcceptController(const boost::system::error_code& e, connection_ptr conn)
214{
215 Info info(__FUNCTION__);
216 if (!e)
217 {
[778abb]218 conn->async_read(choice,
219 boost::bind(&FragmentScheduler::handle_ReadChoice, this,
220 boost::asio::placeholders::error, conn));
221 }
222 else
223 {
224 // An error occurred. Log it and return. Since we are not starting a new
225 // accept operation the io_service will run out of work to do and the
226 // server will exit.
227 Exitflag = ControllerErrorFlag;
228 ELOG(0, e.message());
229 }
230}
231
232/** Controller callback function to read the choice for next operation.
233 *
234 * \param e error code if something went wrong
235 * \param conn reference with the connection
236 */
237void FragmentScheduler::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
238{
239 Info info(__FUNCTION__);
240 if (!e)
241 {
[0196c6]242 bool LaunchNewAcceptor = true;
[d1dbfc]243 LOG(1, "INFO: Received request for operation " << choice << ".");
[778abb]244 // switch over the desired choice read previously
245 switch(choice) {
246 case NoOperation:
247 {
248 ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with NoOperation.");
249 break;
250 }
[d1dbfc]251 case GetNextJobId:
252 {
253 const JobId_t nextid = globalId.getNextId();
254 LOG(1, "INFO: Sending next available job id " << nextid << " to controller ...");
255 conn->async_write(nextid,
256 boost::bind(&FragmentScheduler::handle_GetNextJobIdState, this,
257 boost::asio::placeholders::error, conn));
258 break;
259 }
[778abb]260 case ReceiveJobs:
[d1dbfc]261 {
262 // The connection::async_write() function will automatically
263 // serialize the data structure for us.
264 LOG(1, "INFO: Receiving bunch of jobs from a controller ...");
265 conn->async_read(jobs,
266 boost::bind(&FragmentScheduler::handle_ReceiveJobs, this,
267 boost::asio::placeholders::error, conn));
268 break;
269 }
[778abb]270 case CheckState:
271 {
[3c4a5e]272 // first update number
[6f2bc7]273 jobInfo[0] = JobsQueue.getPresentJobs();
274 jobInfo[1] = JobsQueue.getDoneJobs();
[3c4a5e]275 // now we accept connections to check for state of calculations
[6f2bc7]276 LOG(1, "INFO: Sending state that "+toString(jobInfo[0])+" jobs are present and "+toString(jobInfo[1])+" jobs are done to controller ...");
277 conn->async_write(jobInfo,
[3c4a5e]278 boost::bind(&FragmentScheduler::handle_CheckResultState, this,
279 boost::asio::placeholders::error, conn));
[778abb]280 break;
281 }
282 case SendResults:
283 {
[35f587]284 const std::vector<FragmentResult::ptr> results = JobsQueue.getAllResults();
[778abb]285 // ... or we give the results
286 LOG(1, "INFO: Sending "+toString(results.size())+" results to controller ...");
287 conn->async_write(results,
288 boost::bind(&FragmentScheduler::handle_SendResults, this,
289 boost::asio::placeholders::error, conn));
[0196c6]290 break;
291 }
292 case Shutdown:
293 {
294 LaunchNewAcceptor = false;
[778abb]295 break;
[db03d9]296 }
[778abb]297 default:
298 Exitflag = ControllerErrorFlag;
299 ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with no valid choice.");
300 break;
301 }
302 // restore NoOperation choice such that choice is not read twice
303 choice = NoOperation;
304
[0196c6]305 if (LaunchNewAcceptor) {
306 LOG(1, "Launching new acceptor on socket.");
307 // Start an accept operation for a new Connection.
308 connection_ptr new_conn(new Connection(controller_acceptor_.get_io_service()));
309 controller_acceptor_.async_accept(new_conn->socket(),
310 boost::bind(&FragmentScheduler::handle_AcceptController, this,
311 boost::asio::placeholders::error, new_conn));
312 }
[db03d9]313 }
314 else
315 {
316 // An error occurred. Log it and return. Since we are not starting a new
317 // accept operation the io_service will run out of work to do and the
318 // server will exit.
[3c4a5e]319 Exitflag = ControllerErrorFlag;
[db03d9]320 ELOG(0, e.message());
321 }
322}
323
324/** Controller callback function when job has been sent.
[778abb]325 *
326 * We check here whether the worker socket is accepting, if there
327 * have been no jobs we re-activate it, as it is shut down after
328 * last job.
[db03d9]329 *
330 * \param e error code if something went wrong
331 * \param conn reference with the connection
332 */
333void FragmentScheduler::handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn)
334{
335 Info info(__FUNCTION__);
[778abb]336 bool initiateSocket = !JobsQueue.isJobPresent();
337
[db03d9]338 // jobs are received, hence place in JobsQueue
339 if (!jobs.empty()) {
340 LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue.");
341 JobsQueue.pushJobs(jobs);
[778abb]342 // initiate socket if we had no jobs before
343 if (initiateSocket)
344 initiateWorkerSocket();
[db03d9]345 }
346
347 jobs.clear();
[778abb]348
[ed2c5b]349}
[cd4a6e]350
[3c4a5e]351/** Controller callback function when checking on state of results.
352 *
353 * \param e error code if something went wrong
354 * \param conn reference with the connection
355 */
356void FragmentScheduler::handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn)
357{
358 Info info(__FUNCTION__);
359 // do nothing
[6f2bc7]360 LOG(1, "INFO: Sent that " << jobInfo << " jobs are (scheduled, done).");
[3c4a5e]361}
[778abb]362
[d1dbfc]363/** Controller callback function when checking on state of results.
364 *
365 * \param e error code if something went wrong
366 * \param conn reference with the connection
367 */
368void FragmentScheduler::handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn)
369{
370 Info info(__FUNCTION__);
371 // do nothing
372 LOG(1, "INFO: Sent next available job id.");
373}
374
[778abb]375/** Controller callback function when result has been received.
376 *
377 * \param e error code if something went wrong
378 * \param conn reference with the connection
379 */
380void FragmentScheduler::handle_SendResults(const boost::system::error_code& e, connection_ptr conn)
381{
382 Info info(__FUNCTION__);
383 // do nothing
384 LOG(1, "INFO: Results have been sent.");
385}
386
Note: See TracBrowser for help on using the repository browser.