- Timestamp:
- Aug 11, 2025, 5:45:37 PM (3 months ago)
- Branches:
- Candidate_v1.7.0, stable
- Children:
- ae9ad6
- Parents:
- aeec58
- git-author:
- Frederik Heber <frederik.heber@…> (07/16/25 19:05:29)
- git-committer:
- Frederik Heber <frederik.heber@…> (08/11/25 17:45:37)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
ThirdParty/JobMarket/src/JobMarket/FragmentScheduler.hpp
raeec58 rab2e834 17 17 #include <boost/function.hpp> 18 18 #include <boost/shared_ptr.hpp> 19 #include <boost/thread.hpp>20 19 #include <deque> 21 20 #include <set> … … 27 26 #include "JobMarket/Operations/AsyncOperation.hpp" 28 27 #include "JobMarket/Operations/OperationQueue.hpp" 29 #include "Operations/Servers/ShutdownWorkerOperation.hpp"30 28 #include "JobMarket/ControllerChoices.hpp" 31 29 #include "JobMarket/FragmentQueue.hpp" … … 35 33 #include "JobMarket/Results/FragmentResult.hpp" 36 34 #include "JobMarket/types.hpp" 37 #include "JobMarket/Pool/PoolGuard.hpp"38 #include "JobMarket/Pool/WorkerPool.hpp"39 35 #include "JobMarket/WorkerAddress.hpp" 40 36 #include "JobMarket/WorkerChoices.hpp" … … 53 49 unsigned short workerport, 54 50 unsigned short controllerport, 55 const size_t timeout,56 bool guardFromStart51 const WorkerAddress _workeraddress, 52 const size_t timeout 57 53 ); 58 54 ~FragmentScheduler(); … … 63 59 void sendAvailableJobToNextIdleWorker(); 64 60 bool shutdown(); 65 void shutdownWorker(const WorkerAddress &address);66 61 void shutdownAfterSignal(); 67 void removeAllWorkers();68 void removeAllIdleWorkers();69 void removeWorker(const WorkerAddress address);70 void unmarkWorkerBusy(const WorkerAddress address);71 62 void cleanupOperationQueue(AsyncOperation *op); 72 63 void workerAcceptsJob(const WorkerAddress _address, const JobId_t _id); … … 84 75 unsigned short port, 85 76 FragmentQueue &_JobsQueue, 86 WorkerPool &_pool, 87 boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> _callback, 88 boost::function<void (const WorkerAddress)> _unmarkBusyWorkerFunction) : 77 boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> _callback) : 89 78 Listener(io_service, port), 90 79 JobsQueue(_JobsQueue), 91 pool(_pool), 92 callback_sendJobToWorker(_callback), 93 unmarkBusyWorkerFunction(_unmarkBusyWorkerFunction) 80 callback_sendJobToWorker(_callback) 94 81 {} 95 82 virtual ~WorkerListener_t() {} … … 136 123 void handle_ReadAddress(const boost::system::error_code& e, connection_ptr conn, HandlerData::ptr &data); 137 124 138 /// Worker callback function when new worker has enrolled.139 void handle_enrolled(const boost::system::error_code& e, connection_ptr conn, HandlerData::ptr &data);140 141 /// Worker callback function when worker has been informed of successful removal.142 void handle_removed(const boost::system::error_code& e, connection_ptr conn, HandlerData::ptr &data);143 144 125 /// Worker callback function when result has been received. 145 126 void handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn, HandlerData::ptr &data); … … 152 133 FragmentQueue &JobsQueue; 153 134 154 //!> callback reference to container class155 WorkerPool &pool;156 157 135 //!> callback function to access send job function 158 136 boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> callback_sendJobToWorker; 159 160 //!> function to call when a busy worker has finished and sent its result161 boost::function<void (const WorkerAddress)> unmarkBusyWorkerFunction;162 137 }; 163 138 … … 169 144 unsigned short port, 170 145 FragmentQueue &_JobsQueue, 171 WorkerPool &_pool,172 boost::function<void ()> _removeallWorkers,173 146 boost::function<bool ()> _shutdownAllSockets) : 174 147 Listener(io_service, port), 175 148 JobsQueue(_JobsQueue), 176 pool(_pool),177 149 jobInfo((size_t)2, 0), 178 150 NumberIds(0), 179 151 choice(NoControllerOperation), 180 removeallWorkers(_removeallWorkers),181 152 shutdownAllSockets(_shutdownAllSockets), 182 153 globalId(0) … … 209 180 void handle_SendResults(const boost::system::error_code& e, connection_ptr conn); 210 181 211 /// Controller callback function when result are to be sent.212 void handle_SendNumberWorkers(const boost::system::error_code& e, connection_ptr conn);213 214 182 /// Controller callback function when results has been sent. 215 183 void handle_finishSendResults(const boost::system::error_code& e, connection_ptr conn); … … 219 187 FragmentQueue & JobsQueue; 220 188 221 //!> const reference to external pool of Workers222 const WorkerPool & pool;223 224 189 //!> bunch of jobs received from controller before placed in JobsQueue 225 190 std::vector<FragmentJob::ptr> jobs; … … 236 201 //!> choice 237 202 enum ControllerChoices choice; 238 239 //!> bound function to remove all workers240 boost::function<void ()> removeallWorkers;241 203 242 204 //!> bound function to shutdown all sockets … … 258 220 FragmentQueue JobsQueue; 259 221 260 //!> Pool of Workers261 WorkerPool pool;262 263 222 //!> Listener instance that waits for a worker 264 223 WorkerListener_t WorkerListener; … … 266 225 //!> Listener instance that waits for a controller 267 226 ControllerListener_t ControllerListener; 227 228 //!> Ingress address to reach the next worker 229 const WorkerAddress workeraddress; 268 230 269 231 public: … … 287 249 //!> internal queue for all asynchronous operations 288 250 OperationQueue OpQueue; 289 290 private:291 //!> Guard that checks if workers are unreachable292 PoolGuard guard;293 294 //!> shutdown thread to keep the operation queue free to run295 boost::thread *shutdown_thread;296 251 }; 297 252
Note:
See TracChangeset
for help on using the changeset viewer.
