/* * Project: JobMarket * Description: asynchronous Server/Controller/Client-approach to parallel computing, based on boost::asio * Copyright (C) 2011 Frederik Heber. All rights reserved. * */ /* * \file controller_main.cpp * * Created on: Nov 27, 2011 * Author: heber */ // include config.h #ifdef HAVE_CONFIG_H #include #endif // boost asio needs specific operator new #include // program_options must have some strange static stuff, causes double free or // corruption if included after MemDebug #include #include "CodePatterns/MemDebug.hpp" #include #include #include #include #include #include #include #include #include "CodePatterns/Assert.hpp" #include "CodePatterns/Info.hpp" #include "CodePatterns/Log.hpp" #include "JobMarket/atexit.hpp" #include "JobMarket/Controller/controller_AddOn.hpp" #include "JobMarket/Controller/ControllerCommandRegistry.hpp" #include "JobMarket/Controller/ControllerOptions.hpp" #include "JobMarket/Controller/FragmentController.hpp" /** Print the status of scheduled and done jobs. * * @param status pair of number of schedule and done jobs */ void printJobStatus(const std::pair &JobStatus) { LOG(1, "INFO: #" << JobStatus.first << " are waiting in the queue and #" << JobStatus.second << " jobs are calculated so far."); } inline std::vector getListOfCommands(const ControllerCommandRegistry &ControllerCommands) { std::vector Commands; for (ControllerCommandRegistry::const_iterator iter = ControllerCommands.getBeginIter(); iter != ControllerCommands.getEndIter(); ++iter) Commands.push_back(iter->first); return Commands; } int controller_main(int argc, char* argv[]) { // from this moment on, we need to be sure to deeinitialize in the correct order // this is handled by the cleanup function atexit(cleanUp); size_t Exitflag = 0; controller_AddOn *AddOn = getAddOn(); ASSERT(AddOn != NULL, "main() - returned AddOn is NULL."); ControllerOptions *ControllerInfo = AddOn->allocateControllerInfo(); boost::asio::io_service io_service; FragmentController controller(io_service); boost::program_options::variables_map vm; // prepare ControllerCommand // note: we need "< ControllerCommand::commands_t >" because parseExecutable(),... return int // in contrast to other functions that return void ControllerCommandRegistry ControllerCommands; boost::function registrator = (boost::bind(&Registry::registerInstance, boost::ref(ControllerCommands), _1)); registrator(new ControllerCommand("checkresults", boost::assign::list_of< ControllerCommand::commands_t > (boost::bind(&FragmentController::checkResults, boost::ref(controller), boost::cref(ControllerInfo->server), boost::cref(ControllerInfo->serverport))) (boost::bind(&printJobStatus, boost::bind(&FragmentController::getJobStatus, boost::ref(controller)))) )); registrator(new ControllerCommand("removeall", boost::assign::list_of< ControllerCommand::commands_t > (boost::bind(&FragmentController::removeall, boost::ref(controller), boost::cref(ControllerInfo->server), boost::cref(ControllerInfo->serverport))) )); registrator(new ControllerCommand("removealljobs", boost::assign::list_of< ControllerCommand::commands_t > (boost::bind(&FragmentController::removeWaitingJobs, boost::ref(controller), boost::cref(ControllerInfo->server), boost::cref(ControllerInfo->serverport))) )); registrator(new ControllerCommand("removeallresults", boost::assign::list_of< ControllerCommand::commands_t > (boost::bind(&FragmentController::removeWaitingResults, boost::ref(controller), boost::cref(ControllerInfo->server), boost::cref(ControllerInfo->serverport))) )); registrator(new ControllerCommand("shutdown", boost::assign::list_of< ControllerCommand::commands_t > (boost::bind(&FragmentController::shutdown, boost::ref(controller), boost::cref(ControllerInfo->server), boost::cref(ControllerInfo->serverport))) )); AddOn->addSpecificCommands(registrator, controller, *ControllerInfo); // Declare the supported options. boost::program_options::options_description desc("Allowed options"); desc.add_options() ("help,h", "produce help message") ("verbosity,v", boost::program_options::value(), "set verbosity level") ("server", boost::program_options::value< std::string >(), "connect to server at this address (host:port)") ("command", boost::program_options::value< std::string >(), (std::string("command to send to server: ")+toString(getListOfCommands(ControllerCommands))).c_str()) ; AddOn->addSpecificOptions(desc.add_options()); // parse command line boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm); boost::program_options::notify(vm); // set controller information int status = 0; status = ControllerInfo->parseHelp(vm, desc); if (status) return status; status = ControllerInfo->parseVerbosity(vm); if (status) return status; status = ControllerInfo->parseServerPort(vm); if (status) return status; status = ControllerInfo->parseCommand(vm, getListOfCommands(ControllerCommands)); if (status) return status; // all later parse functions depend on parsed command status = AddOn->addOtherParsings(*ControllerInfo, vm); if (status) return status; // parse given ControllerCommand if(!ControllerCommands.isPresentByName(ControllerInfo->command)) { ELOG(1, "Unrecognized command '"+toString(ControllerInfo->command)+"'."); return 255; } const ControllerCommand *commands = ControllerCommands.getByName(ControllerInfo->command); try { // execute each command in the queue synchronously size_t phase = 1; for (ControllerCommand::const_iterator iter = commands->begin(); iter != commands->end(); ++iter) { (*iter)(); { io_service.reset(); //Info info((std::string("io_service: ")+toString(phase)).c_str()); io_service.run(); } } Exitflag = controller.getExitflag(); } catch (std::exception& e) { std::cerr << e.what() << std::endl; } delete AddOn; return Exitflag; }