/* * Project: JobMarket * Description: asynchronous Server/Controller/Client-approach to parallel computing, based on boost::asio * Copyright (C) 2011 Frederik Heber. All rights reserved. * */ /* * FragmentController.cpp * * Created on: Nov 27, 2011 * Author: heber */ // include config.h #ifdef HAVE_CONFIG_H #include #endif // boost asio needs specific operator new #include #include "CodePatterns/MemDebug.hpp" #include "JobMarket/Controller/FragmentController.hpp" #include "JobMarket/Connection.hpp" // Must come before boost/serialization headers. #include "CodePatterns/Info.hpp" #include "CodePatterns/Log.hpp" #include "Operations/Controllers/CheckResultsOperation.hpp" #include "Operations/Controllers/GetNextJobIdOperation.hpp" #include "Operations/Controllers/ReceiveResultsOperation.hpp" #include "Operations/Controllers/RemoveAllJobsOperation.hpp" #include "Operations/Controllers/RemoveAllResultsOperation.hpp" #include "Operations/Controllers/RemoveAllWorkerOperation.hpp" #include "Operations/Controllers/SendJobsOperation.hpp" #include "Operations/Controllers/ShutdownOperation.hpp" #include "JobMarket/JobId.hpp" /** Constructor of class FragmentController. * * \param io_service io_service for the asynchronous operations * \param _host hostname of server that accepts jobs * \param _service of server */ FragmentController::FragmentController( boost::asio::io_service& io_service) : connection_(io_service), failed(boost::bind(&ExitflagContainer::setExitflag, this, ExitflagContainer::ErrorFlag)) { DEBUG_FUNCTION_ENTRYEXIT // insert commands into registry Commands.registerInstance(new CheckResultsOperation(connection_, AsyncOperation::NoOpCallback, failed)); Commands.registerInstance(new GetNextJobIdOperation(connection_, AsyncOperation::NoOpCallback, failed)); Commands.registerInstance(new ReceiveResultsOperation(connection_, AsyncOperation::NoOpCallback, failed)); Commands.registerInstance(new RemoveAllJobsOperation(connection_)); Commands.registerInstance(new RemoveAllResultsOperation(connection_)); Commands.registerInstance(new RemoveAllWorkerOperation(connection_)); Commands.registerInstance(new SendJobsOperation(connection_, AsyncOperation::NoOpCallback, failed)); Commands.registerInstance(new ShutdownOperation(connection_)); } /** Destructor of class FragmentController. * */ FragmentController::~FragmentController() { Commands.cleanup(); } /** Requests an available id from server * * @param host address of server * @param service port/service of server * @param NumberIds number of desired ids to request with this connect */ void FragmentController::requestIds( const std::string &host, const std::string &service, const size_t NumberIds) { GetNextJobIdOperation *getnextid = static_cast( Commands.getByName("getnextjobid")); (*getnextid).setDesiredIds(NumberIds); (*getnextid)(host,service); } /** Returns another available id from a finished GetNextJobIdOperation. * * @return next available id */ JobId_t FragmentController::getAvailableId() { GetNextJobIdOperation *getnextid = static_cast( Commands.getByName("getnextjobid")); if( !getnextid->isNextIdAvailable()) { ELOG(1, "FragmentController::getAvailableId() - there are no more requested ids."); return JobId::IllegalJob; } const JobId_t nextid = getnextid->getNextId(); LOG(1, "INFO: Next available id is " << nextid << "."); return nextid; } /** Adds a vector of jobs to the send operation. * * @param jobs jobs to add */ void FragmentController::addJobs(std::vector &jobs) { SendJobsOperation *sendjobs = static_cast( Commands.getByName("sendjobs")); // place all ids in internal list for (std::vector::const_iterator iter = jobs.begin(); iter != jobs.end(); ++iter) { #ifndef NDEBUG std::pair< std::set::iterator,bool > inserter = #endif jobids.insert((*iter)->getId()); ASSERT( inserter.second, "FragmentController::addJobs() - id "+toString((*iter)->getId()) +" is presently marked as sent to the server."); } // mark as to be sent to server sendjobs->addJobs(jobs); const size_t presentJobs = sendjobs->getPresentJobs(); LOG(1, "INFO: #" << presentJobs << " jobs are now waiting to be transmitted."); } /** Sends contained jobs in operation to server * * @param host address of server * @param service port/service of server */ void FragmentController::sendJobs( const std::string &host, const std::string &service) { SendJobsOperation *sendjobs = static_cast( Commands.getByName("sendjobs")); const size_t presentJobs = sendjobs->getPresentJobs(); LOG(1, "INFO: #" << presentJobs << " jobs are being sent to the server."); (*sendjobs)(host, service); } /** Obtains scheduled and done jobs from server * * @param host address of server * @param service port/service of server */ void FragmentController::checkResults( const std::string &host, const std::string &service) { CheckResultsOperation *checkres = static_cast( Commands.getByName("checkresults")); checkres->setJobIds(jobids); (*checkres)(host, service); } /** Return scheduled and done jobs. * * @return pair of number of still scheduled and already done jobs */ std::pair FragmentController::getJobStatus() const { const CheckResultsOperation * const checkres = static_cast( Commands.getByName("checkresults")); const size_t doneJobs = checkres->getDoneJobs(); const size_t presentJobs = checkres->getPresentJobs(); return make_pair(presentJobs, doneJobs); } /** Requests removal of all pending results from server. * * @param host address of server * @param service port/service of server */ void FragmentController::removeWaitingResults( const std::string &host, const std::string &service) { RemoveAllResultsOperation *removeall = static_cast( Commands.getByName("removeallresults")); (*removeall)(host, service); } /** Requests removal of all pending jobs from server. * * @param host address of server * @param service port/service of server */ void FragmentController::removeWaitingJobs( const std::string &host, const std::string &service) { RemoveAllJobsOperation *removeall = static_cast( Commands.getByName("removealljobs")); (*removeall)(host, service); } /** Requests removal of all idle workers from server. * * @param host address of server * @param service port/service of server */ void FragmentController::removeall( const std::string &host, const std::string &service) { RemoveAllWorkerOperation *removeall = static_cast( Commands.getByName("removeallworker")); (*removeall)(host, service); } /** Obtains results from done jobs from server. * * @param host address of server * @param service port/service of server */ void FragmentController::receiveResults( const std::string &host, const std::string &service) { ReceiveResultsOperation *receiveres = static_cast( Commands.getByName("receiveresults")); receiveres->setJobIds(jobids); (*receiveres)(host, service); } /** Getter for received results. * * @return vector with all received results */ std::vector FragmentController::getReceivedResults() { ReceiveResultsOperation *receiveres = static_cast( Commands.getByName("receiveresults")); const std::vector &results = receiveres->getResults(); for (std::vector::const_iterator iter = results.begin(); iter != results.end(); ++iter) { // ASSERT( jobids.count((*iter)->getId()), // "FragmentController::getReceivedResults() - id " // +toString((*iter)->getId())+" not present in jobids."); if (jobids.count((*iter)->getId())) jobids.erase((*iter)->getId()); } return results; } /** Sends shutdown signal to server * * @param host address of server * @param service port/service of server */ void FragmentController::shutdown( const std::string &host, const std::string &service) { ShutdownOperation *shutdown = static_cast( Commands.getByName("shutdown")); (*shutdown)(host, service); } /** Helper function to allow all derived controllers to change a job's id. * * As the controller is the one that obtains a unique id, it must also have * the authority to set/change a FragmentJob's id. * * \param _newid job is set to this id */ void FragmentController::changeJobId(FragmentJob::ptr &job, const JobId_t _newid) const { job->setId(_newid); }