1 | //
|
---|
2 | // messshm.cc
|
---|
3 | //
|
---|
4 | // Copyright (C) 1996 Limit Point Systems, Inc.
|
---|
5 | //
|
---|
6 | // Author: Curtis Janssen <cljanss@limitpt.com>
|
---|
7 | // Maintainer: LPS
|
---|
8 | //
|
---|
9 | // This file is part of the SC Toolkit.
|
---|
10 | //
|
---|
11 | // The SC Toolkit is free software; you can redistribute it and/or modify
|
---|
12 | // it under the terms of the GNU Library General Public License as published by
|
---|
13 | // the Free Software Foundation; either version 2, or (at your option)
|
---|
14 | // any later version.
|
---|
15 | //
|
---|
16 | // The SC Toolkit is distributed in the hope that it will be useful,
|
---|
17 | // but WITHOUT ANY WARRANTY; without even the implied warranty of
|
---|
18 | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
---|
19 | // GNU Library General Public License for more details.
|
---|
20 | //
|
---|
21 | // You should have received a copy of the GNU Library General Public License
|
---|
22 | // along with the SC Toolkit; see the file COPYING.LIB. If not, write to
|
---|
23 | // the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
|
---|
24 | //
|
---|
25 | // The U.S. Government is granted a limited license as per AL 91-7.
|
---|
26 | //
|
---|
27 |
|
---|
28 |
|
---|
29 | #include <unistd.h>
|
---|
30 | #include <sys/types.h>
|
---|
31 | #include <sys/ipc.h>
|
---|
32 | #include <sys/sem.h>
|
---|
33 | #include <sys/shm.h>
|
---|
34 |
|
---|
35 |
|
---|
36 | #include <util/misc/bug.h>
|
---|
37 | #include <util/misc/formio.h>
|
---|
38 | #include <util/group/messshm.h>
|
---|
39 |
|
---|
40 | using namespace std;
|
---|
41 | using namespace sc;
|
---|
42 |
|
---|
43 | //#define DEBUG
|
---|
44 |
|
---|
45 | #ifndef SEM_A
|
---|
46 | # define SEM_A 0200
|
---|
47 | #endif
|
---|
48 |
|
---|
49 | #ifndef SEM_R
|
---|
50 | # define SEM_R 0400
|
---|
51 | #endif
|
---|
52 |
|
---|
53 | /* NALIGN is the byte boundary that we align data on. */
|
---|
54 | #define NALIGN 8
|
---|
55 | #define ROUNDUPTOALIGN(n) (((n) + (NALIGN-1)) & ~(NALIGN-1))
|
---|
56 |
|
---|
57 | static ClassDesc ShmMessageGrp_cd(
|
---|
58 | typeid(ShmMessageGrp),"ShmMessageGrp",1,"public intMessageGrp",
|
---|
59 | 0, create<ShmMessageGrp>, 0);
|
---|
60 |
|
---|
61 | ShmMessageGrp::ShmMessageGrp()
|
---|
62 | {
|
---|
63 | initialize();
|
---|
64 | }
|
---|
65 |
|
---|
66 | ShmMessageGrp::ShmMessageGrp(int nprocs)
|
---|
67 | {
|
---|
68 | initialize(nprocs);
|
---|
69 | }
|
---|
70 |
|
---|
71 | ShmMessageGrp::ShmMessageGrp(const Ref<KeyVal>& keyval):
|
---|
72 | intMessageGrp(keyval)
|
---|
73 | {
|
---|
74 | int nprocs = keyval->intvalue("n");
|
---|
75 | if (keyval->error() != KeyVal::OK) initialize();
|
---|
76 | else initialize(nprocs);
|
---|
77 | }
|
---|
78 |
|
---|
79 | void ShmMessageGrp::sync()
|
---|
80 | {
|
---|
81 | int i;
|
---|
82 | for (i=0; i<n(); i++) {
|
---|
83 | if (me() == i) continue;
|
---|
84 | wait_for_write(i);
|
---|
85 | commbuf[i]->n_sync++;
|
---|
86 | if (commbuf[i]->n_sync >= n()-1) {
|
---|
87 | while(commbuf[i]->n_wait_for_change) {
|
---|
88 | put_change(i);
|
---|
89 | commbuf[i]->n_wait_for_change--;
|
---|
90 | }
|
---|
91 | }
|
---|
92 | release_write(i);
|
---|
93 | }
|
---|
94 | wait_for_write(me());
|
---|
95 | while (commbuf[me()]->n_sync < n()-1) {
|
---|
96 | commbuf[me()]->n_wait_for_change++;
|
---|
97 | release_write(me());
|
---|
98 | get_change(me());
|
---|
99 | wait_for_write(me());
|
---|
100 | }
|
---|
101 | commbuf[me()]->n_sync -= n()-1;
|
---|
102 | while(commbuf[me()]->n_wait_for_change) {
|
---|
103 | put_change(me());
|
---|
104 | commbuf[me()]->n_wait_for_change--;
|
---|
105 | }
|
---|
106 | release_write(me());
|
---|
107 | }
|
---|
108 |
|
---|
109 | ShmMessageGrp::~ShmMessageGrp()
|
---|
110 | {
|
---|
111 | // sync the nodes
|
---|
112 | sync();
|
---|
113 |
|
---|
114 | // make sure node zero is las to touch the shared memory
|
---|
115 | if (me() == 0) {
|
---|
116 | wait_for_write(0);
|
---|
117 | while (commbuf[0]->n_sync < n()-1) {
|
---|
118 | commbuf[0]->n_wait_for_change++;
|
---|
119 | release_write(0);
|
---|
120 | get_change(0);
|
---|
121 | wait_for_write(0);
|
---|
122 | }
|
---|
123 | release_write(0);
|
---|
124 | shmdt((SHMTYPE)sharedmem);
|
---|
125 | // release the memory
|
---|
126 | shmctl(shmid,IPC_RMID,0);
|
---|
127 |
|
---|
128 | for (int i=0; i<n(); i++) {
|
---|
129 | #ifdef SEMCTL_REQUIRES_SEMUN
|
---|
130 | semun junk;
|
---|
131 | junk.val = 0;
|
---|
132 | #else
|
---|
133 | int junk = 0;
|
---|
134 | #endif
|
---|
135 | semctl(semid,i,IPC_RMID,junk);
|
---|
136 | semctl(change_semid,i,IPC_RMID,junk);
|
---|
137 | }
|
---|
138 | }
|
---|
139 | else {
|
---|
140 | wait_for_write(0);
|
---|
141 | commbuf[0]->n_sync++;
|
---|
142 | while(commbuf[0]->n_wait_for_change) {
|
---|
143 | put_change(0);
|
---|
144 | commbuf[0]->n_wait_for_change--;
|
---|
145 | }
|
---|
146 | shmdt((SHMTYPE)sharedmem);
|
---|
147 | release_write(0);
|
---|
148 | }
|
---|
149 | }
|
---|
150 |
|
---|
151 | void ShmMessageGrp::initialize()
|
---|
152 | {
|
---|
153 | int nprocs = atoi(getenv("NUMPROC"));
|
---|
154 | if (!nprocs) nprocs = 1;
|
---|
155 | initialize(nprocs);
|
---|
156 | }
|
---|
157 |
|
---|
158 | void ShmMessageGrp::initialize(int nprocs)
|
---|
159 | {
|
---|
160 | int i;
|
---|
161 | void* nextbuf;
|
---|
162 |
|
---|
163 | semdec.sem_num = 0;
|
---|
164 | semdec.sem_op = -1;
|
---|
165 | semdec.sem_flg = 0;
|
---|
166 | seminc.sem_num = 0;
|
---|
167 | seminc.sem_op = 1;
|
---|
168 | seminc.sem_flg = 0;
|
---|
169 |
|
---|
170 | // Each node gets a buffer for incoming data.
|
---|
171 | shmid = shmget(IPC_PRIVATE,
|
---|
172 | nprocs * sizeof(commbuf_t),
|
---|
173 | IPC_CREAT | SHM_R | SHM_W);
|
---|
174 |
|
---|
175 | // Attach the shared segment.
|
---|
176 | nextbuf = sharedmem = shmat(shmid,0,0);
|
---|
177 |
|
---|
178 | #ifdef SEMCTL_REQUIRES_SEMUN
|
---|
179 | semun semzero;
|
---|
180 | semzero.val = 0;
|
---|
181 | semun semone;
|
---|
182 | semone.val = 1;
|
---|
183 | #else
|
---|
184 | int semzero = 0;
|
---|
185 | int semone = 1;
|
---|
186 | #endif
|
---|
187 |
|
---|
188 | // Each shared memory segment gets a semaphore to synchronize access.
|
---|
189 | semid = semget(IPC_PRIVATE,nprocs,IPC_CREAT | SEM_R | SEM_A );
|
---|
190 | if (semid == -1) {
|
---|
191 | perror("semget");
|
---|
192 | exit(-1);
|
---|
193 | }
|
---|
194 |
|
---|
195 | change_semid = semget(IPC_PRIVATE,nprocs,IPC_CREAT | SEM_R | SEM_A );
|
---|
196 | if (change_semid == -1) {
|
---|
197 | perror("semget");
|
---|
198 | exit(-1);
|
---|
199 | }
|
---|
200 |
|
---|
201 | for (i=0; i<nprocs; i++) {
|
---|
202 |
|
---|
203 | // Mark all of the segments as available for writing.
|
---|
204 | if (semctl(semid,i,SETVAL,semone) == -1) {
|
---|
205 | perror("semctl");
|
---|
206 | exit(-1);
|
---|
207 | }
|
---|
208 |
|
---|
209 | if (semctl(change_semid,i,SETVAL,semzero) == -1) {
|
---|
210 | perror("semctl");
|
---|
211 | exit(-1);
|
---|
212 | }
|
---|
213 |
|
---|
214 | // Alloc shm for each node's commbuf.
|
---|
215 | commbuf[i] = (commbuf_t*) nextbuf;
|
---|
216 | // Mark the commbuf as having no messages.
|
---|
217 | commbuf[i]->nmsg = 0;
|
---|
218 | // and no outstanding waits
|
---|
219 | commbuf[i]->n_wait_for_change = 0;
|
---|
220 | commbuf[i]->n_sync = 0;
|
---|
221 | nextbuf = (void*)(((char*)nextbuf) + sizeof(commbuf_t));
|
---|
222 | }
|
---|
223 |
|
---|
224 | // Create the remaining nodes.
|
---|
225 | int mynodeid = 0;
|
---|
226 | for (i=1; i<nprocs; i++) {
|
---|
227 | int pid = fork();
|
---|
228 | if (!pid) {
|
---|
229 | mynodeid = i;
|
---|
230 | break;
|
---|
231 | }
|
---|
232 | }
|
---|
233 |
|
---|
234 | // Initialize the base class.
|
---|
235 | intMessageGrp::initialize(mynodeid, nprocs, 30);
|
---|
236 | }
|
---|
237 |
|
---|
238 | Ref<MessageGrp>
|
---|
239 | ShmMessageGrp::clone(void)
|
---|
240 | {
|
---|
241 | Ref<MessageGrp> smgrp = new ShmMessageGrp(n());
|
---|
242 | return smgrp;
|
---|
243 | }
|
---|
244 |
|
---|
245 | int ShmMessageGrp::basic_probe(int msgtype)
|
---|
246 | {
|
---|
247 | int i;
|
---|
248 | msgbuf_t *message;
|
---|
249 |
|
---|
250 | wait_for_write(me());
|
---|
251 |
|
---|
252 | message = (msgbuf_t*)commbuf[me()]->buf;
|
---|
253 | for (i=0; i<commbuf[me()]->nmsg; i++) {
|
---|
254 | if ((msgtype == -1) || (msgtype == message->type)) {
|
---|
255 | release_write(me());
|
---|
256 | return 1;
|
---|
257 | }
|
---|
258 | message = NEXT_MESSAGE(message);
|
---|
259 | }
|
---|
260 |
|
---|
261 | release_write(me());
|
---|
262 |
|
---|
263 | return 0;
|
---|
264 | }
|
---|
265 |
|
---|
266 | void ShmMessageGrp::basic_recv(int type, void* buf, int bytes)
|
---|
267 | {
|
---|
268 | int size;
|
---|
269 | int i;
|
---|
270 | msgbuf_t *message,*lastmessage;
|
---|
271 |
|
---|
272 | #ifdef DEBUG
|
---|
273 | ExEnv::outn() << "ShmGrp: basic_recv: "
|
---|
274 | << "type = " << type << ' '
|
---|
275 | << "buf = " << buf << ' '
|
---|
276 | << "bytes = " << bytes << ' '
|
---|
277 | << "me = " << me() << endl;
|
---|
278 | print_buffer(me(),me());
|
---|
279 | #endif
|
---|
280 |
|
---|
281 | look_for_message:
|
---|
282 |
|
---|
283 | wait_for_write(me());
|
---|
284 |
|
---|
285 | message = (msgbuf_t*)commbuf[me()]->buf;
|
---|
286 | for (i=0; i<commbuf[me()]->nmsg; i++) {
|
---|
287 | if (message->type == type) break;
|
---|
288 | message = NEXT_MESSAGE(message);
|
---|
289 | }
|
---|
290 | if (i==commbuf[me()]->nmsg) {
|
---|
291 | commbuf[me()]->n_wait_for_change++;
|
---|
292 | release_write(me());
|
---|
293 | get_change(me());
|
---|
294 | goto look_for_message;
|
---|
295 | }
|
---|
296 |
|
---|
297 | if (bytes < message->size) {
|
---|
298 | ExEnv::errn() << scprintf("messshm.cc: recv buffer too small\n");
|
---|
299 | abort();
|
---|
300 | }
|
---|
301 | if (bytes < message->size) size = bytes;
|
---|
302 | else size = message->size;
|
---|
303 |
|
---|
304 | // Copy the data.
|
---|
305 | memcpy(buf,((char*)message) + sizeof(msgbuf_t),size);
|
---|
306 |
|
---|
307 | // Find the lastmessage.
|
---|
308 | lastmessage = (msgbuf_t*)commbuf[me()]->buf;
|
---|
309 | for (i=0; i<commbuf[me()]->nmsg; i++) {
|
---|
310 | lastmessage = NEXT_MESSAGE(lastmessage);
|
---|
311 | }
|
---|
312 |
|
---|
313 | // Repack the message buffer.
|
---|
314 | memmove(message,NEXT_MESSAGE(message),
|
---|
315 | (size_t)((char*)lastmessage)-(size_t)((char*)NEXT_MESSAGE(message)));
|
---|
316 |
|
---|
317 | commbuf[me()]->nmsg--;
|
---|
318 |
|
---|
319 | while(commbuf[me()]->n_wait_for_change) {
|
---|
320 | put_change(me());
|
---|
321 | commbuf[me()]->n_wait_for_change--;
|
---|
322 | }
|
---|
323 |
|
---|
324 | release_write(me());
|
---|
325 | }
|
---|
326 |
|
---|
327 | void ShmMessageGrp::basic_send(int dest, int type, const void* buf, int bytes)
|
---|
328 | {
|
---|
329 | int i;
|
---|
330 | msgbuf_t *availmsg;
|
---|
331 |
|
---|
332 | #ifdef DEBUG
|
---|
333 | ExEnv::outn() << "ShmGrp: basic_send: "
|
---|
334 | << "dest = " << dest << ' '
|
---|
335 | << "type = " << type << ' '
|
---|
336 | << "buf = " << buf << ' '
|
---|
337 | << "bytes = " << bytes << ' '
|
---|
338 | << "me = " << me() << endl;
|
---|
339 | #endif
|
---|
340 |
|
---|
341 | if (dest>=n()) {
|
---|
342 | //debug_start("bad destination");
|
---|
343 | ExEnv::errn() << scprintf("ShmMessageGrp::basic_send: bad destination\n");
|
---|
344 | abort();
|
---|
345 | }
|
---|
346 |
|
---|
347 | try_send_again:
|
---|
348 |
|
---|
349 | // Obtain write access to the dest's incoming buffer.
|
---|
350 | wait_for_write(dest);
|
---|
351 |
|
---|
352 | availmsg = (msgbuf_t*)commbuf[dest]->buf;
|
---|
353 | for (i=0; i<commbuf[dest]->nmsg; i++) {
|
---|
354 | availmsg = NEXT_MESSAGE(availmsg);
|
---|
355 | }
|
---|
356 | if ( (((char*)availmsg) + ROUNDUPTOALIGN(sizeof(msgbuf_t) + bytes))
|
---|
357 | > (((char*)commbuf[dest]) + sizeof(commbuf_t))) {
|
---|
358 | if (me() == dest) {
|
---|
359 | // sending a message to myself and the buffer is full
|
---|
360 | // --cannot recover
|
---|
361 | ExEnv::errn() << scprintf("commbuf size exceeded on %d\n",me());
|
---|
362 | ExEnv::errn() << scprintf(" availmsg = 0x%x\n",availmsg);
|
---|
363 | ExEnv::errn() << scprintf(" commbuf[%d] + sizeof(commbuf_t) = 0x%x\n",
|
---|
364 | dest,((char*)commbuf[dest]) + sizeof(commbuf_t));
|
---|
365 | ExEnv::errn() << scprintf(" size = %d\n",bytes);
|
---|
366 | abort();
|
---|
367 | }
|
---|
368 | else {
|
---|
369 | // try to recover from a full buffer by waiting for the dest
|
---|
370 | // to read some data.
|
---|
371 | commbuf[dest]->n_wait_for_change++;
|
---|
372 | release_write(dest);
|
---|
373 | get_change(dest);
|
---|
374 | goto try_send_again;
|
---|
375 | }
|
---|
376 | }
|
---|
377 | availmsg->from = me();
|
---|
378 | availmsg->type = type;
|
---|
379 | availmsg->size = bytes;
|
---|
380 | memcpy(((char*)availmsg) + sizeof(msgbuf_t),buf,bytes);
|
---|
381 | commbuf[dest]->nmsg++;
|
---|
382 |
|
---|
383 | // let the dest know that there is more data in the buffer
|
---|
384 | while(commbuf[dest]->n_wait_for_change) {
|
---|
385 | put_change(dest);
|
---|
386 | commbuf[dest]->n_wait_for_change--;
|
---|
387 | }
|
---|
388 |
|
---|
389 | // Release write access to the dest's buffer.
|
---|
390 | release_write(dest);
|
---|
391 | }
|
---|
392 |
|
---|
393 | msgbuf_t * ShmMessageGrp::NEXT_MESSAGE(msgbuf_t *m)
|
---|
394 | {
|
---|
395 | msgbuf_t *r;
|
---|
396 | if (m->size < 0) {
|
---|
397 | ExEnv::errn() << scprintf("NEXT_MESSAGE: m->size = %d (real %d)\n",
|
---|
398 | m->size,sizeof(msgbuf_t) + m->size + m->size%8);
|
---|
399 | //debug_start("m->size < 0");
|
---|
400 | ExEnv::errn() << scprintf("messshm.cc: m->size < 0\n");
|
---|
401 | abort();
|
---|
402 | }
|
---|
403 | r = ((msgbuf_t*)(((char*)m) + ROUNDUPTOALIGN(sizeof(msgbuf_t) + m->size)));
|
---|
404 | return r;
|
---|
405 | }
|
---|
406 |
|
---|
407 | void ShmMessageGrp::get_change(int node)
|
---|
408 | {
|
---|
409 | semdec.sem_num = node;
|
---|
410 | semop(change_semid,&semdec,1);
|
---|
411 | semdec.sem_num = 0;
|
---|
412 | }
|
---|
413 |
|
---|
414 | void ShmMessageGrp::put_change(int node)
|
---|
415 | {
|
---|
416 | seminc.sem_num = node;
|
---|
417 | semop(change_semid,&seminc,1);
|
---|
418 | seminc.sem_num = 0;
|
---|
419 |
|
---|
420 | }
|
---|
421 |
|
---|
422 | // Obtain a lock for writing to the node's buffer.
|
---|
423 | void ShmMessageGrp::wait_for_write(int node)
|
---|
424 | {
|
---|
425 | semdec.sem_num = node;
|
---|
426 | semop(semid,&semdec,1);
|
---|
427 | semdec.sem_num = 0;
|
---|
428 | }
|
---|
429 |
|
---|
430 | // Release lock for writing to node's buffer.
|
---|
431 | void ShmMessageGrp::release_write(int node)
|
---|
432 | {
|
---|
433 | seminc.sem_num = node;
|
---|
434 | semop(semid,&seminc,1);
|
---|
435 | seminc.sem_num = 0;
|
---|
436 | }
|
---|
437 |
|
---|
438 | #ifdef DEBUG
|
---|
439 | void ShmMessageGrp::print_buffer(int node, int me)
|
---|
440 | {
|
---|
441 | int i;
|
---|
442 | msgbuf_t *message;
|
---|
443 | message = (msgbuf_t*)commbuf[node]->buf;
|
---|
444 |
|
---|
445 | ExEnv::outn() << scprintf("Printing buffer for node %d on node %d\n",node,me);
|
---|
446 | for (i=0; i<commbuf[node]->nmsg; i++) {
|
---|
447 | ExEnv::outn() <<
|
---|
448 | scprintf(" on node %2d: to=%2d, bytes=%6d, type=%10d, from=%2d\n",
|
---|
449 | me,
|
---|
450 | node,
|
---|
451 | message->size,
|
---|
452 | message->type,
|
---|
453 | message->from);
|
---|
454 | ExEnv::outn().flush();
|
---|
455 | message = NEXT_MESSAGE(message);
|
---|
456 | }
|
---|
457 |
|
---|
458 | }
|
---|
459 | #endif
|
---|
460 |
|
---|
461 | /////////////////////////////////////////////////////////////////////////////
|
---|
462 |
|
---|
463 | // Local Variables:
|
---|
464 | // mode: c++
|
---|
465 | // c-file-style: "CLJ"
|
---|
466 | // End:
|
---|