Ignore:
Timestamp:
May 4, 2012, 2:19:07 PM (13 years ago)
Author:
Frederik Heber <heber@…>
Branches:
Action_Thermostats, Add_AtomRandomPerturbation, Add_FitFragmentPartialChargesAction, Add_RotateAroundBondAction, Add_SelectAtomByNameAction, Added_ParseSaveFragmentResults, AddingActions_SaveParseParticleParameters, Adding_Graph_to_ChangeBondActions, Adding_MD_integration_tests, Adding_ParticleName_to_Atom, Adding_StructOpt_integration_tests, AtomFragments, Automaking_mpqc_open, AutomationFragmentation_failures, Candidate_v1.5.4, Candidate_v1.6.0, Candidate_v1.6.1, ChangeBugEmailaddress, ChangingTestPorts, ChemicalSpaceEvaluator, CombiningParticlePotentialParsing, Combining_Subpackages, Debian_Package_split, Debian_package_split_molecuildergui_only, Disabling_MemDebug, Docu_Python_wait, EmpiricalPotential_contain_HomologyGraph, EmpiricalPotential_contain_HomologyGraph_documentation, Enable_parallel_make_install, Enhance_userguide, Enhanced_StructuralOptimization, Enhanced_StructuralOptimization_continued, Example_ManyWaysToTranslateAtom, Exclude_Hydrogens_annealWithBondGraph, FitPartialCharges_GlobalError, Fix_BoundInBox_CenterInBox_MoleculeActions, Fix_ChargeSampling_PBC, Fix_ChronosMutex, Fix_FitPartialCharges, Fix_FitPotential_needs_atomicnumbers, Fix_ForceAnnealing, Fix_IndependentFragmentGrids, Fix_ParseParticles, Fix_ParseParticles_split_forward_backward_Actions, Fix_PopActions, Fix_QtFragmentList_sorted_selection, Fix_Restrictedkeyset_FragmentMolecule, Fix_StatusMsg, Fix_StepWorldTime_single_argument, Fix_Verbose_Codepatterns, Fix_fitting_potentials, Fixes, ForceAnnealing_goodresults, ForceAnnealing_oldresults, ForceAnnealing_tocheck, ForceAnnealing_with_BondGraph, ForceAnnealing_with_BondGraph_continued, ForceAnnealing_with_BondGraph_continued_betteresults, ForceAnnealing_with_BondGraph_contraction-expansion, FragmentAction_writes_AtomFragments, FragmentMolecule_checks_bonddegrees, GeometryObjects, Gui_Fixes, Gui_displays_atomic_force_velocity, ImplicitCharges, IndependentFragmentGrids, IndependentFragmentGrids_IndividualZeroInstances, IndependentFragmentGrids_IntegrationTest, IndependentFragmentGrids_Sole_NN_Calculation, JobMarket_RobustOnKillsSegFaults, JobMarket_StableWorkerPool, JobMarket_unresolvable_hostname_fix, MoreRobust_FragmentAutomation, ODR_violation_mpqc_open, PartialCharges_OrthogonalSummation, PdbParser_setsAtomName, PythonUI_with_named_parameters, QtGui_reactivate_TimeChanged_changes, Recreated_GuiChecks, Rewrite_FitPartialCharges, RotateToPrincipalAxisSystem_UndoRedo, SaturateAtoms_findBestMatching, SaturateAtoms_singleDegree, StoppableMakroAction, Subpackage_CodePatterns, Subpackage_JobMarket, Subpackage_LinearAlgebra, Subpackage_levmar, Subpackage_mpqc_open, Subpackage_vmg, Switchable_LogView, ThirdParty_MPQC_rebuilt_buildsystem, TrajectoryDependenant_MaxOrder, TremoloParser_IncreasedPrecision, TremoloParser_MultipleTimesteps, TremoloParser_setsAtomName, Ubuntu_1604_changes, stable
Children:
3c4a5e
Parents:
8ee5ac
git-author:
Frederik Heber <heber@…> (11/27/11 23:20:43)
git-committer:
Frederik Heber <heber@…> (05/04/12 14:19:07)
Message:

