Changeset 9d7c6a


Ignore:
Timestamp:
Jul 2, 2012, 8:32:10 AM (13 years ago)
Author:
Frederik Heber <heber@…>
Branches:
Action_Thermostats, Add_AtomRandomPerturbation, Add_FitFragmentPartialChargesAction, Add_RotateAroundBondAction, Add_SelectAtomByNameAction, Added_ParseSaveFragmentResults, AddingActions_SaveParseParticleParameters, Adding_Graph_to_ChangeBondActions, Adding_MD_integration_tests, Adding_ParticleName_to_Atom, Adding_StructOpt_integration_tests, AtomFragments, Automaking_mpqc_open, AutomationFragmentation_failures, Candidate_v1.5.4, Candidate_v1.6.0, Candidate_v1.6.1, ChangeBugEmailaddress, ChangingTestPorts, ChemicalSpaceEvaluator, CombiningParticlePotentialParsing, Combining_Subpackages, Debian_Package_split, Debian_package_split_molecuildergui_only, Disabling_MemDebug, Docu_Python_wait, EmpiricalPotential_contain_HomologyGraph, EmpiricalPotential_contain_HomologyGraph_documentation, Enable_parallel_make_install, Enhance_userguide, Enhanced_StructuralOptimization, Enhanced_StructuralOptimization_continued, Example_ManyWaysToTranslateAtom, Exclude_Hydrogens_annealWithBondGraph, FitPartialCharges_GlobalError, Fix_BoundInBox_CenterInBox_MoleculeActions, Fix_ChargeSampling_PBC, Fix_ChronosMutex, Fix_FitPartialCharges, Fix_FitPotential_needs_atomicnumbers, Fix_ForceAnnealing, Fix_IndependentFragmentGrids, Fix_ParseParticles, Fix_ParseParticles_split_forward_backward_Actions, Fix_PopActions, Fix_QtFragmentList_sorted_selection, Fix_Restrictedkeyset_FragmentMolecule, Fix_StatusMsg, Fix_StepWorldTime_single_argument, Fix_Verbose_Codepatterns, Fix_fitting_potentials, Fixes, ForceAnnealing_goodresults, ForceAnnealing_oldresults, ForceAnnealing_tocheck, ForceAnnealing_with_BondGraph, ForceAnnealing_with_BondGraph_continued, ForceAnnealing_with_BondGraph_continued_betteresults, ForceAnnealing_with_BondGraph_contraction-expansion, FragmentAction_writes_AtomFragments, FragmentMolecule_checks_bonddegrees, GeometryObjects, Gui_Fixes, Gui_displays_atomic_force_velocity, ImplicitCharges, IndependentFragmentGrids, IndependentFragmentGrids_IndividualZeroInstances, IndependentFragmentGrids_IntegrationTest, IndependentFragmentGrids_Sole_NN_Calculation, JobMarket_RobustOnKillsSegFaults, JobMarket_StableWorkerPool, JobMarket_unresolvable_hostname_fix, MoreRobust_FragmentAutomation, ODR_violation_mpqc_open, PartialCharges_OrthogonalSummation, PdbParser_setsAtomName, PythonUI_with_named_parameters, QtGui_reactivate_TimeChanged_changes, Recreated_GuiChecks, Rewrite_FitPartialCharges, RotateToPrincipalAxisSystem_UndoRedo, SaturateAtoms_findBestMatching, SaturateAtoms_singleDegree, StoppableMakroAction, Subpackage_CodePatterns, Subpackage_JobMarket, Subpackage_LinearAlgebra, Subpackage_levmar, Subpackage_mpqc_open, Subpackage_vmg, Switchable_LogView, ThirdParty_MPQC_rebuilt_buildsystem, TrajectoryDependenant_MaxOrder, TremoloParser_IncreasedPrecision, TremoloParser_MultipleTimesteps, TremoloParser_setsAtomName, Ubuntu_1604_changes, stable
Children:
2ab0b0
Parents:
b08c7c
git-author:
Frederik Heber <heber@…> (05/13/12 17:53:34)
git-committer:
Frederik Heber <heber@…> (07/02/12 08:32:10)
Message:

PoolWorker now has an OperationQueue and ServerAddress.

