/* * SpecificFragmentController_impl.hpp * * Created on: Aug 27, 2012 * Author: heber */ #ifndef SPECIFICFRAGMENTCONTROLLER_IMPL_HPP_ #define SPECIFICFRAGMENTCONTROLLER_IMPL_HPP_ // include config.h #ifdef HAVE_CONFIG_H #include #endif #include "SpecificFragmentController.hpp" #include "CodePatterns/Assert.hpp" #include "CodePatterns/toString.hpp" template size_t SpecificFragmentController::ResultContainer::receiveResults( SpecificFragmentController &callback) { // receive (and remove the respective ids) callback.receiveResults(callback.host, callback.port); callback.RunService("Requesting results"); std::vector fragmentresults = callback.getReceivedResults(); // convert std::vector fragmentData; ConvertFragmentResultTo(fragmentresults, fragmentData); // insert into map std::vector::const_iterator resultiter = fragmentresults.begin(); typename std::vector::const_iterator dataiter = fragmentData.begin(); for(;resultiter != fragmentresults.end(); ++resultiter, ++dataiter) { const JobId_t _id = (*resultiter)->getId(); #ifndef NDEBUG std::pair::iterator, bool> inserter = #endif IdData.insert( std::make_pair( _id, *dataiter) ); ASSERT( inserter.second, "SpecificFragmentController::ResultContainer::receiveResults() - result id " +toString(inserter.first->first)+" already present."); } ASSERT( dataiter == fragmentData.end(), "SpecificFragmentController::ResultContainer::receiveResults() - fragmentresults and fragmentdata differ in size."); return fragmentData.size(); } template void SpecificFragmentController::ResultContainer::waitforResults( const size_t NoExpectedResults, boost::asio::io_service &io_service, SpecificFragmentController &callback) { // wait but receive all results that are already done size_t NoReceivedResults = 0; while (NoReceivedResults != NoExpectedResults) { // wait a bit boost::asio::deadline_timer timer(io_service); timer.expires_from_now(boost::posix_time::milliseconds(500)); timer.wait(); // then request status callback.checkResults(callback.host, callback.port); callback.RunService("Checking on results"); const std::pair JobStatus = callback.getJobStatus(); const size_t NoCalculatedResults = JobStatus.second; // if some are done, get them if (NoCalculatedResults != 0) { NoReceivedResults += receiveResults(callback); callback.handler(NoReceivedResults, NoExpectedResults); LOG(1, "INFO: #" << JobStatus.first << " are waiting in the queue and #" << NoReceivedResults << " of " << NoExpectedResults << " jobs are calculated so far."); } } } template void SpecificFragmentController::ResultContainer::ConvertFragmentResultTo( const std::vector &results, std::vector &fragmentData) { // extract results fragmentData.clear(); fragmentData.reserve(results.size()); LOG(2, "DEBUG: Parsing now through " << results.size() << " results."); for (std::vector::const_iterator iter = results.begin(); iter != results.end(); ++iter) { //LOG(1, "RESULT: job #"+toString((*iter)->getId())+": "+toString((*iter)->result)); T extractedData; std::stringstream inputstream((*iter)->result); LOG(2, "DEBUG: First 50 characters FragmentResult's string: "+(*iter)->result.substr(0, 50)); boost::archive::text_iarchive ia(inputstream); ia >> extractedData; LOG(1, "INFO: extracted data is " << extractedData << "."); fragmentData.push_back(extractedData); } ASSERT( results.size() == fragmentData.size(), "ConvertFragmentResultTo() - the number of extracted data differs from the number of results."); } #endif /* SPECIFICFRAGMENTCONTROLLER_IMPL_HPP_ */