/* * Project: MoleCuilder * Description: creates and alters molecular systems * Copyright (C) 2011 University of Bonn. All rights reserved. * Please see the LICENSE file or "Copyright notice" in builder.cpp for details. */ /* * FragmentController.cpp * * Created on: Nov 27, 2011 * Author: heber */ // include config.h #ifdef HAVE_CONFIG_H #include #endif // boost asio needs specific operator new #include #include "CodePatterns/MemDebug.hpp" #include #include #include #include #include "Connection.hpp" // Must come before boost/serialization headers. #include #include "CodePatterns/Info.hpp" #include "CodePatterns/Log.hpp" #include "FragmentJob.hpp" #include "FragmentResult.hpp" #include "ControllerChoices.hpp" #include "FragmentController.hpp" /** Constructor of class FragmentController. * * \param io_service io_service for the asynchronous operations * \param _host hostname of server that accepts jobs * \param _service of server */ FragmentController::FragmentController( boost::asio::io_service& io_service, const std::string& _host, const std::string& _service) : connection_(io_service), host(_host), service(_service), recjobs(connection_, _host, _service), checkres(connection_, _host, _service), sendres(connection_, _host, _service), shutdown(connection_, _host, _service) { Info info(__FUNCTION__); } /** Destructor of class FragmentController. * */ FragmentController::~FragmentController() {} /** Constructor for class Operation. * * \param _connection connection to operate on */ FragmentController::Operation::Operation(Connection &_connection, const std::string& _host, const std::string& _service) : connection_(_connection), host(_host), service(_service), Exitflag(OkFlag) {} /** Destructor for class Operation. * */ FragmentController::Operation::~Operation() {} /** Handle connect operation to receive jobs from controller * * \param e error code if something went wrong * \param endpoint_iterator endpoint of the connection */ void FragmentController::ReceiveJobsOperation::handle_connect_calc(const boost::system::error_code& e, boost::asio::ip::tcp::resolver::iterator endpoint_iterator) { Info info(__FUNCTION__); if (!e) { // Successfully established connection. Give choice. enum ControllerChoices choice = ReceiveJobs; connection_.async_write(choice, boost::bind(&FragmentController::ReceiveJobsOperation::handle_SendJobs, this, boost::asio::placeholders::error)); } else if (endpoint_iterator != boost::asio::ip::tcp::resolver::iterator()) { // Try the next endpoint. connection_.socket().close(); boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator; connection_.socket().async_connect(endpoint, boost::bind(&FragmentController::ReceiveJobsOperation::handle_connect_calc, this, boost::asio::placeholders::error, ++endpoint_iterator)); } else { // An error occurred. Log it and return. Since we are not starting a new // operation the io_service will run out of work to do and the client will // exit. Exitflag = ErrorFlag; ELOG(1, e.message()); } } /** Callback function when an operation has been completed. * * \param e error code if something went wrong */ void FragmentController::ReceiveJobsOperation::handle_FinishOperation(const boost::system::error_code& e) { Info info(__FUNCTION__); LOG(1, "INFO: Jobs have been sent. Clearing."); jobs.clear(); Operation::handle_FinishOperation(e); } /** Handle connect operation to send number of done jobs. * * \param e error code if something went wrong * \param endpoint_iterator endpoint of the connection */ void FragmentController::CheckResultsOperation::handle_connect_check(const boost::system::error_code& e, boost::asio::ip::tcp::resolver::iterator endpoint_iterator) { Info info(__FUNCTION__); if (!e) { // Successfully established connection. Give choice. enum ControllerChoices choice = CheckState; connection_.async_write(choice, boost::bind(&FragmentController::CheckResultsOperation::handle_ReceiveDoneJobs, this, boost::asio::placeholders::error)); } else if (endpoint_iterator != boost::asio::ip::tcp::resolver::iterator()) { // Try the next endpoint. connection_.socket().close(); boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator; connection_.socket().async_connect(endpoint, boost::bind(&FragmentController::CheckResultsOperation::handle_connect_check, this, boost::asio::placeholders::error, ++endpoint_iterator)); } else { // An error occurred. Log it and return. Since we are not starting a new // operation the io_service will run out of work to do and the client will // exit. Exitflag = ErrorFlag; ELOG(1, e.message()); } } /** Handle connect operation to send results. * * \param e error code if something went wrong * \param endpoint_iterator endpoint of the connection */ void FragmentController::SendResultsOperation::handle_connect_get(const boost::system::error_code& e, boost::asio::ip::tcp::resolver::iterator endpoint_iterator) { Info info(__FUNCTION__); if (!e) { // Successfully established connection. Give choice. enum ControllerChoices choice = SendResults; connection_.async_write(choice, boost::bind(&FragmentController::SendResultsOperation::handle_ReceivingResults, this, boost::asio::placeholders::error)); } else if (endpoint_iterator != boost::asio::ip::tcp::resolver::iterator()) { // Try the next endpoint. connection_.socket().close(); boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator; connection_.socket().async_connect(endpoint, boost::bind(&FragmentController::SendResultsOperation::handle_connect_get, this, boost::asio::placeholders::error, ++endpoint_iterator)); } else { // An error occurred. Log it and return. Since we are not starting a new // operation the io_service will run out of work to do and the client will // exit. Exitflag = ErrorFlag; ELOG(1, e.message()); } } /** Handle connect operation to shutdown scheduler. * * \param e error code if something went wrong * \param endpoint_iterator endpoint of the connection */ void FragmentController::ShutdownOperation::handle_connect_shutdown(const boost::system::error_code& e, boost::asio::ip::tcp::resolver::iterator endpoint_iterator) { Info info(__FUNCTION__); if (!e) { // Successfully established connection. Give choice. enum ControllerChoices choice = Shutdown; connection_.async_write(choice, boost::bind(&FragmentController::ShutdownOperation::handle_FinishOperation, this, boost::asio::placeholders::error)); } else if (endpoint_iterator != boost::asio::ip::tcp::resolver::iterator()) { // Try the next endpoint. connection_.socket().close(); boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator; connection_.socket().async_connect(endpoint, boost::bind(&FragmentController::ShutdownOperation::handle_connect_shutdown, this, boost::asio::placeholders::error, ++endpoint_iterator)); } else { // An error occurred. Log it and return. Since we are not starting a new // operation the io_service will run out of work to do and the client will // exit. Exitflag = ErrorFlag; ELOG(1, e.message()); } } /** Callback function when an operation has been completed. * * \param e error code if something went wrong */ void FragmentController::Operation::handle_FinishOperation(const boost::system::error_code& e) { Info info(__FUNCTION__); if (!e) { LOG(1, "INFO: Operation completed."); } else { // An error occurred. Exitflag = ErrorFlag; ELOG(1, e.message()); } // Since we are not starting a new operation the io_service will run out of // work to do and the client will exit. } /** Callback function when jobs have been sent. * * \param e error code if something went wrong */ void FragmentController::ReceiveJobsOperation::handle_SendJobs(const boost::system::error_code& e) { Info info(__FUNCTION__); if (!e) { // Successfully established connection. Start operation to read the vector // of jobs. The connection::async_write() function will automatically // encode the data that is written to the underlying socket. LOG(1, "INFO: Sending "+toString(jobs.size())+" jobs ..."); connection_.async_write(jobs, boost::bind(&FragmentController::ReceiveJobsOperation::handle_FinishOperation, this, boost::asio::placeholders::error)); } else { // An error occurred. Exitflag = ErrorFlag; ELOG(1, e.message()); } // Since we are not starting a new operation the io_service will run out of // work to do and the client will exit. } /** Callback function when results have been received. * * \param e error code if something went wrong */ void FragmentController::SendResultsOperation::handle_ReceivingResults(const boost::system::error_code& e) { Info info(__FUNCTION__); if (!e) { // The connection::async_read() function will automatically // decode the data that is written to the underlying socket. connection_.async_read(results, boost::bind(&FragmentController::SendResultsOperation::handle_ReceivedResults, this, boost::asio::placeholders::error)); } else { // An error occurred. Exitflag = ErrorFlag; ELOG(1, e.message()); } // Since we are not starting a new operation the io_service will run out of // work to do and the client will exit. } /** Callback function when doneJobs have been received. * * \param e error code if something went wrong */ void FragmentController::SendResultsOperation::handle_ReceivedResults(const boost::system::error_code& e) { Info info(__FUNCTION__); LOG(1, "INFO: Received "+toString(results.size())+" results ..."); handle_FinishOperation(e); } /** Callback function when doneJobs have been received. * * \param e error code if something went wrong */ void FragmentController::CheckResultsOperation::handle_ReceiveDoneJobs(const boost::system::error_code& e) { Info info(__FUNCTION__); if (!e) { // The connection::async_read() function will automatically // decode the data that is written to the underlying socket. LOG(1, "INFO: Checking number of done jobs ..."); connection_.async_read(doneJobs, boost::bind(&FragmentController::Operation::handle_FinishOperation, this, boost::asio::placeholders::error)); } else { // An error occurred. Exitflag = ErrorFlag; ELOG(1, e.message()); } } /** Internal function to resolve all possible connection endpoints. * * \return endpoint iterator of connection */ boost::asio::ip::tcp::resolver::iterator FragmentController::Operation::getEndpointIterator() { // Resolve the host name into an IP address. boost::asio::ip::tcp::resolver resolver(connection_.socket().get_io_service()); boost::asio::ip::tcp::resolver::query query(host, service); boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query); return endpoint_iterator; } /** Internal function to connect to the endpoint of the server asynchronuously. * * We require internal connetion_ and host and service to be set up for this. */ void FragmentController::ReceiveJobsOperation::connect_calc() { Info info(__FUNCTION__); // Resolve the host name into an IP address. boost::asio::ip::tcp::resolver::iterator endpoint_iterator = getEndpointIterator(); boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator; // Start an asynchronous connect operation. std::cout << "Connecting to endpoint " << endpoint << " to calc " << std::endl; connection_.socket().async_connect(endpoint, boost::bind(&FragmentController::ReceiveJobsOperation::handle_connect_calc, this, boost::asio::placeholders::error, ++endpoint_iterator)); } /** Internal function to connect to the endpoint of the server asynchronuously. * * We require internal connetion_ and host and service to be set up for this. */ void FragmentController::CheckResultsOperation::connect_check() { Info info(__FUNCTION__); // Resolve the host name into an IP address. boost::asio::ip::tcp::resolver::iterator endpoint_iterator = getEndpointIterator(); boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator; // Start an asynchronous connect operation. std::cout << "Connecting to endpoint " << endpoint << " to check " << std::endl; connection_.socket().async_connect(endpoint, boost::bind(&FragmentController::CheckResultsOperation::handle_connect_check, this, boost::asio::placeholders::error, ++endpoint_iterator)); } /** Internal function to connect to the endpoint of the server asynchronuously. * * We require internal connetion_ and host and service to be set up for this. */ void FragmentController::SendResultsOperation::connect_get() { Info info(__FUNCTION__); // Resolve the host name into an IP address. boost::asio::ip::tcp::resolver::iterator endpoint_iterator = getEndpointIterator(); boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator; // Start an asynchronous connect operation. std::cout << "Connecting to endpoint " << endpoint << " to get results " << std::endl; connection_.socket().async_connect(endpoint, boost::bind(&FragmentController::SendResultsOperation::handle_connect_get, this, boost::asio::placeholders::error, ++endpoint_iterator)); } /** Internal function to connect to the endpoint of the server asynchronuously. * * We require internal connetion_ and host and service to be set up for this. */ void FragmentController::ShutdownOperation::connect_shutdown() { Info info(__FUNCTION__); // Resolve the host name into an IP address. boost::asio::ip::tcp::resolver::iterator endpoint_iterator = getEndpointIterator(); boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator; // Start an asynchronous connect operation. std::cout << "Connecting to endpoint " << endpoint << " to get results " << std::endl; connection_.socket().async_connect(endpoint, boost::bind(&FragmentController::ShutdownOperation::handle_connect_shutdown, this, boost::asio::placeholders::error, ++endpoint_iterator)); } /** Internal function to disconnect connection_ correctly. * */ void FragmentController::Operation::disconnect() { //connection_.socket().close(); } /** Place number of jobs into this controller. * * \param _jobs jobs to add */ void FragmentController::ReceiveJobsOperation::addJobs(const std::vector &_jobs) { jobs.reserve(jobs.size()+_jobs.size()); BOOST_FOREACH(FragmentJob job, _jobs) { jobs.push_back(job); } } /** Prepares the calculation of the results for the current jobs. */ void FragmentController::ReceiveJobsOperation::operator()() { Info info(__FUNCTION__); // connect connect_calc(); //disconnect disconnect(); } /** Prepares the calculation of the results for the current jobs. */ void FragmentController::CheckResultsOperation::operator()() { Info info(__FUNCTION__); // connect connect_check(); //disconnect disconnect(); } /** Getter for results. * * \return vector of results for the added jobs (\sa addJobs()). */ std::vector FragmentController::SendResultsOperation::getResults() { Info info(__FUNCTION__); return results; } /** Function to initiate receival of results. * */ void FragmentController::SendResultsOperation::operator()() { // connect connect_get(); //disconnect disconnect(); } /** Function to initiate shutdown of server. * */ void FragmentController::ShutdownOperation::operator()() { // connect connect_shutdown(); //disconnect disconnect(); } /** Getter for doneJobs. * * \sa checkResults() * \param doneJobs */ size_t FragmentController::CheckResultsOperation::getDoneJobs() const { return doneJobs; } /** Getter for number of jobs present in the queue. * * \return jobs.size() */ size_t FragmentController::ReceiveJobsOperation::getPresentJobs() const { return jobs.size(); }