source: src/Fragmentation/Automation/FragmentScheduler.hpp@ befcf8

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 Candidate_v1.7.0 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since befcf8 was ba995d, checked in by Frederik Heber <heber@…>, 13 years ago

ShutdownWorkerOperation is now an asynchronous operation.

  • Property mode set to 100644
File size: 6.9 KB
RevLine 
[72eaf7f]1/*
[926a49]2 * FragmentScheduler.hpp
[72eaf7f]3 *
[cd4a6e]4 * Created on: Oct 19, 2011
[72eaf7f]5 * Author: heber
6 */
7
[cd4a6e]8#ifndef FRAGMENTSCHEDULER_HPP_
9#define FRAGMENTSCHEDULER_HPP_
[72eaf7f]10
[f93842]11// include config.h
12#ifdef HAVE_CONFIG_H
13#include <config.h>
14#endif
15
[cd4a6e]16#include <boost/asio.hpp>
[8036b7]17#include <boost/function.hpp>
[9a6b895]18#include <boost/shared_ptr.hpp>
19#include <deque>
20#include <vector>
[cd4a6e]21
[2344a3]22#include "CodePatterns/Observer/Observer.hpp"
[cd4a6e]23#include "Connection.hpp"
[778abb]24#include "ControllerChoices.hpp"
[9a6b895]25#include "Operations/AsyncOperation.hpp"
26#include "Operations/OperationQueue.hpp"
[50d095]27#include "Operations/Servers/ShutdownWorkerOperation.hpp"
[2344a3]28#include "ControllerChoices.hpp"
[7670865]29#include "FragmentQueue.hpp"
[d1dbfc]30#include "GlobalJobId.hpp"
[7670865]31#include "Jobs/FragmentJob.hpp"
[8036b7]32#include "Listener.hpp"
[7670865]33#include "Results/FragmentResult.hpp"
[778abb]34#include "types.hpp"
[41c1b7]35#include "Pool/WorkerPool.hpp"
36#include "WorkerAddress.hpp"
[9a3f84]37#include "WorkerChoices.hpp"
[72eaf7f]38
[8036b7]39/** FragmentScheduler serves FragmentJobs to Workers and accepts commands from
40 * a Controller.
[cd4a6e]41 *
42 */
[2344a3]43class FragmentScheduler : public Observer
[72eaf7f]44{
45public:
46 /// Constructor opens the acceptor and starts waiting for the first incoming
[cd4a6e]47 /// Connection.
[db03d9]48 FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport);
[2344a3]49 ~FragmentScheduler();
[72eaf7f]50
[8036b7]51private:
[41c1b7]52 void sendJobToWorker(const WorkerAddress &address, FragmentJob::ptr &job);
[2344a3]53 void sendAvailableJobToNextIdleWorker();
54 void shutdown();
55 void shutdownWorker(const WorkerAddress &address);
56 void removeAllWorkers();
[9a6b895]57 void cleanupOperationQueue(AsyncOperation *op);
[2344a3]58
59 void update(Observable *publisher);
60 void recieveNotification(Observable *publisher, Notification_ptr notification);
61 void subjectKilled(Observable *publisher);
[41c1b7]62
[9a6b895]63
[8036b7]64 class WorkerListener_t : public Listener
65 {
66 public:
67 WorkerListener_t(
68 boost::asio::io_service& io_service,
69 unsigned short port,
[41c1b7]70 FragmentQueue &_JobsQueue,
71 WorkerPool &_pool,
72 boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> _callback) :
[8036b7]73 Listener(io_service, port),
[41c1b7]74 address("127.0.0.1", "0"),
[8036b7]75 JobsQueue(_JobsQueue),
[41c1b7]76 pool(_pool),
77 result( new FragmentResult(JobId::NoJob) ),
[9a3f84]78 callback_sendJobToWorker(_callback),
79 choice(NoWorkerOperation)
[8036b7]80 {}
81 virtual ~WorkerListener_t() {}
82
83 protected:
84 /// Handle completion of a accept worker operation.
85 void handle_Accept(const boost::system::error_code& e, connection_ptr conn);
86
[9a3f84]87 /// Handle completion of Worker operation to read choice
88 void handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn);
89
[8036b7]90 /// Worker callback function when job has been sent.
[9a3f84]91 void handle_ReadAddress(const boost::system::error_code& e, connection_ptr conn);
[41c1b7]92
93 /// Worker callback function when new worker has enrolled.
94 void handle_enrolled(const boost::system::error_code& e, connection_ptr conn);
[8036b7]95
96 /// Worker callback function when result has been received.
97 void handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn);
[9a3f84]98
99 /// Worker callback function when invalid result has been received.
100 void handle_RejectResultFromWorker(const boost::system::error_code& e, connection_ptr conn);
[8036b7]101 private:
[41c1b7]102 //!> address of new Worker
103 WorkerAddress address;
104
105 //!> reference to Queue
106 FragmentQueue &JobsQueue;
107
108 //!> callback reference to container class
109 WorkerPool &pool;
[8036b7]110
[9a3f84]111 //!> result that is received from the client.
[8036b7]112 FragmentResult::ptr result;
113
[41c1b7]114 //!> callback function to access send job function
115 boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> callback_sendJobToWorker;
116
[9a3f84]117 //!> choice
118 enum WorkerChoices choice;
[db03d9]119 };
[72eaf7f]120
[8036b7]121 class ControllerListener_t : public Listener
[db03d9]122 {
[8036b7]123 public:
124 ControllerListener_t(
125 boost::asio::io_service& io_service,
126 unsigned short port,
127 FragmentQueue &_JobsQueue,
[2344a3]128 boost::function<void ()> _shutdownAllSockets) :
[8036b7]129 Listener(io_service, port),
130 JobsQueue(_JobsQueue),
131 jobInfo((size_t)2, 0),
[38032a]132 choice(NoControllerOperation),
[2344a3]133 shutdownAllSockets(_shutdownAllSockets),
134 globalId(0)
[8036b7]135 {}
136 virtual ~ControllerListener_t() {}
137
138 protected:
139 /// Handle completion of a accept controller operation.
140 void handle_Accept(const boost::system::error_code& e, connection_ptr conn);
141
142 /// Handle completion of controller operation to read choice
143 void handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn);
144
145 /// Controller callback function when job has been sent.
146 void handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn);
147
148 /// Controller callback function when checking on state of results.
149 void handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn);
150
151 /// Controller callback function when checking on state of results.
152 void handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn);
153
[c4f43e]154 /// Controller callback function when free job ids have been sent.
155 void handle_SendIds(const boost::system::error_code& e, connection_ptr conn);
156
[8036b7]157 /// Controller callback function when result has been received.
158 void handle_SendResults(const boost::system::error_code& e, connection_ptr conn);
159
160 private:
161 //!> reference to external FragmentQueue containing jobs to work on
162 FragmentQueue & JobsQueue;
163
[9a3f84]164 //!> bunch of jobs received from controller before placed in JobsQueue
[8036b7]165 std::vector<FragmentJob::ptr> jobs;
166
[9a3f84]167 //!> number of jobs that are waiting to be and are calculated, required for returning status
[8036b7]168 std::vector<size_t> jobInfo;
169
[c4f43e]170 //!> number of job ids request from controller;
171 size_t NumberIds;
172
[9a3f84]173 //!> choice
[8036b7]174 enum ControllerChoices choice;
175
[2344a3]176 //!> bound function to shutdown all sockets
177 boost::function<void ()> shutdownAllSockets;
178
[8036b7]179 // TODO: replace this instance by a IdPool.
180 //!> global id to give next available job id
181 GlobalJobId globalId;
182 };
[402bde]183
184private:
[2344a3]185 //!> static entity to indicate to clients that the queue is empty.
186 static FragmentJob::ptr NoJob;
187
188 //!> reference to the io_service which we use for connections
189 boost::asio::io_service& io_service;
190
[9a3f84]191 //!> Queue with data to be sent to each client.
[41c1b7]192 FragmentQueue JobsQueue;
193
194 //!> Pool of Workers
195 WorkerPool pool;
196
[8036b7]197 //!> Listener instance that waits for a worker
198 WorkerListener_t WorkerListener;
[72eaf7f]199
[8036b7]200 //!> Listener instance that waits for a controller
201 ControllerListener_t ControllerListener;
[778abb]202
[8036b7]203public:
204 /** Getter for Exitflag.
205 *
206 * @return Exitflag of operations
207 */
208 size_t getExitflag() const
209 {
210 if (WorkerListener.getExitflag() != 0)
211 return WorkerListener.getExitflag();
212 if (ControllerListener.getExitflag() != 0)
213 return ControllerListener.getExitflag();
214 return 0;
215 }
[d1dbfc]216
[41c1b7]217private:
218 //!> Connection for sending jobs to workers
219 Connection connection;
220
[9a6b895]221 //!> internal queue for all asynchronous operations
222 OperationQueue OpQueue;
[72eaf7f]223};
224
[cd4a6e]225#endif /* FRAGMENTSCHEDULER_HPP_ */
Note: See TracBrowser for help on using the repository browser.