Added possibility to add jobs via a FragmentController.

  • Server listens on another port for a controller to connect, also now has an Exitflag.
  • Controller can add a bunch of jobs.
  • JobAdder can successfully add a number jobs to Server, if 0 are sent, the socket shuts down.
  • TESTFIX: extended regression Fragmentation/Automation test to use Jobadder as by default Server starts with no jobs in the queue.
Location:
src/Fragmentation/Automation
Files:
3 added
5 edited

Legend:

Unmodified
Added
Removed
  • src/Fragmentation/Automation/FragmentScheduler.cpp

    r8ee5ac rdb03d9  
    4141FragmentJob FragmentScheduler::NoJob(std::string("NoJob"), JobId::NoJob);
    4242
    43 FragmentScheduler::FragmentScheduler(boost::asio::io_service& io_service, unsigned short port) :
    44   acceptor_(io_service,
    45       boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)
     43/** Constructor of class FragmentScheduler.
     44 *
     45 * We setup both acceptors to accept connections from workers and Controller.
     46 *
     47 * \param io_service io_service of the asynchronous communications
     48 * \param workerport port to listen for worker connections
     49 * \param controllerport port to listen for controller connections.
     50 */
     51FragmentScheduler::FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport) :
     52  worker_acceptor_(io_service,
     53      boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), workerport)
    4654  ),
    47   result(JobId::NoJob)
    48 {
    49   Info info(__FUNCTION__);
    50   FragmentJob s(std::string("test"), 1);
    51   JobsQueue.pushJob(s);
    52 
    53   // Start an accept operation for a new connection.
    54   connection_ptr new_conn(new Connection(acceptor_.get_io_service()));
    55   acceptor_.async_accept(new_conn->socket(),
    56     boost::bind(&FragmentScheduler::handle_accept, this,
    57       boost::asio::placeholders::error, new_conn));
    58 }
    59 
    60 /// Handle completion of a accept operation.
    61 void FragmentScheduler::handle_accept(const boost::system::error_code& e, connection_ptr conn)
     55  controller_acceptor_(io_service,
     56      boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), controllerport)
     57  ),
     58  result(JobId::NoJob),
     59  Exitflag(OkFlag)
     60{
     61  Info info(__FUNCTION__);
     62  {
     63    // Start an accept operation for worker connections.
     64    connection_ptr new_conn(new Connection(worker_acceptor_.get_io_service()));
     65    worker_acceptor_.async_accept(new_conn->socket(),
     66      boost::bind(&FragmentScheduler::handle_AcceptWorker, this,
     67        boost::asio::placeholders::error, new_conn));
     68  }
     69
     70  {
     71    // Start an accept operation for controller connection.
     72    connection_ptr new_conn(new Connection(controller_acceptor_.get_io_service()));
     73    controller_acceptor_.async_accept(new_conn->socket(),
     74      boost::bind(&FragmentScheduler::handle_AcceptController, this,
     75        boost::asio::placeholders::error, new_conn));
     76  }
     77}
     78
     79/** Handle a new worker connection.
     80 *
     81 * We check whether jobs are in the JobsQueue. If present, job is sent.
     82 *
     83 * \sa handle_SendJobtoWorker()
     84 *
     85 * \param e error code if something went wrong
     86 * \param conn reference with the connection
     87 */
     88void FragmentScheduler::handle_AcceptWorker(const boost::system::error_code& e, connection_ptr conn)
    6289{
    6390  Info info(__FUNCTION__);
     
    73100      LOG(1, "INFO: Sending job #" << job.getId() << ".");
    74101      conn->async_write(job,
    75         boost::bind(&FragmentScheduler::handle_SendJob, this,
     102        boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this,
    76103        boost::asio::placeholders::error, conn));
    77104
    78105      // Start an accept operation for a new Connection only when there
    79106      // are still jobs present otherwise we quit.
    80       connection_ptr new_conn(new Connection(acceptor_.get_io_service()));
    81       acceptor_.async_accept(new_conn->socket(),
    82         boost::bind(&FragmentScheduler::handle_accept, this,
     107      connection_ptr new_conn(new Connection(worker_acceptor_.get_io_service()));
     108      worker_acceptor_.async_accept(new_conn->socket(),
     109        boost::bind(&FragmentScheduler::handle_AcceptWorker, this,
    83110        boost::asio::placeholders::error, new_conn));
    84111    } else {
    85112      // send the static NoJob
    86113      conn->async_write(NoJob,
    87         boost::bind(&FragmentScheduler::handle_SendJob, this,
     114        boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this,
    88115        boost::asio::placeholders::error, conn));
    89116
     
    102129}
    103130
    104 /// Callback function when job has been sent.
    105 void FragmentScheduler::handle_SendJob(const boost::system::error_code& e, connection_ptr conn)
     131/** Callback function when job has been sent.
     132 *
     133 * After job has been sent we start async_read() for the result.
     134 *
     135 * \sa handle_ReceiveResultFromWorker()
     136 *
     137 * \param e error code if something went wrong
     138 * \param conn reference with the connection
     139 */
     140void FragmentScheduler::handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn)
    106141{
    107142    Info info(__FUNCTION__);
     
    110145    LOG(1, "INFO: Receiving result for a job ...");
    111146    conn->async_read(result,
    112       boost::bind(&FragmentScheduler::handle_ReceiveResult, this,
     147      boost::bind(&FragmentScheduler::handle_ReceiveResultFromWorker, this,
    113148      boost::asio::placeholders::error, conn));
    114149}
    115150
    116 /// Callback function when result has been received.
    117 void FragmentScheduler::handle_ReceiveResult(const boost::system::error_code& e, connection_ptr conn)
    118 {
    119     Info info(__FUNCTION__);
    120     // nothing to do
    121     LOG(1, "INFO: Received result for job #" << result.getId() << " ...");
    122     // and push into queue
    123     ASSERT(result.getId() != JobId::NoJob,
    124         "FragmentScheduler::handle_write() - result received has NoJob id.");
    125     ASSERT(result.getId() != JobId::IllegalJob,
    126         "FragmentScheduler::handle_write() - result received has IllegalJob id.");
    127     if ((result.getId() != JobId::NoJob) && (result.getId() != JobId::IllegalJob))
    128       JobsQueue.pushResult(result);
    129     // erase result
    130     result = FragmentResult(JobId::NoJob);
    131 }
    132 
     151/** Callback function when result has been received.
     152 *
     153 * \param e error code if something went wrong
     154 * \param conn reference with the connection
     155 */
     156void FragmentScheduler::handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
     157{
     158  Info info(__FUNCTION__);
     159  // nothing to do
     160  LOG(1, "INFO: Received result for job #" << result.getId() << " ...");
     161  // and push into queue
     162  ASSERT(result.getId() != (JobId_t)JobId::NoJob,
     163      "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has NoJob id.");
     164  ASSERT(result.getId() != (JobId_t)JobId::IllegalJob,
     165      "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has IllegalJob id.");
     166  if ((result.getId() != (JobId_t)JobId::NoJob) && (result.getId() != (JobId_t)JobId::IllegalJob))
     167    JobsQueue.pushResult(result);
     168  // erase result
     169  result = FragmentResult(JobId::NoJob);
     170}
     171
     172/** Handle a new controller connection.
     173 *
     174 * \sa handle_ReceiveJobs()
     175 * \sa handle_CheckResultState()
     176 * \sa handle_SendResults()
     177 *
     178 * \param e error code if something went wrong
     179 * \param conn reference with the connection
     180 */
     181void FragmentScheduler::handle_AcceptController(const boost::system::error_code& e, connection_ptr conn)
     182{
     183  Info info(__FUNCTION__);
     184  if (!e)
     185  {
     186    if (jobs.empty()) {
     187      // The connection::async_write() function will automatically
     188      // serialize the data structure for us.
     189      LOG(1, "INFO: Receiving bunch of jobs from a controller ...");
     190      conn->async_read(jobs,
     191        boost::bind(&FragmentScheduler::handle_ReceiveJobs, this,
     192        boost::asio::placeholders::error, conn));
     193    }
     194  }
     195  else
     196  {
     197    // An error occurred. Log it and return. Since we are not starting a new
     198    // accept operation the io_service will run out of work to do and the
     199    // server will exit.
     200    Exitflag = ErrorFlag;
     201    ELOG(0, e.message());
     202  }
     203}
     204
     205/** Controller callback function when job has been sent.
     206 *
     207 * \param e error code if something went wrong
     208 * \param conn reference with the connection
     209 */
     210void FragmentScheduler::handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn)
     211{
     212  Info info(__FUNCTION__);
     213  // jobs are received, hence place in JobsQueue
     214  if (!jobs.empty()) {
     215    LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue.");
     216    JobsQueue.pushJobs(jobs);
     217  }
     218  // launch new acceptor of queue has been filled/is full
     219  if (JobsQueue.isJobPresent()) {
     220    // Start an accept operation for a new Connection.
     221    connection_ptr new_conn(new Connection(controller_acceptor_.get_io_service()));
     222    controller_acceptor_.async_accept(new_conn->socket(),
     223      boost::bind(&FragmentScheduler::handle_AcceptController, this,
     224      boost::asio::placeholders::error, new_conn));
     225  } else {
     226    LOG(1, "INFO: Shutting down controller socket.");
     227  }
     228
     229  jobs.clear();
     230}
     231
  • src/Fragmentation/Automation/FragmentScheduler.hpp

    r8ee5ac rdb03d9  
    3030  /// Constructor opens the acceptor and starts waiting for the first incoming
    3131  /// Connection.
    32   FragmentScheduler(boost::asio::io_service& io_service, unsigned short port);
     32  FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport);
    3333
    34   /// Handle completion of a accept operation.
    35   void handle_accept(const boost::system::error_code& e, connection_ptr conn);
     34  enum Exitflag_t{
     35    OkFlag = 0,
     36    ErrorFlag = 255
     37  };
    3638
    37   /// Callback function when job has been sent.
    38   void handle_SendJob(const boost::system::error_code& e, connection_ptr conn);
     39  /** Getter for Exitflag.
     40   *
     41   * @return Exitflag of operations
     42   */
     43  size_t getExitflag() const
     44  {
     45    return Exitflag;
     46  }
    3947
    40   /// Callback function when result has been received.
    41   void handle_ReceiveResult(const boost::system::error_code& e, connection_ptr conn);
     48protected:
     49  /// Handle completion of a accept worker operation.
     50  void handle_AcceptWorker(const boost::system::error_code& e, connection_ptr conn);
     51
     52  /// Handle completion of a accept controller operation.
     53  void handle_AcceptController(const boost::system::error_code& e, connection_ptr conn);
     54
     55  /// Worker callback function when job has been sent.
     56  void handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn);
     57
     58  /// Worker callback function when result has been received.
     59  void handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn);
     60
     61  /// Controller callback function when job has been sent.
     62  void handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn);
    4263
    4364private:
    44   /// The acceptor object used to accept incoming socket connections.
    45   boost::asio::ip::tcp::acceptor acceptor_;
     65  /// The acceptor object used to accept incoming worker socket connections.
     66  boost::asio::ip::tcp::acceptor worker_acceptor_;
     67
     68  /// The acceptor object used to accept incoming controller socket connections.
     69  boost::asio::ip::tcp::acceptor controller_acceptor_;
    4670
    4771  /// result that is received from the client.
    4872  FragmentResult result;
     73
     74  /// bunch of jobs received from controller before placed in JobsQueue
     75  std::vector<FragmentJob> jobs;
    4976
    5077  /// Queue with data to be sent to each client.
     
    5380  // static entity to indicate to clients that the queue is empty.
    5481  static FragmentJob NoJob;
     82
     83  // Exit flag on program exit
     84  enum Exitflag_t Exitflag;
    5585};
    5686
  • src/Fragmentation/Automation/FragmentWorker.hpp

    r8ee5ac rdb03d9  
    2929      const std::string& host, const std::string& service);
    3030
    31   /// Handle completion of a connect operation.
    32   void handle_connect(const boost::system::error_code& e,
    33       boost::asio::ip::tcp::resolver::iterator endpoint_iterator);
    34 
    35   /// Callback function when job has been received.
    36   void handle_ReceiveJob(const boost::system::error_code& e);
    37 
    38   /// Callback function when result has been sent.
    39   void handle_SendResult(const boost::system::error_code& e);
    40 
    4131  enum Exitflag_t {
    4232    OkFlag = 0,
     
    5242    return Exitflag;
    5343  }
     44
     45protected:
     46  /// Handle completion of a connect operation.
     47  void handle_connect(const boost::system::error_code& e,
     48      boost::asio::ip::tcp::resolver::iterator endpoint_iterator);
     49
     50  /// Callback function when job has been received.
     51  void handle_ReceiveJob(const boost::system::error_code& e);
     52
     53  /// Callback function when result has been sent.
     54  void handle_SendResult(const boost::system::error_code& e);
    5455
    5556private:
  • src/Fragmentation/Automation/Makefile.am

    r8ee5ac rdb03d9  
    4545AM_CPPFLAGS = ${BOOST_CPPFLAGS} ${CodePatterns_CFLAGS}
    4646
    47 bin_PROGRAMS += Server Worker
     47bin_PROGRAMS += JobAdder Server Worker
     48
     49CONTROLLERSOURCE = \
     50  FragmentController.cpp
     51
     52CONTROLLERHEADER = \
     53  Connection.hpp \
     54  FragmentController.hpp
    4855
    4956SERVERSOURCE = \
    50   FragmentScheduler.cpp \
    51   Server.cpp
     57  FragmentScheduler.cpp
    5258
    5359SERVERHEADER = \
     
    6369  FragmentWorker.hpp
    6470
    65 Server_SOURCES = $(SERVERSOURCE) $(SERVERHEADER)
     71JobAdder_SOURCES = $(CONTROLLERSOURCE) $(CONTROLLERHEADER) JobAdder.cpp
     72JobAdder_LDFLAGS = $(AM_LDFLAGS) $(BOOST_ASIO_LDFLAGS) $(BOOST_SYSTEM_LDFLAGS) $(BOOST_THREAD_LDFLAGS) $(BOOST_SERIALIZATION_LDFLAGS)
     73JobAdder_CXXFLAGS = $(AM_CPPFLAGS)
     74JobAdder_LDADD = \
     75  libMolecuilderFragmentJobs.la \
     76  libMolecuilderFragmentationAutomation.la \
     77  $(BOOST_ASIO_LIBS) \
     78  $(BOOST_SERIALIZATION_LIBS) \
     79  $(BOOST_THREAD_LIBS) \
     80  $(BOOST_SYSTEM_LIBS) \
     81  ${CodePatterns_LIBS}
     82
     83Server_SOURCES = $(SERVERSOURCE) $(SERVERHEADER) Server.cpp
    6684Server_LDFLAGS = $(AM_LDFLAGS) $(BOOST_ASIO_LDFLAGS) $(BOOST_SYSTEM_LDFLAGS) $(BOOST_THREAD_LDFLAGS) $(BOOST_SERIALIZATION_LDFLAGS)
    6785Server_CXXFLAGS = $(AM_CPPFLAGS)
  • src/Fragmentation/Automation/Server.cpp

    r8ee5ac rdb03d9  
    4343  setVerbosity(3);
    4444
     45  size_t Exitflag = 0;
    4546  try
    4647  {
    4748    // Check command line arguments.
    48     if (argc != 2)
     49    if (argc != 3)
    4950    {
    50       std::cerr << "Usage: " << argv[0] << " <port>" << std::endl;
     51      std::cerr << "Usage: " << argv[0] << " <worker port> <controller port>" << std::endl;
    5152      return 1;
    5253    }
    53     unsigned short port = boost::lexical_cast<unsigned short>(argv[1]);
    54     std::cout << "Listening on port " << port << "." << std::endl;
     54    unsigned short workerport = boost::lexical_cast<unsigned short>(argv[1]);
     55    std::cout << "Listening for workers on port " << workerport << "." << std::endl;
     56    unsigned short controllerport = boost::lexical_cast<unsigned short>(argv[2]);
     57    std::cout << "Listening for controller on port " << controllerport << "." << std::endl;
    5558
    5659    boost::asio::io_service io_service;
    57     FragmentScheduler Server(io_service, port);
     60    FragmentScheduler Server(io_service, workerport, controllerport);
    5861    {
    5962      Info info("io_service");
    6063      io_service.run();
    6164    }
     65    Exitflag = Server.getExitflag();
    6266  }
    6367  catch (std::exception& e)
     
    6670  }
    6771
    68   return 0;
     72  return Exitflag;
    6973}
Note: See TracChangeset for help on using the changeset viewer.