source: src/Fragmentation/Automation/FragmentScheduler.hpp@ c4f43e

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 Candidate_v1.7.0 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since c4f43e was c4f43e, checked in by Frederik Heber <heber@…>, 13 years ago

GetNextJobIdOperation now may request a vector of ids.

  • otherwise we needed multiple phase one calls, ever requesting a single id.
  • now, we state how many ids we want and may eventually get one by one as desired.
  • adapted FragmentScheduler::ControllerListener_t and FragmentController accordingly as we now have to exchange the desired number of ids, too.
  • requestid() needs now an additional parameter.
  • Property mode set to 100644
File size: 6.9 KB
RevLine 
[72eaf7f]1/*
[926a49]2 * FragmentScheduler.hpp
[72eaf7f]3 *
[cd4a6e]4 * Created on: Oct 19, 2011
[72eaf7f]5 * Author: heber
6 */
7
[cd4a6e]8#ifndef FRAGMENTSCHEDULER_HPP_
9#define FRAGMENTSCHEDULER_HPP_
[72eaf7f]10
[f93842]11// include config.h
12#ifdef HAVE_CONFIG_H
13#include <config.h>
14#endif
15
[72eaf7f]16#include <vector>
[cd4a6e]17#include <boost/asio.hpp>
[8036b7]18#include <boost/function.hpp>
[cd4a6e]19
[2344a3]20#include "CodePatterns/Observer/Observer.hpp"
[cd4a6e]21#include "Connection.hpp"
[778abb]22#include "ControllerChoices.hpp"
[50d095]23#include "Operations/Servers/SendJobToWorkerOperation.hpp"
24#include "Operations/Servers/ShutdownWorkerOperation.hpp"
[2344a3]25#include "ControllerChoices.hpp"
[7670865]26#include "FragmentQueue.hpp"
[d1dbfc]27#include "GlobalJobId.hpp"
[7670865]28#include "Jobs/FragmentJob.hpp"
[8036b7]29#include "Listener.hpp"
[7670865]30#include "Results/FragmentResult.hpp"
[778abb]31#include "types.hpp"
[41c1b7]32#include "Pool/WorkerPool.hpp"
33#include "WorkerAddress.hpp"
[9a3f84]34#include "WorkerChoices.hpp"
[72eaf7f]35
[8036b7]36/** FragmentScheduler serves FragmentJobs to Workers and accepts commands from
37 * a Controller.
[cd4a6e]38 *
39 */
[2344a3]40class FragmentScheduler : public Observer
[72eaf7f]41{
42public:
43 /// Constructor opens the acceptor and starts waiting for the first incoming
[cd4a6e]44 /// Connection.
[db03d9]45 FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport);
[2344a3]46 ~FragmentScheduler();
[72eaf7f]47
[8036b7]48private:
[41c1b7]49 void sendJobToWorker(const WorkerAddress &address, FragmentJob::ptr &job);
[2344a3]50 void sendAvailableJobToNextIdleWorker();
51 void shutdown();
52 void shutdownWorker(const WorkerAddress &address);
53 void removeAllWorkers();
54
55 void update(Observable *publisher);
56 void recieveNotification(Observable *publisher, Notification_ptr notification);
57 void subjectKilled(Observable *publisher);
[41c1b7]58
[8036b7]59 class WorkerListener_t : public Listener
60 {
61 public:
62 WorkerListener_t(
63 boost::asio::io_service& io_service,
64 unsigned short port,
[41c1b7]65 FragmentQueue &_JobsQueue,
66 WorkerPool &_pool,
67 boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> _callback) :
[8036b7]68 Listener(io_service, port),
[41c1b7]69 address("127.0.0.1", "0"),
[8036b7]70 JobsQueue(_JobsQueue),
[41c1b7]71 pool(_pool),
72 result( new FragmentResult(JobId::NoJob) ),
[9a3f84]73 callback_sendJobToWorker(_callback),
74 choice(NoWorkerOperation)
[8036b7]75 {}
76 virtual ~WorkerListener_t() {}
77
78 protected:
79 /// Handle completion of a accept worker operation.
80 void handle_Accept(const boost::system::error_code& e, connection_ptr conn);
81
[9a3f84]82 /// Handle completion of Worker operation to read choice
83 void handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn);
84
[8036b7]85 /// Worker callback function when job has been sent.
[9a3f84]86 void handle_ReadAddress(const boost::system::error_code& e, connection_ptr conn);
[41c1b7]87
88 /// Worker callback function when new worker has enrolled.
89 void handle_enrolled(const boost::system::error_code& e, connection_ptr conn);
[8036b7]90
91 /// Worker callback function when result has been received.
92 void handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn);
[9a3f84]93
94 /// Worker callback function when invalid result has been received.
95 void handle_RejectResultFromWorker(const boost::system::error_code& e, connection_ptr conn);
[8036b7]96 private:
[41c1b7]97 //!> address of new Worker
98 WorkerAddress address;
99
100 //!> reference to Queue
101 FragmentQueue &JobsQueue;
102
103 //!> callback reference to container class
104 WorkerPool &pool;
[8036b7]105
[9a3f84]106 //!> result that is received from the client.
[8036b7]107 FragmentResult::ptr result;
108
[41c1b7]109 //!> callback function to access send job function
110 boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> callback_sendJobToWorker;
111
[9a3f84]112 //!> choice
113 enum WorkerChoices choice;
[db03d9]114 };
[72eaf7f]115
[8036b7]116 class ControllerListener_t : public Listener
[db03d9]117 {
[8036b7]118 public:
119 ControllerListener_t(
120 boost::asio::io_service& io_service,
121 unsigned short port,
122 FragmentQueue &_JobsQueue,
[2344a3]123 boost::function<void ()> _shutdownAllSockets) :
[8036b7]124 Listener(io_service, port),
125 JobsQueue(_JobsQueue),
126 jobInfo((size_t)2, 0),
[38032a]127 choice(NoControllerOperation),
[2344a3]128 shutdownAllSockets(_shutdownAllSockets),
129 globalId(0)
[8036b7]130 {}
131 virtual ~ControllerListener_t() {}
132
133 protected:
134 /// Handle completion of a accept controller operation.
135 void handle_Accept(const boost::system::error_code& e, connection_ptr conn);
136
137 /// Handle completion of controller operation to read choice
138 void handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn);
139
140 /// Controller callback function when job has been sent.
141 void handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn);
142
143 /// Controller callback function when checking on state of results.
144 void handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn);
145
146 /// Controller callback function when checking on state of results.
147 void handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn);
148
[c4f43e]149 /// Controller callback function when free job ids have been sent.
150 void handle_SendIds(const boost::system::error_code& e, connection_ptr conn);
151
[8036b7]152 /// Controller callback function when result has been received.
153 void handle_SendResults(const boost::system::error_code& e, connection_ptr conn);
154
155 private:
156 //!> reference to external FragmentQueue containing jobs to work on
157 FragmentQueue & JobsQueue;
158
[9a3f84]159 //!> bunch of jobs received from controller before placed in JobsQueue
[8036b7]160 std::vector<FragmentJob::ptr> jobs;
161
[9a3f84]162 //!> number of jobs that are waiting to be and are calculated, required for returning status
[8036b7]163 std::vector<size_t> jobInfo;
164
[c4f43e]165 //!> number of job ids request from controller;
166 size_t NumberIds;
167
[9a3f84]168 //!> choice
[8036b7]169 enum ControllerChoices choice;
170
[2344a3]171 //!> bound function to shutdown all sockets
172 boost::function<void ()> shutdownAllSockets;
173
[8036b7]174 // TODO: replace this instance by a IdPool.
175 //!> global id to give next available job id
176 GlobalJobId globalId;
177 };
[402bde]178
179private:
[2344a3]180 //!> static entity to indicate to clients that the queue is empty.
181 static FragmentJob::ptr NoJob;
182
183 //!> reference to the io_service which we use for connections
184 boost::asio::io_service& io_service;
185
[9a3f84]186 //!> Queue with data to be sent to each client.
[41c1b7]187 FragmentQueue JobsQueue;
188
189 //!> Pool of Workers
190 WorkerPool pool;
191
[8036b7]192 //!> Listener instance that waits for a worker
193 WorkerListener_t WorkerListener;
[72eaf7f]194
[8036b7]195 //!> Listener instance that waits for a controller
196 ControllerListener_t ControllerListener;
[778abb]197
[8036b7]198public:
199 /** Getter for Exitflag.
200 *
201 * @return Exitflag of operations
202 */
203 size_t getExitflag() const
204 {
205 if (WorkerListener.getExitflag() != 0)
206 return WorkerListener.getExitflag();
207 if (ControllerListener.getExitflag() != 0)
208 return ControllerListener.getExitflag();
209 return 0;
210 }
[d1dbfc]211
[41c1b7]212private:
213 //!> Connection for sending jobs to workers
214 Connection connection;
215
216 //!> internal operation to send jobs to workers
217 mutable SendJobToWorkerOperation sendJobOp;
[6ea7f4]218
219 //!> internal operation to shutdown a worker
220 ShutdownWorkerOperation shutdownWorkerOp;
[72eaf7f]221};
222
[cd4a6e]223#endif /* FRAGMENTSCHEDULER_HPP_ */
Note: See TracBrowser for help on using the repository browser.