Ignore:
Timestamp:
May 4, 2012, 2:19:06 PM (13 years ago)
Author:
Frederik Heber <heber@…>
Branches:
Action_Thermostats, Add_AtomRandomPerturbation, Add_FitFragmentPartialChargesAction, Add_RotateAroundBondAction, Add_SelectAtomByNameAction, Added_ParseSaveFragmentResults, AddingActions_SaveParseParticleParameters, Adding_Graph_to_ChangeBondActions, Adding_MD_integration_tests, Adding_ParticleName_to_Atom, Adding_StructOpt_integration_tests, AtomFragments, Automaking_mpqc_open, AutomationFragmentation_failures, Candidate_v1.5.4, Candidate_v1.6.0, Candidate_v1.6.1, ChangeBugEmailaddress, ChangingTestPorts, ChemicalSpaceEvaluator, CombiningParticlePotentialParsing, Combining_Subpackages, Debian_Package_split, Debian_package_split_molecuildergui_only, Disabling_MemDebug, Docu_Python_wait, EmpiricalPotential_contain_HomologyGraph, EmpiricalPotential_contain_HomologyGraph_documentation, Enable_parallel_make_install, Enhance_userguide, Enhanced_StructuralOptimization, Enhanced_StructuralOptimization_continued, Example_ManyWaysToTranslateAtom, Exclude_Hydrogens_annealWithBondGraph, FitPartialCharges_GlobalError, Fix_BoundInBox_CenterInBox_MoleculeActions, Fix_ChargeSampling_PBC, Fix_ChronosMutex, Fix_FitPartialCharges, Fix_FitPotential_needs_atomicnumbers, Fix_ForceAnnealing, Fix_IndependentFragmentGrids, Fix_ParseParticles, Fix_ParseParticles_split_forward_backward_Actions, Fix_PopActions, Fix_QtFragmentList_sorted_selection, Fix_Restrictedkeyset_FragmentMolecule, Fix_StatusMsg, Fix_StepWorldTime_single_argument, Fix_Verbose_Codepatterns, Fix_fitting_potentials, Fixes, ForceAnnealing_goodresults, ForceAnnealing_oldresults, ForceAnnealing_tocheck, ForceAnnealing_with_BondGraph, ForceAnnealing_with_BondGraph_continued, ForceAnnealing_with_BondGraph_continued_betteresults, ForceAnnealing_with_BondGraph_contraction-expansion, FragmentAction_writes_AtomFragments, FragmentMolecule_checks_bonddegrees, GeometryObjects, Gui_Fixes, Gui_displays_atomic_force_velocity, ImplicitCharges, IndependentFragmentGrids, IndependentFragmentGrids_IndividualZeroInstances, IndependentFragmentGrids_IntegrationTest, IndependentFragmentGrids_Sole_NN_Calculation, JobMarket_RobustOnKillsSegFaults, JobMarket_StableWorkerPool, JobMarket_unresolvable_hostname_fix, MoreRobust_FragmentAutomation, ODR_violation_mpqc_open, PartialCharges_OrthogonalSummation, PdbParser_setsAtomName, PythonUI_with_named_parameters, QtGui_reactivate_TimeChanged_changes, Recreated_GuiChecks, Rewrite_FitPartialCharges, RotateToPrincipalAxisSystem_UndoRedo, SaturateAtoms_findBestMatching, SaturateAtoms_singleDegree, StoppableMakroAction, Subpackage_CodePatterns, Subpackage_JobMarket, Subpackage_LinearAlgebra, Subpackage_levmar, Subpackage_mpqc_open, Subpackage_vmg, Switchable_LogView, ThirdParty_MPQC_rebuilt_buildsystem, TrajectoryDependenant_MaxOrder, TremoloParser_IncreasedPrecision, TremoloParser_MultipleTimesteps, TremoloParser_setsAtomName, Ubuntu_1604_changes, stable
Children:
083490
Parents:
630a6e
git-author:
Frederik Heber <heber@…> (11/27/11 18:38:26)
git-committer:
Frederik Heber <heber@…> (05/04/12 14:19:06)
Message:

