source: src/Fragmentation/Automation/FragmentScheduler.cpp@ ff60cfa

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 Candidate_v1.7.0 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since ff60cfa was ff60cfa, checked in by Frederik Heber <heber@…>, 13 years ago

Controller has new function addjobs.

  • addjobs parses mpqc input file and creates MPQCCommandJob which is sent to send jobs
  • old sendjobs is now createjobs which creates empty SystemCommandJob.
  • added new class GlobalJobId which holds a global JobId such that FragmentJobs created by Controller each have a unique id. NOTE: This should probably be replaced by IdPool implementation when Fragmentation/Automation is integrated into rest of molecuilder.
  • dummyInit() to have MPQCCommandJob instances known to FragmentScheduler.
  • TESTFIX: Adapted regression tests Fragmentation/Automation adding-jobs, server-worker, and completerun due to command token change.
  • Property mode set to 100644
File size: 11.4 KB
RevLine 
[72eaf7f]1/*
[cd4a6e]2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * \file FragmentScheduler.cpp
10 *
11 * This file strongly follows the Serialization example from the boost::asio
12 * library (see server.cpp)
[72eaf7f]13 *
[cd4a6e]14 * Created on: Oct 19, 2011
[72eaf7f]15 * Author: heber
16 */
17
[f93842]18// include config.h
19#ifdef HAVE_CONFIG_H
20#include <config.h>
21#endif
22
[c6bcd0]23// boost asio needs specific operator new
[72eaf7f]24#include <boost/asio.hpp>
[c6bcd0]25
26#include "CodePatterns/MemDebug.hpp"
27
[72eaf7f]28#include <boost/bind.hpp>
29#include <boost/lexical_cast.hpp>
30#include <iostream>
31#include <vector>
[af3aed]32#include "Connection.hpp" // Must come before boost/serialization headers.
[72eaf7f]33#include <boost/serialization/vector.hpp>
[af3aed]34#include "CodePatterns/Info.hpp"
[b0b64c]35#include "CodePatterns/Log.hpp"
[ff60cfa]36#include "Jobs/MPQCCommandJob.hpp"
[d920b9]37#include "Jobs/SystemCommandJob.hpp"
[ef2767]38#include "JobId.hpp"
[72eaf7f]39
[cd4a6e]40#include "FragmentScheduler.hpp"
[72eaf7f]41
[ff60cfa]42FragmentJob::ptr FragmentScheduler::NoJob(new SystemCommandJob(std::string("/bin/true"), std::string("dosomething"), JobId::NoJob));
43
44/** Helper function to enforce binding of FragmentWorker to possible derived
45 * FragmentJob classes.
46 */
47void dummyInit() {
48 SystemCommandJob("/bin/false", "something", JobId::IllegalJob);
49 MPQCCommandJob("nofile", JobId::IllegalJob);
50}
[c7deca]51
[db03d9]52/** Constructor of class FragmentScheduler.
53 *
54 * We setup both acceptors to accept connections from workers and Controller.
55 *
56 * \param io_service io_service of the asynchronous communications
57 * \param workerport port to listen for worker connections
58 * \param controllerport port to listen for controller connections.
59 */
60FragmentScheduler::FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport) :
61 worker_acceptor_(io_service,
62 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), workerport)
63 ),
64 controller_acceptor_(io_service,
65 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), controllerport)
[ef2767]66 ),
[35f587]67 result( new FragmentResult(JobId::NoJob) ),
[778abb]68 choice(NoOperation),
[db03d9]69 Exitflag(OkFlag)
[ed2c5b]70{
[b0b64c]71 Info info(__FUNCTION__);
[72eaf7f]72
[778abb]73 // only initiate socket if jobs are already present
74 if (JobsQueue.isJobPresent()) {
75 LOG(1, "Listening for workers on port " << workerport << ".");
76 initiateWorkerSocket();
77 }
[402bde]78
79 initiateControllerSocket();
[778abb]80 LOG(1, "Listening for controller on port " << controllerport << ".");
[ed2c5b]81}
[72eaf7f]82
[402bde]83/** Internal function to start worker connection.
84 *
85 */
86void FragmentScheduler::initiateWorkerSocket()
87{
88 // Start an accept operation for worker connections.
89 connection_ptr new_conn(new Connection(worker_acceptor_.get_io_service()));
90 worker_acceptor_.async_accept(new_conn->socket(),
91 boost::bind(&FragmentScheduler::handle_AcceptWorker, this,
92 boost::asio::placeholders::error, new_conn));
93}
94
95/** Internal function to start controller connection.
96 *
97 */
98void FragmentScheduler::initiateControllerSocket()
99{
100 // Start an accept operation for controller connection.
101 connection_ptr new_conn(new Connection(controller_acceptor_.get_io_service()));
102 controller_acceptor_.async_accept(new_conn->socket(),
103 boost::bind(&FragmentScheduler::handle_AcceptController, this,
104 boost::asio::placeholders::error, new_conn));
105}
106
107
[db03d9]108/** Handle a new worker connection.
109 *
110 * We check whether jobs are in the JobsQueue. If present, job is sent.
111 *
112 * \sa handle_SendJobtoWorker()
113 *
114 * \param e error code if something went wrong
115 * \param conn reference with the connection
116 */
117void FragmentScheduler::handle_AcceptWorker(const boost::system::error_code& e, connection_ptr conn)
[ed2c5b]118{
[cd4a6e]119 Info info(__FUNCTION__);
[ed2c5b]120 if (!e)
[72eaf7f]121 {
[b0b64c]122 // Successfully accepted a new connection.
123 // Check whether there are jobs in the queue
124 if (JobsQueue.isJobPresent()) {
125 // pop a job and send it to the client.
[78ad7d]126 FragmentJob::ptr job(JobsQueue.popJob());
[b0b64c]127 // The connection::async_write() function will automatically
128 // serialize the data structure for us.
[78ad7d]129 LOG(1, "INFO: Sending job #" << job->getId() << ".");
[ef2767]130 conn->async_write(job,
[db03d9]131 boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this,
[b0b64c]132 boost::asio::placeholders::error, conn));
[0bdd51b]133
[b0b64c]134 } else {
[c7deca]135 // send the static NoJob
136 conn->async_write(NoJob,
[db03d9]137 boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this,
[c7deca]138 boost::asio::placeholders::error, conn));
139
[ef2767]140 // then there must be no read necesary
141
[b0b64c]142 ELOG(2, "There is currently no job present in the queue.");
143 }
[cd4a6e]144 }
145 else
146 {
147 // An error occurred. Log it and return. Since we are not starting a new
148 // accept operation the io_service will run out of work to do and the
149 // server will exit.
[3c4a5e]150 Exitflag = WorkerErrorFlag;
[b0b64c]151 ELOG(0, e.message());
[cd4a6e]152 }
[778abb]153
154 // Start an accept operation for a new Connection only when there
155 // are still jobs present
156 if (JobsQueue.isJobPresent())
157 initiateWorkerSocket();
[ed2c5b]158}
[72eaf7f]159
[db03d9]160/** Callback function when job has been sent.
161 *
162 * After job has been sent we start async_read() for the result.
163 *
164 * \sa handle_ReceiveResultFromWorker()
165 *
166 * \param e error code if something went wrong
167 * \param conn reference with the connection
168 */
169void FragmentScheduler::handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn)
[ed2c5b]170{
[af3aed]171 Info info(__FUNCTION__);
[ef2767]172 LOG(1, "INFO: Job sent.");
173 // obtain result
174 LOG(1, "INFO: Receiving result for a job ...");
175 conn->async_read(result,
[db03d9]176 boost::bind(&FragmentScheduler::handle_ReceiveResultFromWorker, this,
[ef2767]177 boost::asio::placeholders::error, conn));
178}
179
[db03d9]180/** Callback function when result has been received.
181 *
182 * \param e error code if something went wrong
183 * \param conn reference with the connection
184 */
185void FragmentScheduler::handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
[ef2767]186{
[db03d9]187 Info info(__FUNCTION__);
[35f587]188 LOG(1, "INFO: Received result for job #" << result->getId() << " ...");
189 // and push into queue
190 ASSERT(result->getId() != (JobId_t)JobId::NoJob,
[db03d9]191 "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has NoJob id.");
[35f587]192 ASSERT(result->getId() != (JobId_t)JobId::IllegalJob,
[db03d9]193 "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has IllegalJob id.");
[778abb]194 // place id into expected
[35f587]195 if ((result->getId() != (JobId_t)JobId::NoJob) && (result->getId() != (JobId_t)JobId::IllegalJob))
[db03d9]196 JobsQueue.pushResult(result);
197 // erase result
[35f587]198 result.reset();
[778abb]199 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
[db03d9]200}
201
202/** Handle a new controller connection.
203 *
204 * \sa handle_ReceiveJobs()
205 * \sa handle_CheckResultState()
206 * \sa handle_SendResults()
207 *
208 * \param e error code if something went wrong
209 * \param conn reference with the connection
210 */
211void FragmentScheduler::handle_AcceptController(const boost::system::error_code& e, connection_ptr conn)
212{
213 Info info(__FUNCTION__);
214 if (!e)
215 {
[778abb]216 conn->async_read(choice,
217 boost::bind(&FragmentScheduler::handle_ReadChoice, this,
218 boost::asio::placeholders::error, conn));
219 }
220 else
221 {
222 // An error occurred. Log it and return. Since we are not starting a new
223 // accept operation the io_service will run out of work to do and the
224 // server will exit.
225 Exitflag = ControllerErrorFlag;
226 ELOG(0, e.message());
227 }
228}
229
230/** Controller callback function to read the choice for next operation.
231 *
232 * \param e error code if something went wrong
233 * \param conn reference with the connection
234 */
235void FragmentScheduler::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
236{
237 Info info(__FUNCTION__);
238 if (!e)
239 {
[0196c6]240 bool LaunchNewAcceptor = true;
[778abb]241 // switch over the desired choice read previously
242 switch(choice) {
243 case NoOperation:
244 {
245 ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with NoOperation.");
246 break;
247 }
248 case ReceiveJobs:
249 {
250 // The connection::async_write() function will automatically
251 // serialize the data structure for us.
252 LOG(1, "INFO: Receiving bunch of jobs from a controller ...");
253 conn->async_read(jobs,
254 boost::bind(&FragmentScheduler::handle_ReceiveJobs, this,
255 boost::asio::placeholders::error, conn));
256 break;
257 }
258 case CheckState:
259 {
[3c4a5e]260 // first update number
261 doneJobs = JobsQueue.getDoneJobs();
262 // now we accept connections to check for state of calculations
[778abb]263 LOG(1, "INFO: Sending state that "+toString(doneJobs)+" jobs are done to controller ...");
[3c4a5e]264 conn->async_write(doneJobs,
265 boost::bind(&FragmentScheduler::handle_CheckResultState, this,
266 boost::asio::placeholders::error, conn));
[778abb]267 break;
268 }
269 case SendResults:
270 {
[35f587]271 const std::vector<FragmentResult::ptr> results = JobsQueue.getAllResults();
[778abb]272 // ... or we give the results
273 LOG(1, "INFO: Sending "+toString(results.size())+" results to controller ...");
274 conn->async_write(results,
275 boost::bind(&FragmentScheduler::handle_SendResults, this,
276 boost::asio::placeholders::error, conn));
[0196c6]277 break;
278 }
279 case Shutdown:
280 {
281 LaunchNewAcceptor = false;
[778abb]282 break;
[db03d9]283 }
[778abb]284 default:
285 Exitflag = ControllerErrorFlag;
286 ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with no valid choice.");
287 break;
288 }
289 // restore NoOperation choice such that choice is not read twice
290 choice = NoOperation;
291
[0196c6]292 if (LaunchNewAcceptor) {
293 LOG(1, "Launching new acceptor on socket.");
294 // Start an accept operation for a new Connection.
295 connection_ptr new_conn(new Connection(controller_acceptor_.get_io_service()));
296 controller_acceptor_.async_accept(new_conn->socket(),
297 boost::bind(&FragmentScheduler::handle_AcceptController, this,
298 boost::asio::placeholders::error, new_conn));
299 }
[db03d9]300 }
301 else
302 {
303 // An error occurred. Log it and return. Since we are not starting a new
304 // accept operation the io_service will run out of work to do and the
305 // server will exit.
[3c4a5e]306 Exitflag = ControllerErrorFlag;
[db03d9]307 ELOG(0, e.message());
308 }
309}
310
311/** Controller callback function when job has been sent.
[778abb]312 *
313 * We check here whether the worker socket is accepting, if there
314 * have been no jobs we re-activate it, as it is shut down after
315 * last job.
[db03d9]316 *
317 * \param e error code if something went wrong
318 * \param conn reference with the connection
319 */
320void FragmentScheduler::handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn)
321{
322 Info info(__FUNCTION__);
[778abb]323 bool initiateSocket = !JobsQueue.isJobPresent();
324
[db03d9]325 // jobs are received, hence place in JobsQueue
326 if (!jobs.empty()) {
327 LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue.");
328 JobsQueue.pushJobs(jobs);
[778abb]329 // initiate socket if we had no jobs before
330 if (initiateSocket)
331 initiateWorkerSocket();
[db03d9]332 }
333
334 jobs.clear();
[778abb]335
[ed2c5b]336}
[cd4a6e]337
[3c4a5e]338/** Controller callback function when checking on state of results.
339 *
340 * \param e error code if something went wrong
341 * \param conn reference with the connection
342 */
343void FragmentScheduler::handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn)
344{
345 Info info(__FUNCTION__);
346 // do nothing
347 LOG(1, "INFO: Sent that " << doneJobs << " jobs are done.");
348}
[778abb]349
350/** Controller callback function when result has been received.
351 *
352 * \param e error code if something went wrong
353 * \param conn reference with the connection
354 */
355void FragmentScheduler::handle_SendResults(const boost::system::error_code& e, connection_ptr conn)
356{
357 Info info(__FUNCTION__);
358 // do nothing
359 LOG(1, "INFO: Results have been sent.");
360}
361
Note: See TracBrowser for help on using the repository browser.