Changeset f91f47


Ignore:
Timestamp:
Feb 2, 2023, 8:14:03 PM (3 years ago)
Author:
Frederik Heber <frederik.heber@…>
Branches:
Candidate_v1.7.0, stable
Children:
1b84b8
Parents:
75a0cb
Message:

FIX: JobMarket's PoolGuard did not check for existing result.

  • when a worker re-enrolls, the PoolGuard notices that it's no longer on its busy list. However, it now also checks whether the result the worker was working on is actually present. If missing, then the job is resubmitted to the queue.
Location:
ThirdParty/JobMarket/src/JobMarket
Files:
3 edited

Legend:

Unmodified
Added
Removed
  • ThirdParty/JobMarket/src/JobMarket/FragmentScheduler.cpp

    r75a0cb rf91f47  
    6969      boost::bind(&FragmentScheduler::removeWorker, boost::ref(*this), _1),
    7070      boost::bind(&FragmentQueue::resubmitJob, boost::ref(JobsQueue), _1),
     71      boost::bind(&FragmentQueue::isResultPresent, boost::cref(JobsQueue), _1),
    7172      OpQueue)
    7273{
  • ThirdParty/JobMarket/src/JobMarket/Pool/PoolGuard.cpp

    r75a0cb rf91f47  
    4242    const boost::function<void (const WorkerAddress)> _removeWorkerfunction,
    4343    const boost::function<void (const JobId_t)> _resubmitJobfunction,
     44    const boost::function<bool (const JobId_t)> _checkResultPresentfunction,
    4445    OperationQueue &_OpQueue) :
    4546  CheckAtNextInterval(false),
     
    4849  removeWorkerfunction(_removeWorkerfunction),
    4950  resubmitJobfunction(_resubmitJobfunction),
     51  checkResultPresentfunction(_checkResultPresentfunction),
    5052  OpQueue(_OpQueue),
    5153  connection(_connection),
     
    8486              << currentiter->second << " than last time "
    8587              << iter->second << ", scheduling for checkalive.");
     88        }
     89      } else {
     90        // check whether the result is not present
     91        if (!checkResultPresentfunction(iter->second)) {
     92          LOG(1, "INFO: Worker " << address << " is no longer busy but the job #"
     93              << iter->second << "'s result is not present, resubmitting.");
     94          resubmitJobfunction(iter->second);
    8695        }
    8796      }
  • ThirdParty/JobMarket/src/JobMarket/Pool/PoolGuard.hpp

    r75a0cb rf91f47  
    4444   * @param _removeWorkerfunction bound function to remove worker by its WorkerAddress
    4545   * @param _resubmitJobfunction bound function to resubmit job by its JobId_t
     46   * @param _checkResultPresentfunction bound function to check whether a job's result by its JobId_t is present
    4647   * @param _OpQueue access to operation queue for placing CheckAliveWorkerOperations
    4748   */
     
    5253      const boost::function<void (const WorkerAddress)> _removeWorkerfunction,
    5354      const boost::function<void (const JobId_t)> _resubmitJobfunction,
     55      const boost::function<bool (const JobId_t)> _checkResultPresentfunction,
    5456      OperationQueue &_OpQueue
    5557      );
     
    131133  const boost::function<void (const JobId_t)> resubmitJobfunction;
    132134
     135  //!> bound function for checking for a job result's presence
     136  const boost::function<bool (const JobId_t)> checkResultPresentfunction;
     137
    133138  //!> typedef for the internal list of workers
    134139  typedef std::map<WorkerAddress, JobId_t> WorkerList_t;
Note: See TracChangeset for help on using the changeset viewer.