Changeset ab2e834


Ignore:
Timestamp:
Aug 11, 2025, 5:45:37 PM (3 months ago)
Author:
Frederik Heber <frederik.heber@…>
Branches:
Candidate_v1.7.0, stable
Children:
ae9ad6
Parents:
aeec58
git-author:
Frederik Heber <frederik.heber@…> (07/16/25 19:05:29)
git-committer:
Frederik Heber <frederik.heber@…> (08/11/25 17:45:37)
Message:

Reduces JobMarket features for use in kubernetes clusters.

  • removes all functionality where we can enumerate the workers, enroll or remove them.
  • removes PoolGuard and CheckAlive functionality.
  • removes the shutdown thread: not needed as we don't wait for busy workers anymore.
  • removes WorkerPool. This is now handled by the kubernetes deployment.
  • TESTS: Removed respective unit and regression tests.
  • TESTS: Adapted regression tests as worker now needs to be present when first job is sent. Moreover, server needs to be equipped with worker host and port.
Location:
ThirdParty/JobMarket
Files:
24 deleted
27 edited

Legend:

Unmodified
Added
Removed
  • ThirdParty/JobMarket/src/JobMarket/Controller/FragmentController.cpp

    raeec58 rab2e834  
    3131#include "Operations/Controllers/CheckResultsOperation.hpp"
    3232#include "Operations/Controllers/GetNextJobIdOperation.hpp"
    33 #include "Operations/Controllers/GetNumberWorkersOperation.hpp"
    3433#include "Operations/Controllers/ReceiveResultsOperation.hpp"
    3534#include "Operations/Controllers/RemoveAllJobsOperation.hpp"
    3635#include "Operations/Controllers/RemoveAllResultsOperation.hpp"
    37 #include "Operations/Controllers/RemoveAllWorkerOperation.hpp"
    3836#include "Operations/Controllers/SendJobsOperation.hpp"
    3937#include "Operations/Controllers/ShutdownOperation.hpp"
     
    5755  Commands.registerInstance(new CheckResultsOperation(connection_, AsyncOperation::NoOpCallback, failed));
    5856  Commands.registerInstance(new GetNextJobIdOperation(connection_, AsyncOperation::NoOpCallback, failed));
    59   Commands.registerInstance(new GetNumberWorkersOperation(connection_, AsyncOperation::NoOpCallback, failed));
    6057  Commands.registerInstance(new ReceiveResultsOperation(connection_, AsyncOperation::NoOpCallback, failed));
    6158  Commands.registerInstance(new RemoveAllJobsOperation(connection_));
    6259  Commands.registerInstance(new RemoveAllResultsOperation(connection_));
    63   Commands.registerInstance(new RemoveAllWorkerOperation(connection_));
    6460  Commands.registerInstance(new SendJobsOperation(connection_, AsyncOperation::NoOpCallback, failed));
    6561  Commands.registerInstance(new ShutdownOperation(connection_));
     
    164160}
    165161
    166 /** Obtains number of workers enrolled at server
    167  *
    168  * @param host address of server
    169  * @param service port/service of server
    170  */
    171 void FragmentController::checkEnrolledWorkers(
    172     const std::string &host,
    173     const std::string &service)
    174 {
    175   GetNumberWorkersOperation *numberworkers = static_cast<GetNumberWorkersOperation *>(
    176       Commands.getByName("numberworkers"));
    177   (*numberworkers)(host, service);
    178 }
    179 
    180162/** Return scheduled and done jobs.
    181163 *
     
    191173}
    192174
    193 /** Return scheduled and done jobs.
    194  *
    195  * @return number of workers in certain states enrolled in server
    196  */
    197 std::vector<size_t> FragmentController::getNumberOfWorkers()
    198 {
    199   const GetNumberWorkersOperation * const numberworkers = static_cast<const GetNumberWorkersOperation *>(
    200       Commands.getByName("numberworkers"));
    201   return numberworkers->get();
    202 }
    203 
    204175/** Requests removal of all pending results from server.
    205176 *
     
    228199  RemoveAllJobsOperation *removeall = static_cast<RemoveAllJobsOperation *>(
    229200      Commands.getByName("removealljobs"));
    230   (*removeall)(host, service);
    231   setExitflagFromStatus(*removeall);
    232 }
    233 
    234 /** Requests removal of all idle workers from server.
    235  *
    236  * @param host address of server
    237  * @param service port/service of server
    238  */
    239 void FragmentController::removeall(
    240     const std::string &host,
    241     const std::string &service)
    242 {
    243   RemoveAllWorkerOperation *removeall = static_cast<RemoveAllWorkerOperation *>(
    244       Commands.getByName("removeallworker"));
    245201  (*removeall)(host, service);
    246202  setExitflagFromStatus(*removeall);
  • ThirdParty/JobMarket/src/JobMarket/Controller/FragmentController.hpp

    raeec58 rab2e834  
    4949  void checkResults(const std::string &host, const std::string &service);
    5050  std::pair<size_t, size_t> getJobStatus() const;
    51   void checkEnrolledWorkers(const std::string &host, const std::string &service);
    5251  std::vector<size_t> getNumberOfWorkers();
    5352  void removeWaitingResults(const std::string &host, const std::string &service);
    5453  void removeWaitingJobs(const std::string &host, const std::string &service);
    55   void removeall(const std::string &host, const std::string &service);
    5654  void receiveResults(const std::string &host, const std::string &service);
    5755  std::vector<FragmentResult::ptr> getReceivedResults();
  • ThirdParty/JobMarket/src/JobMarket/Controller/controller_AddOn.hpp

    raeec58 rab2e834  
    5454 */
    5555struct controller_AddOn {
     56  virtual ~controller_AddOn() {}
     57
    5658  /** Function to allocate a derived variant of ControllerOptions which might
    5759   * be extended by own member variables and parsing functions.
  • ThirdParty/JobMarket/src/JobMarket/Controller/controller_main.cpp

    raeec58 rab2e834  
    5757}
    5858
    59 /** Print the status of workers
    60  *
    61  * @param number_workers vector of workers in certain states (busy, idle, ...)
    62  */
    63 void printNumberWorkers(const std::vector<size_t> &number_workers)
    64 {
    65   ASSERT(number_workers.size() == 2,
    66       "printNumberWorkers() - vector of worker numbers is not of size 2.");
    67   LOG(1, "INFO: " << number_workers[0] << " are busy and "
    68       << number_workers[1] << " are idle.");
    69 }
    70 
    7159inline std::vector<std::string> getListOfCommands(const ControllerCommandRegistry &ControllerCommands)
    7260{
     
    10896        (boost::bind(&printJobStatus,
    10997            boost::bind(&FragmentController::getJobStatus, boost::ref(controller))))
    110       ));
    111   registrator(new ControllerCommand("numberworkers",
    112       boost::assign::list_of< ControllerCommand::commands_t >
    113         (boost::bind(&FragmentController::checkEnrolledWorkers,
    114             boost::ref(controller), boost::cref(ControllerInfo->server), boost::cref(ControllerInfo->serverport)))
    115         (boost::bind(&printNumberWorkers,
    116             boost::bind(&FragmentController::getNumberOfWorkers, boost::ref(controller))))
    117       ));
    118   registrator(new ControllerCommand("removeall",
    119       boost::assign::list_of< ControllerCommand::commands_t >
    120         (boost::bind(&FragmentController::removeall,
    121             boost::ref(controller), boost::cref(ControllerInfo->server), boost::cref(ControllerInfo->serverport)))
    12298      ));
    12399  registrator(new ControllerCommand("removealljobs",
     
    176152  {
    177153    // execute each command in the queue synchronously
    178     size_t phase = 1;
     154    //size_t phase = 1;
    179155    for (ControllerCommand::const_iterator iter = commands->begin();
    180156       iter != commands->end(); ++iter) {
  • ThirdParty/JobMarket/src/JobMarket/ControllerChoices.hpp

    raeec58 rab2e834  
    2323  EmptyResultQueue,
    2424  EmptyJobQueue,
    25   RemoveAll,
    2625  ShutdownControllerSocket,
    27   NumberOfWorkers
    2826};
    2927
  • ThirdParty/JobMarket/src/JobMarket/FragmentScheduler.cpp

    raeec58 rab2e834  
    3535#include "CodePatterns/Log.hpp"
    3636#include "CodePatterns/Observer/Notification.hpp"
    37 #include "Operations/Servers/SendJobToWorkerOperation.hpp"
    38 #include "Operations/Servers/ShutdownWorkerOperation.hpp"
    39 #include "Operations/Workers/EnrollInPoolOperation.hpp"
    4037#include "JobMarket/JobId.hpp"
     38#include "JobMarket/Operations/Servers/SendJobToWorkerOperation.hpp"
     39
    4140
    4241#include "JobMarket/FragmentScheduler.hpp"
     
    5554    unsigned short workerport,
    5655    unsigned short controllerport,
    57     const size_t timeout,
    58     bool guardFromStart) :
     56    const WorkerAddress _workeraddress,
     57    const size_t timeout) :
    5958  Observer("FragmentScheduler"),
    6059  io_service(_io_service),
    61   WorkerListener(_io_service, workerport, JobsQueue, pool,
    62       boost::bind(&FragmentScheduler::sendJobToWorker, boost::ref(*this), _1, _2),
    63       boost::bind(&FragmentScheduler::unmarkWorkerBusy, boost::ref(*this), _1)),
    64   ControllerListener(_io_service, controllerport, JobsQueue, pool,
    65       boost::bind(&FragmentScheduler::removeAllWorkers, boost::ref(*this)),
     60  WorkerListener(_io_service, workerport, JobsQueue,
     61      boost::bind(&FragmentScheduler::sendJobToWorker, boost::ref(*this), _1, _2)),
     62  ControllerListener(_io_service, controllerport, JobsQueue,
    6663      boost::bind(&FragmentScheduler::shutdown, boost::ref(*this))),
     64  workeraddress(_workeraddress),
    6765  connection(_io_service),
    68   OpQueue(boost::bind(&FragmentScheduler::removeWorker, boost::ref(*this), _1)),
    69   guard(_io_service, timeout, connection, guardFromStart,
    70       boost::bind(&FragmentScheduler::removeWorker, boost::ref(*this), _1),
    71       boost::bind(&FragmentQueue::resubmitJob, boost::ref(JobsQueue), _1),
    72       boost::bind(&FragmentQueue::isResultPresent, boost::cref(JobsQueue), _1),
    73       pool.getPoolOfWorkers(),
    74       OpQueue),
    75   shutdown_thread(NULL)
     66  OpQueue()
    7667{
    7768  DEBUG_FUNCTION_ENTRYEXIT
     
    8071   && (ControllerListener.getExitflag() == Listener::OkFlag)) {
    8172    // sign on to idle workers and present jobs
    82     pool.signOn(this, WorkerPool::WorkerIdle);
    8373    JobsQueue.signOn(this, FragmentQueue::JobAdded);
    8474
     
    9585FragmentScheduler::~FragmentScheduler()
    9686{
    97   if (shutdown_thread != NULL) {
    98     // notify shutdown thread that we wish to terminate
    99     shutdown_thread->interrupt();
    100     // wait till it ends
    101     shutdown_thread->join();
    102     delete shutdown_thread;
    103   }
    104 
    10587  // sign off
    106   pool.signOff(this, WorkerPool::WorkerIdle);
    10788  JobsQueue.signOff(this, FragmentQueue::JobAdded);
    10889}
     
    123104void FragmentScheduler::sendJobToWorker(const WorkerAddress &address, FragmentJob::ptr &job)
    124105{
    125   ASSERT( pool.isWorkerBusy(address),
    126       "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is not marked as busy.");
    127106  LOG(1, "INFO: Sending job " << job->getId() << " to worker " << address << ".");
    128107
     
    145124void FragmentScheduler::workerAcceptsJob(const WorkerAddress _address, const JobId_t _id)
    146125{
    147   // inform guard and if not already running, also launch guard
    148   guard.addBusyWorker(_address, _id);
    149   if (!guard.isRunning())
    150     guard.start();
    151 
    152126}
    153127
     
    160134void FragmentScheduler::workerRejectsJob(const WorkerAddress _address, const JobId_t _id)
    161135{
    162   unmarkWorkerBusy(_address);
    163136  JobsQueue.resubmitJob(_id);
    164 }
    165 
    166 /** Helper function to shutdown a single worker.
    167  *
    168  * We send NoJob to indicate shutdown
    169  *
    170  * @param address of worker to shutdown
    171  */
    172 void FragmentScheduler::shutdownWorker(const WorkerAddress &address)
    173 {
    174   ASSERT( !pool.isWorkerBusy(address),
    175       "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is already busy.");
    176   LOG(2, "INFO: Shutting down worker " << address << "...");
    177   AsyncOperation *shutdownWorkerOp = new ShutdownWorkerOperation(connection);
    178   OpQueue.push_back(shutdownWorkerOp, address);
    179 }
    180 
    181 /** Sends shutdown to all current workers in the pool.
    182  *
    183  * This only works if called in an extra thread that does not block the main
    184  * thread. Otherwise, the \a OperationQueue will not be able to send out
    185  * the shutdown operations to the workers and we loop forever.
    186  */
    187 void FragmentScheduler::removeAllWorkers()
    188 {
    189   /**
    190    * Some workers might still be busy. Let's remove all idle workers already.
    191    */
    192   removeAllIdleWorkers();
    193 
    194   // wait for busy workers to finish if there are any
    195   if (pool.hasBusyWorkers()) {
    196     while (pool.hasBusyWorkers()) {
    197       LOG(2, "DEBUG: Waiting for " << pool.getNoBusyWorkers() << " busy workers to finish ...");
    198       sleep(1);
    199     }
    200 
    201     // Finally, remove also the remaining then busy, now idle workers
    202     removeAllIdleWorkers();
    203   }
    204 
    205   // wait for remaining workers to finish
    206   while (pool.presentIdleWorkers()) {
    207     LOG(2, "DEBUG: Waiting for " << pool.getNoIdleWorkers() << " idle workers to finish ...");
    208     sleep(1);
    209   }
    210 }
    211 
    212 /** Sends shutdown to all currently idle workers in the pool.
    213  *
    214  */
    215 void FragmentScheduler::removeAllIdleWorkers()
    216 {
    217   LOG(2, "INFO: Shutting down " << pool.getNoIdleWorkers() << " idle workers ...");
    218   // iterate until there are no more idle workers
    219   // get list of all idle workers
    220   typedef std::vector<std::pair<std::string, std::string> > WorkerList_t;
    221   const WorkerList_t WorkerList = pool.getListOfIdleWorkers();
    222 
    223   // give all workers shutdown signal
    224   for (WorkerList_t::const_iterator iter = WorkerList.begin(); iter != WorkerList.end(); ++iter)
    225     shutdownWorker(WorkerAddress(iter->first, iter->second));
    226137}
    227138
     
    239150  */
    240151  LOG(0, "STATUS: Shutting down due to signal " << sig << ".");
    241 
    242   // sign off from notifications such that no new jobs are given to workers
    243   pool.signOff(this, WorkerPool::WorkerIdle);
    244 
    245   if (shutdown_thread == NULL) {
    246     shutdown_thread = new boost::thread(boost::bind(&FragmentScheduler::shutdownAfterSignal, this));
    247   }
    248152}
    249153
     
    251155 */
    252156void FragmentScheduler::shutdownAfterSignal() {
    253   removeAllWorkers();
    254157  shutdown();
    255158}
     
    263166bool FragmentScheduler::shutdown()
    264167{
    265   if (!pool.presentIdleWorkers() && !pool.hasBusyWorkers()) {
    266     LOG(1, "INFO: Shutting all down ...");
    267 
    268     // close the guard's watch
    269     guard.stop();
    270 
    271     /// close the worker listener's socket (until here need to accept results still)
    272     WorkerListener.closeSocket();
    273 
    274     /// close the controller listener's socket
    275     ControllerListener.closeSocket();
    276 
    277     /// finally, stop the io_service
    278     sleep(1);
    279     io_service.stop();
    280 
    281     return true;
    282   } else {
    283     ELOG(2, "There are still idle or busy workers present.");
    284     return false;
    285   }
     168  LOG(1, "INFO: Shutting all down ...");
     169
     170  /// close the worker listener's socket (until here need to accept results still)
     171  WorkerListener.closeSocket();
     172
     173  /// close the controller listener's socket
     174  ControllerListener.closeSocket();
     175
     176  /// finally, stop the io_service
     177  sleep(1);
     178  io_service.stop();
     179
     180  return true;
    286181}
    287182
     
    291186void FragmentScheduler::sendAvailableJobToNextIdleWorker()
    292187{
    293   const WorkerAddress address = pool.getNextIdleWorker();
    294188  FragmentJob::ptr job = JobsQueue.popJob();
    295   sendJobToWorker(address, job);
     189  sendJobToWorker(workeraddress, job);
    296190}
    297191
     
    303197void FragmentScheduler::recieveNotification(Observable *publisher, Notification_ptr notification)
    304198{
    305   if ((publisher == &pool) && (notification->getChannelNo() == WorkerPool::WorkerIdle)) {
    306     // we have an idle worker
    307     LOG(1, "INFO: We are notified of an idle worker.");
    308     // are jobs available?
    309     if (JobsQueue.isJobPresent()) {
    310       sendAvailableJobToNextIdleWorker();
    311     }
    312   } else if ((publisher == &JobsQueue) && (notification->getChannelNo() == FragmentQueue::JobAdded)) {
     199  if ((publisher == &JobsQueue) && (notification->getChannelNo() == FragmentQueue::JobAdded)) {
    313200    // we have new jobs
    314201    LOG(1, "INFO: We are notified of a new job.");
    315     // check for idle workers
    316     if (pool.presentIdleWorkers()) {
    317       sendAvailableJobToNextIdleWorker();
    318     }
     202    sendAvailableJobToNextIdleWorker();
    319203  } else {
    320204    ASSERT(0, "FragmentScheduler::recieveNotification() - we are not signed on for updates in channel "
     
    325209void FragmentScheduler::subjectKilled(Observable *publisher)
    326210{}
    327 
    328 /** Removes worker at \a address and stops guard in case of last busy one.
    329  *
    330  * This is is specifically meant to remove a busy worker as in this case we
    331  * might also have to call PoolGuard::stop() if it has been the last busy
    332  * worker.
    333  *
    334  * @param address address of worker to remove.
    335  */
    336 void FragmentScheduler::removeWorker(const WorkerAddress address)
    337 {
    338   pool.removeWorker(address);
    339 }
    340 
    341 /** Unmarks worker from being busy in WorkerPool.
    342  *
    343  * This is required to catch when a worker has finished working on a job and to
    344  * inform the PoolGuard
    345  *
    346  * @param address address of once busy worker
    347  */
    348 void FragmentScheduler::unmarkWorkerBusy(const WorkerAddress address)
    349 {
    350   // inform guard
    351   guard.removeBusyWorker(address);
    352   // unmark in pool
    353   pool.unmarkWorkerBusy(address);
    354 }
  • ThirdParty/JobMarket/src/JobMarket/FragmentScheduler.hpp

    raeec58 rab2e834  
    1717#include <boost/function.hpp>
    1818#include <boost/shared_ptr.hpp>
    19 #include <boost/thread.hpp>
    2019#include <deque>
    2120#include <set>
     
    2726#include "JobMarket/Operations/AsyncOperation.hpp"
    2827#include "JobMarket/Operations/OperationQueue.hpp"
    29 #include "Operations/Servers/ShutdownWorkerOperation.hpp"
    3028#include "JobMarket/ControllerChoices.hpp"
    3129#include "JobMarket/FragmentQueue.hpp"
     
    3533#include "JobMarket/Results/FragmentResult.hpp"
    3634#include "JobMarket/types.hpp"
    37 #include "JobMarket/Pool/PoolGuard.hpp"
    38 #include "JobMarket/Pool/WorkerPool.hpp"
    3935#include "JobMarket/WorkerAddress.hpp"
    4036#include "JobMarket/WorkerChoices.hpp"
     
    5349      unsigned short workerport,
    5450      unsigned short controllerport,
    55       const size_t timeout,
    56       bool guardFromStart
     51      const WorkerAddress _workeraddress,
     52      const size_t timeout
    5753      );
    5854  ~FragmentScheduler();
     
    6359  void sendAvailableJobToNextIdleWorker();
    6460  bool shutdown();
    65   void shutdownWorker(const WorkerAddress &address);
    6661  void shutdownAfterSignal();
    67   void removeAllWorkers();
    68   void removeAllIdleWorkers();
    69   void removeWorker(const WorkerAddress address);
    70   void unmarkWorkerBusy(const WorkerAddress address);
    7162  void cleanupOperationQueue(AsyncOperation *op);
    7263  void workerAcceptsJob(const WorkerAddress _address, const JobId_t _id);
     
    8475        unsigned short port,
    8576        FragmentQueue &_JobsQueue,
    86         WorkerPool &_pool,
    87         boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> _callback,
    88         boost::function<void (const WorkerAddress)> _unmarkBusyWorkerFunction) :
     77        boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> _callback) :
    8978      Listener(io_service, port),
    9079      JobsQueue(_JobsQueue),
    91       pool(_pool),
    92       callback_sendJobToWorker(_callback),
    93       unmarkBusyWorkerFunction(_unmarkBusyWorkerFunction)
     80      callback_sendJobToWorker(_callback)
    9481    {}
    9582    virtual ~WorkerListener_t() {}
     
    136123    void handle_ReadAddress(const boost::system::error_code& e, connection_ptr conn, HandlerData::ptr &data);
    137124
    138     /// Worker callback function when new worker has enrolled.
    139     void handle_enrolled(const boost::system::error_code& e, connection_ptr conn, HandlerData::ptr &data);
    140 
    141     /// Worker callback function when worker has been informed of successful removal.
    142     void handle_removed(const boost::system::error_code& e, connection_ptr conn, HandlerData::ptr &data);
    143 
    144125    /// Worker callback function when result has been received.
    145126    void handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn, HandlerData::ptr &data);
     
    152133    FragmentQueue &JobsQueue;
    153134
    154     //!> callback reference to container class
    155     WorkerPool &pool;
    156 
    157135    //!> callback function to access send job function
    158136    boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> callback_sendJobToWorker;
    159 
    160     //!> function to call when a busy worker has finished and sent its result
    161     boost::function<void (const WorkerAddress)> unmarkBusyWorkerFunction;
    162137  };
    163138
     
    169144        unsigned short port,
    170145        FragmentQueue &_JobsQueue,
    171         WorkerPool &_pool,
    172         boost::function<void ()> _removeallWorkers,
    173146        boost::function<bool ()> _shutdownAllSockets) :
    174147      Listener(io_service, port),
    175148      JobsQueue(_JobsQueue),
    176       pool(_pool),
    177149      jobInfo((size_t)2, 0),
    178150      NumberIds(0),
    179151      choice(NoControllerOperation),
    180       removeallWorkers(_removeallWorkers),
    181152      shutdownAllSockets(_shutdownAllSockets),
    182153      globalId(0)
     
    209180    void handle_SendResults(const boost::system::error_code& e, connection_ptr conn);
    210181
    211     /// Controller callback function when result are to be sent.
    212     void handle_SendNumberWorkers(const boost::system::error_code& e, connection_ptr conn);
    213 
    214182    /// Controller callback function when results has been sent.
    215183    void handle_finishSendResults(const boost::system::error_code& e, connection_ptr conn);
     
    219187    FragmentQueue & JobsQueue;
    220188
    221     //!> const reference to external pool of Workers
    222     const WorkerPool & pool;
    223 
    224189    //!> bunch of jobs received from controller before placed in JobsQueue
    225190    std::vector<FragmentJob::ptr> jobs;
     
    236201    //!> choice
    237202    enum ControllerChoices choice;
    238 
    239     //!> bound function to remove all workers
    240     boost::function<void ()> removeallWorkers;
    241203
    242204    //!> bound function to shutdown all sockets
     
    258220  FragmentQueue JobsQueue;
    259221
    260   //!> Pool of Workers
    261   WorkerPool pool;
    262 
    263222  //!> Listener instance that waits for a worker
    264223  WorkerListener_t  WorkerListener;
     
    266225  //!> Listener instance that waits for a controller
    267226  ControllerListener_t  ControllerListener;
     227
     228  //!> Ingress address to reach the next worker
     229  const WorkerAddress workeraddress;
    268230
    269231public:
     
    287249  //!> internal queue for all asynchronous operations
    288250  OperationQueue OpQueue;
    289 
    290 private:
    291   //!> Guard that checks if workers are unreachable
    292   PoolGuard guard;
    293 
    294   //!> shutdown thread to keep the operation queue free to run
    295   boost::thread *shutdown_thread;
    296251};
    297252
  • ThirdParty/JobMarket/src/JobMarket/FragmentScheduler_ControllerListener.cpp

    raeec58 rab2e834  
    136136      JobsQueue.removeWaitingJobs();
    137137      LOG(2, "DEBUG: Queue has " << JobsQueue.getPresentJobs() << " results.");
    138       break;
    139     }
    140     case RemoveAll:
    141     {
    142       removeallWorkers();
    143       break;
    144     }
    145     case NumberOfWorkers:
    146     {
    147       LOG(1, "INFO: Sending total number of workers to a controller ...");
    148       std::vector<size_t> number_workers;
    149       number_workers += pool.getNoBusyWorkers();
    150       number_workers += pool.getNoIdleWorkers();
    151       conn->async_write(number_workers,
    152         boost::bind(&FragmentScheduler::ControllerListener_t::handle_SendNumberWorkers, this,
    153         boost::asio::placeholders::error, conn));
    154138      break;
    155139    }
     
    360344  LOG(1, "INFO: Results have been sent.");
    361345}
    362 
    363 /** Controller callback function when number of workers has been sent.
    364  *
    365  * \param e error code if something went wrong
    366  * \param conn reference with the connection
    367  */
    368 void FragmentScheduler::ControllerListener_t::handle_SendNumberWorkers(const boost::system::error_code& e, connection_ptr conn)
    369 {
    370   DEBUG_FUNCTION_ENTRYEXIT
    371   // do nothing
    372   LOG(1, "INFO: Number of workers have been sent.");
    373 }
  • ThirdParty/JobMarket/src/JobMarket/FragmentScheduler_WorkerListener.cpp

    raeec58 rab2e834  
    110110        break;
    111111      }
    112       case EnrollInPool:
    113       {
    114         if (pool.presentInPool(data->address)) {
    115           ELOG(1, "INFO: worker "+toString(data->address)+" is already contained in pool.");
    116           conn->async_write(false,
    117             boost::bind(&FragmentScheduler::WorkerListener_t::handle_enrolled, this,
    118             boost::asio::placeholders::error, conn, data));
    119           // mark as idle if a rejoining worker
    120           unmarkBusyWorkerFunction(data->address);
    121           LOG(1, "INFO: There are " << pool.getNoTotalWorkers() << " workers in the queue, "
    122               << pool.getNoIdleWorkers() << " of which are idle.");
    123         } else {
    124           // insert as its new worker
    125           LOG(1, "INFO: Adding " << data->address << " to pool ...");
    126           const bool addedToPool = pool.addWorker(data->address);
    127           conn->async_write(addedToPool,
    128             boost::bind(&FragmentScheduler::WorkerListener_t::handle_enrolled, this,
    129             boost::asio::placeholders::error, conn, data));
    130         }
    131         break;
    132       }
    133112      case SendResult:
    134113      {
    135         if (pool.presentInPool(data->address)) {
    136           // check whether its priority is busy_priority
    137           if (pool.isWorkerBusy(data->address)) {
    138             conn->async_read(data->result,
    139               boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker, this,
    140               boost::asio::placeholders::error, conn, data));
    141           } else {
    142             ELOG(1, "Worker " << data->address << " trying to send result who is not marked as busy.");
    143             conn->async_read(data->result,
    144               boost::bind(&FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker, this,
    145               boost::asio::placeholders::error, conn, data));
    146           }
    147         } else {
    148           ELOG(1, "Worker " << data->address << " trying to send result who is not in pool.");
    149           conn->async_read(data->result,
    150             boost::bind(&FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker, this,
    151             boost::asio::placeholders::error, conn, data));
    152         }
    153         break;
    154       }
    155       case RemoveFromPool:
    156       {
    157         if (pool.presentInPool(data->address)) {
    158           conn->async_write(true,
    159             boost::bind(&FragmentScheduler::WorkerListener_t::handle_removed, this,
    160             boost::asio::placeholders::error, conn, data));
    161         } else {
    162           ELOG(1, "Shutting down Worker " << data->address << " not contained in pool.");
    163           conn->async_write(false,
    164             boost::bind(&FragmentScheduler::WorkerListener_t::handle_removed, this,
    165             boost::asio::placeholders::error, conn, data));
    166         }
     114        // check whether its priority is busy_priority
     115        conn->async_read(data->result,
     116          boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker, this,
     117          boost::asio::placeholders::error, conn, data));
    167118        break;
    168119      }
     
    179130  else
    180131  {
    181     // An error occurred. Log it and return. Since we are not starting a new
    182     // accept operation the io_service will run out of work to do and the
    183     // server will exit.
    184     setExitflag( ErrorFlag );
    185     ELOG(0, e.message());
    186   }
    187 }
    188 
    189 
    190 /** Callback function when new worker has enrolled.
    191  *
    192  * \param e error code if something went wrong
    193  * \param conn reference with the connection
    194  * \param data shared pointer with data associated to this chain of handlers.
    195  */
    196 void FragmentScheduler::WorkerListener_t::handle_enrolled(const boost::system::error_code& e, connection_ptr conn, HandlerData::ptr &data)
    197 {
    198   DEBUG_FUNCTION_ENTRYEXIT
    199   if (!e)  {
    200     LOG(2, "DEBUG: Successfully enrolled.");
    201     LOG(1, "INFO: There are " << pool.getNoTotalWorkers() << " workers in the queue, "
    202         << pool.getNoIdleWorkers() << " of which are idle.");
    203   } else {
    204     // An error occurred. Log it and return. Since we are not starting a new
    205     // accept operation the io_service will run out of work to do and the
    206     // server will exit.
    207     setExitflag( ErrorFlag );
    208     ELOG(0, e.message());
    209   }
    210 }
    211 
    212 /** Callback function when new worker has enrolled.
    213  *
    214  * \param e error code if something went wrong
    215  * \param conn reference with the connection
    216  * \param data shared pointer with data associated to this chain of handlers.
    217  */
    218 void FragmentScheduler::WorkerListener_t::handle_removed(const boost::system::error_code& e, connection_ptr conn, HandlerData::ptr &data)
    219 {
    220   DEBUG_FUNCTION_ENTRYEXIT
    221   if (!e)  {
    222     // removing present worker
    223     pool.removeWorker(data->address);
    224   } else {
    225132    // An error occurred. Log it and return. Since we are not starting a new
    226133    // accept operation the io_service will run out of work to do and the
     
    251158    JobsQueue.pushResult(data->result);
    252159
    253   // mark as idle
    254   unmarkBusyWorkerFunction(data->address);
    255   LOG(1, "INFO: There are " << pool.getNoTotalWorkers() << " workers in the queue, "
    256       << pool.getNoIdleWorkers() << " of which are idle.");
    257 
    258160  LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
    259161}
  • ThirdParty/JobMarket/src/JobMarket/Makefile.am

    raeec58 rab2e834  
    4040        Operations/Controllers/CheckResultsOperation.cpp \
    4141        Operations/Controllers/GetNextJobIdOperation.cpp \
    42         Operations/Controllers/GetNumberWorkersOperation.cpp \
    4342        Operations/Controllers/ReceiveResultsOperation.cpp \
    4443        Operations/Controllers/RemoveAllJobsOperation.cpp \
    4544        Operations/Controllers/RemoveAllResultsOperation.cpp \
    46         Operations/Controllers/RemoveAllWorkerOperation.cpp \
    4745        Operations/Controllers/SendJobsOperation.cpp \
    4846        Operations/Controllers/ShutdownOperation.cpp \
    49         Operations/Servers/CheckAliveWorkerOperation.cpp \
    5047        Operations/Servers/SendJobToWorkerOperation.cpp \
    51         Operations/Servers/ShutdownWorkerOperation.cpp \
    52         Operations/Workers/EnrollInPoolOperation.cpp \
    5348        Operations/Workers/ObtainJobOperation.cpp \
    54         Operations/Workers/RemoveFromPoolOperation.cpp \
    5549        Operations/Workers/SubmitResultOperation.cpp
    5650
     
    6458        Operations/Controllers/CheckResultsOperation.hpp \
    6559        Operations/Controllers/GetNextJobIdOperation.hpp \
    66         Operations/Controllers/GetNumberWorkersOperation.hpp \
    6760        Operations/Controllers/ReceiveResultsOperation.hpp \
    6861        Operations/Controllers/RemoveAllJobsOperation.hpp \
    6962        Operations/Controllers/RemoveAllResultsOperation.hpp \
    70         Operations/Controllers/RemoveAllWorkerOperation.hpp \
    7163        Operations/Controllers/SendJobsOperation.hpp \
    7264        Operations/Controllers/ShutdownOperation.hpp \
    73         Operations/Servers/CheckAliveWorkerOperation.hpp \
    7465        Operations/Servers/SendJobToWorkerOperation.hpp \
    75         Operations/Servers/ShutdownWorkerOperation.hpp \
    76         Operations/Workers/EnrollInPoolOperation.hpp \
    7766        Operations/Workers/ObtainJobOperation.hpp \
    78         Operations/Workers/RemoveFromPoolOperation.hpp \
    7967        Operations/Workers/SubmitResultOperation.hpp \
    8068        ServerChoices.hpp \
     
    8775
    8876POOLSOURCE = \
    89         Pool/PoolGuard.cpp \
    90         Pool/WorkerPool.cpp
     77        Pool/PoolWorker.cpp
    9178
    9279POOLHEADER = \
    93         Pool/PoolGuard.hpp \
    94         Pool/WorkerPool.hpp
     80        Pool/PoolWorker.hpp
    9581
    9682noinst_LTLIBRARIES += libJobMarketPool.la
  • ThirdParty/JobMarket/src/JobMarket/Pool/PoolWorker.cpp

    raeec58 rab2e834  
    3535#include "CodePatterns/Log.hpp"
    3636#include "JobMarket/Jobs/FragmentJob.hpp"
    37 #include "Operations/Workers/EnrollInPoolOperation.hpp"
    38 #include "Operations/Workers/RemoveFromPoolOperation.hpp"
    3937#include "Operations/Workers/SubmitResultOperation.hpp"
    4038#include "JobMarket/Results/FragmentResult.hpp"
     
    7169
    7270  if (PoolListener.getExitflag() == Listener::OkFlag) {
    73     // always enroll and make listenining initiation depend on its success
    74     const boost::function<void ()> initiateme =
    75         boost::bind(&PoolWorker::initiateSockets, this);
    76     const boost::function<void ()> shutdown =
    77         boost::bind(&PoolWorker::finish, this); // no need to remove from pool
    78     AsyncOperation *enrollOp = new EnrollInPoolOperation(connection_, MyAddress, initiateme, shutdown);
    79     LOG(2, "DEBUG: Putting enroll in pool operation into queue ...");
    80     OpQueue.push_back(enrollOp, ServerAddress);
     71    PoolListener.initiateSocket();
     72    HealthListener.initiateSocket();
    8173  } else {
    8274    setExitflag( ErrorFlag );
    83     ELOG(0, "Not enrolling, we just exit due to failed listen bind.");
    84   }
    85 }
    86 
    87 void PoolWorker::initiateSockets()
    88 {
    89   PoolListener.initiateSocket();
    90   HealthListener.initiateSocket();
     75    ELOG(0, "Exit due to failed listen bind.");
     76  }
    9177}
    9278
     
    146132  {
    147133    LOG(1, "INFO: Received request for operation " << choice << ".");
    148     // switch over the desired choice read previously
    149     bool shutdownflag = false;
    150134    switch(choice) {
    151135      case NoServerOperation:
    152136      {
    153137        ELOG(1, "PoolListener_t::handle_ReadChoice() - called with NoOperation.");
    154         break;
    155       }
    156       case CheckAlive:
    157       {
    158         // send address as pong to ping (check alive signal)
    159         conn->async_write(callback.MyAddress,
    160             boost::bind(&PoolWorker::PoolListener_t::handle_SendAddress, this,
    161             boost::asio::placeholders::error, conn));
    162138        break;
    163139      }
     
    178154        break;
    179155      }
    180       case ShutdownWorker:
    181       {
    182         shutdownflag = true;
    183         callback.shutdown();
    184         break;
    185       }
    186156    }
    187157    // and listen for following connections if not to shutdown
    188     if (!shutdownflag)
    189       initiateSocket();
     158    initiateSocket();
    190159  }
    191160  else
     
    267236}
    268237
    269 /// Controller callback function when address has been sent
    270 void PoolWorker::PoolListener_t::handle_SendAddress(const boost::system::error_code& e, connection_ptr conn)
    271 {
    272   DEBUG_FUNCTION_ENTRYEXIT
    273 
    274   if (!e)
    275   {
    276     LOG(1, "INFO: Sent address " << callback.MyAddress << " as alive confirmation.");
    277   }
    278   else
    279   {
    280     // An error occurred. Log it and return. Since we are not starting a new
    281     // accept operation the io_service will run out of work to do and the
    282     // server will exit.
    283     setExitflag( ErrorFlag );
    284     ELOG(0, e.message());
    285   }
    286 }
    287 
    288238void PoolWorker::PoolListener_t::handle_RemoveThread()
    289239{
     
    341291void PoolWorker::shutdown()
    342292{
    343   // remove us from pool
    344   boost::function<void ()> closingdown = boost::bind(&PoolWorker::finish, this);
    345   AsyncOperation *removeOp = new RemoveFromPoolOperation(connection_, MyAddress, closingdown, failed);
    346   LOG(2, "DEBUG: Putting remove from pool operation into queue ...");
    347   OpQueue.push_back(removeOp, ServerAddress);
     293  finish();
    348294  // block queue such that io_service may stop
    349295  OpQueue.block();
  • ThirdParty/JobMarket/src/JobMarket/Pool/PoolWorker.hpp

    raeec58 rab2e834  
    6161
    6262  void WorkOnJob(FragmentJob::ptr &job);
    63   void removeFromPool();
    6463  void shutdown(int sig);
    6564  void shutdown();
     
    148147
    149148private:
    150     void initiateSockets();
    151 
    152 private:
    153149  //!> reference to io_service which we use for connections
    154150  boost::asio::io_service& io_service;
    155151
    156   //!> The listener for the WorkerPool
     152  //!> The listener for the server
    157153  PoolListener_t PoolListener;
    158154
    159   //!> The listener for the WorkerPool
     155  //!> The listener for the health probe
    160156  HealthProbeListener_t HealthListener;
    161157
  • ThirdParty/JobMarket/src/JobMarket/ServerChoices.hpp

    raeec58 rab2e834  
    1818  NoServerOperation,
    1919  ReceiveJob,
    20   CheckAlive,
    21   ShutdownWorker
    2220};
    2321
  • ThirdParty/JobMarket/src/JobMarket/ServerOptions.cpp

    raeec58 rab2e834  
    7474}
    7575
    76 bool ServerOptions::parseGuardFromStart(boost::program_options::variables_map &vm)
    77 {
    78   if (vm.count("guard-from-start")) {
    79     try {
    80       guardFromStart = vm["guard-from-start"].as< bool >();
    81     } catch (boost::bad_lexical_cast) {
    82       ELOG(1, "Could not read " << guardFromStart << " as boolean.");
    83       return 255;
    84     }
     76int ServerOptions::parseWorkerAddress(boost::program_options::variables_map &vm) {
     77  if (vm.count("workeraddress")) {
     78    const std::string worker = vm["workeraddress"].as< std::string >();
     79    workeraddressport = worker.substr(worker.find_last_of(':')+1, std::string::npos);
     80    workeraddresshost = worker.substr(0, worker.find_last_of(':'));
     81    LOG(1, "INFO: Using " << workeraddresshost << ":" << workeraddressport << " as worker's general address.");
    8582  } else {
    86     guardFromStart = false;
     83    ELOG(1, "Requiring worker's address (host:port) to connect to.");
     84    return 255;
    8785  }
    88   LOG(1, "INFO: Using " << guardFromStart << " as guard from start.");
    8986  return 0;
    9087}
  • ThirdParty/JobMarket/src/JobMarket/ServerOptions.hpp

    raeec58 rab2e834  
    1414#endif
    1515
     16#include <string>
     17
    1618#include <boost/program_options.hpp>
    1719
     
    2426  int parseControllerPort(boost::program_options::variables_map &vm);
    2527  int parseTimeout(boost::program_options::variables_map &vm);
    26   bool parseGuardFromStart(boost::program_options::variables_map &vm);
     28  int parseWorkerAddress(boost::program_options::variables_map &vm);
    2729
    2830  unsigned short workerport;
    2931  unsigned short controllerport;
     32  std::string workeraddresshost;
     33  std::string workeraddressport;
    3034  size_t timeout;
    31   bool guardFromStart;
    3235};
    3336
  • ThirdParty/JobMarket/src/JobMarket/WorkerChoices.hpp

    raeec58 rab2e834  
    1717enum WorkerChoices {
    1818  NoWorkerOperation,
    19   EnrollInPool,
    20   SendResult,
    21   RemoveFromPool
     19  SendResult
    2220};
    2321
  • ThirdParty/JobMarket/src/JobMarket/server_main.cpp

    raeec58 rab2e834  
    5555      ("verbosity,v", boost::program_options::value<size_t>(), "set verbosity level")
    5656      ("signal", boost::program_options::value< std::vector<size_t> >(), "set signal to catch (can be given multiple times)")
     57      ("workeraddress", boost::program_options::value< std::string>(), "connect to all workers at this address (host:port)")
    5758      ("workerport", boost::program_options::value< unsigned short >(), "listen on this port for connecting workers")
    5859      ("controllerport", boost::program_options::value< unsigned short >(), "listen on this port for connecting controller")
    5960      ("timeout", boost::program_options::value< size_t >(), "timeout in seconds for alive status of workers")
    60       ("guard-from-start", boost::program_options::value< bool >(), "activate pool guard from server start directly")
    6161  ;
    6262
     
    7878  status = ServerOpts.parseSignals(vm);
    7979  if (status) return status;
    80   status = ServerOpts.parseGuardFromStart(vm);
     80  status = ServerOpts.parseWorkerAddress(vm);
    8181  if (status) return status;
    8282
     
    8585  {
    8686    boost::asio::io_service io_service;
    87     FragmentScheduler Server(io_service, ServerOpts.workerport, ServerOpts.controllerport, ServerOpts.timeout, ServerOpts.guardFromStart);
     87    FragmentScheduler Server(
     88      io_service,
     89      ServerOpts.workerport,
     90      ServerOpts.controllerport,
     91      WorkerAddress(ServerOpts.workeraddresshost, ServerOpts.workeraddressport),
     92      ServerOpts.timeout);
    8893
    8994    // catch ctrl-c and shutdown worker properly
  • ThirdParty/JobMarket/src/unittests/Makefile.am

    raeec58 rab2e834  
    1919  OperationQueueUnitTest.cpp \
    2020  SystemCommandJobUnitTest.cpp \
    21   WorkerAddressUnitTest.cpp \
    22   WorkerPoolUnitTest.cpp
     21  WorkerAddressUnitTest.cpp
    2322 
    2423FRAGMENTATIONAUTOMATIONTESTSHEADERS = \
     
    2827  OperationQueueUnitTest.hpp \
    2928  SystemCommandJobUnitTest.hpp \
    30   WorkerAddressUnitTest.hpp \
    31   WorkerPoolUnitTest.hpp
     29  WorkerAddressUnitTest.hpp
    3230
    3331FRAGMENTATIONAUTOMATIONTESTS = \
     
    3735  OperationQueueUnitTest \
    3836  SystemCommandJobUnitTest \
    39   WorkerAddressUnitTest \
    40   WorkerPoolUnitTest
     37  WorkerAddressUnitTest
    4138
    4239
  • ThirdParty/JobMarket/tests/regression/Makefile.am

    raeec58 rab2e834  
    1717        $(srcdir)/testsuite-addingjobs.at \
    1818        $(srcdir)/testsuite-checkstate.at \
    19         $(srcdir)/testsuite-checkalive.at \
    2019        $(srcdir)/testsuite-completerun.at \
    21         $(srcdir)/testsuite-enrollinpool.at \
    22         $(srcdir)/testsuite-falsehost.at \
    2320        $(srcdir)/testsuite-getresults.at \
    24         $(srcdir)/testsuite-numberworkers.at \
    2521        $(srcdir)/testsuite-resubmitjobs.at \
    2622        $(srcdir)/testsuite-server-shutdown.at \
  • ThirdParty/JobMarket/tests/regression/testsuite-addingjobs.at

    raeec58 rab2e834  
    1010
    1111# start service in background
    12 ${AUTOTEST_PATH}/JobMarketServer --workerport $WORKERPORT --controllerport $CONTROLLERPORT &
     12${AUTOTEST_PATH}/JobMarketServer --workeraddress 127.0.0.1:$WORKERLISTENPORT --workerport $WORKERPORT --controllerport $CONTROLLERPORT &
    1313server_pid=$!
    1414AT_CHECK([sleep 1], 0, [ignore], [ignore], [kill $server_pid])
     
    1818AT_CHECK([fgrep "Sending 1 jobs ..." stdout], 0, [ignore], [ignore], [kill $server_pid])
    1919
    20 # ann one NoOpJob
     20# add one NoOpJob
    2121AT_CHECK([${AUTOTEST_PATH}/JobMarketController --server 127.0.0.1:$CONTROLLERPORT --command createjobs], 0, [stdout], [ignore], [kill $server_pid])
    2222AT_CHECK([fgrep "Creating 1 new NoOpJob(s)." stdout], 0, [ignore], [ignore], [kill $server_pid])
  • ThirdParty/JobMarket/tests/regression/testsuite-checkstate.at

    raeec58 rab2e834  
    77WORKERPORT=11040
    88CONTROLLERPORT=11041
     9WORKERLISTENPORT=11042
    910
    1011# start service in background
    11 ${AUTOTEST_PATH}/JobMarketServer --workerport $WORKERPORT --controllerport $CONTROLLERPORT &
     12${AUTOTEST_PATH}/JobMarketServer --workeraddress 127.0.0.1:$WORKERLISTENPORT --workerport $WORKERPORT --controllerport $CONTROLLERPORT &
    1213server_pid=$!
    1314AT_CHECK([sleep 1], 0, [ignore], [ignore], [kill $server_pid])
  • ThirdParty/JobMarket/tests/regression/testsuite-completerun.at

    raeec58 rab2e834  
    1010
    1111# start service in background
    12 ${AUTOTEST_PATH}/JobMarketServer --workerport $WORKERPORT --controllerport $CONTROLLERPORT &
     12${AUTOTEST_PATH}/JobMarketServer --workeraddress 127.0.0.1:$WORKERLISTENPORT --workerport $WORKERPORT --controllerport $CONTROLLERPORT &
    1313server_pid=$!
    1414AT_CHECK([sleep 1], 0, [ignore], [ignore], [kill $server_pid])
    1515
    16 # add thre jobs via JobAdder
     16# add a worker to work on jobs
     17${AUTOTEST_PATH}/JobMarketPoolWorker -v 5 --signal 2 --server 127.0.0.1:${WORKERPORT} --hostname 127.0.0.1 --listen $WORKERLISTENPORT &
     18worker_pid=$!
     19
     20# add three jobs via JobAdder
    1721AT_CHECK([${AUTOTEST_PATH}/JobMarketController --server 127.0.0.1:$CONTROLLERPORT --command createjobs --executable "cat" --jobcommand "Nothing" --times 3], 0, [stdout], [ignore], [kill $server_pid])
    1822AT_CHECK([fgrep "Sending 3 jobs ..." stdout], 0, [ignore], [ignore], [kill $server_pid])
    19 
    20 # Checking results
    21 AT_CHECK([${AUTOTEST_PATH}/JobMarketController --server 127.0.0.1:$CONTROLLERPORT --command checkresults], 0, [stdout], [ignore], [kill $server_pid])
    22 AT_CHECK([fgrep "#3 are waiting in the queue and #0 jobs are calculated so far." stdout], 0, [ignore], [ignore], [kill $server_pid])
    23 
    24 # enlist a worker to work on jobs
    25 ${AUTOTEST_PATH}/JobMarketPoolWorker -v 5 --signal 2 --server 127.0.0.1:${WORKERPORT} --hostname 127.0.0.1 --listen $WORKERLISTENPORT &
    26 worker_pid=$!
    27 AT_CHECK([sleep 2], 0, [ignore], [ignore], [kill $server_pid $worker_pid])
     23AT_CHECK([sleep 5], 0, [ignore], [ignore], [kill $server_pid $worker_pid])
    2824
    2925# send kill signal to worker such that it shuts down
  • ThirdParty/JobMarket/tests/regression/testsuite-getresults.at

    raeec58 rab2e834  
    77WORKERPORT=11045
    88CONTROLLERPORT=11046
     9WORKERLISTENPORT=11047
    910
    1011# start service in background
    11 ${AUTOTEST_PATH}/JobMarketServer --workerport $WORKERPORT --controllerport $CONTROLLERPORT &
     12${AUTOTEST_PATH}/JobMarketServer --workeraddress 127.0.0.1:$WORKERLISTENPORT --workerport $WORKERPORT --controllerport $CONTROLLERPORT &
    1213server_pid=$!
    1314AT_CHECK([sleep 1], 0, [ignore], [ignore], [kill $server_pid])
  • ThirdParty/JobMarket/tests/regression/testsuite-resubmitjobs.at

    raeec58 rab2e834  
    1010
    1111# start service in background
    12 ${AUTOTEST_PATH}/JobMarketServer --workerport $WORKERPORT --controllerport $CONTROLLERPORT &
     12${AUTOTEST_PATH}/JobMarketServer --workeraddress 127.0.0.1:$WORKERLISTENPORT --workerport $WORKERPORT --controllerport $CONTROLLERPORT &
    1313server_pid=$!
    1414AT_CHECK([sleep 1], 0, [ignore], [ignore], [kill $server_pid])
     15
     16# add a worker to work on jobs
     17${AUTOTEST_PATH}/JobMarketPoolWorker -v 5 --signal 2 --server 127.0.0.1:${WORKERPORT} --hostname 127.0.0.1 --listen $WORKERLISTENPORT 2>workererr &
     18worker_pid=$!
    1519
    1620# add one always failing job via JobAdder
    1721AT_CHECK([${AUTOTEST_PATH}/JobMarketController --server 127.0.0.1:$CONTROLLERPORT --command createjobs --executable "/bin/false" --jobcommand "Nothing"], 0, [stdout], [ignore], [kill $server_pid])
    1822AT_CHECK([fgrep "Sending 1 jobs ..." stdout], 0, [ignore], [ignore], [kill $server_pid])
    19 
    20 # enlist a worker to work on jobs
    21 ${AUTOTEST_PATH}/JobMarketPoolWorker -v 5 --signal 2 --server 127.0.0.1:${WORKERPORT} --hostname 127.0.0.1 --listen $WORKERLISTENPORT 2>workererr &
    22 worker_pid=$!
    23 AT_CHECK([sleep 2], 0, [ignore], [ignore], [kill $server_pid $worker_pid])
     23AT_CHECK([sleep 3], 0, [ignore], [ignore], [kill $server_pid $worker_pid])
    2424
    2525# send kill signal to worker such that it shuts down
  • ThirdParty/JobMarket/tests/regression/testsuite-server-shutdown.at

    raeec58 rab2e834  
    77WORKERPORT=11030
    88CONTROLLERPORT=11031
     9WORKERLISTENPORT=11032
    910
    1011# start service in background
    11 ${AUTOTEST_PATH}/JobMarketServer --workerport $WORKERPORT --controllerport $CONTROLLERPORT &
     12${AUTOTEST_PATH}/JobMarketServer --workeraddress 127.0.0.1:$WORKERLISTENPORT --workerport $WORKERPORT --controllerport $CONTROLLERPORT &
    1213server_pid=$!
    1314AT_CHECK([sleep 1], 0, [ignore], [ignore], [kill $server_pid])
  • ThirdParty/JobMarket/tests/regression/testsuite-server-worker.at

    raeec58 rab2e834  
    1010
    1111# start service in background
    12 ${AUTOTEST_PATH}/JobMarketServer --workerport $WORKERPORT --controllerport $CONTROLLERPORT &
     12${AUTOTEST_PATH}/JobMarketServer --workeraddress 127.0.0.1:$WORKERLISTENPORT --workerport $WORKERPORT --controllerport $CONTROLLERPORT &
    1313server_pid=$!
    1414AT_CHECK([sleep 1], 0, [ignore], [ignore], [kill $server_pid])
    1515
     16# add a worker to work on jobs
     17${AUTOTEST_PATH}/JobMarketPoolWorker -v 5 --signal 2 --server 127.0.0.1:${WORKERPORT} --hostname 127.0.0.1 --listen $WORKERLISTENPORT &
     18worker_pid=$!
     19
    1620# add one jobs
    1721AT_CHECK([${AUTOTEST_PATH}/JobMarketController --server 127.0.0.1:$CONTROLLERPORT --command createjobs --executable "cat" --jobcommand "Nothing"], 0, [ignore], [ignore], [kill $server_pid])
    18 
    19 # enlist a worker to work on jobs
    20 ${AUTOTEST_PATH}/JobMarketPoolWorker -v 5 --signal 2 --server 127.0.0.1:${WORKERPORT} --hostname 127.0.0.1 --listen $WORKERLISTENPORT &
    21 worker_pid=$!
    22 AT_CHECK([sleep 2], 0, [ignore], [ignore], [kill $server_pid $worker_pid])
     22AT_CHECK([sleep 3], 0, [ignore], [ignore], [kill $server_pid $worker_pid])
    2323
    2424# send kill signal to worker such that it shuts down
  • ThirdParty/JobMarket/tests/regression/testsuite.at

    raeec58 rab2e834  
    1717m4_include([testsuite-server-shutdown.at])
    1818
    19 # check whether poolworker may succesfully enroll in pool
    20 m4_include([testsuite-enrollinpool.at])
    21 
    2219# check whether adding jobs works
    2320m4_include([testsuite-addingjobs.at])
    24 
    25 # check whether dead busy poolworker is automatically removed
    26 m4_include([testsuite-checkalive.at])
    2721
    2822# check whether checking state works
     
    4034# check whether complete run works
    4135m4_include([testsuite-completerun.at])
    42 
    43 # check whether false hostname does not shutdown server
    44 m4_include([testsuite-falsehost.at])
    45 
    46 # check whether total number of workers is returned
    47 m4_include([testsuite-numberworkers.at])
Note: See TracChangeset for help on using the changeset viewer.