source: src/Fragmentation/Automation/controller.cpp@ fe95b7

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since fe95b7 was 986885, checked in by Frederik Heber <heber@…>, 13 years ago

Added specific Command to controller to receive mpqc results and combine them.

  • new entry in CommandIndices ReceiveMPQCIndex.
  • is not done. This should be implemented using the already present methods in the rest of MoleCuilder. This should go into the function printreceivedmpqcresults().
  • Property mode set to 100644
File size: 13.9 KB
Line 
1/*
2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * \file controller.cpp
10 *
11 * This file strongly follows the Serialization example from the boost::asio
12 * library (see client.cpp)
13 *
14 * Created on: Nov 27, 2011
15 * Author: heber
16 */
17
18
19// include config.h
20#ifdef HAVE_CONFIG_H
21#include <config.h>
22#endif
23
24// boost asio needs specific operator new
25#include <boost/asio.hpp>
26
27#include "CodePatterns/MemDebug.hpp"
28
29#include <boost/archive/text_oarchive.hpp>
30#include <boost/archive/text_iarchive.hpp>
31#include <fstream>
32#include <iostream>
33#include <map>
34#include <sstream>
35#include <streambuf>
36#include <vector>
37
38#include "atexit.hpp"
39#include "CodePatterns/Info.hpp"
40#include "CodePatterns/Log.hpp"
41#include "Controller/FragmentController.hpp"
42#include "Controller/Commands/CheckResultsOperation.hpp"
43#include "Controller/Commands/GetNextJobIdOperation.hpp"
44#include "Controller/Commands/ReceiveJobsOperation.hpp"
45#include "Controller/Commands/SendResultsOperation.hpp"
46#include "Controller/Commands/ShutdownOperation.hpp"
47#include "Jobs/MPQCCommandJob.hpp"
48#include "Jobs/MPQCCommandJob_MPQCData.hpp"
49#include "Jobs/SystemCommandJob.hpp"
50#include "Results/FragmentResult.hpp"
51
52enum CommandIndices {
53 UnknownCommandIndex = 0,
54 AddJobsIndex = 1,
55 CreateJobsIndex = 2,
56 CheckResultsIndex = 3,
57 ReceiveResultsIndex = 4,
58 ReceiveMPQCIndex = 5,
59 ShutdownIndex = 6,
60};
61
62/** Requests an available id from server
63 *
64 * @param controller FragmentController with CommandRegistry
65 * @param host address of server
66 * @param service port/service of server
67 */
68void requestid(
69 FragmentController &controller,
70 const std::string &host,
71 const std::string &service)
72{
73 GetNextJobIdOperation *getnextid = static_cast<GetNextJobIdOperation *>(
74 controller.Commands.getByName("getnextjobid"));
75 (*getnextid)(host,service);
76}
77
78/** Returns another available id from a finished GetNextJobIdOperation.
79 *
80 * @param controller FragmentController with CommandRegistry
81 * @return next available id
82 */
83JobId_t getavailableid(FragmentController &controller)
84{
85 GetNextJobIdOperation *getnextid = static_cast<GetNextJobIdOperation *>(
86 controller.Commands.getByName("getnextjobid"));
87 const JobId_t nextid = getnextid->getNextId();
88 LOG(1, "INFO: Next available id is " << nextid << ".");
89 return nextid;
90}
91
92void createjobs(
93 std::vector<FragmentJob::ptr> &jobs,
94 const std::string &command,
95 const std::string &argument,
96 const JobId_t nextid)
97{
98 FragmentJob::ptr testJob( new SystemCommandJob(command, argument, nextid) );
99 jobs.push_back(testJob);
100 LOG(1, "INFO: Added one SystemCommandJob.");
101}
102
103/** Creates a MPQCCommandJob with argument \a filename.
104 *
105 * @param jobs created job is added to this vector
106 * @param filename filename being argument to job
107 * @param nextid id for this job
108 */
109void parsejob(
110 std::vector<FragmentJob::ptr> &jobs,
111 const std::string &filename,
112 const JobId_t nextid)
113{
114 std::ifstream file;
115 file.open(filename.c_str());
116 ASSERT( file.good(), "parsejob() - file "+filename+" does not exist.");
117 std::string output((std::istreambuf_iterator<char>(file)),
118 std::istreambuf_iterator<char>());
119 FragmentJob::ptr testJob( new MPQCCommandJob(output, nextid) );
120 jobs.push_back(testJob);
121 file.close();
122 LOG(1, "INFO: Added MPQCCommandJob from file "+filename+".");
123}
124
125/** Adds a vector of jobs to the send operation.
126 *
127 * @param controller FragmentController with CommandRegistry
128 * @param jobs jobs to add
129 */
130void addjobs(
131 FragmentController &controller,
132 std::vector<FragmentJob::ptr> &jobs)
133{
134 ReceiveJobsOperation *recjobs = static_cast<ReceiveJobsOperation *>(
135 controller.Commands.getByName("receivejobs"));
136 recjobs->addJobs(jobs);
137}
138
139/** Sends contained jobs in operation to server
140 *
141 * @param controller FragmentController with CommandRegistry
142 * @param host address of server
143 * @param service port/service of server
144 */
145void sendjobs(
146 FragmentController &controller,
147 const std::string &host,
148 const std::string &service)
149{
150 ReceiveJobsOperation *recjobs = static_cast<ReceiveJobsOperation *>(
151 controller.Commands.getByName("receivejobs"));
152 (*recjobs)(host, service);
153}
154
155/** Obtains scheduled and done jobs from server
156 *
157 * @param controller FragmentController with CommandRegistry
158 * @param host address of server
159 * @param service port/service of server
160 */
161void checkresults(
162 FragmentController &controller,
163 const std::string &host,
164 const std::string &service)
165{
166 CheckResultsOperation *checkres = static_cast<CheckResultsOperation *>(
167 controller.Commands.getByName("checkresults"));
168 (*checkres)(host, service);
169}
170
171/** Prints scheduled and done jobs.
172 *
173 * @param controller FragmentController with CommandRegistry
174 */
175void printdonejobs(FragmentController &controller)
176{
177 CheckResultsOperation *checkres = static_cast<CheckResultsOperation *>(
178 controller.Commands.getByName("checkresults"));
179 const size_t doneJobs = checkres->getDoneJobs();
180 const size_t presentJobs = checkres->getPresentJobs();
181 LOG(1, "INFO: #" << presentJobs << " are waiting in the queue and #" << doneJobs << " jobs are calculated so far.");
182}
183
184/** Obtains results from done jobs from server.
185 *
186 * @param controller FragmentController with CommandRegistry
187 * @param host address of server
188 * @param service port/service of server
189 */
190void receiveresults(
191 FragmentController &controller,
192 const std::string &host,
193 const std::string &service)
194{
195 SendResultsOperation *sendres = static_cast<SendResultsOperation *>(
196 controller.Commands.getByName("sendresults"));
197 (*sendres)(host, service);
198}
199
200/** Print received results.
201 *
202 * @param controller FragmentController with CommandRegistry
203 */
204void printreceivedresults(FragmentController &controller)
205{
206 SendResultsOperation *sendres = static_cast<SendResultsOperation *>(
207 controller.Commands.getByName("sendresults"));
208 std::vector<FragmentResult::ptr> results = sendres->getResults();
209 for (std::vector<FragmentResult::ptr>::const_iterator iter = results.begin();
210 iter != results.end(); ++iter)
211 LOG(1, "RESULT: job #"+toString((*iter)->getId())+": "+toString((*iter)->result));
212}
213
214/** Print received results.
215 *
216 * @param controller FragmentController with CommandRegistry
217 */
218void printreceivedmpqcresults(FragmentController &controller, const std::string &KeySetFilename)
219{
220 SendResultsOperation *sendres = static_cast<SendResultsOperation *>(
221 controller.Commands.getByName("sendresults"));
222 std::vector<FragmentResult::ptr> results = sendres->getResults();
223
224 // parse in KeySetfile
225// const size_t MAXBUFFER = 256;
226 std::ifstream inputfile;
227 inputfile.open(KeySetFilename.c_str());
228// while (inputfile.getline(buffer, MAXBUFFER)) {
229//
230// }
231 inputfile.close();
232
233 // combine all found data
234 std::vector<MPQCData> fragmentData(results.size());
235 MPQCData combinedData;
236
237 LOG(2, "DEBUG: Parsing now through " << results.size() << " results.");
238 for (std::vector<FragmentResult::ptr>::const_iterator iter = results.begin();
239 iter != results.end(); ++iter) {
240 LOG(1, "RESULT: job #"+toString((*iter)->getId())+": "+toString((*iter)->result));
241 MPQCData extractedData;
242 std::stringstream inputstream((*iter)->result);
243 boost::archive::text_iarchive ia(inputstream);
244 ia >> extractedData;
245 LOG(1, "INFO: extracted data is " << extractedData << ".");
246 }
247}
248
249/** Sends shutdown signal to server
250 *
251 * @param controller FragmentController with CommandRegistry
252 * @param host address of server
253 * @param service port/service of server
254 */
255void shutdown(
256 FragmentController &controller,
257 const std::string &host,
258 const std::string &service)
259{
260 ShutdownOperation *shutdown = static_cast<ShutdownOperation *>(
261 controller.Commands.getByName("shutdown"));
262 (*shutdown)(host, service);
263}
264
265/** Returns a unique index for every command to allow switching over it.
266 *
267 * \param &commandmap map with command strings
268 * \param &cmd command string
269 * \return index from CommandIndices: UnkownCommandIndex - unknown command, else - command index
270 */
271CommandIndices getCommandIndex(std::map<std::string, CommandIndices> &commandmap, const std::string &cmd)
272{
273 std::map<std::string, CommandIndices>::const_iterator iter = commandmap.find(cmd);
274 if (iter != commandmap.end())
275 return iter->second;
276 else
277 return UnknownCommandIndex;
278}
279
280
281int main(int argc, char* argv[])
282{
283 // from this moment on, we need to be sure to deeinitialize in the correct order
284 // this is handled by the cleanup function
285 atexit(cleanUp);
286
287 setVerbosity(3);
288
289 size_t Exitflag = 0;
290 typedef std::map<std::string, CommandIndices> CommandsMap_t;
291 CommandsMap_t CommandsMap;
292 CommandsMap.insert( std::make_pair("addjobs", AddJobsIndex) );
293 CommandsMap.insert( std::make_pair("createjobs", CreateJobsIndex) );
294 CommandsMap.insert( std::make_pair("checkresults", CheckResultsIndex) );
295 CommandsMap.insert( std::make_pair("receiveresults", ReceiveResultsIndex) );
296 CommandsMap.insert( std::make_pair("receivempqc", ReceiveMPQCIndex) );
297 CommandsMap.insert( std::make_pair("shutdown", ShutdownIndex) );
298 try
299 {
300 // Check command line arguments.
301 if (argc < 4)
302 {
303 std::cerr << "Usage: " << argv[0] << " <host> <port> <command> [options to command]" << std::endl;
304 std::cerr << "List of available commands:" << std::endl;
305 for(CommandsMap_t::const_iterator iter = CommandsMap.begin();
306 iter != CommandsMap.end(); ++iter) {
307 std::cerr << "\t" << iter->first << std::endl;
308 }
309 return 1;
310 }
311
312 boost::asio::io_service io_service;
313 FragmentController controller(io_service);
314
315 // Initial phase: information gathering from server
316
317 switch(getCommandIndex(CommandsMap, argv[3])) {
318 case AddJobsIndex:
319 {
320 if (argc < 5) {
321 ELOG(1, "Please add a filename for the MPQCCommandJob.");
322 } else {
323 // get an id for every filename
324 for (int argcount = 4; argcount < argc; ++argcount) {
325 requestid(controller, argv[1], argv[2]);
326 }
327 }
328 break;
329 }
330 case CreateJobsIndex:
331 {
332 std::vector<FragmentJob::ptr> jobs;
333 if (argc < 6) {
334 ELOG(1, "'createjobs' requires two options: [command] [argument].");
335 } else {
336 requestid(controller, argv[1], argv[2]);
337 }
338 break;
339 }
340 case CheckResultsIndex:
341 break;
342 case ReceiveResultsIndex:
343 break;
344 case ReceiveMPQCIndex:
345 break;
346 case ShutdownIndex:
347 break;
348 case UnknownCommandIndex:
349 default:
350 ELOG(1, "Unrecognized command '"+toString(argv[3])+"'.");
351 break;
352 }
353
354 {
355 io_service.reset();
356 Info info("io_service: Phase One");
357 io_service.run();
358 }
359
360 // Second phase: Building jobs and sending information to server
361
362 switch(getCommandIndex(CommandsMap, argv[3])) {
363 case AddJobsIndex:
364 {
365 std::vector<FragmentJob::ptr> jobs;
366 if (argc < 5) {
367 ELOG(1, "Please add a filename for the MPQCCommandJob.");
368 } else {
369 for (int argcount = 4; argcount < argc; ++argcount) {
370 const JobId_t next_id = getavailableid(controller);
371 const std::string filename(argv[argcount]);
372 LOG(1, "INFO: Creating MPQCCommandJob with filename'"
373 +filename+"', and id "+toString(next_id)+".");
374 parsejob(jobs, filename, next_id);
375 }
376 addjobs(controller, jobs);
377 sendjobs(controller, argv[1], argv[2]);
378 }
379 break;
380 }
381 case CreateJobsIndex:
382 {
383 std::vector<FragmentJob::ptr> jobs;
384 if (argc < 6) {
385 ELOG(1, "'createjobs' requires two options: [command] [argument].");
386 } else {
387 const JobId_t next_id = getavailableid(controller);
388 const std::string command(argv[4]);
389 const std::string argument(argv[5]);
390 LOG(1, "INFO: Creating SystemCommandJob with command '"
391 +command+"', argument '"+argument+"', and id "+toString(next_id)+".");
392 createjobs(jobs, command, argument, next_id);
393 }
394 addjobs(controller, jobs);
395 sendjobs(controller, argv[1], argv[2]);
396 break;
397 }
398 case CheckResultsIndex:
399 {
400 checkresults(controller, argv[1], argv[2]);
401 break;
402 }
403 case ReceiveResultsIndex:
404 {
405 receiveresults(controller, argv[1], argv[2]);
406 break;
407 }
408 case ReceiveMPQCIndex:
409 {
410 receiveresults(controller, argv[1], argv[2]);
411 break;
412 }
413 case ShutdownIndex:
414 {
415 shutdown(controller, argv[1], argv[2]);
416 break;
417 }
418 case UnknownCommandIndex:
419 default:
420 ELOG(0, "Unrecognized command '"+toString(argv[3])+"'.");
421 break;
422 }
423
424 {
425 io_service.reset();
426 Info info("io_service: Phase Two");
427 io_service.run();
428 }
429
430 // Final phase: Print result of command
431
432 switch(getCommandIndex(CommandsMap, argv[3])) {
433 case AddJobsIndex:
434 case CreateJobsIndex:
435 break;
436 case CheckResultsIndex:
437 {
438 printdonejobs(controller);
439 break;
440 }
441 case ReceiveResultsIndex:
442 {
443 printreceivedresults(controller);
444 break;
445 }
446 case ReceiveMPQCIndex:
447 {
448 if (argc == 4) {
449 ELOG(1, "'receivempqc' requires one option: [KeySetFilename].");
450 } else {
451 const std::string KeySetFilename = argv[4];
452 LOG(1, "INFO: Parsing id associations from " << KeySetFilename << ".");
453 printreceivedmpqcresults(controller, KeySetFilename);
454 }
455 break;
456 }
457 case ShutdownIndex:
458 break;
459 case UnknownCommandIndex:
460 default:
461 ELOG(0, "Unrecognized command '"+toString(argv[3])+"'.");
462 break;
463 }
464 Exitflag = controller.getExitflag();
465 }
466 catch (std::exception& e)
467 {
468 std::cerr << e.what() << std::endl;
469 }
470
471 return Exitflag;
472}
Note: See TracBrowser for help on using the repository browser.