source: src/Fragmentation/Automation/FragmentScheduler.hpp@ a06358c

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 Candidate_v1.7.0 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since a06358c was 9a3f84, checked in by Frederik Heber <heber@…>, 13 years ago

FragmentScheduler now uses WorkerChoices for handling connection workers.

  • Workers always first send address, then their choice and depending on this we branch into various handlers.
  • SubmitResultOperation and EnrollInPoolOperation now correctly give choice after sending address.
  • handle_enrolled uses getNextIdleWorker() to mark indirectly as busy. This is temporary.
  • we do not resubmit if Max_Attempts is 1.
  • Property mode set to 100644
File size: 6.1 KB
RevLine 
[72eaf7f]1/*
[926a49]2 * FragmentScheduler.hpp
[72eaf7f]3 *
[cd4a6e]4 * Created on: Oct 19, 2011
[72eaf7f]5 * Author: heber
6 */
7
[cd4a6e]8#ifndef FRAGMENTSCHEDULER_HPP_
9#define FRAGMENTSCHEDULER_HPP_
[72eaf7f]10
[f93842]11// include config.h
12#ifdef HAVE_CONFIG_H
13#include <config.h>
14#endif
15
[72eaf7f]16#include <vector>
[cd4a6e]17#include <boost/asio.hpp>
[8036b7]18#include <boost/function.hpp>
[cd4a6e]19
20#include "Connection.hpp"
[778abb]21#include "ControllerChoices.hpp"
[41c1b7]22#include "Controller/Commands/SendJobToWorkerOperation.hpp"
[7670865]23#include "FragmentQueue.hpp"
[d1dbfc]24#include "GlobalJobId.hpp"
[7670865]25#include "Jobs/FragmentJob.hpp"
[8036b7]26#include "Listener.hpp"
[7670865]27#include "Results/FragmentResult.hpp"
[778abb]28#include "types.hpp"
[41c1b7]29#include "Pool/WorkerPool.hpp"
30#include "WorkerAddress.hpp"
[9a3f84]31#include "WorkerChoices.hpp"
[72eaf7f]32
[8036b7]33/** FragmentScheduler serves FragmentJobs to Workers and accepts commands from
34 * a Controller.
[cd4a6e]35 *
36 */
[926a49]37class FragmentScheduler
[72eaf7f]38{
39public:
40 /// Constructor opens the acceptor and starts waiting for the first incoming
[cd4a6e]41 /// Connection.
[db03d9]42 FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport);
[72eaf7f]43
[8036b7]44private:
[41c1b7]45 void sendJobToWorker(const WorkerAddress &address, FragmentJob::ptr &job);
46// void shutdownWorker(const WorkerAddress &address);
47// void removeAllWorkers();
48
[8036b7]49 class WorkerListener_t : public Listener
50 {
51 public:
52 WorkerListener_t(
53 boost::asio::io_service& io_service,
54 unsigned short port,
[41c1b7]55 FragmentQueue &_JobsQueue,
56 WorkerPool &_pool,
57 boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> _callback) :
[8036b7]58 Listener(io_service, port),
[41c1b7]59 address("127.0.0.1", "0"),
[8036b7]60 JobsQueue(_JobsQueue),
[41c1b7]61 pool(_pool),
62 result( new FragmentResult(JobId::NoJob) ),
[9a3f84]63 callback_sendJobToWorker(_callback),
64 choice(NoWorkerOperation)
[8036b7]65 {}
66 virtual ~WorkerListener_t() {}
67
68 protected:
69 /// Handle completion of a accept worker operation.
70 void handle_Accept(const boost::system::error_code& e, connection_ptr conn);
71
[9a3f84]72 /// Handle completion of Worker operation to read choice
73 void handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn);
74
[8036b7]75 /// Worker callback function when job has been sent.
[9a3f84]76 void handle_ReadAddress(const boost::system::error_code& e, connection_ptr conn);
[41c1b7]77
78 /// Worker callback function when new worker has enrolled.
79 void handle_enrolled(const boost::system::error_code& e, connection_ptr conn);
[8036b7]80
81 /// Worker callback function when result has been received.
82 void handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn);
[9a3f84]83
84 /// Worker callback function when invalid result has been received.
85 void handle_RejectResultFromWorker(const boost::system::error_code& e, connection_ptr conn);
[8036b7]86 private:
[9a3f84]87 //!> static entity to indicate to clients that the queue is empty.
88 static FragmentJob::ptr NoJob;
89
[41c1b7]90 //!> address of new Worker
91 WorkerAddress address;
92
93 //!> reference to Queue
94 FragmentQueue &JobsQueue;
95
96 //!> callback reference to container class
97 WorkerPool &pool;
[8036b7]98
[9a3f84]99 //!> result that is received from the client.
[8036b7]100 FragmentResult::ptr result;
101
[41c1b7]102 //!> callback function to access send job function
103 boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> callback_sendJobToWorker;
104
[9a3f84]105 //!> choice
106 enum WorkerChoices choice;
[db03d9]107 };
[72eaf7f]108
[8036b7]109 class ControllerListener_t : public Listener
[db03d9]110 {
[8036b7]111 public:
112 ControllerListener_t(
113 boost::asio::io_service& io_service,
114 unsigned short port,
115 FragmentQueue &_JobsQueue,
116 boost::function<void ()> _initiateWorkerSocket) :
117 Listener(io_service, port),
118 JobsQueue(_JobsQueue),
119 jobInfo((size_t)2, 0),
[38032a]120 choice(NoControllerOperation),
[8036b7]121 globalId(0),
122 initiateWorkerSocket(_initiateWorkerSocket)
123 {}
124 virtual ~ControllerListener_t() {}
125
126 protected:
127 /// Handle completion of a accept controller operation.
128 void handle_Accept(const boost::system::error_code& e, connection_ptr conn);
129
130 /// Handle completion of controller operation to read choice
131 void handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn);
132
133 /// Controller callback function when job has been sent.
134 void handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn);
135
136 /// Controller callback function when checking on state of results.
137 void handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn);
138
139 /// Controller callback function when checking on state of results.
140 void handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn);
141
142 /// Controller callback function when result has been received.
143 void handle_SendResults(const boost::system::error_code& e, connection_ptr conn);
144
145 private:
146 //!> reference to external FragmentQueue containing jobs to work on
147 FragmentQueue & JobsQueue;
148
[9a3f84]149 //!> bunch of jobs received from controller before placed in JobsQueue
[8036b7]150 std::vector<FragmentJob::ptr> jobs;
151
[9a3f84]152 //!> number of jobs that are waiting to be and are calculated, required for returning status
[8036b7]153 std::vector<size_t> jobInfo;
154
[9a3f84]155 //!> choice
[8036b7]156 enum ControllerChoices choice;
157
158 // TODO: replace this instance by a IdPool.
159 //!> global id to give next available job id
160 GlobalJobId globalId;
161
162 //!> callback function to tell that worker socket should be enabled
163 boost::function<void ()> initiateWorkerSocket;
164 };
[402bde]165
166private:
[9a3f84]167 //!> Queue with data to be sent to each client.
[41c1b7]168 FragmentQueue JobsQueue;
169
170 //!> Pool of Workers
171 WorkerPool pool;
172
[8036b7]173 //!> Listener instance that waits for a worker
174 WorkerListener_t WorkerListener;
[72eaf7f]175
[8036b7]176 //!> Listener instance that waits for a controller
177 ControllerListener_t ControllerListener;
[778abb]178
[8036b7]179public:
180 /** Getter for Exitflag.
181 *
182 * @return Exitflag of operations
183 */
184 size_t getExitflag() const
185 {
186 if (WorkerListener.getExitflag() != 0)
187 return WorkerListener.getExitflag();
188 if (ControllerListener.getExitflag() != 0)
189 return ControllerListener.getExitflag();
190 return 0;
191 }
[d1dbfc]192
[41c1b7]193private:
194 //!> Connection for sending jobs to workers
195 Connection connection;
196
197 //!> internal operation to send jobs to workers
198 mutable SendJobToWorkerOperation sendJobOp;
[72eaf7f]199};
200
[cd4a6e]201#endif /* FRAGMENTSCHEDULER_HPP_ */
Note: See TracBrowser for help on using the repository browser.