Changeset db03d9 for src/Fragmentation/Automation
- Timestamp:
- May 4, 2012, 2:19:07 PM (13 years ago)
- Branches:
- Action_Thermostats, Add_AtomRandomPerturbation, Add_FitFragmentPartialChargesAction, Add_RotateAroundBondAction, Add_SelectAtomByNameAction, Added_ParseSaveFragmentResults, AddingActions_SaveParseParticleParameters, Adding_Graph_to_ChangeBondActions, Adding_MD_integration_tests, Adding_ParticleName_to_Atom, Adding_StructOpt_integration_tests, AtomFragments, Automaking_mpqc_open, AutomationFragmentation_failures, Candidate_v1.5.4, Candidate_v1.6.0, Candidate_v1.6.1, ChangeBugEmailaddress, ChangingTestPorts, ChemicalSpaceEvaluator, CombiningParticlePotentialParsing, Combining_Subpackages, Debian_Package_split, Debian_package_split_molecuildergui_only, Disabling_MemDebug, Docu_Python_wait, EmpiricalPotential_contain_HomologyGraph, EmpiricalPotential_contain_HomologyGraph_documentation, Enable_parallel_make_install, Enhance_userguide, Enhanced_StructuralOptimization, Enhanced_StructuralOptimization_continued, Example_ManyWaysToTranslateAtom, Exclude_Hydrogens_annealWithBondGraph, FitPartialCharges_GlobalError, Fix_BoundInBox_CenterInBox_MoleculeActions, Fix_ChargeSampling_PBC, Fix_ChronosMutex, Fix_FitPartialCharges, Fix_FitPotential_needs_atomicnumbers, Fix_ForceAnnealing, Fix_IndependentFragmentGrids, Fix_ParseParticles, Fix_ParseParticles_split_forward_backward_Actions, Fix_PopActions, Fix_QtFragmentList_sorted_selection, Fix_Restrictedkeyset_FragmentMolecule, Fix_StatusMsg, Fix_StepWorldTime_single_argument, Fix_Verbose_Codepatterns, Fix_fitting_potentials, Fixes, ForceAnnealing_goodresults, ForceAnnealing_oldresults, ForceAnnealing_tocheck, ForceAnnealing_with_BondGraph, ForceAnnealing_with_BondGraph_continued, ForceAnnealing_with_BondGraph_continued_betteresults, ForceAnnealing_with_BondGraph_contraction-expansion, FragmentAction_writes_AtomFragments, FragmentMolecule_checks_bonddegrees, GeometryObjects, Gui_Fixes, Gui_displays_atomic_force_velocity, ImplicitCharges, IndependentFragmentGrids, IndependentFragmentGrids_IndividualZeroInstances, IndependentFragmentGrids_IntegrationTest, IndependentFragmentGrids_Sole_NN_Calculation, JobMarket_RobustOnKillsSegFaults, JobMarket_StableWorkerPool, JobMarket_unresolvable_hostname_fix, MoreRobust_FragmentAutomation, ODR_violation_mpqc_open, PartialCharges_OrthogonalSummation, PdbParser_setsAtomName, PythonUI_with_named_parameters, QtGui_reactivate_TimeChanged_changes, Recreated_GuiChecks, Rewrite_FitPartialCharges, RotateToPrincipalAxisSystem_UndoRedo, SaturateAtoms_findBestMatching, SaturateAtoms_singleDegree, StoppableMakroAction, Subpackage_CodePatterns, Subpackage_JobMarket, Subpackage_LinearAlgebra, Subpackage_levmar, Subpackage_mpqc_open, Subpackage_vmg, Switchable_LogView, ThirdParty_MPQC_rebuilt_buildsystem, TrajectoryDependenant_MaxOrder, TremoloParser_IncreasedPrecision, TremoloParser_MultipleTimesteps, TremoloParser_setsAtomName, Ubuntu_1604_changes, stable
- Children:
- 3c4a5e
- Parents:
- 8ee5ac
- git-author:
- Frederik Heber <heber@…> (11/27/11 23:20:43)
- git-committer:
- Frederik Heber <heber@…> (05/04/12 14:19:07)
- Location:
- src/Fragmentation/Automation
- Files:
-
- 3 added
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
src/Fragmentation/Automation/FragmentScheduler.cpp
r8ee5ac rdb03d9 41 41 FragmentJob FragmentScheduler::NoJob(std::string("NoJob"), JobId::NoJob); 42 42 43 FragmentScheduler::FragmentScheduler(boost::asio::io_service& io_service, unsigned short port) : 44 acceptor_(io_service, 45 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port) 43 /** Constructor of class FragmentScheduler. 44 * 45 * We setup both acceptors to accept connections from workers and Controller. 46 * 47 * \param io_service io_service of the asynchronous communications 48 * \param workerport port to listen for worker connections 49 * \param controllerport port to listen for controller connections. 50 */ 51 FragmentScheduler::FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport) : 52 worker_acceptor_(io_service, 53 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), workerport) 46 54 ), 47 result(JobId::NoJob) 48 { 49 Info info(__FUNCTION__); 50 FragmentJob s(std::string("test"), 1); 51 JobsQueue.pushJob(s); 52 53 // Start an accept operation for a new connection. 54 connection_ptr new_conn(new Connection(acceptor_.get_io_service())); 55 acceptor_.async_accept(new_conn->socket(), 56 boost::bind(&FragmentScheduler::handle_accept, this, 57 boost::asio::placeholders::error, new_conn)); 58 } 59 60 /// Handle completion of a accept operation. 61 void FragmentScheduler::handle_accept(const boost::system::error_code& e, connection_ptr conn) 55 controller_acceptor_(io_service, 56 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), controllerport) 57 ), 58 result(JobId::NoJob), 59 Exitflag(OkFlag) 60 { 61 Info info(__FUNCTION__); 62 { 63 // Start an accept operation for worker connections. 64 connection_ptr new_conn(new Connection(worker_acceptor_.get_io_service())); 65 worker_acceptor_.async_accept(new_conn->socket(), 66 boost::bind(&FragmentScheduler::handle_AcceptWorker, this, 67 boost::asio::placeholders::error, new_conn)); 68 } 69 70 { 71 // Start an accept operation for controller connection. 72 connection_ptr new_conn(new Connection(controller_acceptor_.get_io_service())); 73 controller_acceptor_.async_accept(new_conn->socket(), 74 boost::bind(&FragmentScheduler::handle_AcceptController, this, 75 boost::asio::placeholders::error, new_conn)); 76 } 77 } 78 79 /** Handle a new worker connection. 80 * 81 * We check whether jobs are in the JobsQueue. If present, job is sent. 82 * 83 * \sa handle_SendJobtoWorker() 84 * 85 * \param e error code if something went wrong 86 * \param conn reference with the connection 87 */ 88 void FragmentScheduler::handle_AcceptWorker(const boost::system::error_code& e, connection_ptr conn) 62 89 { 63 90 Info info(__FUNCTION__); … … 73 100 LOG(1, "INFO: Sending job #" << job.getId() << "."); 74 101 conn->async_write(job, 75 boost::bind(&FragmentScheduler::handle_SendJob , this,102 boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this, 76 103 boost::asio::placeholders::error, conn)); 77 104 78 105 // Start an accept operation for a new Connection only when there 79 106 // are still jobs present otherwise we quit. 80 connection_ptr new_conn(new Connection( acceptor_.get_io_service()));81 acceptor_.async_accept(new_conn->socket(),82 boost::bind(&FragmentScheduler::handle_ accept, this,107 connection_ptr new_conn(new Connection(worker_acceptor_.get_io_service())); 108 worker_acceptor_.async_accept(new_conn->socket(), 109 boost::bind(&FragmentScheduler::handle_AcceptWorker, this, 83 110 boost::asio::placeholders::error, new_conn)); 84 111 } else { 85 112 // send the static NoJob 86 113 conn->async_write(NoJob, 87 boost::bind(&FragmentScheduler::handle_SendJob , this,114 boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this, 88 115 boost::asio::placeholders::error, conn)); 89 116 … … 102 129 } 103 130 104 /// Callback function when job has been sent. 105 void FragmentScheduler::handle_SendJob(const boost::system::error_code& e, connection_ptr conn) 131 /** Callback function when job has been sent. 132 * 133 * After job has been sent we start async_read() for the result. 134 * 135 * \sa handle_ReceiveResultFromWorker() 136 * 137 * \param e error code if something went wrong 138 * \param conn reference with the connection 139 */ 140 void FragmentScheduler::handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn) 106 141 { 107 142 Info info(__FUNCTION__); … … 110 145 LOG(1, "INFO: Receiving result for a job ..."); 111 146 conn->async_read(result, 112 boost::bind(&FragmentScheduler::handle_ReceiveResult , this,147 boost::bind(&FragmentScheduler::handle_ReceiveResultFromWorker, this, 113 148 boost::asio::placeholders::error, conn)); 114 149 } 115 150 116 /// Callback function when result has been received. 117 void FragmentScheduler::handle_ReceiveResult(const boost::system::error_code& e, connection_ptr conn) 118 { 119 Info info(__FUNCTION__); 120 // nothing to do 121 LOG(1, "INFO: Received result for job #" << result.getId() << " ..."); 122 // and push into queue 123 ASSERT(result.getId() != JobId::NoJob, 124 "FragmentScheduler::handle_write() - result received has NoJob id."); 125 ASSERT(result.getId() != JobId::IllegalJob, 126 "FragmentScheduler::handle_write() - result received has IllegalJob id."); 127 if ((result.getId() != JobId::NoJob) && (result.getId() != JobId::IllegalJob)) 128 JobsQueue.pushResult(result); 129 // erase result 130 result = FragmentResult(JobId::NoJob); 131 } 132 151 /** Callback function when result has been received. 152 * 153 * \param e error code if something went wrong 154 * \param conn reference with the connection 155 */ 156 void FragmentScheduler::handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn) 157 { 158 Info info(__FUNCTION__); 159 // nothing to do 160 LOG(1, "INFO: Received result for job #" << result.getId() << " ..."); 161 // and push into queue 162 ASSERT(result.getId() != (JobId_t)JobId::NoJob, 163 "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has NoJob id."); 164 ASSERT(result.getId() != (JobId_t)JobId::IllegalJob, 165 "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has IllegalJob id."); 166 if ((result.getId() != (JobId_t)JobId::NoJob) && (result.getId() != (JobId_t)JobId::IllegalJob)) 167 JobsQueue.pushResult(result); 168 // erase result 169 result = FragmentResult(JobId::NoJob); 170 } 171 172 /** Handle a new controller connection. 173 * 174 * \sa handle_ReceiveJobs() 175 * \sa handle_CheckResultState() 176 * \sa handle_SendResults() 177 * 178 * \param e error code if something went wrong 179 * \param conn reference with the connection 180 */ 181 void FragmentScheduler::handle_AcceptController(const boost::system::error_code& e, connection_ptr conn) 182 { 183 Info info(__FUNCTION__); 184 if (!e) 185 { 186 if (jobs.empty()) { 187 // The connection::async_write() function will automatically 188 // serialize the data structure for us. 189 LOG(1, "INFO: Receiving bunch of jobs from a controller ..."); 190 conn->async_read(jobs, 191 boost::bind(&FragmentScheduler::handle_ReceiveJobs, this, 192 boost::asio::placeholders::error, conn)); 193 } 194 } 195 else 196 { 197 // An error occurred. Log it and return. Since we are not starting a new 198 // accept operation the io_service will run out of work to do and the 199 // server will exit. 200 Exitflag = ErrorFlag; 201 ELOG(0, e.message()); 202 } 203 } 204 205 /** Controller callback function when job has been sent. 206 * 207 * \param e error code if something went wrong 208 * \param conn reference with the connection 209 */ 210 void FragmentScheduler::handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn) 211 { 212 Info info(__FUNCTION__); 213 // jobs are received, hence place in JobsQueue 214 if (!jobs.empty()) { 215 LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue."); 216 JobsQueue.pushJobs(jobs); 217 } 218 // launch new acceptor of queue has been filled/is full 219 if (JobsQueue.isJobPresent()) { 220 // Start an accept operation for a new Connection. 221 connection_ptr new_conn(new Connection(controller_acceptor_.get_io_service())); 222 controller_acceptor_.async_accept(new_conn->socket(), 223 boost::bind(&FragmentScheduler::handle_AcceptController, this, 224 boost::asio::placeholders::error, new_conn)); 225 } else { 226 LOG(1, "INFO: Shutting down controller socket."); 227 } 228 229 jobs.clear(); 230 } 231 -
src/Fragmentation/Automation/FragmentScheduler.hpp
r8ee5ac rdb03d9 30 30 /// Constructor opens the acceptor and starts waiting for the first incoming 31 31 /// Connection. 32 FragmentScheduler(boost::asio::io_service& io_service, unsigned short port);32 FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport); 33 33 34 /// Handle completion of a accept operation. 35 void handle_accept(const boost::system::error_code& e, connection_ptr conn); 34 enum Exitflag_t{ 35 OkFlag = 0, 36 ErrorFlag = 255 37 }; 36 38 37 /// Callback function when job has been sent. 38 void handle_SendJob(const boost::system::error_code& e, connection_ptr conn); 39 /** Getter for Exitflag. 40 * 41 * @return Exitflag of operations 42 */ 43 size_t getExitflag() const 44 { 45 return Exitflag; 46 } 39 47 40 /// Callback function when result has been received. 41 void handle_ReceiveResult(const boost::system::error_code& e, connection_ptr conn); 48 protected: 49 /// Handle completion of a accept worker operation. 50 void handle_AcceptWorker(const boost::system::error_code& e, connection_ptr conn); 51 52 /// Handle completion of a accept controller operation. 53 void handle_AcceptController(const boost::system::error_code& e, connection_ptr conn); 54 55 /// Worker callback function when job has been sent. 56 void handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn); 57 58 /// Worker callback function when result has been received. 59 void handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn); 60 61 /// Controller callback function when job has been sent. 62 void handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn); 42 63 43 64 private: 44 /// The acceptor object used to accept incoming socket connections. 45 boost::asio::ip::tcp::acceptor acceptor_; 65 /// The acceptor object used to accept incoming worker socket connections. 66 boost::asio::ip::tcp::acceptor worker_acceptor_; 67 68 /// The acceptor object used to accept incoming controller socket connections. 69 boost::asio::ip::tcp::acceptor controller_acceptor_; 46 70 47 71 /// result that is received from the client. 48 72 FragmentResult result; 73 74 /// bunch of jobs received from controller before placed in JobsQueue 75 std::vector<FragmentJob> jobs; 49 76 50 77 /// Queue with data to be sent to each client. … … 53 80 // static entity to indicate to clients that the queue is empty. 54 81 static FragmentJob NoJob; 82 83 // Exit flag on program exit 84 enum Exitflag_t Exitflag; 55 85 }; 56 86 -
src/Fragmentation/Automation/FragmentWorker.hpp
r8ee5ac rdb03d9 29 29 const std::string& host, const std::string& service); 30 30 31 /// Handle completion of a connect operation.32 void handle_connect(const boost::system::error_code& e,33 boost::asio::ip::tcp::resolver::iterator endpoint_iterator);34 35 /// Callback function when job has been received.36 void handle_ReceiveJob(const boost::system::error_code& e);37 38 /// Callback function when result has been sent.39 void handle_SendResult(const boost::system::error_code& e);40 41 31 enum Exitflag_t { 42 32 OkFlag = 0, … … 52 42 return Exitflag; 53 43 } 44 45 protected: 46 /// Handle completion of a connect operation. 47 void handle_connect(const boost::system::error_code& e, 48 boost::asio::ip::tcp::resolver::iterator endpoint_iterator); 49 50 /// Callback function when job has been received. 51 void handle_ReceiveJob(const boost::system::error_code& e); 52 53 /// Callback function when result has been sent. 54 void handle_SendResult(const boost::system::error_code& e); 54 55 55 56 private: -
src/Fragmentation/Automation/Makefile.am
r8ee5ac rdb03d9 45 45 AM_CPPFLAGS = ${BOOST_CPPFLAGS} ${CodePatterns_CFLAGS} 46 46 47 bin_PROGRAMS += Server Worker 47 bin_PROGRAMS += JobAdder Server Worker 48 49 CONTROLLERSOURCE = \ 50 FragmentController.cpp 51 52 CONTROLLERHEADER = \ 53 Connection.hpp \ 54 FragmentController.hpp 48 55 49 56 SERVERSOURCE = \ 50 FragmentScheduler.cpp \ 51 Server.cpp 57 FragmentScheduler.cpp 52 58 53 59 SERVERHEADER = \ … … 63 69 FragmentWorker.hpp 64 70 65 Server_SOURCES = $(SERVERSOURCE) $(SERVERHEADER) 71 JobAdder_SOURCES = $(CONTROLLERSOURCE) $(CONTROLLERHEADER) JobAdder.cpp 72 JobAdder_LDFLAGS = $(AM_LDFLAGS) $(BOOST_ASIO_LDFLAGS) $(BOOST_SYSTEM_LDFLAGS) $(BOOST_THREAD_LDFLAGS) $(BOOST_SERIALIZATION_LDFLAGS) 73 JobAdder_CXXFLAGS = $(AM_CPPFLAGS) 74 JobAdder_LDADD = \ 75 libMolecuilderFragmentJobs.la \ 76 libMolecuilderFragmentationAutomation.la \ 77 $(BOOST_ASIO_LIBS) \ 78 $(BOOST_SERIALIZATION_LIBS) \ 79 $(BOOST_THREAD_LIBS) \ 80 $(BOOST_SYSTEM_LIBS) \ 81 ${CodePatterns_LIBS} 82 83 Server_SOURCES = $(SERVERSOURCE) $(SERVERHEADER) Server.cpp 66 84 Server_LDFLAGS = $(AM_LDFLAGS) $(BOOST_ASIO_LDFLAGS) $(BOOST_SYSTEM_LDFLAGS) $(BOOST_THREAD_LDFLAGS) $(BOOST_SERIALIZATION_LDFLAGS) 67 85 Server_CXXFLAGS = $(AM_CPPFLAGS) -
src/Fragmentation/Automation/Server.cpp
r8ee5ac rdb03d9 43 43 setVerbosity(3); 44 44 45 size_t Exitflag = 0; 45 46 try 46 47 { 47 48 // Check command line arguments. 48 if (argc != 2)49 if (argc != 3) 49 50 { 50 std::cerr << "Usage: " << argv[0] << " < port>" << std::endl;51 std::cerr << "Usage: " << argv[0] << " <worker port> <controller port>" << std::endl; 51 52 return 1; 52 53 } 53 unsigned short port = boost::lexical_cast<unsigned short>(argv[1]); 54 std::cout << "Listening on port " << port << "." << std::endl; 54 unsigned short workerport = boost::lexical_cast<unsigned short>(argv[1]); 55 std::cout << "Listening for workers on port " << workerport << "." << std::endl; 56 unsigned short controllerport = boost::lexical_cast<unsigned short>(argv[2]); 57 std::cout << "Listening for controller on port " << controllerport << "." << std::endl; 55 58 56 59 boost::asio::io_service io_service; 57 FragmentScheduler Server(io_service, port);60 FragmentScheduler Server(io_service, workerport, controllerport); 58 61 { 59 62 Info info("io_service"); 60 63 io_service.run(); 61 64 } 65 Exitflag = Server.getExitflag(); 62 66 } 63 67 catch (std::exception& e) … … 66 70 } 67 71 68 return 0;72 return Exitflag; 69 73 }
Note:
See TracChangeset
for help on using the changeset viewer.