source: ThirdParty/mpqc_open/src/lib/util/group/memmtmpi.cc@ 482400e

Action_Thermostats Add_AtomRandomPerturbation Add_RotateAroundBondAction Add_SelectAtomByNameAction Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_StructOpt_integration_tests Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.6.0 Candidate_v1.6.1 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_ChronosMutex Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion GeometryObjects Gui_displays_atomic_force_velocity IndependentFragmentGrids_IntegrationTest JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks RotateToPrincipalAxisSystem_UndoRedo StoppableMakroAction Subpackage_levmar Subpackage_vmg ThirdParty_MPQC_rebuilt_buildsystem TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps Ubuntu_1604_changes stable
Last change on this file since 482400e was 860145, checked in by Frederik Heber <heber@…>, 8 years ago

Merge commit '0b990dfaa8c6007a996d030163a25f7f5fc8a7e7' as 'ThirdParty/mpqc_open'

  • Property mode set to 100644
File size: 11.9 KB
Line 
1//
2// memmtmpi.cc
3// based on memmpi.cc
4//
5// Copyright (C) 1996 Limit Point Systems, Inc.
6//
7// Author: Curtis Janssen <cljanss@limitpt.com>
8// Maintainer: LPS
9//
10// This file is part of the SC Toolkit.
11//
12// The SC Toolkit is free software; you can redistribute it and/or modify
13// it under the terms of the GNU Library General Public License as published by
14// the Free Software Foundation; either version 2, or (at your option)
15// any later version.
16//
17// The SC Toolkit is distributed in the hope that it will be useful,
18// but WITHOUT ANY WARRANTY; without even the implied warranty of
19// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20// GNU Library General Public License for more details.
21//
22// You should have received a copy of the GNU Library General Public License
23// along with the SC Toolkit; see the file COPYING.LIB. If not, write to
24// the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
25//
26// The U.S. Government is granted a limited license as per AL 91-7.
27//
28
29#ifndef _util_group_memmtmpi_cc
30#define _util_group_memmtmpi_cc
31
32#ifdef __GNUC__
33#pragma implementation
34#endif
35
36#include <assert.h>
37
38#include <util/misc/formio.h>
39#include <util/group/messmpi.h>
40#include <util/group/memmtmpi.h>
41
42#define MPICH_SKIP_MPICXX
43#include <mpi.h>
44
45using namespace std;
46
47// Define this to use immediate mode. This was added added to work
48// around bugs in non-immediate mode optimizations in an MPI impl.
49#undef USE_IMMEDIATE_MODE
50
51namespace sc {
52
53static const int dbufsize = 32768;
54
55///////////////////////////////////////////////////////////////////////
56// The MTMPIThread class
57
58class MTMPIThread: public Thread {
59 private:
60 MTMPIMemoryGrp *mem_;
61 int req_tag_;
62 int tag_;
63 unsigned int nreq_recd_;
64 double chunk[dbufsize];
65 public:
66 MTMPIThread(MTMPIMemoryGrp *, int reqtype, int tag);
67 void run();
68 int run_one();
69 unsigned int nreq_recd() { return nreq_recd_; }
70 void set_nreq_recd(unsigned int val) { nreq_recd_ = val; }
71};
72
73MTMPIThread::MTMPIThread(MTMPIMemoryGrp *mem,
74 int reqtype, int tag)
75{
76 mem_ = mem;
77 req_tag_ = reqtype;
78 tag_ = tag;
79 nreq_recd_ = 0;
80}
81
82void
83MTMPIThread::run()
84{
85 while(run_one());
86}
87
88int
89MTMPIThread::run_one()
90{
91 int i;
92 int dsize;
93 int dremain;
94 int doffset;
95 long l;
96 MemoryDataRequest req;
97 MPI_Status status;
98#ifndef USE_IMMEDIATE_MODE
99 MPI_Recv(req.data(),req.nbytes(),MPI_BYTE,MPI_ANY_SOURCE,
100 req_tag_,mem_->comm_comm_,&status);
101#else
102 MPI_Request mpireq;
103 MPI_Irecv(req.data(),req.nbytes(),MPI_BYTE,MPI_ANY_SOURCE,
104 req_tag_,mem_->comm_comm_,&mpireq);
105 MPI_Wait(&mpireq,&status);
106#endif // USE_IMMEDIATE_MODE
107 int rtag = req.serial_number();
108 if (mem_->debug_) {
109 mem_->print_lock_->lock();
110 req.print("RECV",mem_->hout);
111 mem_->print_lock_->unlock();
112 }
113 if (req.touches_data()) {
114 assert(req.size() >= 0);
115 if (req.offset()+req.size() > mem_->localsize()) {
116 mem_->print_lock_->lock();
117 req.print("BAD RECV");
118 ExEnv::outn() << "mem_->localsize() = " << mem_->localsize() << endl;
119 mem_->print_lock_->lock();
120 }
121 assert(req.offset()+req.size() <= mem_->localsize());
122 }
123 switch (req.request()) {
124 case MemoryDataRequest::Deactivate:
125 return 0;
126 case MemoryDataRequest::Retrieve:
127 nreq_recd_++;
128 if (req.lock())
129 mem_->obtain_local_lock(req.offset(), req.offset()+req.size());
130 MPI_Send(&mem_->data_[req.offset()],req.size(),MPI_BYTE,
131 req.node(),rtag,mem_->comp_comm_);
132 break;
133 case MemoryDataRequest::Replace:
134 nreq_recd_++;
135 // May be able to get rid of this MPI_Send - MLL
136 MPI_Send(&tag_,1,MPI_INT,req.node(),rtag,mem_->comp_comm_);
137 MPI_Recv(&mem_->data_[req.offset()],req.size(),MPI_BYTE,
138 req.node(),tag_,mem_->comm_comm_,&status);
139 if (req.lock())
140 mem_->release_local_lock(req.offset(), req.offset()+req.size());
141 break;
142 case MemoryDataRequest::DoubleSum:
143 nreq_recd_++;
144// MPI_Send(&tag_,1,MPI_INT,req.node(),rtag,mem_->comm_);
145 dsize = req.size()/sizeof(double);
146 dremain = dsize;
147 doffset = req.offset()/sizeof(double);
148 mem_->obtain_local_lock(req.offset(), req.offset()+req.size());
149 while(dremain>0) {
150 int dchunksize = dbufsize;
151 if (dremain < dchunksize) dchunksize = dremain;
152#ifndef USE_IMMEDIATE_MODE
153 MPI_Recv(chunk,dchunksize,MPI_DOUBLE,
154 req.node(),rtag ,mem_->comm_comm_,&status);
155#else
156 MPI_Request mpireq;
157 MPI_Irecv(chunk,dchunksize,MPI_DOUBLE,
158 req.node(),rtag ,mem_->comm_comm_,&mpireq);
159 MPI_Wait(&mpireq,&status);
160#endif // USE_IMMEDIATE_MODE
161 double *source_data = &((double*)mem_->data_)[doffset];
162 for (i=0; i<dchunksize; i++) {
163 source_data[i] += chunk[i];
164 }
165 dremain -= dchunksize;
166 doffset += dchunksize;
167 }
168 mem_->release_local_lock(req.offset(), req.offset()+req.size());
169 break;
170 default:
171 mem_->print_lock_->lock();
172 ExEnv::outn() << "MTMPIThread: bad memory data request" << endl;
173 mem_->print_lock_->unlock();
174 abort();
175 }
176 return 1;
177}
178
179///////////////////////////////////////////////////////////////////////
180// The MTMPIMemoryGrp class
181
182static ClassDesc MTMPIMemoryGrp_cd(
183 typeid(MTMPIMemoryGrp),"MTMPIMemoryGrp",1,"public ActiveMsgMemoryGrp",
184 0, create<MTMPIMemoryGrp>, 0);
185
186MTMPIMemoryGrp::MTMPIMemoryGrp(const Ref<MessageGrp>& msg,
187 const Ref<ThreadGrp>& th,
188 MPI_Comm comm):
189 ActiveMsgMemoryGrp(msg)
190{
191 if (debug_) ExEnv::outn() << "MTMPIMemoryGrp CTOR entered" << endl;
192
193 th_ = th;
194
195 init_mtmpimg(comm,th_->nthread());
196}
197
198MTMPIMemoryGrp::MTMPIMemoryGrp(const Ref<KeyVal>& keyval):
199 ActiveMsgMemoryGrp(keyval)
200{
201 if (debug_) ExEnv::outn() << "MTMPIMemoryGrp keyval CTOR entered" << endl;
202
203 th_ = ThreadGrp::get_default_threadgrp();
204
205 KeyValValueint nthreaddef(th_->nthread());
206 int nthread = keyval->intvalue("num_threads",nthreaddef);
207 ExEnv::out0() << indent << "MTMPIMemoryGrp: num_threads = " << nthread << endl;
208
209 init_mtmpimg(MPI_COMM_WORLD,nthread);
210}
211
212MTMPIMemoryGrp::~MTMPIMemoryGrp()
213{
214 deactivate();
215 for (int i=0; i<th_->nthread()-1; i++) {
216 delete thread_[i];
217 }
218 delete[] thread_;
219 delete[] nreq_sent_;
220 delete[] nreq_sent_buf_;
221}
222
223void
224MTMPIMemoryGrp::init_mtmpimg(MPI_Comm comm, int nthread)
225{
226 int i;
227 active_ = 0;
228
229 if (nthread < 2) nthread = 2;
230 th_ = th_->clone(nthread);
231 nthread = th_->nthread();
232
233 if (debug_) {
234 char name[256];
235 sprintf(name, "mpqc.hand.%d", me());
236 hout.open(name);
237 sprintf(name, "mpqc.main.%d", me());
238 mout.open(name);
239 }
240
241 MPI_Comm_dup(comm, &comp_comm_);
242 MPI_Comm_dup(comm, &comm_comm_);
243
244 MPI_Errhandler_set(comp_comm_, MPI_ERRORS_ARE_FATAL);
245 MPI_Errhandler_set(comm_comm_, MPI_ERRORS_ARE_FATAL);
246
247 serial_ = 0;
248 req_tag_ = 15001;
249
250 serial_lock_ = th_->new_lock();
251
252 thread_ = new MTMPIThread*[nthread-1];
253 th_->add_thread(0,0);
254 for (i=1; i<nthread; i++) {
255 thread_[i-1] = new MTMPIThread(this,req_tag_,req_tag_ + i);
256 th_->add_thread(i,thread_[i-1]);
257 }
258 print_lock_ = th_->new_lock();
259
260 nreq_sent_ = new unsigned int[n()];
261 memset(nreq_sent_, 0, sizeof(unsigned int)*n());
262 nreq_sent_buf_ = new unsigned int[n()];
263}
264
265int
266MTMPIMemoryGrp::serial(int node)
267{
268 serial_lock_->lock();
269 nreq_sent_[node]++;
270 int r = serial_;
271 serial_++;
272 if (serial_ == req_tag_) serial_ = 0;
273 serial_lock_->unlock();
274 return r;
275}
276
277void
278MTMPIMemoryGrp::retrieve_data(void *data, int node, int offset, int size,
279 int lock)
280{
281 MemoryDataRequest req(MemoryDataRequest::Retrieve,me(),offset,size,lock,
282 serial(node));
283 int tag = req.serial_number();
284
285 // send the request
286 if (debug_) {
287 print_lock_->lock();
288 req.print("SEND",mout);
289 print_lock_->unlock();
290 }
291 MPI_Send(req.data(),req.nbytes(),MPI_BYTE,node,req_tag_,comm_comm_);
292
293 // receive the data
294 MPI_Status status;
295 MPI_Recv(data,size,MPI_BYTE,node,tag,comp_comm_,&status);
296}
297
298void
299MTMPIMemoryGrp::replace_data(void *data, int node, int offset, int size,
300 int unlock)
301{
302 MemoryDataRequest req(MemoryDataRequest::Replace,me(),offset,size,unlock,
303 serial(node));
304 int tag = req.serial_number();
305
306 if (debug_) {
307 print_lock_->lock();
308 req.print("SEND",mout);
309 print_lock_->unlock();
310 }
311 MPI_Send(req.data(),req.nbytes(),MPI_BYTE,node,req_tag_,comm_comm_);
312
313 // wait for the go ahead message
314 MPI_Status status;
315 int rtag;
316 MPI_Recv(&rtag,1,MPI_INT,node,tag,comp_comm_,&status);
317
318 // send the data
319 MPI_Send(data,size,MPI_BYTE,node,rtag,comm_comm_);
320}
321
322void
323MTMPIMemoryGrp::sum_data(double *data, int node, int offset, int size)
324{
325 MemoryDataRequest req(MemoryDataRequest::DoubleSum,me(),offset,size,
326 0, serial(node));
327 int tag = req.serial_number();
328
329 // send the request
330 if (debug_) {
331 print_lock_->lock();
332 req.print("SEND",mout);
333 print_lock_->unlock();
334 }
335#ifndef USE_IMMEDIATE_MODE
336 MPI_Send(req.data(),req.nbytes(),MPI_BYTE,node,req_tag_,comm_comm_);
337#else
338 MPI_Status status;
339 MPI_Request mpireq;
340 MPI_Isend(req.data(),req.nbytes(),MPI_BYTE,node,req_tag_,comm_comm_,&mpireq);
341 MPI_Wait(&mpireq,&status);
342#endif // USE_IMMEDIATE_MODE
343
344 // wait for the go ahead message
345// MPI_Status status;
346// int rtag;
347// MPI_Recv(&rtag,1,MPI_INT,node,tag,comm_,&status);
348
349 int dsize = size/sizeof(double);
350 int dremain = dsize;
351 int dcurrent = 0;
352 while(dremain>0) {
353 int dchunksize = dbufsize;
354 if (dremain < dchunksize) dchunksize = dremain;
355 // send the data
356#ifndef USE_IMMEDIATE_MODE
357 MPI_Send(&data[dcurrent],dchunksize,MPI_DOUBLE,
358 node,tag,comm_comm_);
359#else
360 MPI_Request mpireq;
361 MPI_Isend(&data[dcurrent],dchunksize,MPI_DOUBLE,
362 node,tag,comm_comm_,&mpireq);
363 MPI_Wait(&mpireq,&status);
364#endif // USE_IMMEDIATE_MODE
365 dcurrent += dchunksize;
366 dremain -= dchunksize;
367 }
368}
369
370void
371MTMPIMemoryGrp::activate()
372{
373 // Only remote requests require the handler. There are only remote
374 // requests if there is more than one node.
375 if (n() == 1) return;
376
377 if (th_->nthread() < 2) {
378 ExEnv::outn() << "MTMPIMemoryGrp didn't get enough threads" << endl;
379 abort();
380 }
381
382 if (active_) return;
383 active_ = 1;
384
385 th_->start_threads();
386}
387
388void
389MTMPIMemoryGrp::deactivate()
390{
391 if (!active_) return;
392 active_ = 0;
393
394 // send a shutdown message
395 MemoryDataRequest req(MemoryDataRequest::Deactivate);
396 if (debug_) {
397 print_lock_->lock();
398 req.print("SEND",mout);
399 print_lock_->unlock();
400 }
401 for (int i=1; i<th_->nthread(); i++) {
402#ifndef USE_IMMEDIATE_MODE
403 MPI_Send(req.data(),req.nbytes(),MPI_BYTE,me(),req_tag_,comm_comm_);
404#else
405 MPI_Request mpireq;
406 MPI_Status status;
407 MPI_Isend(req.data(),req.nbytes(),MPI_BYTE,me(),req_tag_,comm_comm_,&mpireq);
408 MPI_Wait(&mpireq,&status);
409#endif // USE_IMMEDIATE_MODE
410 }
411
412 // wait on the thread to shutdown
413 th_->wait_threads();
414}
415
416void
417MTMPIMemoryGrp::sync()
418{
419 if (active_) {
420 MPI_Allreduce(nreq_sent_, nreq_sent_buf_,
421 n(), MPI_UNSIGNED, MPI_SUM, comm_comm_);
422 deactivate();
423 unsigned int nreq_recd = 0;
424 for (int i=0; i<th_->nthread()-1; i++) {
425 nreq_recd += thread_[i]->nreq_recd();
426 thread_[i]->set_nreq_recd(0);
427 }
428 int n_outstanding = nreq_sent_buf_[me()] - nreq_recd;
429 for (int i=0; i<n_outstanding; i++) {
430 thread_[0]->run_one();
431 }
432 memset(nreq_sent_, 0, sizeof(unsigned int)*n());
433 // Make sure processing of all outstanding requests is finished
434 // before starting the next phase.
435 MPI_Barrier(comm_comm_);
436 activate();
437 }
438 else {
439 MPI_Barrier(comm_comm_);
440 }
441}
442
443#endif
444
445/////////////////////////////////////////////////////////////////////////////
446
447}
448
449// Local Variables:
450// mode: c++
451// c-file-style: "CLJ"
452// End:
Note: See TracBrowser for help on using the repository browser.