Changeset 950cd4


Ignore:
Timestamp:
Aug 11, 2025, 5:44:42 PM (2 months ago)
Author:
Frederik Heber <frederik.heber@…>
Branches:
Candidate_v1.7.0, stable
Children:
74d798
Parents:
e5f9e7
git-author:
Frederik Heber <frederik.heber@…> (07/04/25 20:57:33)
git-committer:
Frederik Heber <frederik.heber@…> (08/11/25 17:44:42)
Message:

Added HealthProbeListener to PoolWorker.

  • this allows a simple tcp liveness probe where kubernetes only checks for a connection on a port can be established.
Location:
ThirdParty/JobMarket/src/JobMarket
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • ThirdParty/JobMarket/src/JobMarket/Pool/PoolWorker.cpp

    re5f9e7 r950cd4  
    5656    const std::string& _service,
    5757    const std::string& listenhost,
    58     const std::string& listenservice) :
     58    const std::string& listenservice,
     59    const std::string& healthport) :
    5960  io_service(_io_service),
    6061  PoolListener(_io_service, boost::lexical_cast<unsigned short>(listenservice), *this),
     62  HealthListener(_io_service, boost::lexical_cast<unsigned short>(healthport), PoolListener),
    6163  MyAddress(listenhost, listenservice),
    6264  ServerAddress(_host, _service),
     
    7072  if (PoolListener.getExitflag() == Listener::OkFlag) {
    7173    // always enroll and make listenining initiation depend on its success
    72     const boost::function<void ()> initiateme =
    73         boost::bind(&PoolListener_t::initiateSocket, boost::ref(PoolListener));
     74    const boost::function<void ()> initiateme = 
     75        boost::bind(&PoolWorker::initiateSockets, this);
    7476    const boost::function<void ()> shutdown =
    7577        boost::bind(&PoolWorker::finish, this); // no need to remove from pool
     
    8284  }
    8385}
     86
     87void PoolWorker::initiateSockets()
     88{
     89  PoolListener.initiateSocket();
     90  HealthListener.initiateSocket();
     91}
     92
     93/// Handle completion of a accept server operation.
     94void PoolWorker::HealthProbeListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn)
     95{
     96  DEBUG_FUNCTION_ENTRYEXIT
     97  if (!e)
     98  {
     99    if (PoolListener.getExitflag() == Listener::OkFlag) {
     100      LOG(2, "DEBUG: Ping from health check");
     101      // and listen for following connections
     102      initiateSocket();
     103    } else {
     104      LOG(2, "INFO: Exitflag is " <<  PoolListener.getExitflag() << ", unhealthy ...");
     105      closeSocket();
     106    }
     107  }
     108  else
     109  {
     110    // An error occurred. Log it and return. Since we are not starting a new
     111    // accept operation the io_service will run out of work to do and the
     112    // server will exit.
     113    setExitflag( ErrorFlag );
     114    ELOG(0, e.message());
     115  }
     116}
     117
    84118
    85119/// Handle completion of a accept server operation.
     
    324358{
    325359  // somehow stop listener
     360  HealthListener.closeSocket();
     361
     362  // somehow stop listener
    326363  PoolListener.closeSocket();
    327364
  • ThirdParty/JobMarket/src/JobMarket/Pool/PoolWorker.hpp

    re5f9e7 r950cd4  
    4545      const std::string& service,
    4646      const std::string& listenhost,
    47       const std::string& listenservice);
     47      const std::string& listenservice,
     48      const std::string& healthport);
    4849
    4950  /** Returns the flag of the handled operation.
     
    122123  };
    123124
     125    /**
     126   * Handles TCP liveness probes.
     127   */
     128  class HealthProbeListener_t : public Listener
     129  {
     130  public:
     131    HealthProbeListener_t(
     132        boost::asio::io_service& io_service,
     133        unsigned short port,
     134        const PoolListener_t& _PoolListener) :
     135      Listener(io_service, port),
     136      PoolListener(_PoolListener)
     137    {}
     138
     139    virtual ~HealthProbeListener_t() {}
     140
     141  protected:
     142    /// Handle completion of a accept controller operation.
     143    void handle_Accept(const boost::system::error_code& e, connection_ptr conn);
     144
     145  private:
     146    const PoolListener_t& PoolListener;
     147  };
     148
     149private:
     150    void initiateSockets();
     151
    124152private:
    125153  //!> reference to io_service which we use for connections
     
    128156  //!> The listener for the WorkerPool
    129157  PoolListener_t PoolListener;
     158
     159  //!> The listener for the WorkerPool
     160  HealthProbeListener_t HealthListener;
    130161
    131162  //!> address of this worker
  • ThirdParty/JobMarket/src/JobMarket/WorkerOptions.cpp

    re5f9e7 r950cd4  
    8585  return 0;
    8686}
     87
     88int WorkerOptions::parseHealthProbePort(boost::program_options::variables_map &vm) {
     89  if (vm.count("healthport")) {
     90    try {
     91      healthprobeport = vm["healthport"].as< std::string >();
     92    } catch (boost::bad_lexical_cast) {
     93      ELOG(1, "Could not read " << vm["healthport"].as< std::string >() << " as digits.");
     94      return 255;
     95    }
     96    LOG(1, "INFO: Using port " << healthprobeport << " to listen fdor health probe connects.");
     97  } else {
     98    healthprobeport = "";
     99  }
     100  return 0;
     101}
  • ThirdParty/JobMarket/src/JobMarket/WorkerOptions.hpp

    re5f9e7 r950cd4  
    2929  int parseLocalhost(boost::program_options::variables_map &vm);
    3030  int parseListenPort(boost::program_options::variables_map &vm);
     31  int parseHealthProbePort(boost::program_options::variables_map &vm);
    3132
    3233  std::string server;
     
    3435  std::string hostname;
    3536  std::string listenport;
     37  std::string healthprobeport;
    3638};
    3739
  • ThirdParty/JobMarket/src/JobMarket/poolworker_main.cpp

    re5f9e7 r950cd4  
    5858      ("server", boost::program_options::value< std::string>(), "connect to server at this address (host:port)")
    5959      ("listen", boost::program_options::value< std::string >(), "listen on this port")
     60      ("healthport", boost::program_options::value< std::string >(), "listen on this port for health probes")
    6061      ("hostname", boost::program_options::value< std::string>(), "name of host on which this codes runs and which server can resolve")
    6162  ;
     
    7677  status = WorkerOpts.parseLocalhost(vm);
    7778  if (status) return status;
     79  status = WorkerOpts.parseHealthProbePort(vm);
     80  if (status) return status;
    7881  status = WorkerOpts.parseSignals(vm);
    7982  if (status) return status;
     
    8386  {
    8487    boost::asio::io_service io_service;
    85     PoolWorker client(io_service, WorkerOpts.server, WorkerOpts.serverport, WorkerOpts.hostname, WorkerOpts.listenport);
     88    PoolWorker client(io_service, WorkerOpts.server, WorkerOpts.serverport, WorkerOpts.hostname, WorkerOpts.listenport, "8092");
    8689
    8790    // catch ctrl-c and shutdown worker properly
Note: See TracChangeset for help on using the changeset viewer.