source: src/Fragmentation/Automation/FragmentScheduler.cpp@ 95b384

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 Candidate_v1.7.0 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since 95b384 was 95b384, checked in by Frederik Heber <heber@…>, 13 years ago

Removed duplicate code in FragmentScheduler with initiating new connections.

  • Property mode set to 100644
File size: 12.1 KB
Line 
1/*
2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * \file FragmentScheduler.cpp
10 *
11 * This file strongly follows the Serialization example from the boost::asio
12 * library (see server.cpp)
13 *
14 * Created on: Oct 19, 2011
15 * Author: heber
16 */
17
18// include config.h
19#ifdef HAVE_CONFIG_H
20#include <config.h>
21#endif
22
23// boost asio needs specific operator new
24#include <boost/asio.hpp>
25
26#include "CodePatterns/MemDebug.hpp"
27
28#include <boost/bind.hpp>
29#include <boost/lexical_cast.hpp>
30#include <iostream>
31#include <vector>
32#include "Connection.hpp" // Must come before boost/serialization headers.
33#include <boost/serialization/vector.hpp>
34#include "CodePatterns/Info.hpp"
35#include "CodePatterns/Log.hpp"
36#include "Jobs/MPQCCommandJob.hpp"
37#include "Jobs/SystemCommandJob.hpp"
38#include "JobId.hpp"
39
40#include "FragmentScheduler.hpp"
41
42FragmentJob::ptr FragmentScheduler::NoJob(new SystemCommandJob(std::string("/bin/true"), std::string("dosomething"), JobId::NoJob));
43
44/** Helper function to enforce binding of FragmentWorker to possible derived
45 * FragmentJob classes.
46 */
47void dummyInit() {
48 SystemCommandJob("/bin/false", "something", JobId::IllegalJob);
49 MPQCCommandJob("nofile", JobId::IllegalJob);
50}
51
52/** Constructor of class FragmentScheduler.
53 *
54 * We setup both acceptors to accept connections from workers and Controller.
55 *
56 * \param io_service io_service of the asynchronous communications
57 * \param workerport port to listen for worker connections
58 * \param controllerport port to listen for controller connections.
59 */
60FragmentScheduler::FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport) :
61 worker_acceptor_(io_service,
62 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), workerport)
63 ),
64 controller_acceptor_(io_service,
65 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), controllerport)
66 ),
67 result( new FragmentResult(JobId::NoJob) ),
68 jobInfo((size_t)2, 0),
69 choice(NoOperation),
70 globalId(0),
71 Exitflag(OkFlag)
72{
73 Info info(__FUNCTION__);
74
75 // only initiate socket if jobs are already present
76 if (JobsQueue.isJobPresent()) {
77 LOG(1, "Listening for workers on port " << workerport << ".");
78 initiateWorkerSocket();
79 }
80
81 initiateControllerSocket();
82 LOG(1, "Listening for controller on port " << controllerport << ".");
83}
84
85/** Internal function to start worker connection.
86 *
87 */
88void FragmentScheduler::initiateWorkerSocket()
89{
90 // Start an accept operation for worker connections.
91 connection_ptr new_conn(new Connection(worker_acceptor_.get_io_service()));
92 worker_acceptor_.async_accept(new_conn->socket(),
93 boost::bind(&FragmentScheduler::handle_AcceptWorker, this,
94 boost::asio::placeholders::error, new_conn));
95}
96
97/** Internal function to start controller connection.
98 *
99 */
100void FragmentScheduler::initiateControllerSocket()
101{
102 // Start an accept operation for controller connection.
103 connection_ptr new_conn(new Connection(controller_acceptor_.get_io_service()));
104 controller_acceptor_.async_accept(new_conn->socket(),
105 boost::bind(&FragmentScheduler::handle_AcceptController, this,
106 boost::asio::placeholders::error, new_conn));
107}
108
109
110/** Handle a new worker connection.
111 *
112 * We check whether jobs are in the JobsQueue. If present, job is sent.
113 *
114 * \sa handle_SendJobtoWorker()
115 *
116 * \param e error code if something went wrong
117 * \param conn reference with the connection
118 */
119void FragmentScheduler::handle_AcceptWorker(const boost::system::error_code& e, connection_ptr conn)
120{
121 Info info(__FUNCTION__);
122 if (!e)
123 {
124 // Successfully accepted a new connection.
125 // Check whether there are jobs in the queue
126 if (JobsQueue.isJobPresent()) {
127 // pop a job and send it to the client.
128 FragmentJob::ptr job(JobsQueue.popJob());
129 // The connection::async_write() function will automatically
130 // serialize the data structure for us.
131 LOG(1, "INFO: Sending job #" << job->getId() << ".");
132 conn->async_write(job,
133 boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this,
134 boost::asio::placeholders::error, conn));
135
136 } else {
137 // send the static NoJob
138 conn->async_write(NoJob,
139 boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this,
140 boost::asio::placeholders::error, conn));
141
142 // then there must be no read necesary
143
144 ELOG(2, "There is currently no job present in the queue.");
145 }
146 }
147 else
148 {
149 // An error occurred. Log it and return. Since we are not starting a new
150 // accept operation the io_service will run out of work to do and the
151 // server will exit.
152 Exitflag = WorkerErrorFlag;
153 ELOG(0, e.message());
154 }
155
156 if (JobsQueue.isJobPresent()) {
157 // Start an accept operation for a new Connection only when there
158 // are still jobs present
159 initiateWorkerSocket();
160 }
161}
162
163/** Callback function when job has been sent.
164 *
165 * After job has been sent we start async_read() for the result.
166 *
167 * \sa handle_ReceiveResultFromWorker()
168 *
169 * \param e error code if something went wrong
170 * \param conn reference with the connection
171 */
172void FragmentScheduler::handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn)
173{
174 Info info(__FUNCTION__);
175 LOG(1, "INFO: Job sent.");
176 // obtain result
177 LOG(1, "INFO: Receiving result for a job ...");
178 conn->async_read(result,
179 boost::bind(&FragmentScheduler::handle_ReceiveResultFromWorker, this,
180 boost::asio::placeholders::error, conn));
181}
182
183/** Callback function when result has been received.
184 *
185 * \param e error code if something went wrong
186 * \param conn reference with the connection
187 */
188void FragmentScheduler::handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
189{
190 Info info(__FUNCTION__);
191 LOG(1, "INFO: Received result for job #" << result->getId() << " ...");
192 // and push into queue
193 ASSERT(result->getId() != (JobId_t)JobId::NoJob,
194 "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has NoJob id.");
195 ASSERT(result->getId() != (JobId_t)JobId::IllegalJob,
196 "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has IllegalJob id.");
197 // place id into expected
198 if ((result->getId() != (JobId_t)JobId::NoJob) && (result->getId() != (JobId_t)JobId::IllegalJob))
199 JobsQueue.pushResult(result);
200 // erase result
201 result.reset();
202 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
203}
204
205/** Handle a new controller connection.
206 *
207 * \sa handle_ReceiveJobs()
208 * \sa handle_CheckResultState()
209 * \sa handle_SendResults()
210 *
211 * \param e error code if something went wrong
212 * \param conn reference with the connection
213 */
214void FragmentScheduler::handle_AcceptController(const boost::system::error_code& e, connection_ptr conn)
215{
216 Info info(__FUNCTION__);
217 if (!e)
218 {
219 conn->async_read(choice,
220 boost::bind(&FragmentScheduler::handle_ReadChoice, this,
221 boost::asio::placeholders::error, conn));
222 }
223 else
224 {
225 // An error occurred. Log it and return. Since we are not starting a new
226 // accept operation the io_service will run out of work to do and the
227 // server will exit.
228 Exitflag = ControllerErrorFlag;
229 ELOG(0, e.message());
230 }
231}
232
233/** Controller callback function to read the choice for next operation.
234 *
235 * \param e error code if something went wrong
236 * \param conn reference with the connection
237 */
238void FragmentScheduler::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
239{
240 Info info(__FUNCTION__);
241 if (!e)
242 {
243 bool LaunchNewAcceptor = true;
244 LOG(1, "INFO: Received request for operation " << choice << ".");
245 // switch over the desired choice read previously
246 switch(choice) {
247 case NoOperation:
248 {
249 ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with NoOperation.");
250 break;
251 }
252 case GetNextJobId:
253 {
254 const JobId_t nextid = globalId.getNextId();
255 LOG(1, "INFO: Sending next available job id " << nextid << " to controller ...");
256 conn->async_write(nextid,
257 boost::bind(&FragmentScheduler::handle_GetNextJobIdState, this,
258 boost::asio::placeholders::error, conn));
259 break;
260 }
261 case ReceiveJobs:
262 {
263 // The connection::async_write() function will automatically
264 // serialize the data structure for us.
265 LOG(1, "INFO: Receiving bunch of jobs from a controller ...");
266 conn->async_read(jobs,
267 boost::bind(&FragmentScheduler::handle_ReceiveJobs, this,
268 boost::asio::placeholders::error, conn));
269 break;
270 }
271 case CheckState:
272 {
273 // first update number
274 jobInfo[0] = JobsQueue.getPresentJobs();
275 jobInfo[1] = JobsQueue.getDoneJobs();
276 // now we accept connections to check for state of calculations
277 LOG(1, "INFO: Sending state that "+toString(jobInfo[0])+" jobs are present and "+toString(jobInfo[1])+" jobs are done to controller ...");
278 conn->async_write(jobInfo,
279 boost::bind(&FragmentScheduler::handle_CheckResultState, this,
280 boost::asio::placeholders::error, conn));
281 break;
282 }
283 case SendResults:
284 {
285 const std::vector<FragmentResult::ptr> results = JobsQueue.getAllResults();
286 // ... or we give the results
287 LOG(1, "INFO: Sending "+toString(results.size())+" results to controller ...");
288 conn->async_write(results,
289 boost::bind(&FragmentScheduler::handle_SendResults, this,
290 boost::asio::placeholders::error, conn));
291 break;
292 }
293 case Shutdown:
294 {
295 LaunchNewAcceptor = false;
296 break;
297 }
298 default:
299 Exitflag = ControllerErrorFlag;
300 ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with no valid choice.");
301 break;
302 }
303 // restore NoOperation choice such that choice is not read twice
304 choice = NoOperation;
305
306 if (LaunchNewAcceptor) {
307 LOG(1, "Launching new acceptor on socket.");
308 // Start an accept operation for a new Connection.
309 initiateControllerSocket();
310 }
311 }
312 else
313 {
314 // An error occurred. Log it and return. Since we are not starting a new
315 // accept operation the io_service will run out of work to do and the
316 // server will exit.
317 Exitflag = ControllerErrorFlag;
318 ELOG(0, e.message());
319 }
320}
321
322/** Controller callback function when job has been sent.
323 *
324 * We check here whether the worker socket is accepting, if there
325 * have been no jobs we re-activate it, as it is shut down after
326 * last job.
327 *
328 * \param e error code if something went wrong
329 * \param conn reference with the connection
330 */
331void FragmentScheduler::handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn)
332{
333 Info info(__FUNCTION__);
334 bool initiateSocket = !JobsQueue.isJobPresent();
335
336 // jobs are received, hence place in JobsQueue
337 if (!jobs.empty()) {
338 LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue.");
339 JobsQueue.pushJobs(jobs);
340 // initiate socket if we had no jobs before
341 if (initiateSocket)
342 initiateWorkerSocket();
343 }
344
345 jobs.clear();
346
347}
348
349/** Controller callback function when checking on state of results.
350 *
351 * \param e error code if something went wrong
352 * \param conn reference with the connection
353 */
354void FragmentScheduler::handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn)
355{
356 Info info(__FUNCTION__);
357 // do nothing
358 LOG(1, "INFO: Sent that " << jobInfo << " jobs are (scheduled, done).");
359}
360
361/** Controller callback function when checking on state of results.
362 *
363 * \param e error code if something went wrong
364 * \param conn reference with the connection
365 */
366void FragmentScheduler::handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn)
367{
368 Info info(__FUNCTION__);
369 // do nothing
370 LOG(1, "INFO: Sent next available job id.");
371}
372
373/** Controller callback function when result has been received.
374 *
375 * \param e error code if something went wrong
376 * \param conn reference with the connection
377 */
378void FragmentScheduler::handle_SendResults(const boost::system::error_code& e, connection_ptr conn)
379{
380 Info info(__FUNCTION__);
381 // do nothing
382 LOG(1, "INFO: Results have been sent.");
383}
384
Note: See TracBrowser for help on using the repository browser.