Changeset f67dfb for ThirdParty


Ignore:
Timestamp:
Mar 9, 2017, 10:16:58 PM (8 years ago)
Author:
Frederik Heber <heber@…>
Branches:
Action_Thermostats, Add_AtomRandomPerturbation, Add_RotateAroundBondAction, Add_SelectAtomByNameAction, Adding_Graph_to_ChangeBondActions, Adding_MD_integration_tests, Adding_StructOpt_integration_tests, AutomationFragmentation_failures, Candidate_v1.6.0, Candidate_v1.6.1, ChangeBugEmailaddress, ChangingTestPorts, ChemicalSpaceEvaluator, Debian_Package_split, Debian_package_split_molecuildergui_only, Disabling_MemDebug, Docu_Python_wait, EmpiricalPotential_contain_HomologyGraph_documentation, Enhance_userguide, Enhanced_StructuralOptimization, Enhanced_StructuralOptimization_continued, Example_ManyWaysToTranslateAtom, Exclude_Hydrogens_annealWithBondGraph, FitPartialCharges_GlobalError, Fix_ChronosMutex, Fix_StatusMsg, Fix_StepWorldTime_single_argument, Fix_Verbose_Codepatterns, ForceAnnealing_goodresults, ForceAnnealing_oldresults, ForceAnnealing_tocheck, ForceAnnealing_with_BondGraph, ForceAnnealing_with_BondGraph_continued, ForceAnnealing_with_BondGraph_continued_betteresults, ForceAnnealing_with_BondGraph_contraction-expansion, GeometryObjects, Gui_displays_atomic_force_velocity, IndependentFragmentGrids_IntegrationTest, JobMarket_RobustOnKillsSegFaults, JobMarket_StableWorkerPool, JobMarket_unresolvable_hostname_fix, PartialCharges_OrthogonalSummation, PythonUI_with_named_parameters, QtGui_reactivate_TimeChanged_changes, Recreated_GuiChecks, RotateToPrincipalAxisSystem_UndoRedo, StoppableMakroAction, TremoloParser_IncreasedPrecision, TremoloParser_MultipleTimesteps, Ubuntu_1604_changes, stable
Children:
4e8108, cc85e4
Parents:
8e7a1b
git-author:
Frederik Heber <heber@…> (02/23/17 15:17:19)
git-committer:
Frederik Heber <heber@…> (03/09/17 22:16:58)
Message:

FIX: FragmentScheduler and PoolWorker now handle unresolvable addresses properly.

  • OperationQueue now checks whether last operation has succeeded of failed and calls a failure callback in error case which is a NoOp by default.
  • if PoolWorker's Ops fail, he will automatically shutdown now.
  • if FragmentScheduler's Ops fail, he will automatically remove the worker from its pool.