Location:
src/Fragmentation/Automation/Pool
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • src/Fragmentation/Automation/Pool/PoolWorker.cpp

    rb08c7c r9d7c6a  
    3737#include "Jobs/MPQCCommandJob.hpp"
    3838#include "Jobs/SystemCommandJob.hpp"
     39#include "Operations/Workers/EnrollInPoolOperation.hpp"
     40#include "Operations/Workers/RemoveFromPoolOperation.hpp"
     41#include "Operations/Workers/SubmitResultOperation.hpp"
    3942#include "Results/FragmentResult.hpp"
    4043#include "PoolWorker.hpp"
     
    5356 *
    5457 * @param _io_service io service for creating connections
    55  * @param _host host part of address of pool to connect to
    56  * @param _service service part of address of pool to connect to
    57  * @param listenhost host part of address of this instance for listening to pool connections
    58  * @param listenservice seervice part of address of this instance for listening to pool connections
     58 * @param _host host part of MyAddress of pool to connect to
     59 * @param _service service part of MyAddress of pool to connect to
     60 * @param listenhost host part of MyAddress of this instance for listening to pool connections
     61 * @param listenservice seervice part of MyAddress of this instance for listening to pool connections
    5962 */
    6063PoolWorker::PoolWorker(
     
    6770  PoolListener(_io_service, boost::lexical_cast<unsigned short>(listenservice), *this),
    6871  MyAddress(listenhost, listenservice),
     72  ServerAddress(_host, _service),
    6973  connection_(_io_service),
    70   failed(boost::bind(&ExitflagContainer::setExitflag, this, ExitflagContainer::ErrorFlag)),
    71   closingdown(boost::bind(&PoolWorker::finish, this)),
    72   initiateme(boost::bind(&PoolListener_t::initiateSocket, boost::ref(PoolListener))),
    73   enrollOp(connection_, MyAddress, initiateme, failed),
    74   submitOp(connection_, MyAddress, AsyncOperation::NoOpCallback, failed),
    75   submitresult(boost::bind(&AsyncOperation::operator(),
    76       boost::ref(submitOp),
    77       _host, _service)),
    78   removeOp(connection_, MyAddress, closingdown, failed),
    79   removeme(boost::bind(&AsyncOperation::operator(),
    80       boost::ref(removeOp),
    81       _host, _service))
     74  failed(boost::bind(&ExitflagContainer::setExitflag, this, ExitflagContainer::ErrorFlag))
    8275{
    8376  Info info(__FUNCTION__);
    8477
    85   // always enroll
    86   enrollOp(_host,_service);
     78  // always enroll and make listenining initiation depend on its success
     79  const boost::function<void ()> initiateme =
     80      boost::bind(&PoolListener_t::initiateSocket, boost::ref(PoolListener));
     81  AsyncOperation *enrollOp = new EnrollInPoolOperation(connection_, MyAddress, initiateme, failed);
     82  LOG(2, "DEBUG: Putting enroll in pool operation into queue ...");
     83  OpQueue.push_back(enrollOp, ServerAddress);
    8784}
    8885
     
    144141  FragmentResult::ptr result = job->Work();
    145142  LOG(2, "DEBUG: Setting result " << result->getId() << ".");
    146   submitOp.setResult(result);
     143
     144  AsyncOperation *submitOp = new SubmitResultOperation(connection_, MyAddress, AsyncOperation::NoOpCallback, failed);
     145  static_cast<SubmitResultOperation *>(submitOp)->setResult(result);
    147146
    148147  // submit result
    149   LOG(2, "DEBUG: Sending result ...");
    150   submitresult();
     148  LOG(2, "DEBUG: Putting send result operation into queue ...");
     149  OpQueue.push_back(submitOp, ServerAddress);
    151150}
    152151
     
    172171{
    173172  // remove us from pool
    174   removeme();
     173  boost::function<void ()> closingdown = boost::bind(&PoolWorker::finish, this);
     174  AsyncOperation *removeOp = new RemoveFromPoolOperation(connection_, MyAddress, closingdown, failed);
     175  LOG(2, "DEBUG: Putting remove from pool operation into queue ...");
     176  OpQueue.push_back(removeOp, ServerAddress);
    175177}
    176178
  • src/Fragmentation/Automation/Pool/PoolWorker.hpp

    rb08c7c r9d7c6a  
    2323#include "ExitflagContainer.hpp"
    2424#include "Jobs/FragmentJob.hpp"
    25 #include "Operations/Workers/EnrollInPoolOperation.hpp"
    26 #include "Operations/Workers/RemoveFromPoolOperation.hpp"
    27 #include "Operations/Workers/SubmitResultOperation.hpp"
     25#include "Operations/OperationQueue.hpp"
    2826#include "Listener.hpp"
    2927#include "WorkerAddress.hpp"
     
    9896  const WorkerAddress MyAddress;
    9997
     98  //!> address of the server we work for
     99  WorkerAddress ServerAddress;
     100
    100101  //!> The Connection to the server for the stored operations
    101102  Connection connection_;
    102103
    103   //!> internally bound function that sets the Exitflag to ErrorFlag
     104  //!> bound function as callback when operation fails
    104105  boost::function<void ()> failed;
    105106
    106   //!> internally bound function that sets the Exitflag to ErrorFlag
    107   const boost::function<void ()> closingdown;
    108 
    109   //!> internally bound function initiates the PoolListener's socket
    110   const boost::function<void ()> initiateme;
    111 
    112   //!> operation that handles obtaining a job
    113   EnrollInPoolOperation enrollOp;
    114 
    115   //!> operation that handles submitting job's result
    116   SubmitResultOperation submitOp;
    117 
    118   //!> internally bound function such that host and service don't have to be stored, submits result
    119   boost::function<void ()> submitresult;
    120 
    121   //!> operation that handles removal from pool
    122   RemoveFromPoolOperation removeOp;
    123 
    124   //!> internally bound function such that host and service don't have to be stored, removes us from server
    125   boost::function<void ()> removeme;
     107  //!> internal queue for all asynchronous operations
     108  OperationQueue OpQueue;
    126109};
    127110
Note: See TracChangeset for help on using the changeset viewer.