source: src/Fragmentation/Automation/FragmentScheduler.cpp@ 6f2bc7

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 Candidate_v1.7.0 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since 6f2bc7 was 6f2bc7, checked in by Frederik Heber <heber@…>, 13 years ago

CheckResultsOperation now receives both scheduled and done jobs.

  • Property mode set to 100644
File size: 11.6 KB
Line 
1/*
2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * \file FragmentScheduler.cpp
10 *
11 * This file strongly follows the Serialization example from the boost::asio
12 * library (see server.cpp)
13 *
14 * Created on: Oct 19, 2011
15 * Author: heber
16 */
17
18// include config.h
19#ifdef HAVE_CONFIG_H
20#include <config.h>
21#endif
22
23// boost asio needs specific operator new
24#include <boost/asio.hpp>
25
26#include "CodePatterns/MemDebug.hpp"
27
28#include <boost/bind.hpp>
29#include <boost/lexical_cast.hpp>
30#include <iostream>
31#include <vector>
32#include "Connection.hpp" // Must come before boost/serialization headers.
33#include <boost/serialization/vector.hpp>
34#include "CodePatterns/Info.hpp"
35#include "CodePatterns/Log.hpp"
36#include "Jobs/MPQCCommandJob.hpp"
37#include "Jobs/SystemCommandJob.hpp"
38#include "JobId.hpp"
39
40#include "FragmentScheduler.hpp"
41
42FragmentJob::ptr FragmentScheduler::NoJob(new SystemCommandJob(std::string("/bin/true"), std::string("dosomething"), JobId::NoJob));
43
44/** Helper function to enforce binding of FragmentWorker to possible derived
45 * FragmentJob classes.
46 */
47void dummyInit() {
48 SystemCommandJob("/bin/false", "something", JobId::IllegalJob);
49 MPQCCommandJob("nofile", JobId::IllegalJob);
50}
51
52/** Constructor of class FragmentScheduler.
53 *
54 * We setup both acceptors to accept connections from workers and Controller.
55 *
56 * \param io_service io_service of the asynchronous communications
57 * \param workerport port to listen for worker connections
58 * \param controllerport port to listen for controller connections.
59 */
60FragmentScheduler::FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport) :
61 worker_acceptor_(io_service,
62 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), workerport)
63 ),
64 controller_acceptor_(io_service,
65 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), controllerport)
66 ),
67 result( new FragmentResult(JobId::NoJob) ),
68 jobInfo((size_t)2, 0),
69 choice(NoOperation),
70 Exitflag(OkFlag)
71{
72 Info info(__FUNCTION__);
73
74 // only initiate socket if jobs are already present
75 if (JobsQueue.isJobPresent()) {
76 LOG(1, "Listening for workers on port " << workerport << ".");
77 initiateWorkerSocket();
78 }
79
80 initiateControllerSocket();
81 LOG(1, "Listening for controller on port " << controllerport << ".");
82}
83
84/** Internal function to start worker connection.
85 *
86 */
87void FragmentScheduler::initiateWorkerSocket()
88{
89 // Start an accept operation for worker connections.
90 connection_ptr new_conn(new Connection(worker_acceptor_.get_io_service()));
91 worker_acceptor_.async_accept(new_conn->socket(),
92 boost::bind(&FragmentScheduler::handle_AcceptWorker, this,
93 boost::asio::placeholders::error, new_conn));
94}
95
96/** Internal function to start controller connection.
97 *
98 */
99void FragmentScheduler::initiateControllerSocket()
100{
101 // Start an accept operation for controller connection.
102 connection_ptr new_conn(new Connection(controller_acceptor_.get_io_service()));
103 controller_acceptor_.async_accept(new_conn->socket(),
104 boost::bind(&FragmentScheduler::handle_AcceptController, this,
105 boost::asio::placeholders::error, new_conn));
106}
107
108
109/** Handle a new worker connection.
110 *
111 * We check whether jobs are in the JobsQueue. If present, job is sent.
112 *
113 * \sa handle_SendJobtoWorker()
114 *
115 * \param e error code if something went wrong
116 * \param conn reference with the connection
117 */
118void FragmentScheduler::handle_AcceptWorker(const boost::system::error_code& e, connection_ptr conn)
119{
120 Info info(__FUNCTION__);
121 if (!e)
122 {
123 // Successfully accepted a new connection.
124 // Check whether there are jobs in the queue
125 if (JobsQueue.isJobPresent()) {
126 // pop a job and send it to the client.
127 FragmentJob::ptr job(JobsQueue.popJob());
128 // The connection::async_write() function will automatically
129 // serialize the data structure for us.
130 LOG(1, "INFO: Sending job #" << job->getId() << ".");
131 conn->async_write(job,
132 boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this,
133 boost::asio::placeholders::error, conn));
134
135 } else {
136 // send the static NoJob
137 conn->async_write(NoJob,
138 boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this,
139 boost::asio::placeholders::error, conn));
140
141 // then there must be no read necesary
142
143 ELOG(2, "There is currently no job present in the queue.");
144 }
145 }
146 else
147 {
148 // An error occurred. Log it and return. Since we are not starting a new
149 // accept operation the io_service will run out of work to do and the
150 // server will exit.
151 Exitflag = WorkerErrorFlag;
152 ELOG(0, e.message());
153 }
154
155 // Start an accept operation for a new Connection only when there
156 // are still jobs present
157 if (JobsQueue.isJobPresent())
158 initiateWorkerSocket();
159}
160
161/** Callback function when job has been sent.
162 *
163 * After job has been sent we start async_read() for the result.
164 *
165 * \sa handle_ReceiveResultFromWorker()
166 *
167 * \param e error code if something went wrong
168 * \param conn reference with the connection
169 */
170void FragmentScheduler::handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn)
171{
172 Info info(__FUNCTION__);
173 LOG(1, "INFO: Job sent.");
174 // obtain result
175 LOG(1, "INFO: Receiving result for a job ...");
176 conn->async_read(result,
177 boost::bind(&FragmentScheduler::handle_ReceiveResultFromWorker, this,
178 boost::asio::placeholders::error, conn));
179}
180
181/** Callback function when result has been received.
182 *
183 * \param e error code if something went wrong
184 * \param conn reference with the connection
185 */
186void FragmentScheduler::handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
187{
188 Info info(__FUNCTION__);
189 LOG(1, "INFO: Received result for job #" << result->getId() << " ...");
190 // and push into queue
191 ASSERT(result->getId() != (JobId_t)JobId::NoJob,
192 "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has NoJob id.");
193 ASSERT(result->getId() != (JobId_t)JobId::IllegalJob,
194 "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has IllegalJob id.");
195 // place id into expected
196 if ((result->getId() != (JobId_t)JobId::NoJob) && (result->getId() != (JobId_t)JobId::IllegalJob))
197 JobsQueue.pushResult(result);
198 // erase result
199 result.reset();
200 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
201}
202
203/** Handle a new controller connection.
204 *
205 * \sa handle_ReceiveJobs()
206 * \sa handle_CheckResultState()
207 * \sa handle_SendResults()
208 *
209 * \param e error code if something went wrong
210 * \param conn reference with the connection
211 */
212void FragmentScheduler::handle_AcceptController(const boost::system::error_code& e, connection_ptr conn)
213{
214 Info info(__FUNCTION__);
215 if (!e)
216 {
217 conn->async_read(choice,
218 boost::bind(&FragmentScheduler::handle_ReadChoice, this,
219 boost::asio::placeholders::error, conn));
220 }
221 else
222 {
223 // An error occurred. Log it and return. Since we are not starting a new
224 // accept operation the io_service will run out of work to do and the
225 // server will exit.
226 Exitflag = ControllerErrorFlag;
227 ELOG(0, e.message());
228 }
229}
230
231/** Controller callback function to read the choice for next operation.
232 *
233 * \param e error code if something went wrong
234 * \param conn reference with the connection
235 */
236void FragmentScheduler::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
237{
238 Info info(__FUNCTION__);
239 if (!e)
240 {
241 bool LaunchNewAcceptor = true;
242 // switch over the desired choice read previously
243 switch(choice) {
244 case NoOperation:
245 {
246 ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with NoOperation.");
247 break;
248 }
249 case ReceiveJobs:
250 {
251 // The connection::async_write() function will automatically
252 // serialize the data structure for us.
253 LOG(1, "INFO: Receiving bunch of jobs from a controller ...");
254 conn->async_read(jobs,
255 boost::bind(&FragmentScheduler::handle_ReceiveJobs, this,
256 boost::asio::placeholders::error, conn));
257 break;
258 }
259 case CheckState:
260 {
261 // first update number
262 jobInfo[0] = JobsQueue.getPresentJobs();
263 jobInfo[1] = JobsQueue.getDoneJobs();
264 // now we accept connections to check for state of calculations
265 LOG(1, "INFO: Sending state that "+toString(jobInfo[0])+" jobs are present and "+toString(jobInfo[1])+" jobs are done to controller ...");
266 conn->async_write(jobInfo,
267 boost::bind(&FragmentScheduler::handle_CheckResultState, this,
268 boost::asio::placeholders::error, conn));
269 break;
270 }
271 case SendResults:
272 {
273 const std::vector<FragmentResult::ptr> results = JobsQueue.getAllResults();
274 // ... or we give the results
275 LOG(1, "INFO: Sending "+toString(results.size())+" results to controller ...");
276 conn->async_write(results,
277 boost::bind(&FragmentScheduler::handle_SendResults, this,
278 boost::asio::placeholders::error, conn));
279 break;
280 }
281 case Shutdown:
282 {
283 LaunchNewAcceptor = false;
284 break;
285 }
286 default:
287 Exitflag = ControllerErrorFlag;
288 ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with no valid choice.");
289 break;
290 }
291 // restore NoOperation choice such that choice is not read twice
292 choice = NoOperation;
293
294 if (LaunchNewAcceptor) {
295 LOG(1, "Launching new acceptor on socket.");
296 // Start an accept operation for a new Connection.
297 connection_ptr new_conn(new Connection(controller_acceptor_.get_io_service()));
298 controller_acceptor_.async_accept(new_conn->socket(),
299 boost::bind(&FragmentScheduler::handle_AcceptController, this,
300 boost::asio::placeholders::error, new_conn));
301 }
302 }
303 else
304 {
305 // An error occurred. Log it and return. Since we are not starting a new
306 // accept operation the io_service will run out of work to do and the
307 // server will exit.
308 Exitflag = ControllerErrorFlag;
309 ELOG(0, e.message());
310 }
311}
312
313/** Controller callback function when job has been sent.
314 *
315 * We check here whether the worker socket is accepting, if there
316 * have been no jobs we re-activate it, as it is shut down after
317 * last job.
318 *
319 * \param e error code if something went wrong
320 * \param conn reference with the connection
321 */
322void FragmentScheduler::handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn)
323{
324 Info info(__FUNCTION__);
325 bool initiateSocket = !JobsQueue.isJobPresent();
326
327 // jobs are received, hence place in JobsQueue
328 if (!jobs.empty()) {
329 LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue.");
330 JobsQueue.pushJobs(jobs);
331 // initiate socket if we had no jobs before
332 if (initiateSocket)
333 initiateWorkerSocket();
334 }
335
336 jobs.clear();
337
338}
339
340/** Controller callback function when checking on state of results.
341 *
342 * \param e error code if something went wrong
343 * \param conn reference with the connection
344 */
345void FragmentScheduler::handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn)
346{
347 Info info(__FUNCTION__);
348 // do nothing
349 LOG(1, "INFO: Sent that " << jobInfo << " jobs are (scheduled, done).");
350}
351
352/** Controller callback function when result has been received.
353 *
354 * \param e error code if something went wrong
355 * \param conn reference with the connection
356 */
357void FragmentScheduler::handle_SendResults(const boost::system::error_code& e, connection_ptr conn)
358{
359 Info info(__FUNCTION__);
360 // do nothing
361 LOG(1, "INFO: Results have been sent.");
362}
363
Note: See TracBrowser for help on using the repository browser.