Location:
ThirdParty/JobMarket
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • TabularUnified ThirdParty/JobMarket/src/JobMarket/FragmentScheduler.cpp

    r8e7a1b rf67dfb  
    6565      boost::bind(&FragmentScheduler::shutdown, boost::ref(*this))),
    6666  connection(_io_service),
     67  OpQueue(boost::bind(&FragmentScheduler::removeWorker, boost::ref(*this), _1)),
    6768  guard(_io_service, timeout, connection,
    6869      boost::bind(&FragmentScheduler::removeWorker, boost::ref(*this), _1),
  • TabularUnified ThirdParty/JobMarket/src/JobMarket/Operations/OperationQueue.cpp

    r8e7a1b rf67dfb  
    3333#include "JobMarket/Operations/AsyncOperation.hpp"
    3434#include "JobMarket/Operations/OperationQueue.hpp"
    35 #include "JobMarket/WorkerAddress.hpp"
     35
     36static void NoOp(const WorkerAddress) {}
     37
     38// static instances
     39const boost::function<void (const WorkerAddress)> OperationQueue::NoOpCallback = boost::bind(&NoOp, _1);
    3640
    3741size_t OperationQueue::max_connections = 1;
     
    7478            boost::bind(&AddressMap_t::count, boost::ref(AddressMap), boost::lambda::_1) );
    7579    if (queueiter != queue.end()) {
    76       AddressMap_t::iterator mapiter = AddressMap.find(*queueiter);
     80      const AddressMap_t::iterator mapiter = AddressMap.find(*queueiter);
    7781      ASSERT( mapiter != AddressMap.end(),
    7882          "OperationQueue::LaunchNextOp() - cannot find connection "+toString((*queueiter)->getName())+" in AddressMap.");
    79       const WorkerAddress address = mapiter->second;
    80       AsyncOp_ptr ptr = mapiter->first;
     83      currentOpsAddress = mapiter->second;
     84      const AsyncOp_ptr ptr = mapiter->first;
    8185      // always erase the op from the list of ones pending for launch
    8286      AddressMap.erase(mapiter);
    8387      // only launch when not a debug op
    84       if ((!address.host.empty()) && (!address.service.empty())) {
     88      if ((!currentOpsAddress.host.empty()) && (!currentOpsAddress.service.empty())) {
    8589        LOG(2, "DEBUG: Launching next operation " << ptr->getName() << ".");
    86        (*ptr)(address.host, address.service);
     90       (*ptr)(currentOpsAddress.host, currentOpsAddress.service);
    8791      } else {
    8892        LOG(3, "DEBUG: Skipping debug operation " << ptr->getName() << " with empty address.");
     
    117121  AsyncOperation *op = static_cast<AsyncOperation *>(publisher);
    118122  if (op != NULL) {
    119     LOG(1, "INFO: We are notified that " << op->getName() << " is done, removing ...");
     123    // check for error code of operation
     124    if (op->getStatus() != Operation::success) {
     125      // remove the worker from the queue
     126      failure_callback(currentOpsAddress);
     127      LOG(1, "INFO: We are notified that " << op->getName() << " has failed, removing ...");
     128    } else {
     129      LOG(1, "INFO: We are notified that " << op->getName() << " is done, removing ...");
     130    }
    120131    // remove from queue
    121132    remove(op, this);
  • TabularUnified ThirdParty/JobMarket/src/JobMarket/Operations/OperationQueue.hpp

    r8e7a1b rf67dfb  
    1515#endif
    1616
     17#include <boost/function.hpp>
    1718#include <boost/shared_ptr.hpp>
    1819#include <deque>
     
    2021#include "CodePatterns/Observer/Observer.hpp"
    2122
     23#include "JobMarket/WorkerAddress.hpp"
     24
    2225class AsyncOperation;
    2326class Observer;
    2427class OperationQueueTest;
    25 class WorkerAddress;
    2628
    2729/** This class is a container for \ref AsyncOperation's that are kept as shared_ptr
     
    3234  //!> grant unit test access to private part
    3335  friend class OperationQueueTest;
     36
     37  //!> static function as default callback on failure of operation that does nothing
     38  static const boost::function<void (const WorkerAddress)> NoOpCallback;
    3439public:
    3540  /** Default constructor for class OperationQueue.
     
    3944    Observer("OperationQueue"),
    4045    RunningOps(0),
    41     IsBlockedFlag(false)
     46    currentOpsAddress("localhost", "default"),
     47    IsBlockedFlag(false),
     48    failure_callback(NoOpCallback)
    4249  {}
     50
     51  /** Default constructor for class OperationQueue.
     52   *
     53   */
     54  OperationQueue(const boost::function<void (const WorkerAddress)> &_failure_callback) :
     55    Observer("OperationQueue"),
     56    RunningOps(0),
     57    currentOpsAddress("localhost", "default"),
     58    IsBlockedFlag(false),
     59    failure_callback(_failure_callback)
     60  {}
     61
    4362  /** Default destructor for class OperationQueue.
    4463   *
     
    140159  AddressMap_t AddressMap;
    141160
     161  //!> address of current operation in case of failure
     162  WorkerAddress currentOpsAddress;
     163
    142164  //!> status flag whether queue is blocked or operations may be pushed.
    143165  bool IsBlockedFlag;
     166
     167  //!> failure callback to call when operation's address could not be resolved
     168  const boost::function<void (const WorkerAddress)> failure_callback;
    144169};
    145170
  • TabularUnified ThirdParty/JobMarket/src/JobMarket/Pool/PoolWorker.cpp

    r8e7a1b rf67dfb  
    6363  connection_(_io_service),
    6464  failed(boost::bind(&ExitflagContainer::setExitflag, this, ExitflagContainer::ErrorFlag)),
     65  OpQueue(boost::bind(&PoolWorker::finish, boost::ref(*this))),
    6566  thread_remover(boost::bind(&PoolWorker::PoolListener_t::handle_RemoveThread, boost::ref(PoolListener)))
    6667{
  • TabularUnified ThirdParty/JobMarket/tests/regression/testsuite-falsehost.at

    r8e7a1b rf67dfb  
    33AT_SETUP([Fragmentation - Automation: Worker sets false hostname])
    44AT_KEYWORDS([fragmentation automation controller poolworker server falsehost])
    5 AT_XFAIL_IF([/bin/true])
    65
    76WORKERPORT=1080
Note: See TracChangeset for help on using the changeset viewer.