source: ThirdParty/mpqc_open/src/lib/util/group/messshm.cc

Candidate_v1.6.1
Last change on this file was 860145, checked in by Frederik Heber <heber@…>, 8 years ago

Merge commit '0b990dfaa8c6007a996d030163a25f7f5fc8a7e7' as 'ThirdParty/mpqc_open'

  • Property mode set to 100644
File size: 11.2 KB
Line 
1//
2// messshm.cc
3//
4// Copyright (C) 1996 Limit Point Systems, Inc.
5//
6// Author: Curtis Janssen <cljanss@limitpt.com>
7// Maintainer: LPS
8//
9// This file is part of the SC Toolkit.
10//
11// The SC Toolkit is free software; you can redistribute it and/or modify
12// it under the terms of the GNU Library General Public License as published by
13// the Free Software Foundation; either version 2, or (at your option)
14// any later version.
15//
16// The SC Toolkit is distributed in the hope that it will be useful,
17// but WITHOUT ANY WARRANTY; without even the implied warranty of
18// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19// GNU Library General Public License for more details.
20//
21// You should have received a copy of the GNU Library General Public License
22// along with the SC Toolkit; see the file COPYING.LIB. If not, write to
23// the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
24//
25// The U.S. Government is granted a limited license as per AL 91-7.
26//
27
28
29#include <unistd.h>
30#include <sys/types.h>
31#include <sys/ipc.h>
32#include <sys/sem.h>
33#include <sys/shm.h>
34
35
36#include <util/misc/bug.h>
37#include <util/misc/formio.h>
38#include <util/group/messshm.h>
39
40using namespace std;
41using namespace sc;
42
43//#define DEBUG
44
45#ifndef SEM_A
46# define SEM_A 0200
47#endif
48
49#ifndef SEM_R
50# define SEM_R 0400
51#endif
52
53/* NALIGN is the byte boundary that we align data on. */
54#define NALIGN 8
55#define ROUNDUPTOALIGN(n) (((n) + (NALIGN-1)) & ~(NALIGN-1))
56
57static ClassDesc ShmMessageGrp_cd(
58 typeid(ShmMessageGrp),"ShmMessageGrp",1,"public intMessageGrp",
59 0, create<ShmMessageGrp>, 0);
60
61ShmMessageGrp::ShmMessageGrp()
62{
63 initialize();
64}
65
66ShmMessageGrp::ShmMessageGrp(int nprocs)
67{
68 initialize(nprocs);
69}
70
71ShmMessageGrp::ShmMessageGrp(const Ref<KeyVal>& keyval):
72 intMessageGrp(keyval)
73{
74 int nprocs = keyval->intvalue("n");
75 if (keyval->error() != KeyVal::OK) initialize();
76 else initialize(nprocs);
77}
78
79void ShmMessageGrp::sync()
80{
81 int i;
82 for (i=0; i<n(); i++) {
83 if (me() == i) continue;
84 wait_for_write(i);
85 commbuf[i]->n_sync++;
86 if (commbuf[i]->n_sync >= n()-1) {
87 while(commbuf[i]->n_wait_for_change) {
88 put_change(i);
89 commbuf[i]->n_wait_for_change--;
90 }
91 }
92 release_write(i);
93 }
94 wait_for_write(me());
95 while (commbuf[me()]->n_sync < n()-1) {
96 commbuf[me()]->n_wait_for_change++;
97 release_write(me());
98 get_change(me());
99 wait_for_write(me());
100 }
101 commbuf[me()]->n_sync -= n()-1;
102 while(commbuf[me()]->n_wait_for_change) {
103 put_change(me());
104 commbuf[me()]->n_wait_for_change--;
105 }
106 release_write(me());
107}
108
109ShmMessageGrp::~ShmMessageGrp()
110{
111 // sync the nodes
112 sync();
113
114 // make sure node zero is las to touch the shared memory
115 if (me() == 0) {
116 wait_for_write(0);
117 while (commbuf[0]->n_sync < n()-1) {
118 commbuf[0]->n_wait_for_change++;
119 release_write(0);
120 get_change(0);
121 wait_for_write(0);
122 }
123 release_write(0);
124 shmdt((SHMTYPE)sharedmem);
125 // release the memory
126 shmctl(shmid,IPC_RMID,0);
127
128 for (int i=0; i<n(); i++) {
129#ifdef SEMCTL_REQUIRES_SEMUN
130 semun junk;
131 junk.val = 0;
132#else
133 int junk = 0;
134#endif
135 semctl(semid,i,IPC_RMID,junk);
136 semctl(change_semid,i,IPC_RMID,junk);
137 }
138 }
139 else {
140 wait_for_write(0);
141 commbuf[0]->n_sync++;
142 while(commbuf[0]->n_wait_for_change) {
143 put_change(0);
144 commbuf[0]->n_wait_for_change--;
145 }
146 shmdt((SHMTYPE)sharedmem);
147 release_write(0);
148 }
149}
150
151void ShmMessageGrp::initialize()
152{
153 int nprocs = atoi(getenv("NUMPROC"));
154 if (!nprocs) nprocs = 1;
155 initialize(nprocs);
156}
157
158void ShmMessageGrp::initialize(int nprocs)
159{
160 int i;
161 void* nextbuf;
162
163 semdec.sem_num = 0;
164 semdec.sem_op = -1;
165 semdec.sem_flg = 0;
166 seminc.sem_num = 0;
167 seminc.sem_op = 1;
168 seminc.sem_flg = 0;
169
170 // Each node gets a buffer for incoming data.
171 shmid = shmget(IPC_PRIVATE,
172 nprocs * sizeof(commbuf_t),
173 IPC_CREAT | SHM_R | SHM_W);
174
175 // Attach the shared segment.
176 nextbuf = sharedmem = shmat(shmid,0,0);
177
178#ifdef SEMCTL_REQUIRES_SEMUN
179 semun semzero;
180 semzero.val = 0;
181 semun semone;
182 semone.val = 1;
183#else
184 int semzero = 0;
185 int semone = 1;
186#endif
187
188 // Each shared memory segment gets a semaphore to synchronize access.
189 semid = semget(IPC_PRIVATE,nprocs,IPC_CREAT | SEM_R | SEM_A );
190 if (semid == -1) {
191 perror("semget");
192 exit(-1);
193 }
194
195 change_semid = semget(IPC_PRIVATE,nprocs,IPC_CREAT | SEM_R | SEM_A );
196 if (change_semid == -1) {
197 perror("semget");
198 exit(-1);
199 }
200
201 for (i=0; i<nprocs; i++) {
202
203 // Mark all of the segments as available for writing.
204 if (semctl(semid,i,SETVAL,semone) == -1) {
205 perror("semctl");
206 exit(-1);
207 }
208
209 if (semctl(change_semid,i,SETVAL,semzero) == -1) {
210 perror("semctl");
211 exit(-1);
212 }
213
214 // Alloc shm for each node's commbuf.
215 commbuf[i] = (commbuf_t*) nextbuf;
216 // Mark the commbuf as having no messages.
217 commbuf[i]->nmsg = 0;
218 // and no outstanding waits
219 commbuf[i]->n_wait_for_change = 0;
220 commbuf[i]->n_sync = 0;
221 nextbuf = (void*)(((char*)nextbuf) + sizeof(commbuf_t));
222 }
223
224 // Create the remaining nodes.
225 int mynodeid = 0;
226 for (i=1; i<nprocs; i++) {
227 int pid = fork();
228 if (!pid) {
229 mynodeid = i;
230 break;
231 }
232 }
233
234 // Initialize the base class.
235 intMessageGrp::initialize(mynodeid, nprocs, 30);
236}
237
238Ref<MessageGrp>
239ShmMessageGrp::clone(void)
240{
241 Ref<MessageGrp> smgrp = new ShmMessageGrp(n());
242 return smgrp;
243}
244
245int ShmMessageGrp::basic_probe(int msgtype)
246{
247 int i;
248 msgbuf_t *message;
249
250 wait_for_write(me());
251
252 message = (msgbuf_t*)commbuf[me()]->buf;
253 for (i=0; i<commbuf[me()]->nmsg; i++) {
254 if ((msgtype == -1) || (msgtype == message->type)) {
255 release_write(me());
256 return 1;
257 }
258 message = NEXT_MESSAGE(message);
259 }
260
261 release_write(me());
262
263 return 0;
264}
265
266void ShmMessageGrp::basic_recv(int type, void* buf, int bytes)
267{
268 int size;
269 int i;
270 msgbuf_t *message,*lastmessage;
271
272#ifdef DEBUG
273 ExEnv::outn() << "ShmGrp: basic_recv: "
274 << "type = " << type << ' '
275 << "buf = " << buf << ' '
276 << "bytes = " << bytes << ' '
277 << "me = " << me() << endl;
278 print_buffer(me(),me());
279#endif
280
281 look_for_message:
282
283 wait_for_write(me());
284
285 message = (msgbuf_t*)commbuf[me()]->buf;
286 for (i=0; i<commbuf[me()]->nmsg; i++) {
287 if (message->type == type) break;
288 message = NEXT_MESSAGE(message);
289 }
290 if (i==commbuf[me()]->nmsg) {
291 commbuf[me()]->n_wait_for_change++;
292 release_write(me());
293 get_change(me());
294 goto look_for_message;
295 }
296
297 if (bytes < message->size) {
298 ExEnv::errn() << scprintf("messshm.cc: recv buffer too small\n");
299 abort();
300 }
301 if (bytes < message->size) size = bytes;
302 else size = message->size;
303
304 // Copy the data.
305 memcpy(buf,((char*)message) + sizeof(msgbuf_t),size);
306
307 // Find the lastmessage.
308 lastmessage = (msgbuf_t*)commbuf[me()]->buf;
309 for (i=0; i<commbuf[me()]->nmsg; i++) {
310 lastmessage = NEXT_MESSAGE(lastmessage);
311 }
312
313 // Repack the message buffer.
314 memmove(message,NEXT_MESSAGE(message),
315 (size_t)((char*)lastmessage)-(size_t)((char*)NEXT_MESSAGE(message)));
316
317 commbuf[me()]->nmsg--;
318
319 while(commbuf[me()]->n_wait_for_change) {
320 put_change(me());
321 commbuf[me()]->n_wait_for_change--;
322 }
323
324 release_write(me());
325}
326
327void ShmMessageGrp::basic_send(int dest, int type, const void* buf, int bytes)
328{
329 int i;
330 msgbuf_t *availmsg;
331
332#ifdef DEBUG
333 ExEnv::outn() << "ShmGrp: basic_send: "
334 << "dest = " << dest << ' '
335 << "type = " << type << ' '
336 << "buf = " << buf << ' '
337 << "bytes = " << bytes << ' '
338 << "me = " << me() << endl;
339#endif
340
341 if (dest>=n()) {
342 //debug_start("bad destination");
343 ExEnv::errn() << scprintf("ShmMessageGrp::basic_send: bad destination\n");
344 abort();
345 }
346
347 try_send_again:
348
349 // Obtain write access to the dest's incoming buffer.
350 wait_for_write(dest);
351
352 availmsg = (msgbuf_t*)commbuf[dest]->buf;
353 for (i=0; i<commbuf[dest]->nmsg; i++) {
354 availmsg = NEXT_MESSAGE(availmsg);
355 }
356 if ( (((char*)availmsg) + ROUNDUPTOALIGN(sizeof(msgbuf_t) + bytes))
357 > (((char*)commbuf[dest]) + sizeof(commbuf_t))) {
358 if (me() == dest) {
359 // sending a message to myself and the buffer is full
360 // --cannot recover
361 ExEnv::errn() << scprintf("commbuf size exceeded on %d\n",me());
362 ExEnv::errn() << scprintf(" availmsg = 0x%x\n",availmsg);
363 ExEnv::errn() << scprintf(" commbuf[%d] + sizeof(commbuf_t) = 0x%x\n",
364 dest,((char*)commbuf[dest]) + sizeof(commbuf_t));
365 ExEnv::errn() << scprintf(" size = %d\n",bytes);
366 abort();
367 }
368 else {
369 // try to recover from a full buffer by waiting for the dest
370 // to read some data.
371 commbuf[dest]->n_wait_for_change++;
372 release_write(dest);
373 get_change(dest);
374 goto try_send_again;
375 }
376 }
377 availmsg->from = me();
378 availmsg->type = type;
379 availmsg->size = bytes;
380 memcpy(((char*)availmsg) + sizeof(msgbuf_t),buf,bytes);
381 commbuf[dest]->nmsg++;
382
383 // let the dest know that there is more data in the buffer
384 while(commbuf[dest]->n_wait_for_change) {
385 put_change(dest);
386 commbuf[dest]->n_wait_for_change--;
387 }
388
389 // Release write access to the dest's buffer.
390 release_write(dest);
391}
392
393msgbuf_t * ShmMessageGrp::NEXT_MESSAGE(msgbuf_t *m)
394{
395 msgbuf_t *r;
396 if (m->size < 0) {
397 ExEnv::errn() << scprintf("NEXT_MESSAGE: m->size = %d (real %d)\n",
398 m->size,sizeof(msgbuf_t) + m->size + m->size%8);
399 //debug_start("m->size < 0");
400 ExEnv::errn() << scprintf("messshm.cc: m->size < 0\n");
401 abort();
402 }
403 r = ((msgbuf_t*)(((char*)m) + ROUNDUPTOALIGN(sizeof(msgbuf_t) + m->size)));
404 return r;
405}
406
407void ShmMessageGrp::get_change(int node)
408{
409 semdec.sem_num = node;
410 semop(change_semid,&semdec,1);
411 semdec.sem_num = 0;
412}
413
414void ShmMessageGrp::put_change(int node)
415{
416 seminc.sem_num = node;
417 semop(change_semid,&seminc,1);
418 seminc.sem_num = 0;
419
420}
421
422// Obtain a lock for writing to the node's buffer.
423void ShmMessageGrp::wait_for_write(int node)
424{
425 semdec.sem_num = node;
426 semop(semid,&semdec,1);
427 semdec.sem_num = 0;
428}
429
430// Release lock for writing to node's buffer.
431void ShmMessageGrp::release_write(int node)
432{
433 seminc.sem_num = node;
434 semop(semid,&seminc,1);
435 seminc.sem_num = 0;
436}
437
438#ifdef DEBUG
439void ShmMessageGrp::print_buffer(int node, int me)
440{
441 int i;
442 msgbuf_t *message;
443 message = (msgbuf_t*)commbuf[node]->buf;
444
445 ExEnv::outn() << scprintf("Printing buffer for node %d on node %d\n",node,me);
446 for (i=0; i<commbuf[node]->nmsg; i++) {
447 ExEnv::outn() <<
448 scprintf(" on node %2d: to=%2d, bytes=%6d, type=%10d, from=%2d\n",
449 me,
450 node,
451 message->size,
452 message->type,
453 message->from);
454 ExEnv::outn().flush();
455 message = NEXT_MESSAGE(message);
456 }
457
458}
459#endif
460
461/////////////////////////////////////////////////////////////////////////////
462
463// Local Variables:
464// mode: c++
465// c-file-style: "CLJ"
466// End:
Note: See TracBrowser for help on using the repository browser.