Ignore:
Timestamp:
Aug 11, 2025, 5:45:37 PM (3 months ago)
Author:
Frederik Heber <frederik.heber@…>
Branches:
Candidate_v1.7.0, stable
Children:
ae9ad6
Parents:
aeec58
git-author:
Frederik Heber <frederik.heber@…> (07/16/25 19:05:29)
git-committer:
Frederik Heber <frederik.heber@…> (08/11/25 17:45:37)
Message:

Reduces JobMarket features for use in kubernetes clusters.

  • removes all functionality where we can enumerate the workers, enroll or remove them.
  • removes PoolGuard and CheckAlive functionality.
  • removes the shutdown thread: not needed as we don't wait for busy workers anymore.
  • removes WorkerPool. This is now handled by the kubernetes deployment.
  • TESTS: Removed respective unit and regression tests.
  • TESTS: Adapted regression tests as worker now needs to be present when first job is sent. Moreover, server needs to be equipped with worker host and port.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • ThirdParty/JobMarket/src/JobMarket/FragmentScheduler.hpp

    raeec58 rab2e834  
    1717#include <boost/function.hpp>
    1818#include <boost/shared_ptr.hpp>
    19 #include <boost/thread.hpp>
    2019#include <deque>
    2120#include <set>
     
    2726#include "JobMarket/Operations/AsyncOperation.hpp"
    2827#include "JobMarket/Operations/OperationQueue.hpp"
    29 #include "Operations/Servers/ShutdownWorkerOperation.hpp"
    3028#include "JobMarket/ControllerChoices.hpp"
    3129#include "JobMarket/FragmentQueue.hpp"
     
    3533#include "JobMarket/Results/FragmentResult.hpp"
    3634#include "JobMarket/types.hpp"
    37 #include "JobMarket/Pool/PoolGuard.hpp"
    38 #include "JobMarket/Pool/WorkerPool.hpp"
    3935#include "JobMarket/WorkerAddress.hpp"
    4036#include "JobMarket/WorkerChoices.hpp"
     
    5349      unsigned short workerport,
    5450      unsigned short controllerport,
    55       const size_t timeout,
    56       bool guardFromStart
     51      const WorkerAddress _workeraddress,
     52      const size_t timeout
    5753      );
    5854  ~FragmentScheduler();
     
    6359  void sendAvailableJobToNextIdleWorker();
    6460  bool shutdown();
    65   void shutdownWorker(const WorkerAddress &address);
    6661  void shutdownAfterSignal();
    67   void removeAllWorkers();
    68   void removeAllIdleWorkers();
    69   void removeWorker(const WorkerAddress address);
    70   void unmarkWorkerBusy(const WorkerAddress address);
    7162  void cleanupOperationQueue(AsyncOperation *op);
    7263  void workerAcceptsJob(const WorkerAddress _address, const JobId_t _id);
     
    8475        unsigned short port,
    8576        FragmentQueue &_JobsQueue,
    86         WorkerPool &_pool,
    87         boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> _callback,
    88         boost::function<void (const WorkerAddress)> _unmarkBusyWorkerFunction) :
     77        boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> _callback) :
    8978      Listener(io_service, port),
    9079      JobsQueue(_JobsQueue),
    91       pool(_pool),
    92       callback_sendJobToWorker(_callback),
    93       unmarkBusyWorkerFunction(_unmarkBusyWorkerFunction)
     80      callback_sendJobToWorker(_callback)
    9481    {}
    9582    virtual ~WorkerListener_t() {}
     
    136123    void handle_ReadAddress(const boost::system::error_code& e, connection_ptr conn, HandlerData::ptr &data);
    137124
    138     /// Worker callback function when new worker has enrolled.
    139     void handle_enrolled(const boost::system::error_code& e, connection_ptr conn, HandlerData::ptr &data);
    140 
    141     /// Worker callback function when worker has been informed of successful removal.
    142     void handle_removed(const boost::system::error_code& e, connection_ptr conn, HandlerData::ptr &data);
    143 
    144125    /// Worker callback function when result has been received.
    145126    void handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn, HandlerData::ptr &data);
     
    152133    FragmentQueue &JobsQueue;
    153134
    154     //!> callback reference to container class
    155     WorkerPool &pool;
    156 
    157135    //!> callback function to access send job function
    158136    boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> callback_sendJobToWorker;
    159 
    160     //!> function to call when a busy worker has finished and sent its result
    161     boost::function<void (const WorkerAddress)> unmarkBusyWorkerFunction;
    162137  };
    163138
     
    169144        unsigned short port,
    170145        FragmentQueue &_JobsQueue,
    171         WorkerPool &_pool,
    172         boost::function<void ()> _removeallWorkers,
    173146        boost::function<bool ()> _shutdownAllSockets) :
    174147      Listener(io_service, port),
    175148      JobsQueue(_JobsQueue),
    176       pool(_pool),
    177149      jobInfo((size_t)2, 0),
    178150      NumberIds(0),
    179151      choice(NoControllerOperation),
    180       removeallWorkers(_removeallWorkers),
    181152      shutdownAllSockets(_shutdownAllSockets),
    182153      globalId(0)
     
    209180    void handle_SendResults(const boost::system::error_code& e, connection_ptr conn);
    210181
    211     /// Controller callback function when result are to be sent.
    212     void handle_SendNumberWorkers(const boost::system::error_code& e, connection_ptr conn);
    213 
    214182    /// Controller callback function when results has been sent.
    215183    void handle_finishSendResults(const boost::system::error_code& e, connection_ptr conn);
     
    219187    FragmentQueue & JobsQueue;
    220188
    221     //!> const reference to external pool of Workers
    222     const WorkerPool & pool;
    223 
    224189    //!> bunch of jobs received from controller before placed in JobsQueue
    225190    std::vector<FragmentJob::ptr> jobs;
     
    236201    //!> choice
    237202    enum ControllerChoices choice;
    238 
    239     //!> bound function to remove all workers
    240     boost::function<void ()> removeallWorkers;
    241203
    242204    //!> bound function to shutdown all sockets
     
    258220  FragmentQueue JobsQueue;
    259221
    260   //!> Pool of Workers
    261   WorkerPool pool;
    262 
    263222  //!> Listener instance that waits for a worker
    264223  WorkerListener_t  WorkerListener;
     
    266225  //!> Listener instance that waits for a controller
    267226  ControllerListener_t  ControllerListener;
     227
     228  //!> Ingress address to reach the next worker
     229  const WorkerAddress workeraddress;
    268230
    269231public:
     
    287249  //!> internal queue for all asynchronous operations
    288250  OperationQueue OpQueue;
    289 
    290 private:
    291   //!> Guard that checks if workers are unreachable
    292   PoolGuard guard;
    293 
    294   //!> shutdown thread to keep the operation queue free to run
    295   boost::thread *shutdown_thread;
    296251};
    297252
Note: See TracChangeset for help on using the changeset viewer.