/* * FragmentScheduler.hpp * * Created on: Oct 19, 2011 * Author: heber */ #ifndef FRAGMENTSCHEDULER_HPP_ #define FRAGMENTSCHEDULER_HPP_ // include config.h #ifdef HAVE_CONFIG_H #include #endif #include #include #include #include "Connection.hpp" #include "ControllerChoices.hpp" #include "Controller/Commands/SendJobToWorkerOperation.hpp" #include "FragmentQueue.hpp" #include "GlobalJobId.hpp" #include "Jobs/FragmentJob.hpp" #include "Listener.hpp" #include "Results/FragmentResult.hpp" #include "types.hpp" #include "Pool/WorkerPool.hpp" #include "WorkerAddress.hpp" /** FragmentScheduler serves FragmentJobs to Workers and accepts commands from * a Controller. * */ class FragmentScheduler { public: /// Constructor opens the acceptor and starts waiting for the first incoming /// Connection. FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport); private: void sendJobToWorker(const WorkerAddress &address, FragmentJob::ptr &job); // void shutdownWorker(const WorkerAddress &address); // void removeAllWorkers(); class WorkerListener_t : public Listener { public: WorkerListener_t( boost::asio::io_service& io_service, unsigned short port, FragmentQueue &_JobsQueue, WorkerPool &_pool, boost::function _callback) : Listener(io_service, port), address("127.0.0.1", "0"), JobsQueue(_JobsQueue), pool(_pool), result( new FragmentResult(JobId::NoJob) ), callback_sendJobToWorker(_callback) {} virtual ~WorkerListener_t() {} protected: /// Handle completion of a accept worker operation. void handle_Accept(const boost::system::error_code& e, connection_ptr conn); /// Worker callback function when job has been sent. void handle_checkAddress(const boost::system::error_code& e, connection_ptr conn); /// Worker callback function when new worker has enrolled. void handle_enrolled(const boost::system::error_code& e, connection_ptr conn); /// Worker callback function when result has been received. void handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn); private: //!> address of new Worker WorkerAddress address; //!> reference to Queue FragmentQueue &JobsQueue; //!> callback reference to container class WorkerPool &pool; /// result that is received from the client. FragmentResult::ptr result; //!> callback function to access send job function boost::function callback_sendJobToWorker; // static entity to indicate to clients that the queue is empty. static FragmentJob::ptr NoJob; }; class ControllerListener_t : public Listener { public: ControllerListener_t( boost::asio::io_service& io_service, unsigned short port, FragmentQueue &_JobsQueue, boost::function _initiateWorkerSocket) : Listener(io_service, port), JobsQueue(_JobsQueue), jobInfo((size_t)2, 0), choice(NoControllerOperation), globalId(0), initiateWorkerSocket(_initiateWorkerSocket) {} virtual ~ControllerListener_t() {} protected: /// Handle completion of a accept controller operation. void handle_Accept(const boost::system::error_code& e, connection_ptr conn); /// Handle completion of controller operation to read choice void handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn); /// Controller callback function when job has been sent. void handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn); /// Controller callback function when checking on state of results. void handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn); /// Controller callback function when checking on state of results. void handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn); /// Controller callback function when result has been received. void handle_SendResults(const boost::system::error_code& e, connection_ptr conn); private: //!> reference to external FragmentQueue containing jobs to work on FragmentQueue & JobsQueue; /// bunch of jobs received from controller before placed in JobsQueue std::vector jobs; /// number of jobs that are waiting to be and are calculated, required for returning status std::vector jobInfo; // choice enum ControllerChoices choice; // TODO: replace this instance by a IdPool. //!> global id to give next available job id GlobalJobId globalId; //!> callback function to tell that worker socket should be enabled boost::function initiateWorkerSocket; }; private: /// Queue with data to be sent to each client. FragmentQueue JobsQueue; //!> Pool of Workers WorkerPool pool; //!> Listener instance that waits for a worker WorkerListener_t WorkerListener; //!> Listener instance that waits for a controller ControllerListener_t ControllerListener; public: /** Getter for Exitflag. * * @return Exitflag of operations */ size_t getExitflag() const { if (WorkerListener.getExitflag() != 0) return WorkerListener.getExitflag(); if (ControllerListener.getExitflag() != 0) return ControllerListener.getExitflag(); return 0; } private: //!> Connection for sending jobs to workers Connection connection; //!> internal operation to send jobs to workers mutable SendJobToWorkerOperation sendJobOp; }; #endif /* FRAGMENTSCHEDULER_HPP_ */