source: src/JobMarket/Controller/FragmentController.cpp@ 404d2b

Action_Thermostats Add_AtomRandomPerturbation Add_RotateAroundBondAction Add_SelectAtomByNameAction Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_StructOpt_integration_tests Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.6.0 Candidate_v1.6.1 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_ChronosMutex Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion GeometryObjects Gui_displays_atomic_force_velocity IndependentFragmentGrids_IntegrationTest JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks RotateToPrincipalAxisSystem_UndoRedo StoppableMakroAction Subpackage_JobMarket Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg ThirdParty_MPQC_rebuilt_buildsystem TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps Ubuntu_1604_changes stable
Last change on this file since 404d2b was 404d2b, checked in by Frederik Heber <heber@…>, 8 years ago

Squashed 'ThirdParty/JobMarket/' content from commit e194722

git-subtree-dir: ThirdParty/JobMarket
git-subtree-split: e19472277e62c493f6c10f1483fe21e64c1039e9

  • Property mode set to 100644
File size: 8.7 KB
Line 
1/*
2 * Project: JobMarket
3 * Description: asynchronous Server/Controller/Client-approach to parallel computing, based on boost::asio
4 * Copyright (C) 2011 Frederik Heber. All rights reserved.
5 *
6 */
7
8/*
9 * FragmentController.cpp
10 *
11 * Created on: Nov 27, 2011
12 * Author: heber
13 */
14
15// include config.h
16#ifdef HAVE_CONFIG_H
17#include <config.h>
18#endif
19
20// boost asio needs specific operator new
21#include <boost/asio.hpp>
22
23#include "CodePatterns/MemDebug.hpp"
24
25#include "JobMarket/Controller/FragmentController.hpp"
26
27#include "JobMarket/Connection.hpp" // Must come before boost/serialization headers.
28#include "CodePatterns/Info.hpp"
29#include "CodePatterns/Log.hpp"
30
31#include "Operations/Controllers/CheckResultsOperation.hpp"
32#include "Operations/Controllers/GetNextJobIdOperation.hpp"
33#include "Operations/Controllers/ReceiveResultsOperation.hpp"
34#include "Operations/Controllers/RemoveAllJobsOperation.hpp"
35#include "Operations/Controllers/RemoveAllResultsOperation.hpp"
36#include "Operations/Controllers/RemoveAllWorkerOperation.hpp"
37#include "Operations/Controllers/SendJobsOperation.hpp"
38#include "Operations/Controllers/ShutdownOperation.hpp"
39
40#include "JobMarket/JobId.hpp"
41
42/** Constructor of class FragmentController.
43 *
44 * \param io_service io_service for the asynchronous operations
45 * \param _host hostname of server that accepts jobs
46 * \param _service of server
47 */
48FragmentController::FragmentController(
49 boost::asio::io_service& io_service) :
50 connection_(io_service),
51 failed(boost::bind(&ExitflagContainer::setExitflag, this, ExitflagContainer::ErrorFlag))
52{
53 DEBUG_FUNCTION_ENTRYEXIT
54
55 // insert commands into registry
56 Commands.registerInstance(new CheckResultsOperation(connection_, AsyncOperation::NoOpCallback, failed));
57 Commands.registerInstance(new GetNextJobIdOperation(connection_, AsyncOperation::NoOpCallback, failed));
58 Commands.registerInstance(new ReceiveResultsOperation(connection_, AsyncOperation::NoOpCallback, failed));
59 Commands.registerInstance(new RemoveAllJobsOperation(connection_));
60 Commands.registerInstance(new RemoveAllResultsOperation(connection_));
61 Commands.registerInstance(new RemoveAllWorkerOperation(connection_));
62 Commands.registerInstance(new SendJobsOperation(connection_, AsyncOperation::NoOpCallback, failed));
63 Commands.registerInstance(new ShutdownOperation(connection_));
64}
65
66/** Destructor of class FragmentController.
67 *
68 */
69FragmentController::~FragmentController()
70{
71 Commands.cleanup();
72}
73
74/** Requests an available id from server
75 *
76 * @param host address of server
77 * @param service port/service of server
78 * @param NumberIds number of desired ids to request with this connect
79 */
80void FragmentController::requestIds(
81 const std::string &host,
82 const std::string &service,
83 const size_t NumberIds)
84{
85 GetNextJobIdOperation *getnextid = static_cast<GetNextJobIdOperation *>(
86 Commands.getByName("getnextjobid"));
87 (*getnextid).setDesiredIds(NumberIds);
88 (*getnextid)(host,service);
89}
90
91/** Returns another available id from a finished GetNextJobIdOperation.
92 *
93 * @return next available id
94 */
95JobId_t FragmentController::getAvailableId()
96{
97 GetNextJobIdOperation *getnextid = static_cast<GetNextJobIdOperation *>(
98 Commands.getByName("getnextjobid"));
99 if( !getnextid->isNextIdAvailable()) {
100 ELOG(1, "FragmentController::getAvailableId() - there are no more requested ids.");
101 return JobId::IllegalJob;
102 }
103 const JobId_t nextid = getnextid->getNextId();
104 LOG(1, "INFO: Next available id is " << nextid << ".");
105 return nextid;
106}
107
108/** Adds a vector of jobs to the send operation.
109 *
110 * @param jobs jobs to add
111 */
112void FragmentController::addJobs(std::vector<FragmentJob::ptr> &jobs)
113{
114 SendJobsOperation *sendjobs = static_cast<SendJobsOperation *>(
115 Commands.getByName("sendjobs"));
116 // place all ids in internal list
117 for (std::vector<FragmentJob::ptr>::const_iterator iter = jobs.begin();
118 iter != jobs.end(); ++iter) {
119#ifndef NDEBUG
120 std::pair< std::set<JobId_t>::iterator,bool > inserter =
121#endif
122 jobids.insert((*iter)->getId());
123 ASSERT( inserter.second,
124 "FragmentController::addJobs() - id "+toString((*iter)->getId())
125 +" is presently marked as sent to the server.");
126 }
127 // mark as to be sent to server
128 sendjobs->addJobs(jobs);
129 const size_t presentJobs = sendjobs->getPresentJobs();
130 LOG(1, "INFO: #" << presentJobs << " jobs are now waiting to be transmitted.");
131}
132
133/** Sends contained jobs in operation to server
134 *
135 * @param host address of server
136 * @param service port/service of server
137 */
138void FragmentController::sendJobs(
139 const std::string &host,
140 const std::string &service)
141{
142 SendJobsOperation *sendjobs = static_cast<SendJobsOperation *>(
143 Commands.getByName("sendjobs"));
144 const size_t presentJobs = sendjobs->getPresentJobs();
145 LOG(1, "INFO: #" << presentJobs << " jobs are being sent to the server.");
146 (*sendjobs)(host, service);
147}
148
149/** Obtains scheduled and done jobs from server
150 *
151 * @param host address of server
152 * @param service port/service of server
153 */
154void FragmentController::checkResults(
155 const std::string &host,
156 const std::string &service)
157{
158 CheckResultsOperation *checkres = static_cast<CheckResultsOperation *>(
159 Commands.getByName("checkresults"));
160 checkres->setJobIds(jobids);
161 (*checkres)(host, service);
162}
163
164/** Return scheduled and done jobs.
165 *
166 * @return pair of number of still scheduled and already done jobs
167 */
168std::pair<size_t, size_t> FragmentController::getJobStatus() const
169{
170 const CheckResultsOperation * const checkres = static_cast<const CheckResultsOperation *>(
171 Commands.getByName("checkresults"));
172 const size_t doneJobs = checkres->getDoneJobs();
173 const size_t presentJobs = checkres->getPresentJobs();
174 return make_pair(presentJobs, doneJobs);
175}
176
177/** Requests removal of all pending results from server.
178 *
179 * @param host address of server
180 * @param service port/service of server
181 */
182void FragmentController::removeWaitingResults(
183 const std::string &host,
184 const std::string &service)
185{
186 RemoveAllResultsOperation *removeall = static_cast<RemoveAllResultsOperation *>(
187 Commands.getByName("removeallresults"));
188 (*removeall)(host, service);
189}
190
191/** Requests removal of all pending jobs from server.
192 *
193 * @param host address of server
194 * @param service port/service of server
195 */
196void FragmentController::removeWaitingJobs(
197 const std::string &host,
198 const std::string &service)
199{
200 RemoveAllJobsOperation *removeall = static_cast<RemoveAllJobsOperation *>(
201 Commands.getByName("removealljobs"));
202 (*removeall)(host, service);
203}
204
205/** Requests removal of all idle workers from server.
206 *
207 * @param host address of server
208 * @param service port/service of server
209 */
210void FragmentController::removeall(
211 const std::string &host,
212 const std::string &service)
213{
214 RemoveAllWorkerOperation *removeall = static_cast<RemoveAllWorkerOperation *>(
215 Commands.getByName("removeallworker"));
216 (*removeall)(host, service);
217}
218
219/** Obtains results from done jobs from server.
220 *
221 * @param host address of server
222 * @param service port/service of server
223 */
224void FragmentController::receiveResults(
225 const std::string &host,
226 const std::string &service)
227{
228 ReceiveResultsOperation *receiveres = static_cast<ReceiveResultsOperation *>(
229 Commands.getByName("receiveresults"));
230 receiveres->setJobIds(jobids);
231 (*receiveres)(host, service);
232}
233
234/** Getter for received results.
235 *
236 * @return vector with all received results
237 */
238std::vector<FragmentResult::ptr> FragmentController::getReceivedResults()
239{
240 ReceiveResultsOperation *receiveres = static_cast<ReceiveResultsOperation *>(
241 Commands.getByName("receiveresults"));
242 const std::vector<FragmentResult::ptr> &results = receiveres->getResults();
243 for (std::vector<FragmentResult::ptr>::const_iterator iter = results.begin();
244 iter != results.end(); ++iter) {
245// ASSERT( jobids.count((*iter)->getId()),
246// "FragmentController::getReceivedResults() - id "
247// +toString((*iter)->getId())+" not present in jobids.");
248 if (jobids.count((*iter)->getId()))
249 jobids.erase((*iter)->getId());
250 }
251 return results;
252}
253
254/** Sends shutdown signal to server
255 *
256 * @param host address of server
257 * @param service port/service of server
258 */
259void FragmentController::shutdown(
260 const std::string &host,
261 const std::string &service)
262{
263 ShutdownOperation *shutdown = static_cast<ShutdownOperation *>(
264 Commands.getByName("shutdown"));
265 (*shutdown)(host, service);
266}
267
268/** Helper function to allow all derived controllers to change a job's id.
269 *
270 * As the controller is the one that obtains a unique id, it must also have
271 * the authority to set/change a FragmentJob's id.
272 *
273 * \param _newid job is set to this id
274 */
275void FragmentController::changeJobId(FragmentJob::ptr &job, const JobId_t _newid) const
276{
277 job->setId(_newid);
278}
Note: See TracBrowser for help on using the repository browser.