/* * Project: MoleCuilder * Description: creates and alters molecular systems * Copyright (C) 2012 University of Bonn. All rights reserved. * Please see the LICENSE file or "Copyright notice" in builder.cpp for details. */ /* * \file PoolWorker.cpp * * This file strongly follows the Serialization example from the boost::asio * library (see client.cpp). * * Created on: Feb 28, 2012 * Author: heber */ // include config.h #ifdef HAVE_CONFIG_H #include #endif // boost asio needs specific operator new #include #include "CodePatterns/MemDebug.hpp" #include #include #include #include #include "Connection.hpp" // Must come before boost/serialization headers. #include #include "CodePatterns/Info.hpp" #include "CodePatterns/Log.hpp" #include "Jobs/FragmentJob.hpp" #include "Jobs/MPQCCommandJob.hpp" #include "Jobs/SystemCommandJob.hpp" #include "Results/FragmentResult.hpp" #include "PoolWorker.hpp" /** Helper function to enforce binding of PoolWorker to possible derived * FragmentJob classes. */ void dummyInit() { SystemCommandJob("/bin/false", "something", JobId::IllegalJob); MPQCCommandJob("nofile", JobId::IllegalJob); } /** Constructor for class PoolWorker. * * We automatically connect to the given pool and enroll. * * @param io_service io service for creating connections * @param _host host part of address of pool to connect to * @param _service service part of address of pool to connect to * @param listenhost host part of address of this instance for listening to pool connections * @param listenservice seervice part of address of this instance for listening to pool connections */ PoolWorker::PoolWorker( boost::asio::io_service& io_service, const std::string& _host, const std::string& _service, const std::string& listenhost, const std::string& listenservice) : connection_(io_service), PoolListener(io_service, boost::lexical_cast(listenservice), *this), address(listenhost, listenservice), enrollOp(connection_, address), submitOp(connection_, address), submitresult(boost::bind(&Operation::operator(), boost::ref(submitOp), _host, _service)) { Info info(__FUNCTION__); // always enroll enrollOp(_host,_service); // initiate listening PoolListener.initiateSocket(); } /// Handle completion of a accept server operation. void PoolWorker::PoolListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); if (!e) { conn->async_read(job, boost::bind(&PoolWorker::PoolListener_t::handle_ReceiveJob, this, boost::asio::placeholders::error, conn)); // and listen for following connections //initiateSocket(); } else { // An error occurred. Log it and return. Since we are not starting a new // accept operation the io_service will run out of work to do and the // server will exit. ELOG(0, e.message()); } } /// Controller callback function when job has been sent. void PoolWorker::PoolListener_t::handle_ReceiveJob(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); if (!e) { LOG(1, "INFO: Received job " << job->getId() << "."); callback.WorkOnJob(job); } else { // An error occurred. Log it and return. Since we are not starting a new // accept operation the io_service will run out of work to do and the // server will exit. ELOG(0, e.message()); } } /** Works on the given job and send the results. * * @param job job to work on */ void PoolWorker::WorkOnJob(FragmentJob::ptr &job) { // work on job and create result LOG(2, "DEBUG: Beginning to work on " << job->getId() << "."); FragmentResult::ptr result = job->Work(); LOG(2, "DEBUG: Setting result " << result->getId() << "."); submitOp.setResult(result); // submit result LOG(2, "DEBUG: Sending result ..."); submitresult(); }