/* * Project: JobMarket * Description: asynchronous Server/Controller/Client-approach to parallel computing, based on boost::asio * Copyright (C) 2011-2012 Frederik Heber. All rights reserved. * */ /* * ReceiveResultsOperation.cpp * * Created on: Dec 11, 2011 * Author: heber */ // include config.h #ifdef HAVE_CONFIG_H #include #endif // boost asio needs specific operator new #include #include "CodePatterns/MemDebug.hpp" #include "JobMarket/Operations/Controllers/ReceiveResultsOperation.hpp" #include #include #include #include "JobMarket/Connection.hpp" // Must come before boost/serialization headers. #include #include #include "CodePatterns/Info.hpp" #include "CodePatterns/Log.hpp" #include "JobMarket/ControllerChoices.hpp" #include "JobMarket/Results/FragmentResult.hpp" // static entities const std::set ReceiveResultsOperation::emptyids; /** Handle connect operation to send results. * * \param e error code if something went wrong * \param endpoint_iterator endpoint of the connection */ void ReceiveResultsOperation::handle_connect(const boost::system::error_code& e, boost::asio::ip::tcp::resolver::iterator endpoint_iterator) { DEBUG_FUNCTION_ENTRYEXIT if (!e) { // Successfully established connection. Give choice. enum ControllerChoices choice = ReceiveResults; connection_.async_write(choice, boost::bind(&ReceiveResultsOperation::handle_SendJobIds, this, boost::asio::placeholders::error)); } else if (endpoint_iterator != boost::asio::ip::tcp::resolver::iterator()) { // Try the next endpoint. connection_.socket().close(); boost::asio::ip::tcp::endpoint endpoint = *endpoint_iterator; connection_.socket().async_connect(endpoint, boost::bind(&ReceiveResultsOperation::handle_connect, this, boost::asio::placeholders::error, ++endpoint_iterator)); } else { // An error occurred. Log it and return. Since we are not starting a new // operation the io_service will run out of work to do and the client will // exit. ELOG(1, e.message()); AsyncOperation::handle_FinishOperation(e); } } /** Callback function when sending desired job ids * * \param e error code if something went wrong */ void ReceiveResultsOperation::handle_SendJobIds(const boost::system::error_code& e) { DEBUG_FUNCTION_ENTRYEXIT if (!e) { // The connection::async_write() function will automatically // decode the data that is written to the underlying socket. LOG(1, "INFO: Sending vector of desired " << jobids.size() << " jobids ..."); connection_.async_write(jobids, boost::bind(&ReceiveResultsOperation::handle_ReceivingResults, this, boost::asio::placeholders::error)); } else { // An error occurred. ELOG(1, e.message()); AsyncOperation::handle_FinishOperation(e); } } /** Callback function when preparing to receive results * * \param e error code if something went wrong */ void ReceiveResultsOperation::handle_ReceivingResults(const boost::system::error_code& e) { DEBUG_FUNCTION_ENTRYEXIT if (!e) { // The connection::async_read() function will automatically // decode the data that is written to the underlying socket. connection_.async_read(results, boost::bind(&ReceiveResultsOperation::handle_ReceivedResults, this, boost::asio::placeholders::error)); } else { // An error occurred. ELOG(1, e.message()); AsyncOperation::handle_FinishOperation(e); } } /** Callback function when results have been received. * * \param e error code if something went wrong */ void ReceiveResultsOperation::handle_ReceivedResults(const boost::system::error_code& e) { DEBUG_FUNCTION_ENTRYEXIT LOG(1, "INFO: Received "+toString(results.size())+" results."); ReceiveResultsOperation::handle_FinishOperation(e); } /** Getter for results. * * \sa calculateResults() * \return vector of results for the added jobs (\sa addJobs()). */ std::vector ReceiveResultsOperation::getResults() { DEBUG_FUNCTION_ENTRYEXIT return results; }