source: src/Fragmentation/Automation/Connection.hpp@ 7dd8bc

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since 7dd8bc was 7dd8bc, checked in by Frederik Heber <heber@…>, 13 years ago

Added template functions for sync_read() and sync_write() to Connection.

  • also factored our prepare_write from async_write() which is needed in case of sync_write() as well.
  • Property mode set to 100644
File size: 6.7 KB
Line 
1/*
2 * Connection.hpp
3 *
4 * Created on: Oct 21, 2011
5 * Author: heber
6 */
7
8#ifndef CONNECTION_HPP_
9#define CONNECTION_HPP_
10
11// include config.h
12#ifdef HAVE_CONFIG_H
13#include <config.h>
14#endif
15
16#include <boost/archive/text_iarchive.hpp>
17#include <boost/archive/text_oarchive.hpp>
18
19#include <boost/asio.hpp>
20#include <boost/bind.hpp>
21#include <boost/shared_ptr.hpp>
22#include <boost/tuple/tuple.hpp>
23#include <iomanip>
24#include <string>
25#include <sstream>
26#include <vector>
27
28#include "CodePatterns/Log.hpp"
29
30/** The Connection class provides serialization primitives on top of a socket.
31 * Each message sent using this class consists of:
32 * @li An 8-byte header containing the length of the serialized data in
33 * hexadecimal.
34 * @li The serialized data.
35 */
36class Connection
37{
38public:
39 /// Constructor.
40 Connection(boost::asio::io_service& io_service)
41 : socket_(io_service)
42 {
43 }
44
45 /// Get the underlying socket. Used for making a Connection or for accepting
46 /// an incoming Connection.
47 boost::asio::ip::tcp::socket& socket()
48 {
49 return socket_;
50 }
51
52 typedef std::vector<boost::asio::const_buffer> buffers_t;
53
54 /** Prepare data for write operation.
55 *
56 * The dara is prepared in two blocks: header and data.
57 *
58 * @param t data to place into buffers
59 * @param vector of buffers
60 * @return true - data placed in buffer, false - some mismatch, data not placed
61 */
62 template <typename T>
63 bool prepare_write(const T& t, buffers_t &buffers)
64 {
65 // Serialize the data first so we know how large it is.
66 std::ostringstream archive_stream;
67 boost::archive::text_oarchive archive(archive_stream);
68 archive << t;
69 outbound_data_ = archive_stream.str();
70
71 // Format the header.
72 std::ostringstream header_stream;
73 header_stream << std::setw(header_length)
74 << std::hex << outbound_data_.size();
75 if (!header_stream || header_stream.str().size() != header_length)
76 return false;
77 outbound_header_ = header_stream.str();
78
79 // Write the serialized data to the socket. We use "gather-write" to send
80 // both the header and the data in a single write operation.
81 buffers.push_back(boost::asio::buffer(outbound_header_));
82 buffers.push_back(boost::asio::buffer(outbound_data_));
83 return true;
84 }
85
86 /// Asynchronously write a data structure to the socket.
87 template <typename T, typename Handler>
88 void async_write(const T& t, Handler handler)
89 {
90 buffers_t buffers;
91 if (!prepare_write(t, buffers)) {
92 // Something went wrong, inform the caller.
93 boost::system::error_code error(boost::asio::error::invalid_argument);
94 socket_.get_io_service().post(boost::bind(handler, error));
95 return;
96 }
97 boost::asio::async_write(socket_, buffers, handler);
98 }
99
100 /// Synchronously write a data structure to the socket.
101 template <typename T>
102 void sync_write(const T& t)
103 {
104 buffers_t buffers;
105 if (prepare_write(t, buffers)) {
106 boost::asio::write(socket_, buffers);
107 } else {
108 ELOG(2, "sync_write failed, data could not be placed in buffers.");
109 }
110 }
111
112 /// Asynchronously read a data structure from the socket.
113 template <typename T, typename Handler>
114 void async_read(T& t, Handler handler)
115 {
116 // Issue a read operation to read exactly the number of bytes in a header.
117 void (Connection::*f)(
118 const boost::system::error_code&,
119 T&, boost::tuple<Handler>)
120 = &Connection::handle_read_header<T, Handler>;
121 boost::asio::async_read(socket_, boost::asio::buffer(inbound_header_),
122 boost::bind(f,
123 this, boost::asio::placeholders::error, boost::ref(t),
124 boost::make_tuple(handler)));
125 }
126
127 /// Synchronously read a data structure from the socket.
128 template <typename T, typename Handler>
129 void sync_read(T& t, Handler handler)
130 {
131 // Issue a read operation to read exactly the number of bytes in a header.
132 void (Connection::*f)(
133 const boost::system::error_code&,
134 T&, boost::tuple<Handler>)
135 = &Connection::handle_read_header<T, Handler>;
136 boost::asio::read(socket_, boost::asio::buffer(inbound_header_));
137 }
138
139 /// Handle a completed read of a message header. The handler is passed using
140 /// a tuple since boost::bind seems to have trouble binding a function object
141 /// created using boost::bind as a parameter.
142 template <typename T, typename Handler>
143 void handle_read_header(const boost::system::error_code& e,
144 T& t, boost::tuple<Handler> handler)
145 {
146 if (e)
147 {
148 boost::get<0>(handler)(e);
149 }
150 else
151 {
152 // Determine the length of the serialized data.
153 std::istringstream is(std::string(inbound_header_, header_length));
154 std::size_t inbound_data_size = 0;
155 if (!(is >> std::hex >> inbound_data_size))
156 {
157 // Header doesn't seem to be valid. Inform the caller.
158 boost::system::error_code error(boost::asio::error::invalid_argument);
159 boost::get<0>(handler)(error);
160 return;
161 }
162
163 // Start an asynchronous call to receive the data.
164 inbound_data_.resize(inbound_data_size);
165 void (Connection::*f)(
166 const boost::system::error_code&,
167 T&, boost::tuple<Handler>)
168 = &Connection::handle_read_data<T, Handler>;
169 boost::asio::async_read(socket_, boost::asio::buffer(inbound_data_),
170 boost::bind(f, this,
171 boost::asio::placeholders::error, boost::ref(t), handler));
172 }
173 }
174
175 /// Handle a completed read of message data.
176 template <typename T, typename Handler>
177 void handle_read_data(const boost::system::error_code& e,
178 T& t, boost::tuple<Handler> handler)
179 {
180 if (e)
181 {
182 boost::get<0>(handler)(e);
183 }
184 else
185 {
186 // Extract the data structure from the data just received.
187 try
188 {
189 std::string archive_data(&inbound_data_[0], inbound_data_.size());
190 std::istringstream archive_stream(archive_data);
191 boost::archive::text_iarchive archive(archive_stream);
192 archive >> t;
193 }
194 catch (std::exception& e)
195 {
196 // Unable to decode data.
197 boost::system::error_code error(boost::asio::error::invalid_argument);
198 boost::get<0>(handler)(error);
199 return;
200 }
201
202 // Inform caller that data has been received ok.
203 boost::get<0>(handler)(e);
204 }
205 }
206
207private:
208 /// The underlying socket.
209 boost::asio::ip::tcp::socket socket_;
210
211 /// The size of a fixed length header.
212 enum { header_length = 8 };
213
214 /// Holds an outbound header.
215 std::string outbound_header_;
216
217 /// Holds the outbound data.
218 std::string outbound_data_;
219
220 /// Holds an inbound header.
221 char inbound_header_[header_length];
222
223 /// Holds the inbound data.
224 std::vector<char> inbound_data_;
225};
226
227typedef boost::shared_ptr<Connection> connection_ptr;
228
229#endif /* CONNECTION_HPP_ */
Note: See TracBrowser for help on using the repository browser.