/* * Project: MoleCuilder * Description: creates and alters molecular systems * Copyright (C) 2011 University of Bonn. All rights reserved. * Please see the LICENSE file or "Copyright notice" in builder.cpp for details. */ /* * \file FragmentScheduler.cpp * * This file strongly follows the Serialization example from the boost::asio * library (see server.cpp) * * Created on: Oct 19, 2011 * Author: heber */ // include config.h #ifdef HAVE_CONFIG_H #include #endif // boost asio needs specific operator new #include #include "CodePatterns/MemDebug.hpp" #include #include #include #include #include "Connection.hpp" // Must come before boost/serialization headers. #include #include "CodePatterns/Info.hpp" #include "CodePatterns/Log.hpp" #include "Jobs/FragmentJob.hpp" #include "JobId.hpp" #include "FragmentScheduler.hpp" FragmentJob::ptr FragmentScheduler::NoJob(new FragmentJob(std::string("/bin/false"), std::string("NoJob"), JobId::NoJob)); /** Constructor of class FragmentScheduler. * * We setup both acceptors to accept connections from workers and Controller. * * \param io_service io_service of the asynchronous communications * \param workerport port to listen for worker connections * \param controllerport port to listen for controller connections. */ FragmentScheduler::FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport) : worker_acceptor_(io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), workerport) ), controller_acceptor_(io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), controllerport) ), result(JobId::NoJob), choice(NoOperation), Exitflag(OkFlag) { Info info(__FUNCTION__); // only initiate socket if jobs are already present if (JobsQueue.isJobPresent()) { LOG(1, "Listening for workers on port " << workerport << "."); initiateWorkerSocket(); } initiateControllerSocket(); LOG(1, "Listening for controller on port " << controllerport << "."); } /** Internal function to start worker connection. * */ void FragmentScheduler::initiateWorkerSocket() { // Start an accept operation for worker connections. connection_ptr new_conn(new Connection(worker_acceptor_.get_io_service())); worker_acceptor_.async_accept(new_conn->socket(), boost::bind(&FragmentScheduler::handle_AcceptWorker, this, boost::asio::placeholders::error, new_conn)); } /** Internal function to start controller connection. * */ void FragmentScheduler::initiateControllerSocket() { // Start an accept operation for controller connection. connection_ptr new_conn(new Connection(controller_acceptor_.get_io_service())); controller_acceptor_.async_accept(new_conn->socket(), boost::bind(&FragmentScheduler::handle_AcceptController, this, boost::asio::placeholders::error, new_conn)); } /** Handle a new worker connection. * * We check whether jobs are in the JobsQueue. If present, job is sent. * * \sa handle_SendJobtoWorker() * * \param e error code if something went wrong * \param conn reference with the connection */ void FragmentScheduler::handle_AcceptWorker(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); if (!e) { // Successfully accepted a new connection. // Check whether there are jobs in the queue if (JobsQueue.isJobPresent()) { // pop a job and send it to the client. FragmentJob::ptr job(JobsQueue.popJob()); // The connection::async_write() function will automatically // serialize the data structure for us. LOG(1, "INFO: Sending job #" << job->getId() << "."); conn->async_write(job, boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this, boost::asio::placeholders::error, conn)); } else { // send the static NoJob conn->async_write(NoJob, boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this, boost::asio::placeholders::error, conn)); // then there must be no read necesary ELOG(2, "There is currently no job present in the queue."); } } else { // An error occurred. Log it and return. Since we are not starting a new // accept operation the io_service will run out of work to do and the // server will exit. Exitflag = WorkerErrorFlag; ELOG(0, e.message()); } // Start an accept operation for a new Connection only when there // are still jobs present if (JobsQueue.isJobPresent()) initiateWorkerSocket(); } /** Callback function when job has been sent. * * After job has been sent we start async_read() for the result. * * \sa handle_ReceiveResultFromWorker() * * \param e error code if something went wrong * \param conn reference with the connection */ void FragmentScheduler::handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); LOG(1, "INFO: Job sent."); // obtain result LOG(1, "INFO: Receiving result for a job ..."); conn->async_read(result, boost::bind(&FragmentScheduler::handle_ReceiveResultFromWorker, this, boost::asio::placeholders::error, conn)); } /** Callback function when result has been received. * * \param e error code if something went wrong * \param conn reference with the connection */ void FragmentScheduler::handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); LOG(1, "INFO: Received result for job #" << result.getId() << " ..."); ASSERT(result.getId() != (JobId_t)JobId::NoJob, "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has NoJob id."); ASSERT(result.getId() != (JobId_t)JobId::IllegalJob, "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has IllegalJob id."); // place id into expected if ((result.getId() != (JobId_t)JobId::NoJob) && (result.getId() != (JobId_t)JobId::IllegalJob)) JobsQueue.pushResult(result); // erase result result = FragmentResult(JobId::NoJob); LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results."); } /** Handle a new controller connection. * * \sa handle_ReceiveJobs() * \sa handle_CheckResultState() * \sa handle_SendResults() * * \param e error code if something went wrong * \param conn reference with the connection */ void FragmentScheduler::handle_AcceptController(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); if (!e) { conn->async_read(choice, boost::bind(&FragmentScheduler::handle_ReadChoice, this, boost::asio::placeholders::error, conn)); } else { // An error occurred. Log it and return. Since we are not starting a new // accept operation the io_service will run out of work to do and the // server will exit. Exitflag = ControllerErrorFlag; ELOG(0, e.message()); } } /** Controller callback function to read the choice for next operation. * * \param e error code if something went wrong * \param conn reference with the connection */ void FragmentScheduler::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); if (!e) { bool LaunchNewAcceptor = true; // switch over the desired choice read previously switch(choice) { case NoOperation: { ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with NoOperation."); break; } case ReceiveJobs: { // The connection::async_write() function will automatically // serialize the data structure for us. LOG(1, "INFO: Receiving bunch of jobs from a controller ..."); conn->async_read(jobs, boost::bind(&FragmentScheduler::handle_ReceiveJobs, this, boost::asio::placeholders::error, conn)); break; } case CheckState: { // first update number doneJobs = JobsQueue.getDoneJobs(); // now we accept connections to check for state of calculations LOG(1, "INFO: Sending state that "+toString(doneJobs)+" jobs are done to controller ..."); conn->async_write(doneJobs, boost::bind(&FragmentScheduler::handle_CheckResultState, this, boost::asio::placeholders::error, conn)); break; } case SendResults: { const std::vector results = JobsQueue.getAllResults(); // ... or we give the results LOG(1, "INFO: Sending "+toString(results.size())+" results to controller ..."); conn->async_write(results, boost::bind(&FragmentScheduler::handle_SendResults, this, boost::asio::placeholders::error, conn)); break; } case Shutdown: { LaunchNewAcceptor = false; break; } default: Exitflag = ControllerErrorFlag; ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with no valid choice."); break; } // restore NoOperation choice such that choice is not read twice choice = NoOperation; if (LaunchNewAcceptor) { LOG(1, "Launching new acceptor on socket."); // Start an accept operation for a new Connection. connection_ptr new_conn(new Connection(controller_acceptor_.get_io_service())); controller_acceptor_.async_accept(new_conn->socket(), boost::bind(&FragmentScheduler::handle_AcceptController, this, boost::asio::placeholders::error, new_conn)); } } else { // An error occurred. Log it and return. Since we are not starting a new // accept operation the io_service will run out of work to do and the // server will exit. Exitflag = ControllerErrorFlag; ELOG(0, e.message()); } } /** Controller callback function when job has been sent. * * We check here whether the worker socket is accepting, if there * have been no jobs we re-activate it, as it is shut down after * last job. * * \param e error code if something went wrong * \param conn reference with the connection */ void FragmentScheduler::handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); bool initiateSocket = !JobsQueue.isJobPresent(); // jobs are received, hence place in JobsQueue if (!jobs.empty()) { LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue."); JobsQueue.pushJobs(jobs); // initiate socket if we had no jobs before if (initiateSocket) initiateWorkerSocket(); } jobs.clear(); } /** Controller callback function when checking on state of results. * * \param e error code if something went wrong * \param conn reference with the connection */ void FragmentScheduler::handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); // do nothing LOG(1, "INFO: Sent that " << doneJobs << " jobs are done."); } /** Controller callback function when result has been received. * * \param e error code if something went wrong * \param conn reference with the connection */ void FragmentScheduler::handle_SendResults(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); // do nothing LOG(1, "INFO: Results have been sent."); }