Server and Worker now also correctly exchange result.

  • Server uses FragmentQueue which also stores the obtained result.
  • Worker calls received FragmentJob::Work() function and returns the thereby calculated FragmentResult.
  • Regression test for Server/Worker now checks whether job #1 is listed twice, this basically checks whether the result has been exchanged.

There have been some concepts to understand to get this working and these

shall be briefly recorded here:

  • asynchronous communication can only work on objects that live beyond the scope of where they have been called, e.g. therefore FragmentScheduler contains a FragmentResult and FragmentWorker a FragmentJob instance. They receive this object and need the write access in the scope of the aync. comm. and not of the caller's scope. This is probably because the initial argument is only used to set up a buffer of correct length. However, the received instance is created/deserialized first when the communication is completed. And this may well be after the caller's scope has been left.
  • This is different to read operations as probably there the object to send is immediately serialized and placed into an internal buffer such that later access is only to this buffer and not to the original instance which therefore does not need to exist anymore. That's why the above Schedulder and Worker do not have the "other" instance as class members as well.
  • chaining asynchronous communications, e.g. a write after a read has been performed, can only be done by using the callback functions that the async_write/read gets as parameters. They are called when the one operation has finished and therein the next operation can then be launched. This way a successful chain is executed.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • src/Fragmentation/Automation/FragmentWorker.cpp

    r630a6e ref2767  
    3434#include "CodePatterns/Log.hpp"
    3535#include "FragmentJob.hpp"
     36#include "FragmentResult.hpp"
    3637#include "FragmentWorker.hpp"
     38
     39FragmentResult FragmentWorker::EmptyResult(JobId::NoJob, std::string("EmptyResult"));
    3740
    3841/// Constructor starts the asynchronous connect operation.
     
    6467    boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
    6568{
    66     Info info(__FUNCTION__);
     69  Info info(__FUNCTION__);
    6770  if (!e)
    6871  {
     
    7073    // of jobs. The connection::async_read() function will automatically
    7174    // decode the data that is read from the underlying socket.
     75    LOG(1, "INFO: Receiving a job ...");
    7276    connection_.async_read(job,
    7377      boost::bind(&FragmentWorker::handle_read, this,
     
    9599  if (!e)
    96100  {
     101    LOG(1, "INFO: Received job #" << job.getId() << ".");
    97102    if (job.getId() != JobId::NoJob) {
    98103      // Print out the data that was received.
    99104      std::cout << "Job output: " << job.outputfile << "\n";
    100105      std::cout << "Job id: " << job.getId() << "\n";
     106
    101107      // do something .. right now: wait
    102       result = job.Work();
     108      LOG(1, "INFO: Calculating job #" << job.getId() << " ...");
     109      FragmentResult result(job.Work());
     110
     111      // write the result to the server
     112      LOG(1, "INFO: Sending result for job #" << job.getId() << " ...");
     113      connection_.async_write(result,
     114        boost::bind(&FragmentWorker::handle_write, this,
     115        boost::asio::placeholders::error));
     116
    103117    } else {
    104118      std::cout << "The server has no job for me." << std::endl;
     119      // send out empty result
     120      LOG(1, "INFO: Sending empty result ...");
     121      connection_.async_write(EmptyResult,
     122        boost::bind(&FragmentWorker::handle_write, this,
     123        boost::asio::placeholders::error));
    105124      Exitflag = NoJobFlag;
    106125    }
     
    117136}
    118137
     138/// Handle completion of a write operation.
     139void FragmentWorker::handle_write(const boost::system::error_code& e)
     140{
     141  Info info(__FUNCTION__);
     142  // Nothing to do.
     143  LOG(1, "INFO: Job #" << job.getId() << " calculated and sent.");
     144  // erase job
     145  job = FragmentJob();
     146}
     147
Note: See TracChangeset for help on using the changeset viewer.