source: src/Fragmentation/Automation/FragmentScheduler.cpp@ 778abb

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 Candidate_v1.7.0 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since 778abb was 778abb, checked in by Frederik Heber <heber@…>, 13 years ago

Added ResultGetter and capabilities to receive calculated results to FragmentController.

  • Added enum (and file) ControllerChoices that defines the state of FragmentScheduler.
  • depending on what is desired the Scheduler switches between these states and either receives or sends information. Requires new member variable choice because receival is of course asynchronous (see note in previous commit).
  • FragmentController has additional functions connect_get() and handle_connect_get() to receive results.
  • connect_calc() and connect_check() now just the choice whereas the actual sending and receiving is done in handle_... functions.
  • handle_FinishOperation() is the common final callback function for all three of these functions.
  • FragmentScheduler contains an internal list of delivered results.
  • FragmentScheduler only initiates worker socket when jobs are present.
  • FIX: FragmentScheduler does only send results that are done and only once.
  • TESTFIX: Removed third Worker that receives NoJob as socket is powered down before because queue has run empty and we haven't add new jobs.
  • Property mode set to 100644
File size: 10.8 KB
Line 
1/*
2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * \file FragmentScheduler.cpp
10 *
11 * This file strongly follows the Serialization example from the boost::asio
12 * library (see server.cpp)
13 *
14 * Created on: Oct 19, 2011
15 * Author: heber
16 */
17
18// include config.h
19#ifdef HAVE_CONFIG_H
20#include <config.h>
21#endif
22
23// boost asio needs specific operator new
24#include <boost/asio.hpp>
25
26#include "CodePatterns/MemDebug.hpp"
27
28#include <boost/bind.hpp>
29#include <boost/lexical_cast.hpp>
30#include <iostream>
31#include <vector>
32#include "Connection.hpp" // Must come before boost/serialization headers.
33#include <boost/serialization/vector.hpp>
34#include "CodePatterns/Info.hpp"
35#include "CodePatterns/Log.hpp"
36#include "FragmentJob.hpp"
37#include "JobId.hpp"
38
39#include "FragmentScheduler.hpp"
40
41FragmentJob FragmentScheduler::NoJob(std::string("NoJob"), JobId::NoJob);
42
43/** Constructor of class FragmentScheduler.
44 *
45 * We setup both acceptors to accept connections from workers and Controller.
46 *
47 * \param io_service io_service of the asynchronous communications
48 * \param workerport port to listen for worker connections
49 * \param controllerport port to listen for controller connections.
50 */
51FragmentScheduler::FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport) :
52 worker_acceptor_(io_service,
53 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), workerport)
54 ),
55 controller_acceptor_(io_service,
56 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), controllerport)
57 ),
58 result(JobId::NoJob),
59 choice(NoOperation),
60 Exitflag(OkFlag)
61{
62 Info info(__FUNCTION__);
63
64 // only initiate socket if jobs are already present
65 if (JobsQueue.isJobPresent()) {
66 LOG(1, "Listening for workers on port " << workerport << ".");
67 initiateWorkerSocket();
68 }
69
70 initiateControllerSocket();
71 LOG(1, "Listening for controller on port " << controllerport << ".");
72}
73
74/** Internal function to start worker connection.
75 *
76 */
77void FragmentScheduler::initiateWorkerSocket()
78{
79 // Start an accept operation for worker connections.
80 connection_ptr new_conn(new Connection(worker_acceptor_.get_io_service()));
81 worker_acceptor_.async_accept(new_conn->socket(),
82 boost::bind(&FragmentScheduler::handle_AcceptWorker, this,
83 boost::asio::placeholders::error, new_conn));
84}
85
86/** Internal function to start controller connection.
87 *
88 */
89void FragmentScheduler::initiateControllerSocket()
90{
91 // Start an accept operation for controller connection.
92 connection_ptr new_conn(new Connection(controller_acceptor_.get_io_service()));
93 controller_acceptor_.async_accept(new_conn->socket(),
94 boost::bind(&FragmentScheduler::handle_AcceptController, this,
95 boost::asio::placeholders::error, new_conn));
96}
97
98
99/** Handle a new worker connection.
100 *
101 * We check whether jobs are in the JobsQueue. If present, job is sent.
102 *
103 * \sa handle_SendJobtoWorker()
104 *
105 * \param e error code if something went wrong
106 * \param conn reference with the connection
107 */
108void FragmentScheduler::handle_AcceptWorker(const boost::system::error_code& e, connection_ptr conn)
109{
110 Info info(__FUNCTION__);
111 if (!e)
112 {
113 // Successfully accepted a new connection.
114 // Check whether there are jobs in the queue
115 if (JobsQueue.isJobPresent()) {
116 // pop a job and send it to the client.
117 FragmentJob job(JobsQueue.popJob());
118 // The connection::async_write() function will automatically
119 // serialize the data structure for us.
120 LOG(1, "INFO: Sending job #" << job.getId() << ".");
121 conn->async_write(job,
122 boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this,
123 boost::asio::placeholders::error, conn));
124
125 } else {
126 // send the static NoJob
127 conn->async_write(NoJob,
128 boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this,
129 boost::asio::placeholders::error, conn));
130
131 // then there must be no read necesary
132
133 ELOG(2, "There is currently no job present in the queue.");
134 }
135 }
136 else
137 {
138 // An error occurred. Log it and return. Since we are not starting a new
139 // accept operation the io_service will run out of work to do and the
140 // server will exit.
141 Exitflag = WorkerErrorFlag;
142 ELOG(0, e.message());
143 }
144
145 // Start an accept operation for a new Connection only when there
146 // are still jobs present
147 if (JobsQueue.isJobPresent())
148 initiateWorkerSocket();
149}
150
151/** Callback function when job has been sent.
152 *
153 * After job has been sent we start async_read() for the result.
154 *
155 * \sa handle_ReceiveResultFromWorker()
156 *
157 * \param e error code if something went wrong
158 * \param conn reference with the connection
159 */
160void FragmentScheduler::handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn)
161{
162 Info info(__FUNCTION__);
163 LOG(1, "INFO: Job sent.");
164 // obtain result
165 LOG(1, "INFO: Receiving result for a job ...");
166 conn->async_read(result,
167 boost::bind(&FragmentScheduler::handle_ReceiveResultFromWorker, this,
168 boost::asio::placeholders::error, conn));
169}
170
171/** Callback function when result has been received.
172 *
173 * \param e error code if something went wrong
174 * \param conn reference with the connection
175 */
176void FragmentScheduler::handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
177{
178 Info info(__FUNCTION__);
179 LOG(1, "INFO: Received result for job #" << result.getId() << " ...");
180 ASSERT(result.getId() != (JobId_t)JobId::NoJob,
181 "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has NoJob id.");
182 ASSERT(result.getId() != (JobId_t)JobId::IllegalJob,
183 "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has IllegalJob id.");
184 // place id into expected
185 if ((result.getId() != (JobId_t)JobId::NoJob) && (result.getId() != (JobId_t)JobId::IllegalJob))
186 JobsQueue.pushResult(result);
187 // erase result
188 result = FragmentResult(JobId::NoJob);
189 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
190}
191
192/** Handle a new controller connection.
193 *
194 * \sa handle_ReceiveJobs()
195 * \sa handle_CheckResultState()
196 * \sa handle_SendResults()
197 *
198 * \param e error code if something went wrong
199 * \param conn reference with the connection
200 */
201void FragmentScheduler::handle_AcceptController(const boost::system::error_code& e, connection_ptr conn)
202{
203 Info info(__FUNCTION__);
204 if (!e)
205 {
206 conn->async_read(choice,
207 boost::bind(&FragmentScheduler::handle_ReadChoice, this,
208 boost::asio::placeholders::error, conn));
209 }
210 else
211 {
212 // An error occurred. Log it and return. Since we are not starting a new
213 // accept operation the io_service will run out of work to do and the
214 // server will exit.
215 Exitflag = ControllerErrorFlag;
216 ELOG(0, e.message());
217 }
218}
219
220/** Controller callback function to read the choice for next operation.
221 *
222 * \param e error code if something went wrong
223 * \param conn reference with the connection
224 */
225void FragmentScheduler::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
226{
227 Info info(__FUNCTION__);
228 if (!e)
229 {
230 // switch over the desired choice read previously
231 switch(choice) {
232 case NoOperation:
233 {
234 ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with NoOperation.");
235 break;
236 }
237 case ReceiveJobs:
238 {
239 // The connection::async_write() function will automatically
240 // serialize the data structure for us.
241 LOG(1, "INFO: Receiving bunch of jobs from a controller ...");
242 conn->async_read(jobs,
243 boost::bind(&FragmentScheduler::handle_ReceiveJobs, this,
244 boost::asio::placeholders::error, conn));
245 break;
246 }
247 case CheckState:
248 {
249 // first update number
250 doneJobs = JobsQueue.getDoneJobs();
251 // now we accept connections to check for state of calculations
252 LOG(1, "INFO: Sending state that "+toString(doneJobs)+" jobs are done to controller ...");
253 conn->async_write(doneJobs,
254 boost::bind(&FragmentScheduler::handle_CheckResultState, this,
255 boost::asio::placeholders::error, conn));
256
257 initiateControllerSocket();
258 break;
259 }
260 case SendResults:
261 {
262 const std::vector<FragmentResult> results = JobsQueue.getAllResults();
263 // ... or we give the results
264 LOG(1, "INFO: Sending "+toString(results.size())+" results to controller ...");
265 conn->async_write(results,
266 boost::bind(&FragmentScheduler::handle_SendResults, this,
267 boost::asio::placeholders::error, conn));
268
269 initiateControllerSocket();
270 break;
271 }
272 default:
273 Exitflag = ControllerErrorFlag;
274 ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with no valid choice.");
275 break;
276 }
277 // restore NoOperation choice such that choice is not read twice
278 choice = NoOperation;
279
280 }
281 else
282 {
283 // An error occurred. Log it and return. Since we are not starting a new
284 // accept operation the io_service will run out of work to do and the
285 // server will exit.
286 Exitflag = ControllerErrorFlag;
287 ELOG(0, e.message());
288 }
289}
290
291/** Controller callback function when job has been sent.
292 *
293 * We check here whether the worker socket is accepting, if there
294 * have been no jobs we re-activate it, as it is shut down after
295 * last job.
296 *
297 * \param e error code if something went wrong
298 * \param conn reference with the connection
299 */
300void FragmentScheduler::handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn)
301{
302 Info info(__FUNCTION__);
303 bool initiateSocket = !JobsQueue.isJobPresent();
304
305 // jobs are received, hence place in JobsQueue
306 if (!jobs.empty()) {
307 LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue.");
308 JobsQueue.pushJobs(jobs);
309 // initiate socket if we had no jobs before
310 if (initiateSocket)
311 initiateWorkerSocket();
312 // launch new acceptor of queue has been filled/is full
313 initiateControllerSocket();
314 } else {
315 LOG(1, "INFO: Shutting down controller socket.");
316 }
317
318 jobs.clear();
319
320}
321
322/** Controller callback function when checking on state of results.
323 *
324 * \param e error code if something went wrong
325 * \param conn reference with the connection
326 */
327void FragmentScheduler::handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn)
328{
329 Info info(__FUNCTION__);
330 // do nothing
331 LOG(1, "INFO: Sent that " << doneJobs << " jobs are done.");
332}
333
334/** Controller callback function when result has been received.
335 *
336 * \param e error code if something went wrong
337 * \param conn reference with the connection
338 */
339void FragmentScheduler::handle_SendResults(const boost::system::error_code& e, connection_ptr conn)
340{
341 Info info(__FUNCTION__);
342 // do nothing
343 LOG(1, "INFO: Results have been sent.");
344}
345
Note: See TracBrowser for help on using the repository browser.