source: src/Fragmentation/Automation/FragmentScheduler.cpp@ 509014

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since 509014 was d1dbfc, checked in by Frederik Heber <heber@…>, 13 years ago

New GetNextJobIdOperation for obtaining next available JobId from server.

  • as JobId is required to create the job, we now need two, separate communication phases: gathering info (ids) and sending info (jobs).
  • new SchedulderState GetNextJobId along with handlers.
  • new GetNextJobIdOperation that requests another id which is internally stored into a list, along with a getter that extracts them one by one.
  • controller's createjobs() and parsejobs() each take an nextid parameter now.
  • Property mode set to 100644
File size: 12.3 KB
Line 
1/*
2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * \file FragmentScheduler.cpp
10 *
11 * This file strongly follows the Serialization example from the boost::asio
12 * library (see server.cpp)
13 *
14 * Created on: Oct 19, 2011
15 * Author: heber
16 */
17
18// include config.h
19#ifdef HAVE_CONFIG_H
20#include <config.h>
21#endif
22
23// boost asio needs specific operator new
24#include <boost/asio.hpp>
25
26#include "CodePatterns/MemDebug.hpp"
27
28#include <boost/bind.hpp>
29#include <boost/lexical_cast.hpp>
30#include <iostream>
31#include <vector>
32#include "Connection.hpp" // Must come before boost/serialization headers.
33#include <boost/serialization/vector.hpp>
34#include "CodePatterns/Info.hpp"
35#include "CodePatterns/Log.hpp"
36#include "Jobs/MPQCCommandJob.hpp"
37#include "Jobs/SystemCommandJob.hpp"
38#include "JobId.hpp"
39
40#include "FragmentScheduler.hpp"
41
42FragmentJob::ptr FragmentScheduler::NoJob(new SystemCommandJob(std::string("/bin/true"), std::string("dosomething"), JobId::NoJob));
43
44/** Helper function to enforce binding of FragmentWorker to possible derived
45 * FragmentJob classes.
46 */
47void dummyInit() {
48 SystemCommandJob("/bin/false", "something", JobId::IllegalJob);
49 MPQCCommandJob("nofile", JobId::IllegalJob);
50}
51
52/** Constructor of class FragmentScheduler.
53 *
54 * We setup both acceptors to accept connections from workers and Controller.
55 *
56 * \param io_service io_service of the asynchronous communications
57 * \param workerport port to listen for worker connections
58 * \param controllerport port to listen for controller connections.
59 */
60FragmentScheduler::FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport) :
61 worker_acceptor_(io_service,
62 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), workerport)
63 ),
64 controller_acceptor_(io_service,
65 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), controllerport)
66 ),
67 result( new FragmentResult(JobId::NoJob) ),
68 jobInfo((size_t)2, 0),
69 choice(NoOperation),
70 globalId(0),
71 Exitflag(OkFlag)
72{
73 Info info(__FUNCTION__);
74
75 // only initiate socket if jobs are already present
76 if (JobsQueue.isJobPresent()) {
77 LOG(1, "Listening for workers on port " << workerport << ".");
78 initiateWorkerSocket();
79 }
80
81 initiateControllerSocket();
82 LOG(1, "Listening for controller on port " << controllerport << ".");
83}
84
85/** Internal function to start worker connection.
86 *
87 */
88void FragmentScheduler::initiateWorkerSocket()
89{
90 // Start an accept operation for worker connections.
91 connection_ptr new_conn(new Connection(worker_acceptor_.get_io_service()));
92 worker_acceptor_.async_accept(new_conn->socket(),
93 boost::bind(&FragmentScheduler::handle_AcceptWorker, this,
94 boost::asio::placeholders::error, new_conn));
95}
96
97/** Internal function to start controller connection.
98 *
99 */
100void FragmentScheduler::initiateControllerSocket()
101{
102 // Start an accept operation for controller connection.
103 connection_ptr new_conn(new Connection(controller_acceptor_.get_io_service()));
104 controller_acceptor_.async_accept(new_conn->socket(),
105 boost::bind(&FragmentScheduler::handle_AcceptController, this,
106 boost::asio::placeholders::error, new_conn));
107}
108
109
110/** Handle a new worker connection.
111 *
112 * We check whether jobs are in the JobsQueue. If present, job is sent.
113 *
114 * \sa handle_SendJobtoWorker()
115 *
116 * \param e error code if something went wrong
117 * \param conn reference with the connection
118 */
119void FragmentScheduler::handle_AcceptWorker(const boost::system::error_code& e, connection_ptr conn)
120{
121 Info info(__FUNCTION__);
122 if (!e)
123 {
124 // Successfully accepted a new connection.
125 // Check whether there are jobs in the queue
126 if (JobsQueue.isJobPresent()) {
127 // pop a job and send it to the client.
128 FragmentJob::ptr job(JobsQueue.popJob());
129 // The connection::async_write() function will automatically
130 // serialize the data structure for us.
131 LOG(1, "INFO: Sending job #" << job->getId() << ".");
132 conn->async_write(job,
133 boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this,
134 boost::asio::placeholders::error, conn));
135
136 } else {
137 // send the static NoJob
138 conn->async_write(NoJob,
139 boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this,
140 boost::asio::placeholders::error, conn));
141
142 // then there must be no read necesary
143
144 ELOG(2, "There is currently no job present in the queue.");
145 }
146 }
147 else
148 {
149 // An error occurred. Log it and return. Since we are not starting a new
150 // accept operation the io_service will run out of work to do and the
151 // server will exit.
152 Exitflag = WorkerErrorFlag;
153 ELOG(0, e.message());
154 }
155
156 // Start an accept operation for a new Connection only when there
157 // are still jobs present
158 if (JobsQueue.isJobPresent())
159 initiateWorkerSocket();
160}
161
162/** Callback function when job has been sent.
163 *
164 * After job has been sent we start async_read() for the result.
165 *
166 * \sa handle_ReceiveResultFromWorker()
167 *
168 * \param e error code if something went wrong
169 * \param conn reference with the connection
170 */
171void FragmentScheduler::handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn)
172{
173 Info info(__FUNCTION__);
174 LOG(1, "INFO: Job sent.");
175 // obtain result
176 LOG(1, "INFO: Receiving result for a job ...");
177 conn->async_read(result,
178 boost::bind(&FragmentScheduler::handle_ReceiveResultFromWorker, this,
179 boost::asio::placeholders::error, conn));
180}
181
182/** Callback function when result has been received.
183 *
184 * \param e error code if something went wrong
185 * \param conn reference with the connection
186 */
187void FragmentScheduler::handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
188{
189 Info info(__FUNCTION__);
190 LOG(1, "INFO: Received result for job #" << result->getId() << " ...");
191 // and push into queue
192 ASSERT(result->getId() != (JobId_t)JobId::NoJob,
193 "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has NoJob id.");
194 ASSERT(result->getId() != (JobId_t)JobId::IllegalJob,
195 "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has IllegalJob id.");
196 // place id into expected
197 if ((result->getId() != (JobId_t)JobId::NoJob) && (result->getId() != (JobId_t)JobId::IllegalJob))
198 JobsQueue.pushResult(result);
199 // erase result
200 result.reset();
201 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
202}
203
204/** Handle a new controller connection.
205 *
206 * \sa handle_ReceiveJobs()
207 * \sa handle_CheckResultState()
208 * \sa handle_SendResults()
209 *
210 * \param e error code if something went wrong
211 * \param conn reference with the connection
212 */
213void FragmentScheduler::handle_AcceptController(const boost::system::error_code& e, connection_ptr conn)
214{
215 Info info(__FUNCTION__);
216 if (!e)
217 {
218 conn->async_read(choice,
219 boost::bind(&FragmentScheduler::handle_ReadChoice, this,
220 boost::asio::placeholders::error, conn));
221 }
222 else
223 {
224 // An error occurred. Log it and return. Since we are not starting a new
225 // accept operation the io_service will run out of work to do and the
226 // server will exit.
227 Exitflag = ControllerErrorFlag;
228 ELOG(0, e.message());
229 }
230}
231
232/** Controller callback function to read the choice for next operation.
233 *
234 * \param e error code if something went wrong
235 * \param conn reference with the connection
236 */
237void FragmentScheduler::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
238{
239 Info info(__FUNCTION__);
240 if (!e)
241 {
242 bool LaunchNewAcceptor = true;
243 LOG(1, "INFO: Received request for operation " << choice << ".");
244 // switch over the desired choice read previously
245 switch(choice) {
246 case NoOperation:
247 {
248 ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with NoOperation.");
249 break;
250 }
251 case GetNextJobId:
252 {
253 const JobId_t nextid = globalId.getNextId();
254 LOG(1, "INFO: Sending next available job id " << nextid << " to controller ...");
255 conn->async_write(nextid,
256 boost::bind(&FragmentScheduler::handle_GetNextJobIdState, this,
257 boost::asio::placeholders::error, conn));
258 break;
259 }
260 case ReceiveJobs:
261 {
262 // The connection::async_write() function will automatically
263 // serialize the data structure for us.
264 LOG(1, "INFO: Receiving bunch of jobs from a controller ...");
265 conn->async_read(jobs,
266 boost::bind(&FragmentScheduler::handle_ReceiveJobs, this,
267 boost::asio::placeholders::error, conn));
268 break;
269 }
270 case CheckState:
271 {
272 // first update number
273 jobInfo[0] = JobsQueue.getPresentJobs();
274 jobInfo[1] = JobsQueue.getDoneJobs();
275 // now we accept connections to check for state of calculations
276 LOG(1, "INFO: Sending state that "+toString(jobInfo[0])+" jobs are present and "+toString(jobInfo[1])+" jobs are done to controller ...");
277 conn->async_write(jobInfo,
278 boost::bind(&FragmentScheduler::handle_CheckResultState, this,
279 boost::asio::placeholders::error, conn));
280 break;
281 }
282 case SendResults:
283 {
284 const std::vector<FragmentResult::ptr> results = JobsQueue.getAllResults();
285 // ... or we give the results
286 LOG(1, "INFO: Sending "+toString(results.size())+" results to controller ...");
287 conn->async_write(results,
288 boost::bind(&FragmentScheduler::handle_SendResults, this,
289 boost::asio::placeholders::error, conn));
290 break;
291 }
292 case Shutdown:
293 {
294 LaunchNewAcceptor = false;
295 break;
296 }
297 default:
298 Exitflag = ControllerErrorFlag;
299 ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with no valid choice.");
300 break;
301 }
302 // restore NoOperation choice such that choice is not read twice
303 choice = NoOperation;
304
305 if (LaunchNewAcceptor) {
306 LOG(1, "Launching new acceptor on socket.");
307 // Start an accept operation for a new Connection.
308 connection_ptr new_conn(new Connection(controller_acceptor_.get_io_service()));
309 controller_acceptor_.async_accept(new_conn->socket(),
310 boost::bind(&FragmentScheduler::handle_AcceptController, this,
311 boost::asio::placeholders::error, new_conn));
312 }
313 }
314 else
315 {
316 // An error occurred. Log it and return. Since we are not starting a new
317 // accept operation the io_service will run out of work to do and the
318 // server will exit.
319 Exitflag = ControllerErrorFlag;
320 ELOG(0, e.message());
321 }
322}
323
324/** Controller callback function when job has been sent.
325 *
326 * We check here whether the worker socket is accepting, if there
327 * have been no jobs we re-activate it, as it is shut down after
328 * last job.
329 *
330 * \param e error code if something went wrong
331 * \param conn reference with the connection
332 */
333void FragmentScheduler::handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn)
334{
335 Info info(__FUNCTION__);
336 bool initiateSocket = !JobsQueue.isJobPresent();
337
338 // jobs are received, hence place in JobsQueue
339 if (!jobs.empty()) {
340 LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue.");
341 JobsQueue.pushJobs(jobs);
342 // initiate socket if we had no jobs before
343 if (initiateSocket)
344 initiateWorkerSocket();
345 }
346
347 jobs.clear();
348
349}
350
351/** Controller callback function when checking on state of results.
352 *
353 * \param e error code if something went wrong
354 * \param conn reference with the connection
355 */
356void FragmentScheduler::handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn)
357{
358 Info info(__FUNCTION__);
359 // do nothing
360 LOG(1, "INFO: Sent that " << jobInfo << " jobs are (scheduled, done).");
361}
362
363/** Controller callback function when checking on state of results.
364 *
365 * \param e error code if something went wrong
366 * \param conn reference with the connection
367 */
368void FragmentScheduler::handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn)
369{
370 Info info(__FUNCTION__);
371 // do nothing
372 LOG(1, "INFO: Sent next available job id.");
373}
374
375/** Controller callback function when result has been received.
376 *
377 * \param e error code if something went wrong
378 * \param conn reference with the connection
379 */
380void FragmentScheduler::handle_SendResults(const boost::system::error_code& e, connection_ptr conn)
381{
382 Info info(__FUNCTION__);
383 // do nothing
384 LOG(1, "INFO: Results have been sent.");
385}
386
Note: See TracBrowser for help on using the repository browser.