/* * Project: JobMarket * Description: asynchronous Server/Controller/Client-approach to parallel computing, based on boost::asio * Copyright (C) 2011 Frederik Heber. All rights reserved. * */ /* * \file FragmentScheduler.cpp * * This file strongly follows the Serialization example from the boost::asio * library (see server.cpp) * * Created on: Oct 19, 2011 * Author: heber */ // include config.h #ifdef HAVE_CONFIG_H #include #endif // boost asio needs specific operator new #include #include "CodePatterns/MemDebug.hpp" #include #include #include #include "JobMarket/Connection.hpp" // Must come before boost/serialization headers. #include #include #include "CodePatterns/Info.hpp" #include "CodePatterns/Log.hpp" #include "CodePatterns/Observer/Notification.hpp" #include "Operations/Servers/SendJobToWorkerOperation.hpp" #include "Operations/Servers/ShutdownWorkerOperation.hpp" #include "Operations/Workers/EnrollInPoolOperation.hpp" #include "JobMarket/JobId.hpp" #include "JobMarket/FragmentScheduler.hpp" /** Constructor of class FragmentScheduler. * * We setup both acceptors to accept connections from workers and Controller. * * \param io_service io_service of the asynchronous communications * \param workerport port to listen for worker connections * \param controllerport port to listen for controller connections. * \param timeout interval in seconds for how often to check on workers status */ FragmentScheduler::FragmentScheduler( boost::asio::io_service& _io_service, unsigned short workerport, unsigned short controllerport, const size_t timeout) : Observer("FragmentScheduler"), io_service(_io_service), WorkerListener(_io_service, workerport, JobsQueue, pool, boost::bind(&FragmentScheduler::sendJobToWorker, boost::ref(*this), _1, _2), boost::bind(&FragmentScheduler::unmarkWorkerBusy, boost::ref(*this), _1)), ControllerListener(_io_service, controllerport, JobsQueue, boost::bind(&FragmentScheduler::removeAllWorkers, boost::ref(*this)), boost::bind(&FragmentScheduler::shutdown, boost::ref(*this))), connection(_io_service), guard(_io_service, timeout, connection, boost::bind(&FragmentScheduler::removeWorker, boost::ref(*this), _1), boost::bind(&FragmentQueue::resubmitJob, boost::ref(JobsQueue), _1), OpQueue) { DEBUG_FUNCTION_ENTRYEXIT if ((WorkerListener.getExitflag() == Listener::OkFlag) && (ControllerListener.getExitflag() == Listener::OkFlag)) { // sign on to idle workers and present jobs pool.signOn(this, WorkerPool::WorkerIdle); JobsQueue.signOn(this, FragmentQueue::JobAdded); // listen for controller ControllerListener.initiateSocket(); // listen for workers WorkerListener.initiateSocket(); } else { ELOG(0, "Not starting, we just exit due to failed listen bind."); } } FragmentScheduler::~FragmentScheduler() { // sign off pool.signOff(this, WorkerPool::WorkerIdle); JobsQueue.signOff(this, FragmentQueue::JobAdded); } FragmentScheduler::WorkerListener_t::HandlerData::HandlerData() : address("127.0.0.1", "0"), result( new FragmentResult(JobId::NoJob) ), choice(NoWorkerOperation) {} /** Helper function to send a job to worker. * * Note that we do not set the worker as busy. We simply send it the job. * * @param address address of worker * @param job job to send */ void FragmentScheduler::sendJobToWorker(const WorkerAddress &address, FragmentJob::ptr &job) { ASSERT( pool.isWorkerBusy(address), "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is not marked as busy."); LOG(1, "INFO: Sending job " << job->getId() << " to worker " << address << "."); // create op, callbacks for success and failure, and hand over to queue boost::function AcceptsJob = boost::bind( &FragmentScheduler::workerAcceptsJob, boost::ref(*this), address, job->getId()); boost::function RejectedJob = boost::bind( &FragmentScheduler::workerRejectsJob, boost::ref(*this), address, job->getId()); AsyncOperation *sendJobOp = new SendJobToWorkerOperation(connection,job, AcceptsJob, RejectedJob); OpQueue.push_back(sendJobOp, address); } /** Helper function that is called asynchronously after worker accepts a new * job and we need to add a check alive for it. * * \param _address address of worker to guard * \param _id id of job the worker is working on */ void FragmentScheduler::workerAcceptsJob(const WorkerAddress _address, const JobId_t _id) { // inform guard and if not already running, also launch guard guard.addBusyWorker(_address, _id); if (!guard.isRunning()) guard.start(); } /** Helper function that is called asynchronously after worker declined new * job. * * \param _address address of worker to guard * \param _id id of job the worker is working on */ void FragmentScheduler::workerRejectsJob(const WorkerAddress _address, const JobId_t _id) { unmarkWorkerBusy(_address); JobsQueue.resubmitJob(_id); } /** Helper function to shutdown a single worker. * * We send NoJob to indicate shutdown * * @param address of worker to shutdown */ void FragmentScheduler::shutdownWorker(const WorkerAddress &address) { ASSERT( !pool.isWorkerBusy(address), "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is already busy."); LOG(2, "INFO: Shutting down worker " << address << "..."); AsyncOperation *shutdownWorkerOp = new ShutdownWorkerOperation(connection); OpQueue.push_back(shutdownWorkerOp, address); } /** Sends shutdown to all current workers in the pool. * */ void FragmentScheduler::removeAllWorkers() { // first, sign off such that no new jobs are given to workers pool.signOff(this, WorkerPool::WorkerIdle); LOG(2, "DEBUG: Waiting for busy workers to finish ..."); while (pool.hasBusyWorkers()) ; LOG(2, "INFO: Shutting down workers ..."); // iterate until there are no more idle workers // get list of all idle workers typedef std::vector > WorkerList_t; WorkerList_t WorkerList = pool.getListOfIdleWorkers(); // give all workers shutdown signal for (WorkerList_t::const_iterator iter = WorkerList.begin(); iter != WorkerList.end(); ++iter) shutdownWorker(WorkerAddress(iter->first, iter->second)); } /** Function to shutdown server properly, e.g. for use as signal handler. * * @param sig signal number */ void FragmentScheduler::shutdown(int sig) { LOG(0, "STATUS: Shutting down due to signal " << sig << "."); if (!pool.presentIdleWorkers() && !pool.hasBusyWorkers()) { shutdown(); } else { removeAllWorkers(); } } /** Helper function to shutdown the server properly. * * \todo one should idle here until all workers have returned from * calculating stuff (or workers need to still listen while they are * calculating which is probably better). * * \note We only shutdown when there are no workers left * * @return true - doing shutdown, false - precondition not met, not shutting down */ bool FragmentScheduler::shutdown() { if (!pool.presentIdleWorkers() && !pool.hasBusyWorkers()) { LOG(1, "INFO: Shutting all down ..."); // close the guard's watch guard.stop(); /// close the worker listener's socket WorkerListener.closeSocket(); /// close the controller listener's socket ControllerListener.closeSocket(); /// finally, stop the io_service io_service.stop(); return true; } else { ELOG(2, "There are still idle or busy workers present."); return false; } } /** Internal helper to send the next available job to the next idle worker. * */ void FragmentScheduler::sendAvailableJobToNextIdleWorker() { const WorkerAddress address = pool.getNextIdleWorker(); FragmentJob::ptr job = JobsQueue.popJob(); sendJobToWorker(address, job); } void FragmentScheduler::update(Observable *publisher) { ASSERT(0, "FragmentScheduler::update() - we are not signed on for global updates."); } void FragmentScheduler::recieveNotification(Observable *publisher, Notification_ptr notification) { if ((publisher == &pool) && (notification->getChannelNo() == WorkerPool::WorkerIdle)) { // we have an idle worker LOG(1, "INFO: We are notified of an idle worker."); // are jobs available? if (JobsQueue.isJobPresent()) { sendAvailableJobToNextIdleWorker(); } else { // if it was the last busy worker, stop the guard for the moment if (!pool.presentBusyWorkers()) guard.stop(); } } else if ((publisher == &JobsQueue) && (notification->getChannelNo() == FragmentQueue::JobAdded)) { // we have new jobs LOG(1, "INFO: We are notified of a new job."); // check for idle workers if (pool.presentIdleWorkers()) { sendAvailableJobToNextIdleWorker(); } } else { ASSERT(0, "FragmentScheduler::recieveNotification() - we are not signed on for updates in channel " +toString(notification->getChannelNo())+"."); } } void FragmentScheduler::subjectKilled(Observable *publisher) {} /** Removes worker at \a address and stops guard in case of last busy one. * * This is is specifically meant to remove a busy worker as in this case we * might also have to call PoolGuard::stop() if it has been the last busy * worker. * * @param address address of worker to remove. */ void FragmentScheduler::removeWorker(const WorkerAddress address) { pool.removeWorker(address); // stop if it has been the last busy worker if (!pool.presentBusyWorkers()) guard.stop(); } /** Unmarks worker from being busy in WorkerPool. * * This is required to catch when a worker has finished working on a job and to * inform the PoolGuard * * @param address address of once busy worker */ void FragmentScheduler::unmarkWorkerBusy(const WorkerAddress address) { // inform guard guard.removeBusyWorker(address); // unmark in pool pool.unmarkWorkerBusy(address); }