source: src/Fragmentation/Automation/FragmentScheduler.cpp@ 95b384

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 Candidate_v1.7.0 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since 95b384 was 95b384, checked in by Frederik Heber <heber@…>, 13 years ago

Removed duplicate code in FragmentScheduler with initiating new connections.

  • Property mode set to 100644
File size: 12.1 KB
RevLine 
[72eaf7f]1/*
[cd4a6e]2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * \file FragmentScheduler.cpp
10 *
11 * This file strongly follows the Serialization example from the boost::asio
12 * library (see server.cpp)
[72eaf7f]13 *
[cd4a6e]14 * Created on: Oct 19, 2011
[72eaf7f]15 * Author: heber
16 */
17
[f93842]18// include config.h
19#ifdef HAVE_CONFIG_H
20#include <config.h>
21#endif
22
[c6bcd0]23// boost asio needs specific operator new
[72eaf7f]24#include <boost/asio.hpp>
[c6bcd0]25
26#include "CodePatterns/MemDebug.hpp"
27
[72eaf7f]28#include <boost/bind.hpp>
29#include <boost/lexical_cast.hpp>
30#include <iostream>
31#include <vector>
[af3aed]32#include "Connection.hpp" // Must come before boost/serialization headers.
[72eaf7f]33#include <boost/serialization/vector.hpp>
[af3aed]34#include "CodePatterns/Info.hpp"
[b0b64c]35#include "CodePatterns/Log.hpp"
[ff60cfa]36#include "Jobs/MPQCCommandJob.hpp"
[d920b9]37#include "Jobs/SystemCommandJob.hpp"
[ef2767]38#include "JobId.hpp"
[72eaf7f]39
[cd4a6e]40#include "FragmentScheduler.hpp"
[72eaf7f]41
[ff60cfa]42FragmentJob::ptr FragmentScheduler::NoJob(new SystemCommandJob(std::string("/bin/true"), std::string("dosomething"), JobId::NoJob));
43
44/** Helper function to enforce binding of FragmentWorker to possible derived
45 * FragmentJob classes.
46 */
47void dummyInit() {
48 SystemCommandJob("/bin/false", "something", JobId::IllegalJob);
49 MPQCCommandJob("nofile", JobId::IllegalJob);
50}
[c7deca]51
[db03d9]52/** Constructor of class FragmentScheduler.
53 *
54 * We setup both acceptors to accept connections from workers and Controller.
55 *
56 * \param io_service io_service of the asynchronous communications
57 * \param workerport port to listen for worker connections
58 * \param controllerport port to listen for controller connections.
59 */
60FragmentScheduler::FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport) :
61 worker_acceptor_(io_service,
62 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), workerport)
63 ),
64 controller_acceptor_(io_service,
65 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), controllerport)
[ef2767]66 ),
[35f587]67 result( new FragmentResult(JobId::NoJob) ),
[6f2bc7]68 jobInfo((size_t)2, 0),
[778abb]69 choice(NoOperation),
[d1dbfc]70 globalId(0),
[db03d9]71 Exitflag(OkFlag)
[ed2c5b]72{
[b0b64c]73 Info info(__FUNCTION__);
[72eaf7f]74
[778abb]75 // only initiate socket if jobs are already present
76 if (JobsQueue.isJobPresent()) {
77 LOG(1, "Listening for workers on port " << workerport << ".");
78 initiateWorkerSocket();
79 }
[402bde]80
81 initiateControllerSocket();
[778abb]82 LOG(1, "Listening for controller on port " << controllerport << ".");
[ed2c5b]83}
[72eaf7f]84
[402bde]85/** Internal function to start worker connection.
86 *
87 */
88void FragmentScheduler::initiateWorkerSocket()
89{
90 // Start an accept operation for worker connections.
91 connection_ptr new_conn(new Connection(worker_acceptor_.get_io_service()));
92 worker_acceptor_.async_accept(new_conn->socket(),
93 boost::bind(&FragmentScheduler::handle_AcceptWorker, this,
94 boost::asio::placeholders::error, new_conn));
95}
96
97/** Internal function to start controller connection.
98 *
99 */
100void FragmentScheduler::initiateControllerSocket()
101{
102 // Start an accept operation for controller connection.
103 connection_ptr new_conn(new Connection(controller_acceptor_.get_io_service()));
104 controller_acceptor_.async_accept(new_conn->socket(),
105 boost::bind(&FragmentScheduler::handle_AcceptController, this,
106 boost::asio::placeholders::error, new_conn));
107}
108
109
[db03d9]110/** Handle a new worker connection.
111 *
112 * We check whether jobs are in the JobsQueue. If present, job is sent.
113 *
114 * \sa handle_SendJobtoWorker()
115 *
116 * \param e error code if something went wrong
117 * \param conn reference with the connection
118 */
119void FragmentScheduler::handle_AcceptWorker(const boost::system::error_code& e, connection_ptr conn)
[ed2c5b]120{
[cd4a6e]121 Info info(__FUNCTION__);
[ed2c5b]122 if (!e)
[72eaf7f]123 {
[b0b64c]124 // Successfully accepted a new connection.
125 // Check whether there are jobs in the queue
126 if (JobsQueue.isJobPresent()) {
127 // pop a job and send it to the client.
[78ad7d]128 FragmentJob::ptr job(JobsQueue.popJob());
[b0b64c]129 // The connection::async_write() function will automatically
130 // serialize the data structure for us.
[78ad7d]131 LOG(1, "INFO: Sending job #" << job->getId() << ".");
[ef2767]132 conn->async_write(job,
[db03d9]133 boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this,
[b0b64c]134 boost::asio::placeholders::error, conn));
[0bdd51b]135
[b0b64c]136 } else {
[c7deca]137 // send the static NoJob
138 conn->async_write(NoJob,
[db03d9]139 boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this,
[c7deca]140 boost::asio::placeholders::error, conn));
141
[ef2767]142 // then there must be no read necesary
143
[b0b64c]144 ELOG(2, "There is currently no job present in the queue.");
145 }
[cd4a6e]146 }
147 else
148 {
149 // An error occurred. Log it and return. Since we are not starting a new
150 // accept operation the io_service will run out of work to do and the
151 // server will exit.
[3c4a5e]152 Exitflag = WorkerErrorFlag;
[b0b64c]153 ELOG(0, e.message());
[cd4a6e]154 }
[778abb]155
[95b384]156 if (JobsQueue.isJobPresent()) {
157 // Start an accept operation for a new Connection only when there
158 // are still jobs present
[778abb]159 initiateWorkerSocket();
[95b384]160 }
[ed2c5b]161}
[72eaf7f]162
[db03d9]163/** Callback function when job has been sent.
164 *
165 * After job has been sent we start async_read() for the result.
166 *
167 * \sa handle_ReceiveResultFromWorker()
168 *
169 * \param e error code if something went wrong
170 * \param conn reference with the connection
171 */
172void FragmentScheduler::handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn)
[ed2c5b]173{
[af3aed]174 Info info(__FUNCTION__);
[ef2767]175 LOG(1, "INFO: Job sent.");
176 // obtain result
177 LOG(1, "INFO: Receiving result for a job ...");
178 conn->async_read(result,
[db03d9]179 boost::bind(&FragmentScheduler::handle_ReceiveResultFromWorker, this,
[ef2767]180 boost::asio::placeholders::error, conn));
181}
182
[db03d9]183/** Callback function when result has been received.
184 *
185 * \param e error code if something went wrong
186 * \param conn reference with the connection
187 */
188void FragmentScheduler::handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
[ef2767]189{
[db03d9]190 Info info(__FUNCTION__);
[35f587]191 LOG(1, "INFO: Received result for job #" << result->getId() << " ...");
192 // and push into queue
193 ASSERT(result->getId() != (JobId_t)JobId::NoJob,
[db03d9]194 "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has NoJob id.");
[35f587]195 ASSERT(result->getId() != (JobId_t)JobId::IllegalJob,
[db03d9]196 "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has IllegalJob id.");
[778abb]197 // place id into expected
[35f587]198 if ((result->getId() != (JobId_t)JobId::NoJob) && (result->getId() != (JobId_t)JobId::IllegalJob))
[db03d9]199 JobsQueue.pushResult(result);
200 // erase result
[35f587]201 result.reset();
[778abb]202 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
[db03d9]203}
204
205/** Handle a new controller connection.
206 *
207 * \sa handle_ReceiveJobs()
208 * \sa handle_CheckResultState()
209 * \sa handle_SendResults()
210 *
211 * \param e error code if something went wrong
212 * \param conn reference with the connection
213 */
214void FragmentScheduler::handle_AcceptController(const boost::system::error_code& e, connection_ptr conn)
215{
216 Info info(__FUNCTION__);
217 if (!e)
218 {
[778abb]219 conn->async_read(choice,
220 boost::bind(&FragmentScheduler::handle_ReadChoice, this,
221 boost::asio::placeholders::error, conn));
222 }
223 else
224 {
225 // An error occurred. Log it and return. Since we are not starting a new
226 // accept operation the io_service will run out of work to do and the
227 // server will exit.
228 Exitflag = ControllerErrorFlag;
229 ELOG(0, e.message());
230 }
231}
232
233/** Controller callback function to read the choice for next operation.
234 *
235 * \param e error code if something went wrong
236 * \param conn reference with the connection
237 */
238void FragmentScheduler::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
239{
240 Info info(__FUNCTION__);
241 if (!e)
242 {
[0196c6]243 bool LaunchNewAcceptor = true;
[d1dbfc]244 LOG(1, "INFO: Received request for operation " << choice << ".");
[778abb]245 // switch over the desired choice read previously
246 switch(choice) {
247 case NoOperation:
248 {
249 ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with NoOperation.");
250 break;
251 }
[d1dbfc]252 case GetNextJobId:
253 {
254 const JobId_t nextid = globalId.getNextId();
255 LOG(1, "INFO: Sending next available job id " << nextid << " to controller ...");
256 conn->async_write(nextid,
257 boost::bind(&FragmentScheduler::handle_GetNextJobIdState, this,
258 boost::asio::placeholders::error, conn));
259 break;
260 }
[778abb]261 case ReceiveJobs:
[d1dbfc]262 {
263 // The connection::async_write() function will automatically
264 // serialize the data structure for us.
265 LOG(1, "INFO: Receiving bunch of jobs from a controller ...");
266 conn->async_read(jobs,
267 boost::bind(&FragmentScheduler::handle_ReceiveJobs, this,
268 boost::asio::placeholders::error, conn));
269 break;
270 }
[778abb]271 case CheckState:
272 {
[3c4a5e]273 // first update number
[6f2bc7]274 jobInfo[0] = JobsQueue.getPresentJobs();
275 jobInfo[1] = JobsQueue.getDoneJobs();
[3c4a5e]276 // now we accept connections to check for state of calculations
[6f2bc7]277 LOG(1, "INFO: Sending state that "+toString(jobInfo[0])+" jobs are present and "+toString(jobInfo[1])+" jobs are done to controller ...");
278 conn->async_write(jobInfo,
[3c4a5e]279 boost::bind(&FragmentScheduler::handle_CheckResultState, this,
280 boost::asio::placeholders::error, conn));
[778abb]281 break;
282 }
283 case SendResults:
284 {
[35f587]285 const std::vector<FragmentResult::ptr> results = JobsQueue.getAllResults();
[778abb]286 // ... or we give the results
287 LOG(1, "INFO: Sending "+toString(results.size())+" results to controller ...");
288 conn->async_write(results,
289 boost::bind(&FragmentScheduler::handle_SendResults, this,
290 boost::asio::placeholders::error, conn));
[0196c6]291 break;
292 }
293 case Shutdown:
294 {
295 LaunchNewAcceptor = false;
[778abb]296 break;
[db03d9]297 }
[778abb]298 default:
299 Exitflag = ControllerErrorFlag;
300 ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with no valid choice.");
301 break;
302 }
303 // restore NoOperation choice such that choice is not read twice
304 choice = NoOperation;
305
[0196c6]306 if (LaunchNewAcceptor) {
307 LOG(1, "Launching new acceptor on socket.");
308 // Start an accept operation for a new Connection.
[95b384]309 initiateControllerSocket();
[0196c6]310 }
[db03d9]311 }
312 else
313 {
314 // An error occurred. Log it and return. Since we are not starting a new
315 // accept operation the io_service will run out of work to do and the
316 // server will exit.
[3c4a5e]317 Exitflag = ControllerErrorFlag;
[db03d9]318 ELOG(0, e.message());
319 }
320}
321
322/** Controller callback function when job has been sent.
[778abb]323 *
324 * We check here whether the worker socket is accepting, if there
325 * have been no jobs we re-activate it, as it is shut down after
326 * last job.
[db03d9]327 *
328 * \param e error code if something went wrong
329 * \param conn reference with the connection
330 */
331void FragmentScheduler::handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn)
332{
333 Info info(__FUNCTION__);
[778abb]334 bool initiateSocket = !JobsQueue.isJobPresent();
335
[db03d9]336 // jobs are received, hence place in JobsQueue
337 if (!jobs.empty()) {
338 LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue.");
339 JobsQueue.pushJobs(jobs);
[778abb]340 // initiate socket if we had no jobs before
341 if (initiateSocket)
342 initiateWorkerSocket();
[db03d9]343 }
344
345 jobs.clear();
[778abb]346
[ed2c5b]347}
[cd4a6e]348
[3c4a5e]349/** Controller callback function when checking on state of results.
350 *
351 * \param e error code if something went wrong
352 * \param conn reference with the connection
353 */
354void FragmentScheduler::handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn)
355{
356 Info info(__FUNCTION__);
357 // do nothing
[6f2bc7]358 LOG(1, "INFO: Sent that " << jobInfo << " jobs are (scheduled, done).");
[3c4a5e]359}
[778abb]360
[d1dbfc]361/** Controller callback function when checking on state of results.
362 *
363 * \param e error code if something went wrong
364 * \param conn reference with the connection
365 */
366void FragmentScheduler::handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn)
367{
368 Info info(__FUNCTION__);
369 // do nothing
370 LOG(1, "INFO: Sent next available job id.");
371}
372
[778abb]373/** Controller callback function when result has been received.
374 *
375 * \param e error code if something went wrong
376 * \param conn reference with the connection
377 */
378void FragmentScheduler::handle_SendResults(const boost::system::error_code& e, connection_ptr conn)
379{
380 Info info(__FUNCTION__);
381 // do nothing
382 LOG(1, "INFO: Results have been sent.");
383}
384
Note: See TracBrowser for help on using the repository browser.