Changeset ab2e834
- Timestamp:
- Aug 11, 2025, 5:45:37 PM (3 months ago)
- Branches:
- Candidate_v1.7.0, stable
- Children:
- ae9ad6
- Parents:
- aeec58
- git-author:
- Frederik Heber <frederik.heber@…> (07/16/25 19:05:29)
- git-committer:
- Frederik Heber <frederik.heber@…> (08/11/25 17:45:37)
- Location:
- ThirdParty/JobMarket
- Files:
-
- 24 deleted
- 27 edited
-
src/JobMarket/Controller/FragmentController.cpp (modified) (5 diffs)
-
src/JobMarket/Controller/FragmentController.hpp (modified) (1 diff)
-
src/JobMarket/Controller/controller_AddOn.hpp (modified) (1 diff)
-
src/JobMarket/Controller/controller_main.cpp (modified) (3 diffs)
-
src/JobMarket/ControllerChoices.hpp (modified) (1 diff)
-
src/JobMarket/FragmentScheduler.cpp (modified) (13 diffs)
-
src/JobMarket/FragmentScheduler.hpp (modified) (15 diffs)
-
src/JobMarket/FragmentScheduler_ControllerListener.cpp (modified) (2 diffs)
-
src/JobMarket/FragmentScheduler_WorkerListener.cpp (modified) (3 diffs)
-
src/JobMarket/Makefile.am (modified) (3 diffs)
-
src/JobMarket/Operations/Controllers/GetNumberWorkersOperation.cpp (deleted)
-
src/JobMarket/Operations/Controllers/GetNumberWorkersOperation.hpp (deleted)
-
src/JobMarket/Operations/Controllers/RemoveAllWorkerOperation.cpp (deleted)
-
src/JobMarket/Operations/Controllers/RemoveAllWorkerOperation.hpp (deleted)
-
src/JobMarket/Operations/Servers/CheckAliveWorkerOperation.cpp (deleted)
-
src/JobMarket/Operations/Servers/CheckAliveWorkerOperation.hpp (deleted)
-
src/JobMarket/Operations/Servers/ShutdownWorkerOperation.cpp (deleted)
-
src/JobMarket/Operations/Servers/ShutdownWorkerOperation.hpp (deleted)
-
src/JobMarket/Operations/Workers/EnrollInPoolOperation.cpp (deleted)
-
src/JobMarket/Operations/Workers/EnrollInPoolOperation.hpp (deleted)
-
src/JobMarket/Operations/Workers/RemoveFromPoolOperation.cpp (deleted)
-
src/JobMarket/Operations/Workers/RemoveFromPoolOperation.hpp (deleted)
-
src/JobMarket/Pool/PoolGuard.cpp (deleted)
-
src/JobMarket/Pool/PoolGuard.hpp (deleted)
-
src/JobMarket/Pool/PoolWorker.cpp (modified) (6 diffs)
-
src/JobMarket/Pool/PoolWorker.hpp (modified) (2 diffs)
-
src/JobMarket/Pool/ServerPing.cpp (deleted)
-
src/JobMarket/Pool/ServerPing.hpp (deleted)
-
src/JobMarket/Pool/WorkerPool.cpp (deleted)
-
src/JobMarket/Pool/WorkerPool.hpp (deleted)
-
src/JobMarket/ServerChoices.hpp (modified) (1 diff)
-
src/JobMarket/ServerOptions.cpp (modified) (1 diff)
-
src/JobMarket/ServerOptions.hpp (modified) (2 diffs)
-
src/JobMarket/WorkerChoices.hpp (modified) (1 diff)
-
src/JobMarket/server_main.cpp (modified) (3 diffs)
-
src/unittests/Makefile.am (modified) (3 diffs)
-
src/unittests/WorkerPoolUnitTest.cpp (deleted)
-
src/unittests/WorkerPoolUnitTest.hpp (deleted)
-
tests/regression/Makefile.am (modified) (1 diff)
-
tests/regression/testsuite-addingjobs.at (modified) (2 diffs)
-
tests/regression/testsuite-checkalive.at (deleted)
-
tests/regression/testsuite-checkstate.at (modified) (1 diff)
-
tests/regression/testsuite-completerun.at (modified) (1 diff)
-
tests/regression/testsuite-enrollinpool.at (deleted)
-
tests/regression/testsuite-falsehost.at (deleted)
-
tests/regression/testsuite-getresults.at (modified) (1 diff)
-
tests/regression/testsuite-numberworkers.at (deleted)
-
tests/regression/testsuite-resubmitjobs.at (modified) (1 diff)
-
tests/regression/testsuite-server-shutdown.at (modified) (1 diff)
-
tests/regression/testsuite-server-worker.at (modified) (1 diff)
-
tests/regression/testsuite.at (modified) (2 diffs)
Legend:
- Unmodified
- Added
- Removed
-
ThirdParty/JobMarket/src/JobMarket/Controller/FragmentController.cpp
raeec58 rab2e834 31 31 #include "Operations/Controllers/CheckResultsOperation.hpp" 32 32 #include "Operations/Controllers/GetNextJobIdOperation.hpp" 33 #include "Operations/Controllers/GetNumberWorkersOperation.hpp"34 33 #include "Operations/Controllers/ReceiveResultsOperation.hpp" 35 34 #include "Operations/Controllers/RemoveAllJobsOperation.hpp" 36 35 #include "Operations/Controllers/RemoveAllResultsOperation.hpp" 37 #include "Operations/Controllers/RemoveAllWorkerOperation.hpp"38 36 #include "Operations/Controllers/SendJobsOperation.hpp" 39 37 #include "Operations/Controllers/ShutdownOperation.hpp" … … 57 55 Commands.registerInstance(new CheckResultsOperation(connection_, AsyncOperation::NoOpCallback, failed)); 58 56 Commands.registerInstance(new GetNextJobIdOperation(connection_, AsyncOperation::NoOpCallback, failed)); 59 Commands.registerInstance(new GetNumberWorkersOperation(connection_, AsyncOperation::NoOpCallback, failed));60 57 Commands.registerInstance(new ReceiveResultsOperation(connection_, AsyncOperation::NoOpCallback, failed)); 61 58 Commands.registerInstance(new RemoveAllJobsOperation(connection_)); 62 59 Commands.registerInstance(new RemoveAllResultsOperation(connection_)); 63 Commands.registerInstance(new RemoveAllWorkerOperation(connection_));64 60 Commands.registerInstance(new SendJobsOperation(connection_, AsyncOperation::NoOpCallback, failed)); 65 61 Commands.registerInstance(new ShutdownOperation(connection_)); … … 164 160 } 165 161 166 /** Obtains number of workers enrolled at server167 *168 * @param host address of server169 * @param service port/service of server170 */171 void FragmentController::checkEnrolledWorkers(172 const std::string &host,173 const std::string &service)174 {175 GetNumberWorkersOperation *numberworkers = static_cast<GetNumberWorkersOperation *>(176 Commands.getByName("numberworkers"));177 (*numberworkers)(host, service);178 }179 180 162 /** Return scheduled and done jobs. 181 163 * … … 191 173 } 192 174 193 /** Return scheduled and done jobs.194 *195 * @return number of workers in certain states enrolled in server196 */197 std::vector<size_t> FragmentController::getNumberOfWorkers()198 {199 const GetNumberWorkersOperation * const numberworkers = static_cast<const GetNumberWorkersOperation *>(200 Commands.getByName("numberworkers"));201 return numberworkers->get();202 }203 204 175 /** Requests removal of all pending results from server. 205 176 * … … 228 199 RemoveAllJobsOperation *removeall = static_cast<RemoveAllJobsOperation *>( 229 200 Commands.getByName("removealljobs")); 230 (*removeall)(host, service);231 setExitflagFromStatus(*removeall);232 }233 234 /** Requests removal of all idle workers from server.235 *236 * @param host address of server237 * @param service port/service of server238 */239 void FragmentController::removeall(240 const std::string &host,241 const std::string &service)242 {243 RemoveAllWorkerOperation *removeall = static_cast<RemoveAllWorkerOperation *>(244 Commands.getByName("removeallworker"));245 201 (*removeall)(host, service); 246 202 setExitflagFromStatus(*removeall); -
ThirdParty/JobMarket/src/JobMarket/Controller/FragmentController.hpp
raeec58 rab2e834 49 49 void checkResults(const std::string &host, const std::string &service); 50 50 std::pair<size_t, size_t> getJobStatus() const; 51 void checkEnrolledWorkers(const std::string &host, const std::string &service);52 51 std::vector<size_t> getNumberOfWorkers(); 53 52 void removeWaitingResults(const std::string &host, const std::string &service); 54 53 void removeWaitingJobs(const std::string &host, const std::string &service); 55 void removeall(const std::string &host, const std::string &service);56 54 void receiveResults(const std::string &host, const std::string &service); 57 55 std::vector<FragmentResult::ptr> getReceivedResults(); -
ThirdParty/JobMarket/src/JobMarket/Controller/controller_AddOn.hpp
raeec58 rab2e834 54 54 */ 55 55 struct controller_AddOn { 56 virtual ~controller_AddOn() {} 57 56 58 /** Function to allocate a derived variant of ControllerOptions which might 57 59 * be extended by own member variables and parsing functions. -
ThirdParty/JobMarket/src/JobMarket/Controller/controller_main.cpp
raeec58 rab2e834 57 57 } 58 58 59 /** Print the status of workers60 *61 * @param number_workers vector of workers in certain states (busy, idle, ...)62 */63 void printNumberWorkers(const std::vector<size_t> &number_workers)64 {65 ASSERT(number_workers.size() == 2,66 "printNumberWorkers() - vector of worker numbers is not of size 2.");67 LOG(1, "INFO: " << number_workers[0] << " are busy and "68 << number_workers[1] << " are idle.");69 }70 71 59 inline std::vector<std::string> getListOfCommands(const ControllerCommandRegistry &ControllerCommands) 72 60 { … … 108 96 (boost::bind(&printJobStatus, 109 97 boost::bind(&FragmentController::getJobStatus, boost::ref(controller)))) 110 ));111 registrator(new ControllerCommand("numberworkers",112 boost::assign::list_of< ControllerCommand::commands_t >113 (boost::bind(&FragmentController::checkEnrolledWorkers,114 boost::ref(controller), boost::cref(ControllerInfo->server), boost::cref(ControllerInfo->serverport)))115 (boost::bind(&printNumberWorkers,116 boost::bind(&FragmentController::getNumberOfWorkers, boost::ref(controller))))117 ));118 registrator(new ControllerCommand("removeall",119 boost::assign::list_of< ControllerCommand::commands_t >120 (boost::bind(&FragmentController::removeall,121 boost::ref(controller), boost::cref(ControllerInfo->server), boost::cref(ControllerInfo->serverport)))122 98 )); 123 99 registrator(new ControllerCommand("removealljobs", … … 176 152 { 177 153 // execute each command in the queue synchronously 178 size_t phase = 1;154 //size_t phase = 1; 179 155 for (ControllerCommand::const_iterator iter = commands->begin(); 180 156 iter != commands->end(); ++iter) { -
ThirdParty/JobMarket/src/JobMarket/ControllerChoices.hpp
raeec58 rab2e834 23 23 EmptyResultQueue, 24 24 EmptyJobQueue, 25 RemoveAll,26 25 ShutdownControllerSocket, 27 NumberOfWorkers28 26 }; 29 27 -
ThirdParty/JobMarket/src/JobMarket/FragmentScheduler.cpp
raeec58 rab2e834 35 35 #include "CodePatterns/Log.hpp" 36 36 #include "CodePatterns/Observer/Notification.hpp" 37 #include "Operations/Servers/SendJobToWorkerOperation.hpp"38 #include "Operations/Servers/ShutdownWorkerOperation.hpp"39 #include "Operations/Workers/EnrollInPoolOperation.hpp"40 37 #include "JobMarket/JobId.hpp" 38 #include "JobMarket/Operations/Servers/SendJobToWorkerOperation.hpp" 39 41 40 42 41 #include "JobMarket/FragmentScheduler.hpp" … … 55 54 unsigned short workerport, 56 55 unsigned short controllerport, 57 const size_t timeout,58 bool guardFromStart) :56 const WorkerAddress _workeraddress, 57 const size_t timeout) : 59 58 Observer("FragmentScheduler"), 60 59 io_service(_io_service), 61 WorkerListener(_io_service, workerport, JobsQueue, pool, 62 boost::bind(&FragmentScheduler::sendJobToWorker, boost::ref(*this), _1, _2), 63 boost::bind(&FragmentScheduler::unmarkWorkerBusy, boost::ref(*this), _1)), 64 ControllerListener(_io_service, controllerport, JobsQueue, pool, 65 boost::bind(&FragmentScheduler::removeAllWorkers, boost::ref(*this)), 60 WorkerListener(_io_service, workerport, JobsQueue, 61 boost::bind(&FragmentScheduler::sendJobToWorker, boost::ref(*this), _1, _2)), 62 ControllerListener(_io_service, controllerport, JobsQueue, 66 63 boost::bind(&FragmentScheduler::shutdown, boost::ref(*this))), 64 workeraddress(_workeraddress), 67 65 connection(_io_service), 68 OpQueue(boost::bind(&FragmentScheduler::removeWorker, boost::ref(*this), _1)), 69 guard(_io_service, timeout, connection, guardFromStart, 70 boost::bind(&FragmentScheduler::removeWorker, boost::ref(*this), _1), 71 boost::bind(&FragmentQueue::resubmitJob, boost::ref(JobsQueue), _1), 72 boost::bind(&FragmentQueue::isResultPresent, boost::cref(JobsQueue), _1), 73 pool.getPoolOfWorkers(), 74 OpQueue), 75 shutdown_thread(NULL) 66 OpQueue() 76 67 { 77 68 DEBUG_FUNCTION_ENTRYEXIT … … 80 71 && (ControllerListener.getExitflag() == Listener::OkFlag)) { 81 72 // sign on to idle workers and present jobs 82 pool.signOn(this, WorkerPool::WorkerIdle);83 73 JobsQueue.signOn(this, FragmentQueue::JobAdded); 84 74 … … 95 85 FragmentScheduler::~FragmentScheduler() 96 86 { 97 if (shutdown_thread != NULL) {98 // notify shutdown thread that we wish to terminate99 shutdown_thread->interrupt();100 // wait till it ends101 shutdown_thread->join();102 delete shutdown_thread;103 }104 105 87 // sign off 106 pool.signOff(this, WorkerPool::WorkerIdle);107 88 JobsQueue.signOff(this, FragmentQueue::JobAdded); 108 89 } … … 123 104 void FragmentScheduler::sendJobToWorker(const WorkerAddress &address, FragmentJob::ptr &job) 124 105 { 125 ASSERT( pool.isWorkerBusy(address),126 "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is not marked as busy.");127 106 LOG(1, "INFO: Sending job " << job->getId() << " to worker " << address << "."); 128 107 … … 145 124 void FragmentScheduler::workerAcceptsJob(const WorkerAddress _address, const JobId_t _id) 146 125 { 147 // inform guard and if not already running, also launch guard148 guard.addBusyWorker(_address, _id);149 if (!guard.isRunning())150 guard.start();151 152 126 } 153 127 … … 160 134 void FragmentScheduler::workerRejectsJob(const WorkerAddress _address, const JobId_t _id) 161 135 { 162 unmarkWorkerBusy(_address);163 136 JobsQueue.resubmitJob(_id); 164 }165 166 /** Helper function to shutdown a single worker.167 *168 * We send NoJob to indicate shutdown169 *170 * @param address of worker to shutdown171 */172 void FragmentScheduler::shutdownWorker(const WorkerAddress &address)173 {174 ASSERT( !pool.isWorkerBusy(address),175 "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is already busy.");176 LOG(2, "INFO: Shutting down worker " << address << "...");177 AsyncOperation *shutdownWorkerOp = new ShutdownWorkerOperation(connection);178 OpQueue.push_back(shutdownWorkerOp, address);179 }180 181 /** Sends shutdown to all current workers in the pool.182 *183 * This only works if called in an extra thread that does not block the main184 * thread. Otherwise, the \a OperationQueue will not be able to send out185 * the shutdown operations to the workers and we loop forever.186 */187 void FragmentScheduler::removeAllWorkers()188 {189 /**190 * Some workers might still be busy. Let's remove all idle workers already.191 */192 removeAllIdleWorkers();193 194 // wait for busy workers to finish if there are any195 if (pool.hasBusyWorkers()) {196 while (pool.hasBusyWorkers()) {197 LOG(2, "DEBUG: Waiting for " << pool.getNoBusyWorkers() << " busy workers to finish ...");198 sleep(1);199 }200 201 // Finally, remove also the remaining then busy, now idle workers202 removeAllIdleWorkers();203 }204 205 // wait for remaining workers to finish206 while (pool.presentIdleWorkers()) {207 LOG(2, "DEBUG: Waiting for " << pool.getNoIdleWorkers() << " idle workers to finish ...");208 sleep(1);209 }210 }211 212 /** Sends shutdown to all currently idle workers in the pool.213 *214 */215 void FragmentScheduler::removeAllIdleWorkers()216 {217 LOG(2, "INFO: Shutting down " << pool.getNoIdleWorkers() << " idle workers ...");218 // iterate until there are no more idle workers219 // get list of all idle workers220 typedef std::vector<std::pair<std::string, std::string> > WorkerList_t;221 const WorkerList_t WorkerList = pool.getListOfIdleWorkers();222 223 // give all workers shutdown signal224 for (WorkerList_t::const_iterator iter = WorkerList.begin(); iter != WorkerList.end(); ++iter)225 shutdownWorker(WorkerAddress(iter->first, iter->second));226 137 } 227 138 … … 239 150 */ 240 151 LOG(0, "STATUS: Shutting down due to signal " << sig << "."); 241 242 // sign off from notifications such that no new jobs are given to workers243 pool.signOff(this, WorkerPool::WorkerIdle);244 245 if (shutdown_thread == NULL) {246 shutdown_thread = new boost::thread(boost::bind(&FragmentScheduler::shutdownAfterSignal, this));247 }248 152 } 249 153 … … 251 155 */ 252 156 void FragmentScheduler::shutdownAfterSignal() { 253 removeAllWorkers();254 157 shutdown(); 255 158 } … … 263 166 bool FragmentScheduler::shutdown() 264 167 { 265 if (!pool.presentIdleWorkers() && !pool.hasBusyWorkers()) { 266 LOG(1, "INFO: Shutting all down ..."); 267 268 // close the guard's watch 269 guard.stop(); 270 271 /// close the worker listener's socket (until here need to accept results still) 272 WorkerListener.closeSocket(); 273 274 /// close the controller listener's socket 275 ControllerListener.closeSocket(); 276 277 /// finally, stop the io_service 278 sleep(1); 279 io_service.stop(); 280 281 return true; 282 } else { 283 ELOG(2, "There are still idle or busy workers present."); 284 return false; 285 } 168 LOG(1, "INFO: Shutting all down ..."); 169 170 /// close the worker listener's socket (until here need to accept results still) 171 WorkerListener.closeSocket(); 172 173 /// close the controller listener's socket 174 ControllerListener.closeSocket(); 175 176 /// finally, stop the io_service 177 sleep(1); 178 io_service.stop(); 179 180 return true; 286 181 } 287 182 … … 291 186 void FragmentScheduler::sendAvailableJobToNextIdleWorker() 292 187 { 293 const WorkerAddress address = pool.getNextIdleWorker();294 188 FragmentJob::ptr job = JobsQueue.popJob(); 295 sendJobToWorker( address, job);189 sendJobToWorker(workeraddress, job); 296 190 } 297 191 … … 303 197 void FragmentScheduler::recieveNotification(Observable *publisher, Notification_ptr notification) 304 198 { 305 if ((publisher == &pool) && (notification->getChannelNo() == WorkerPool::WorkerIdle)) { 306 // we have an idle worker 307 LOG(1, "INFO: We are notified of an idle worker."); 308 // are jobs available? 309 if (JobsQueue.isJobPresent()) { 310 sendAvailableJobToNextIdleWorker(); 311 } 312 } else if ((publisher == &JobsQueue) && (notification->getChannelNo() == FragmentQueue::JobAdded)) { 199 if ((publisher == &JobsQueue) && (notification->getChannelNo() == FragmentQueue::JobAdded)) { 313 200 // we have new jobs 314 201 LOG(1, "INFO: We are notified of a new job."); 315 // check for idle workers 316 if (pool.presentIdleWorkers()) { 317 sendAvailableJobToNextIdleWorker(); 318 } 202 sendAvailableJobToNextIdleWorker(); 319 203 } else { 320 204 ASSERT(0, "FragmentScheduler::recieveNotification() - we are not signed on for updates in channel " … … 325 209 void FragmentScheduler::subjectKilled(Observable *publisher) 326 210 {} 327 328 /** Removes worker at \a address and stops guard in case of last busy one.329 *330 * This is is specifically meant to remove a busy worker as in this case we331 * might also have to call PoolGuard::stop() if it has been the last busy332 * worker.333 *334 * @param address address of worker to remove.335 */336 void FragmentScheduler::removeWorker(const WorkerAddress address)337 {338 pool.removeWorker(address);339 }340 341 /** Unmarks worker from being busy in WorkerPool.342 *343 * This is required to catch when a worker has finished working on a job and to344 * inform the PoolGuard345 *346 * @param address address of once busy worker347 */348 void FragmentScheduler::unmarkWorkerBusy(const WorkerAddress address)349 {350 // inform guard351 guard.removeBusyWorker(address);352 // unmark in pool353 pool.unmarkWorkerBusy(address);354 } -
ThirdParty/JobMarket/src/JobMarket/FragmentScheduler.hpp
raeec58 rab2e834 17 17 #include <boost/function.hpp> 18 18 #include <boost/shared_ptr.hpp> 19 #include <boost/thread.hpp>20 19 #include <deque> 21 20 #include <set> … … 27 26 #include "JobMarket/Operations/AsyncOperation.hpp" 28 27 #include "JobMarket/Operations/OperationQueue.hpp" 29 #include "Operations/Servers/ShutdownWorkerOperation.hpp"30 28 #include "JobMarket/ControllerChoices.hpp" 31 29 #include "JobMarket/FragmentQueue.hpp" … … 35 33 #include "JobMarket/Results/FragmentResult.hpp" 36 34 #include "JobMarket/types.hpp" 37 #include "JobMarket/Pool/PoolGuard.hpp"38 #include "JobMarket/Pool/WorkerPool.hpp"39 35 #include "JobMarket/WorkerAddress.hpp" 40 36 #include "JobMarket/WorkerChoices.hpp" … … 53 49 unsigned short workerport, 54 50 unsigned short controllerport, 55 const size_t timeout,56 bool guardFromStart51 const WorkerAddress _workeraddress, 52 const size_t timeout 57 53 ); 58 54 ~FragmentScheduler(); … … 63 59 void sendAvailableJobToNextIdleWorker(); 64 60 bool shutdown(); 65 void shutdownWorker(const WorkerAddress &address);66 61 void shutdownAfterSignal(); 67 void removeAllWorkers();68 void removeAllIdleWorkers();69 void removeWorker(const WorkerAddress address);70 void unmarkWorkerBusy(const WorkerAddress address);71 62 void cleanupOperationQueue(AsyncOperation *op); 72 63 void workerAcceptsJob(const WorkerAddress _address, const JobId_t _id); … … 84 75 unsigned short port, 85 76 FragmentQueue &_JobsQueue, 86 WorkerPool &_pool, 87 boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> _callback, 88 boost::function<void (const WorkerAddress)> _unmarkBusyWorkerFunction) : 77 boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> _callback) : 89 78 Listener(io_service, port), 90 79 JobsQueue(_JobsQueue), 91 pool(_pool), 92 callback_sendJobToWorker(_callback), 93 unmarkBusyWorkerFunction(_unmarkBusyWorkerFunction) 80 callback_sendJobToWorker(_callback) 94 81 {} 95 82 virtual ~WorkerListener_t() {} … … 136 123 void handle_ReadAddress(const boost::system::error_code& e, connection_ptr conn, HandlerData::ptr &data); 137 124 138 /// Worker callback function when new worker has enrolled.139 void handle_enrolled(const boost::system::error_code& e, connection_ptr conn, HandlerData::ptr &data);140 141 /// Worker callback function when worker has been informed of successful removal.142 void handle_removed(const boost::system::error_code& e, connection_ptr conn, HandlerData::ptr &data);143 144 125 /// Worker callback function when result has been received. 145 126 void handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn, HandlerData::ptr &data); … … 152 133 FragmentQueue &JobsQueue; 153 134 154 //!> callback reference to container class155 WorkerPool &pool;156 157 135 //!> callback function to access send job function 158 136 boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> callback_sendJobToWorker; 159 160 //!> function to call when a busy worker has finished and sent its result161 boost::function<void (const WorkerAddress)> unmarkBusyWorkerFunction;162 137 }; 163 138 … … 169 144 unsigned short port, 170 145 FragmentQueue &_JobsQueue, 171 WorkerPool &_pool,172 boost::function<void ()> _removeallWorkers,173 146 boost::function<bool ()> _shutdownAllSockets) : 174 147 Listener(io_service, port), 175 148 JobsQueue(_JobsQueue), 176 pool(_pool),177 149 jobInfo((size_t)2, 0), 178 150 NumberIds(0), 179 151 choice(NoControllerOperation), 180 removeallWorkers(_removeallWorkers),181 152 shutdownAllSockets(_shutdownAllSockets), 182 153 globalId(0) … … 209 180 void handle_SendResults(const boost::system::error_code& e, connection_ptr conn); 210 181 211 /// Controller callback function when result are to be sent.212 void handle_SendNumberWorkers(const boost::system::error_code& e, connection_ptr conn);213 214 182 /// Controller callback function when results has been sent. 215 183 void handle_finishSendResults(const boost::system::error_code& e, connection_ptr conn); … … 219 187 FragmentQueue & JobsQueue; 220 188 221 //!> const reference to external pool of Workers222 const WorkerPool & pool;223 224 189 //!> bunch of jobs received from controller before placed in JobsQueue 225 190 std::vector<FragmentJob::ptr> jobs; … … 236 201 //!> choice 237 202 enum ControllerChoices choice; 238 239 //!> bound function to remove all workers240 boost::function<void ()> removeallWorkers;241 203 242 204 //!> bound function to shutdown all sockets … … 258 220 FragmentQueue JobsQueue; 259 221 260 //!> Pool of Workers261 WorkerPool pool;262 263 222 //!> Listener instance that waits for a worker 264 223 WorkerListener_t WorkerListener; … … 266 225 //!> Listener instance that waits for a controller 267 226 ControllerListener_t ControllerListener; 227 228 //!> Ingress address to reach the next worker 229 const WorkerAddress workeraddress; 268 230 269 231 public: … … 287 249 //!> internal queue for all asynchronous operations 288 250 OperationQueue OpQueue; 289 290 private:291 //!> Guard that checks if workers are unreachable292 PoolGuard guard;293 294 //!> shutdown thread to keep the operation queue free to run295 boost::thread *shutdown_thread;296 251 }; 297 252 -
ThirdParty/JobMarket/src/JobMarket/FragmentScheduler_ControllerListener.cpp
raeec58 rab2e834 136 136 JobsQueue.removeWaitingJobs(); 137 137 LOG(2, "DEBUG: Queue has " << JobsQueue.getPresentJobs() << " results."); 138 break;139 }140 case RemoveAll:141 {142 removeallWorkers();143 break;144 }145 case NumberOfWorkers:146 {147 LOG(1, "INFO: Sending total number of workers to a controller ...");148 std::vector<size_t> number_workers;149 number_workers += pool.getNoBusyWorkers();150 number_workers += pool.getNoIdleWorkers();151 conn->async_write(number_workers,152 boost::bind(&FragmentScheduler::ControllerListener_t::handle_SendNumberWorkers, this,153 boost::asio::placeholders::error, conn));154 138 break; 155 139 } … … 360 344 LOG(1, "INFO: Results have been sent."); 361 345 } 362 363 /** Controller callback function when number of workers has been sent.364 *365 * \param e error code if something went wrong366 * \param conn reference with the connection367 */368 void FragmentScheduler::ControllerListener_t::handle_SendNumberWorkers(const boost::system::error_code& e, connection_ptr conn)369 {370 DEBUG_FUNCTION_ENTRYEXIT371 // do nothing372 LOG(1, "INFO: Number of workers have been sent.");373 } -
ThirdParty/JobMarket/src/JobMarket/FragmentScheduler_WorkerListener.cpp
raeec58 rab2e834 110 110 break; 111 111 } 112 case EnrollInPool:113 {114 if (pool.presentInPool(data->address)) {115 ELOG(1, "INFO: worker "+toString(data->address)+" is already contained in pool.");116 conn->async_write(false,117 boost::bind(&FragmentScheduler::WorkerListener_t::handle_enrolled, this,118 boost::asio::placeholders::error, conn, data));119 // mark as idle if a rejoining worker120 unmarkBusyWorkerFunction(data->address);121 LOG(1, "INFO: There are " << pool.getNoTotalWorkers() << " workers in the queue, "122 << pool.getNoIdleWorkers() << " of which are idle.");123 } else {124 // insert as its new worker125 LOG(1, "INFO: Adding " << data->address << " to pool ...");126 const bool addedToPool = pool.addWorker(data->address);127 conn->async_write(addedToPool,128 boost::bind(&FragmentScheduler::WorkerListener_t::handle_enrolled, this,129 boost::asio::placeholders::error, conn, data));130 }131 break;132 }133 112 case SendResult: 134 113 { 135 if (pool.presentInPool(data->address)) { 136 // check whether its priority is busy_priority 137 if (pool.isWorkerBusy(data->address)) { 138 conn->async_read(data->result, 139 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker, this, 140 boost::asio::placeholders::error, conn, data)); 141 } else { 142 ELOG(1, "Worker " << data->address << " trying to send result who is not marked as busy."); 143 conn->async_read(data->result, 144 boost::bind(&FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker, this, 145 boost::asio::placeholders::error, conn, data)); 146 } 147 } else { 148 ELOG(1, "Worker " << data->address << " trying to send result who is not in pool."); 149 conn->async_read(data->result, 150 boost::bind(&FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker, this, 151 boost::asio::placeholders::error, conn, data)); 152 } 153 break; 154 } 155 case RemoveFromPool: 156 { 157 if (pool.presentInPool(data->address)) { 158 conn->async_write(true, 159 boost::bind(&FragmentScheduler::WorkerListener_t::handle_removed, this, 160 boost::asio::placeholders::error, conn, data)); 161 } else { 162 ELOG(1, "Shutting down Worker " << data->address << " not contained in pool."); 163 conn->async_write(false, 164 boost::bind(&FragmentScheduler::WorkerListener_t::handle_removed, this, 165 boost::asio::placeholders::error, conn, data)); 166 } 114 // check whether its priority is busy_priority 115 conn->async_read(data->result, 116 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker, this, 117 boost::asio::placeholders::error, conn, data)); 167 118 break; 168 119 } … … 179 130 else 180 131 { 181 // An error occurred. Log it and return. Since we are not starting a new182 // accept operation the io_service will run out of work to do and the183 // server will exit.184 setExitflag( ErrorFlag );185 ELOG(0, e.message());186 }187 }188 189 190 /** Callback function when new worker has enrolled.191 *192 * \param e error code if something went wrong193 * \param conn reference with the connection194 * \param data shared pointer with data associated to this chain of handlers.195 */196 void FragmentScheduler::WorkerListener_t::handle_enrolled(const boost::system::error_code& e, connection_ptr conn, HandlerData::ptr &data)197 {198 DEBUG_FUNCTION_ENTRYEXIT199 if (!e) {200 LOG(2, "DEBUG: Successfully enrolled.");201 LOG(1, "INFO: There are " << pool.getNoTotalWorkers() << " workers in the queue, "202 << pool.getNoIdleWorkers() << " of which are idle.");203 } else {204 // An error occurred. Log it and return. Since we are not starting a new205 // accept operation the io_service will run out of work to do and the206 // server will exit.207 setExitflag( ErrorFlag );208 ELOG(0, e.message());209 }210 }211 212 /** Callback function when new worker has enrolled.213 *214 * \param e error code if something went wrong215 * \param conn reference with the connection216 * \param data shared pointer with data associated to this chain of handlers.217 */218 void FragmentScheduler::WorkerListener_t::handle_removed(const boost::system::error_code& e, connection_ptr conn, HandlerData::ptr &data)219 {220 DEBUG_FUNCTION_ENTRYEXIT221 if (!e) {222 // removing present worker223 pool.removeWorker(data->address);224 } else {225 132 // An error occurred. Log it and return. Since we are not starting a new 226 133 // accept operation the io_service will run out of work to do and the … … 251 158 JobsQueue.pushResult(data->result); 252 159 253 // mark as idle254 unmarkBusyWorkerFunction(data->address);255 LOG(1, "INFO: There are " << pool.getNoTotalWorkers() << " workers in the queue, "256 << pool.getNoIdleWorkers() << " of which are idle.");257 258 160 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results."); 259 161 } -
ThirdParty/JobMarket/src/JobMarket/Makefile.am
raeec58 rab2e834 40 40 Operations/Controllers/CheckResultsOperation.cpp \ 41 41 Operations/Controllers/GetNextJobIdOperation.cpp \ 42 Operations/Controllers/GetNumberWorkersOperation.cpp \43 42 Operations/Controllers/ReceiveResultsOperation.cpp \ 44 43 Operations/Controllers/RemoveAllJobsOperation.cpp \ 45 44 Operations/Controllers/RemoveAllResultsOperation.cpp \ 46 Operations/Controllers/RemoveAllWorkerOperation.cpp \47 45 Operations/Controllers/SendJobsOperation.cpp \ 48 46 Operations/Controllers/ShutdownOperation.cpp \ 49 Operations/Servers/CheckAliveWorkerOperation.cpp \50 47 Operations/Servers/SendJobToWorkerOperation.cpp \ 51 Operations/Servers/ShutdownWorkerOperation.cpp \52 Operations/Workers/EnrollInPoolOperation.cpp \53 48 Operations/Workers/ObtainJobOperation.cpp \ 54 Operations/Workers/RemoveFromPoolOperation.cpp \55 49 Operations/Workers/SubmitResultOperation.cpp 56 50 … … 64 58 Operations/Controllers/CheckResultsOperation.hpp \ 65 59 Operations/Controllers/GetNextJobIdOperation.hpp \ 66 Operations/Controllers/GetNumberWorkersOperation.hpp \67 60 Operations/Controllers/ReceiveResultsOperation.hpp \ 68 61 Operations/Controllers/RemoveAllJobsOperation.hpp \ 69 62 Operations/Controllers/RemoveAllResultsOperation.hpp \ 70 Operations/Controllers/RemoveAllWorkerOperation.hpp \71 63 Operations/Controllers/SendJobsOperation.hpp \ 72 64 Operations/Controllers/ShutdownOperation.hpp \ 73 Operations/Servers/CheckAliveWorkerOperation.hpp \74 65 Operations/Servers/SendJobToWorkerOperation.hpp \ 75 Operations/Servers/ShutdownWorkerOperation.hpp \76 Operations/Workers/EnrollInPoolOperation.hpp \77 66 Operations/Workers/ObtainJobOperation.hpp \ 78 Operations/Workers/RemoveFromPoolOperation.hpp \79 67 Operations/Workers/SubmitResultOperation.hpp \ 80 68 ServerChoices.hpp \ … … 87 75 88 76 POOLSOURCE = \ 89 Pool/PoolGuard.cpp \ 90 Pool/WorkerPool.cpp 77 Pool/PoolWorker.cpp 91 78 92 79 POOLHEADER = \ 93 Pool/PoolGuard.hpp \ 94 Pool/WorkerPool.hpp 80 Pool/PoolWorker.hpp 95 81 96 82 noinst_LTLIBRARIES += libJobMarketPool.la -
ThirdParty/JobMarket/src/JobMarket/Pool/PoolWorker.cpp
raeec58 rab2e834 35 35 #include "CodePatterns/Log.hpp" 36 36 #include "JobMarket/Jobs/FragmentJob.hpp" 37 #include "Operations/Workers/EnrollInPoolOperation.hpp"38 #include "Operations/Workers/RemoveFromPoolOperation.hpp"39 37 #include "Operations/Workers/SubmitResultOperation.hpp" 40 38 #include "JobMarket/Results/FragmentResult.hpp" … … 71 69 72 70 if (PoolListener.getExitflag() == Listener::OkFlag) { 73 // always enroll and make listenining initiation depend on its success 74 const boost::function<void ()> initiateme = 75 boost::bind(&PoolWorker::initiateSockets, this); 76 const boost::function<void ()> shutdown = 77 boost::bind(&PoolWorker::finish, this); // no need to remove from pool 78 AsyncOperation *enrollOp = new EnrollInPoolOperation(connection_, MyAddress, initiateme, shutdown); 79 LOG(2, "DEBUG: Putting enroll in pool operation into queue ..."); 80 OpQueue.push_back(enrollOp, ServerAddress); 71 PoolListener.initiateSocket(); 72 HealthListener.initiateSocket(); 81 73 } else { 82 74 setExitflag( ErrorFlag ); 83 ELOG(0, "Not enrolling, we just exit due to failed listen bind."); 84 } 85 } 86 87 void PoolWorker::initiateSockets() 88 { 89 PoolListener.initiateSocket(); 90 HealthListener.initiateSocket(); 75 ELOG(0, "Exit due to failed listen bind."); 76 } 91 77 } 92 78 … … 146 132 { 147 133 LOG(1, "INFO: Received request for operation " << choice << "."); 148 // switch over the desired choice read previously149 bool shutdownflag = false;150 134 switch(choice) { 151 135 case NoServerOperation: 152 136 { 153 137 ELOG(1, "PoolListener_t::handle_ReadChoice() - called with NoOperation."); 154 break;155 }156 case CheckAlive:157 {158 // send address as pong to ping (check alive signal)159 conn->async_write(callback.MyAddress,160 boost::bind(&PoolWorker::PoolListener_t::handle_SendAddress, this,161 boost::asio::placeholders::error, conn));162 138 break; 163 139 } … … 178 154 break; 179 155 } 180 case ShutdownWorker:181 {182 shutdownflag = true;183 callback.shutdown();184 break;185 }186 156 } 187 157 // and listen for following connections if not to shutdown 188 if (!shutdownflag) 189 initiateSocket(); 158 initiateSocket(); 190 159 } 191 160 else … … 267 236 } 268 237 269 /// Controller callback function when address has been sent270 void PoolWorker::PoolListener_t::handle_SendAddress(const boost::system::error_code& e, connection_ptr conn)271 {272 DEBUG_FUNCTION_ENTRYEXIT273 274 if (!e)275 {276 LOG(1, "INFO: Sent address " << callback.MyAddress << " as alive confirmation.");277 }278 else279 {280 // An error occurred. Log it and return. Since we are not starting a new281 // accept operation the io_service will run out of work to do and the282 // server will exit.283 setExitflag( ErrorFlag );284 ELOG(0, e.message());285 }286 }287 288 238 void PoolWorker::PoolListener_t::handle_RemoveThread() 289 239 { … … 341 291 void PoolWorker::shutdown() 342 292 { 343 // remove us from pool 344 boost::function<void ()> closingdown = boost::bind(&PoolWorker::finish, this); 345 AsyncOperation *removeOp = new RemoveFromPoolOperation(connection_, MyAddress, closingdown, failed); 346 LOG(2, "DEBUG: Putting remove from pool operation into queue ..."); 347 OpQueue.push_back(removeOp, ServerAddress); 293 finish(); 348 294 // block queue such that io_service may stop 349 295 OpQueue.block(); -
ThirdParty/JobMarket/src/JobMarket/Pool/PoolWorker.hpp
raeec58 rab2e834 61 61 62 62 void WorkOnJob(FragmentJob::ptr &job); 63 void removeFromPool();64 63 void shutdown(int sig); 65 64 void shutdown(); … … 148 147 149 148 private: 150 void initiateSockets();151 152 private:153 149 //!> reference to io_service which we use for connections 154 150 boost::asio::io_service& io_service; 155 151 156 //!> The listener for the WorkerPool152 //!> The listener for the server 157 153 PoolListener_t PoolListener; 158 154 159 //!> The listener for the WorkerPool155 //!> The listener for the health probe 160 156 HealthProbeListener_t HealthListener; 161 157 -
ThirdParty/JobMarket/src/JobMarket/ServerChoices.hpp
raeec58 rab2e834 18 18 NoServerOperation, 19 19 ReceiveJob, 20 CheckAlive,21 ShutdownWorker22 20 }; 23 21 -
ThirdParty/JobMarket/src/JobMarket/ServerOptions.cpp
raeec58 rab2e834 74 74 } 75 75 76 bool ServerOptions::parseGuardFromStart(boost::program_options::variables_map &vm) 77 { 78 if (vm.count("guard-from-start")) { 79 try { 80 guardFromStart = vm["guard-from-start"].as< bool >(); 81 } catch (boost::bad_lexical_cast) { 82 ELOG(1, "Could not read " << guardFromStart << " as boolean."); 83 return 255; 84 } 76 int ServerOptions::parseWorkerAddress(boost::program_options::variables_map &vm) { 77 if (vm.count("workeraddress")) { 78 const std::string worker = vm["workeraddress"].as< std::string >(); 79 workeraddressport = worker.substr(worker.find_last_of(':')+1, std::string::npos); 80 workeraddresshost = worker.substr(0, worker.find_last_of(':')); 81 LOG(1, "INFO: Using " << workeraddresshost << ":" << workeraddressport << " as worker's general address."); 85 82 } else { 86 guardFromStart = false; 83 ELOG(1, "Requiring worker's address (host:port) to connect to."); 84 return 255; 87 85 } 88 LOG(1, "INFO: Using " << guardFromStart << " as guard from start.");89 86 return 0; 90 87 } -
ThirdParty/JobMarket/src/JobMarket/ServerOptions.hpp
raeec58 rab2e834 14 14 #endif 15 15 16 #include <string> 17 16 18 #include <boost/program_options.hpp> 17 19 … … 24 26 int parseControllerPort(boost::program_options::variables_map &vm); 25 27 int parseTimeout(boost::program_options::variables_map &vm); 26 bool parseGuardFromStart(boost::program_options::variables_map &vm);28 int parseWorkerAddress(boost::program_options::variables_map &vm); 27 29 28 30 unsigned short workerport; 29 31 unsigned short controllerport; 32 std::string workeraddresshost; 33 std::string workeraddressport; 30 34 size_t timeout; 31 bool guardFromStart;32 35 }; 33 36 -
ThirdParty/JobMarket/src/JobMarket/WorkerChoices.hpp
raeec58 rab2e834 17 17 enum WorkerChoices { 18 18 NoWorkerOperation, 19 EnrollInPool, 20 SendResult, 21 RemoveFromPool 19 SendResult 22 20 }; 23 21 -
ThirdParty/JobMarket/src/JobMarket/server_main.cpp
raeec58 rab2e834 55 55 ("verbosity,v", boost::program_options::value<size_t>(), "set verbosity level") 56 56 ("signal", boost::program_options::value< std::vector<size_t> >(), "set signal to catch (can be given multiple times)") 57 ("workeraddress", boost::program_options::value< std::string>(), "connect to all workers at this address (host:port)") 57 58 ("workerport", boost::program_options::value< unsigned short >(), "listen on this port for connecting workers") 58 59 ("controllerport", boost::program_options::value< unsigned short >(), "listen on this port for connecting controller") 59 60 ("timeout", boost::program_options::value< size_t >(), "timeout in seconds for alive status of workers") 60 ("guard-from-start", boost::program_options::value< bool >(), "activate pool guard from server start directly")61 61 ; 62 62 … … 78 78 status = ServerOpts.parseSignals(vm); 79 79 if (status) return status; 80 status = ServerOpts.parse GuardFromStart(vm);80 status = ServerOpts.parseWorkerAddress(vm); 81 81 if (status) return status; 82 82 … … 85 85 { 86 86 boost::asio::io_service io_service; 87 FragmentScheduler Server(io_service, ServerOpts.workerport, ServerOpts.controllerport, ServerOpts.timeout, ServerOpts.guardFromStart); 87 FragmentScheduler Server( 88 io_service, 89 ServerOpts.workerport, 90 ServerOpts.controllerport, 91 WorkerAddress(ServerOpts.workeraddresshost, ServerOpts.workeraddressport), 92 ServerOpts.timeout); 88 93 89 94 // catch ctrl-c and shutdown worker properly -
ThirdParty/JobMarket/src/unittests/Makefile.am
raeec58 rab2e834 19 19 OperationQueueUnitTest.cpp \ 20 20 SystemCommandJobUnitTest.cpp \ 21 WorkerAddressUnitTest.cpp \ 22 WorkerPoolUnitTest.cpp 21 WorkerAddressUnitTest.cpp 23 22 24 23 FRAGMENTATIONAUTOMATIONTESTSHEADERS = \ … … 28 27 OperationQueueUnitTest.hpp \ 29 28 SystemCommandJobUnitTest.hpp \ 30 WorkerAddressUnitTest.hpp \ 31 WorkerPoolUnitTest.hpp 29 WorkerAddressUnitTest.hpp 32 30 33 31 FRAGMENTATIONAUTOMATIONTESTS = \ … … 37 35 OperationQueueUnitTest \ 38 36 SystemCommandJobUnitTest \ 39 WorkerAddressUnitTest \ 40 WorkerPoolUnitTest 37 WorkerAddressUnitTest 41 38 42 39 -
ThirdParty/JobMarket/tests/regression/Makefile.am
raeec58 rab2e834 17 17 $(srcdir)/testsuite-addingjobs.at \ 18 18 $(srcdir)/testsuite-checkstate.at \ 19 $(srcdir)/testsuite-checkalive.at \20 19 $(srcdir)/testsuite-completerun.at \ 21 $(srcdir)/testsuite-enrollinpool.at \22 $(srcdir)/testsuite-falsehost.at \23 20 $(srcdir)/testsuite-getresults.at \ 24 $(srcdir)/testsuite-numberworkers.at \25 21 $(srcdir)/testsuite-resubmitjobs.at \ 26 22 $(srcdir)/testsuite-server-shutdown.at \ -
ThirdParty/JobMarket/tests/regression/testsuite-addingjobs.at
raeec58 rab2e834 10 10 11 11 # start service in background 12 ${AUTOTEST_PATH}/JobMarketServer --worker port $WORKERPORT --controllerport $CONTROLLERPORT &12 ${AUTOTEST_PATH}/JobMarketServer --workeraddress 127.0.0.1:$WORKERLISTENPORT --workerport $WORKERPORT --controllerport $CONTROLLERPORT & 13 13 server_pid=$! 14 14 AT_CHECK([sleep 1], 0, [ignore], [ignore], [kill $server_pid]) … … 18 18 AT_CHECK([fgrep "Sending 1 jobs ..." stdout], 0, [ignore], [ignore], [kill $server_pid]) 19 19 20 # a nnone NoOpJob20 # add one NoOpJob 21 21 AT_CHECK([${AUTOTEST_PATH}/JobMarketController --server 127.0.0.1:$CONTROLLERPORT --command createjobs], 0, [stdout], [ignore], [kill $server_pid]) 22 22 AT_CHECK([fgrep "Creating 1 new NoOpJob(s)." stdout], 0, [ignore], [ignore], [kill $server_pid]) -
ThirdParty/JobMarket/tests/regression/testsuite-checkstate.at
raeec58 rab2e834 7 7 WORKERPORT=11040 8 8 CONTROLLERPORT=11041 9 WORKERLISTENPORT=11042 9 10 10 11 # start service in background 11 ${AUTOTEST_PATH}/JobMarketServer --worker port $WORKERPORT --controllerport $CONTROLLERPORT &12 ${AUTOTEST_PATH}/JobMarketServer --workeraddress 127.0.0.1:$WORKERLISTENPORT --workerport $WORKERPORT --controllerport $CONTROLLERPORT & 12 13 server_pid=$! 13 14 AT_CHECK([sleep 1], 0, [ignore], [ignore], [kill $server_pid]) -
ThirdParty/JobMarket/tests/regression/testsuite-completerun.at
raeec58 rab2e834 10 10 11 11 # start service in background 12 ${AUTOTEST_PATH}/JobMarketServer --worker port $WORKERPORT --controllerport $CONTROLLERPORT &12 ${AUTOTEST_PATH}/JobMarketServer --workeraddress 127.0.0.1:$WORKERLISTENPORT --workerport $WORKERPORT --controllerport $CONTROLLERPORT & 13 13 server_pid=$! 14 14 AT_CHECK([sleep 1], 0, [ignore], [ignore], [kill $server_pid]) 15 15 16 # add thre jobs via JobAdder 16 # add a worker to work on jobs 17 ${AUTOTEST_PATH}/JobMarketPoolWorker -v 5 --signal 2 --server 127.0.0.1:${WORKERPORT} --hostname 127.0.0.1 --listen $WORKERLISTENPORT & 18 worker_pid=$! 19 20 # add three jobs via JobAdder 17 21 AT_CHECK([${AUTOTEST_PATH}/JobMarketController --server 127.0.0.1:$CONTROLLERPORT --command createjobs --executable "cat" --jobcommand "Nothing" --times 3], 0, [stdout], [ignore], [kill $server_pid]) 18 22 AT_CHECK([fgrep "Sending 3 jobs ..." stdout], 0, [ignore], [ignore], [kill $server_pid]) 19 20 # Checking results 21 AT_CHECK([${AUTOTEST_PATH}/JobMarketController --server 127.0.0.1:$CONTROLLERPORT --command checkresults], 0, [stdout], [ignore], [kill $server_pid]) 22 AT_CHECK([fgrep "#3 are waiting in the queue and #0 jobs are calculated so far." stdout], 0, [ignore], [ignore], [kill $server_pid]) 23 24 # enlist a worker to work on jobs 25 ${AUTOTEST_PATH}/JobMarketPoolWorker -v 5 --signal 2 --server 127.0.0.1:${WORKERPORT} --hostname 127.0.0.1 --listen $WORKERLISTENPORT & 26 worker_pid=$! 27 AT_CHECK([sleep 2], 0, [ignore], [ignore], [kill $server_pid $worker_pid]) 23 AT_CHECK([sleep 5], 0, [ignore], [ignore], [kill $server_pid $worker_pid]) 28 24 29 25 # send kill signal to worker such that it shuts down -
ThirdParty/JobMarket/tests/regression/testsuite-getresults.at
raeec58 rab2e834 7 7 WORKERPORT=11045 8 8 CONTROLLERPORT=11046 9 WORKERLISTENPORT=11047 9 10 10 11 # start service in background 11 ${AUTOTEST_PATH}/JobMarketServer --worker port $WORKERPORT --controllerport $CONTROLLERPORT &12 ${AUTOTEST_PATH}/JobMarketServer --workeraddress 127.0.0.1:$WORKERLISTENPORT --workerport $WORKERPORT --controllerport $CONTROLLERPORT & 12 13 server_pid=$! 13 14 AT_CHECK([sleep 1], 0, [ignore], [ignore], [kill $server_pid]) -
ThirdParty/JobMarket/tests/regression/testsuite-resubmitjobs.at
raeec58 rab2e834 10 10 11 11 # start service in background 12 ${AUTOTEST_PATH}/JobMarketServer --worker port $WORKERPORT --controllerport $CONTROLLERPORT &12 ${AUTOTEST_PATH}/JobMarketServer --workeraddress 127.0.0.1:$WORKERLISTENPORT --workerport $WORKERPORT --controllerport $CONTROLLERPORT & 13 13 server_pid=$! 14 14 AT_CHECK([sleep 1], 0, [ignore], [ignore], [kill $server_pid]) 15 16 # add a worker to work on jobs 17 ${AUTOTEST_PATH}/JobMarketPoolWorker -v 5 --signal 2 --server 127.0.0.1:${WORKERPORT} --hostname 127.0.0.1 --listen $WORKERLISTENPORT 2>workererr & 18 worker_pid=$! 15 19 16 20 # add one always failing job via JobAdder 17 21 AT_CHECK([${AUTOTEST_PATH}/JobMarketController --server 127.0.0.1:$CONTROLLERPORT --command createjobs --executable "/bin/false" --jobcommand "Nothing"], 0, [stdout], [ignore], [kill $server_pid]) 18 22 AT_CHECK([fgrep "Sending 1 jobs ..." stdout], 0, [ignore], [ignore], [kill $server_pid]) 19 20 # enlist a worker to work on jobs 21 ${AUTOTEST_PATH}/JobMarketPoolWorker -v 5 --signal 2 --server 127.0.0.1:${WORKERPORT} --hostname 127.0.0.1 --listen $WORKERLISTENPORT 2>workererr & 22 worker_pid=$! 23 AT_CHECK([sleep 2], 0, [ignore], [ignore], [kill $server_pid $worker_pid]) 23 AT_CHECK([sleep 3], 0, [ignore], [ignore], [kill $server_pid $worker_pid]) 24 24 25 25 # send kill signal to worker such that it shuts down -
ThirdParty/JobMarket/tests/regression/testsuite-server-shutdown.at
raeec58 rab2e834 7 7 WORKERPORT=11030 8 8 CONTROLLERPORT=11031 9 WORKERLISTENPORT=11032 9 10 10 11 # start service in background 11 ${AUTOTEST_PATH}/JobMarketServer --worker port $WORKERPORT --controllerport $CONTROLLERPORT &12 ${AUTOTEST_PATH}/JobMarketServer --workeraddress 127.0.0.1:$WORKERLISTENPORT --workerport $WORKERPORT --controllerport $CONTROLLERPORT & 12 13 server_pid=$! 13 14 AT_CHECK([sleep 1], 0, [ignore], [ignore], [kill $server_pid]) -
ThirdParty/JobMarket/tests/regression/testsuite-server-worker.at
raeec58 rab2e834 10 10 11 11 # start service in background 12 ${AUTOTEST_PATH}/JobMarketServer --worker port $WORKERPORT --controllerport $CONTROLLERPORT &12 ${AUTOTEST_PATH}/JobMarketServer --workeraddress 127.0.0.1:$WORKERLISTENPORT --workerport $WORKERPORT --controllerport $CONTROLLERPORT & 13 13 server_pid=$! 14 14 AT_CHECK([sleep 1], 0, [ignore], [ignore], [kill $server_pid]) 15 15 16 # add a worker to work on jobs 17 ${AUTOTEST_PATH}/JobMarketPoolWorker -v 5 --signal 2 --server 127.0.0.1:${WORKERPORT} --hostname 127.0.0.1 --listen $WORKERLISTENPORT & 18 worker_pid=$! 19 16 20 # add one jobs 17 21 AT_CHECK([${AUTOTEST_PATH}/JobMarketController --server 127.0.0.1:$CONTROLLERPORT --command createjobs --executable "cat" --jobcommand "Nothing"], 0, [ignore], [ignore], [kill $server_pid]) 18 19 # enlist a worker to work on jobs 20 ${AUTOTEST_PATH}/JobMarketPoolWorker -v 5 --signal 2 --server 127.0.0.1:${WORKERPORT} --hostname 127.0.0.1 --listen $WORKERLISTENPORT & 21 worker_pid=$! 22 AT_CHECK([sleep 2], 0, [ignore], [ignore], [kill $server_pid $worker_pid]) 22 AT_CHECK([sleep 3], 0, [ignore], [ignore], [kill $server_pid $worker_pid]) 23 23 24 24 # send kill signal to worker such that it shuts down -
ThirdParty/JobMarket/tests/regression/testsuite.at
raeec58 rab2e834 17 17 m4_include([testsuite-server-shutdown.at]) 18 18 19 # check whether poolworker may succesfully enroll in pool20 m4_include([testsuite-enrollinpool.at])21 22 19 # check whether adding jobs works 23 20 m4_include([testsuite-addingjobs.at]) 24 25 # check whether dead busy poolworker is automatically removed26 m4_include([testsuite-checkalive.at])27 21 28 22 # check whether checking state works … … 40 34 # check whether complete run works 41 35 m4_include([testsuite-completerun.at]) 42 43 # check whether false hostname does not shutdown server44 m4_include([testsuite-falsehost.at])45 46 # check whether total number of workers is returned47 m4_include([testsuite-numberworkers.at])
Note:
See TracChangeset
for help on using the changeset viewer.
