[0b990d] | 1 | //
|
---|
| 2 | // messshm.cc
|
---|
| 3 | //
|
---|
| 4 | // Copyright (C) 1996 Limit Point Systems, Inc.
|
---|
| 5 | //
|
---|
| 6 | // Author: Curtis Janssen <cljanss@limitpt.com>
|
---|
| 7 | // Maintainer: LPS
|
---|
| 8 | //
|
---|
| 9 | // This file is part of the SC Toolkit.
|
---|
| 10 | //
|
---|
| 11 | // The SC Toolkit is free software; you can redistribute it and/or modify
|
---|
| 12 | // it under the terms of the GNU Library General Public License as published by
|
---|
| 13 | // the Free Software Foundation; either version 2, or (at your option)
|
---|
| 14 | // any later version.
|
---|
| 15 | //
|
---|
| 16 | // The SC Toolkit is distributed in the hope that it will be useful,
|
---|
| 17 | // but WITHOUT ANY WARRANTY; without even the implied warranty of
|
---|
| 18 | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
---|
| 19 | // GNU Library General Public License for more details.
|
---|
| 20 | //
|
---|
| 21 | // You should have received a copy of the GNU Library General Public License
|
---|
| 22 | // along with the SC Toolkit; see the file COPYING.LIB. If not, write to
|
---|
| 23 | // the Free Software Foundation, 675 Mass Ave, Cambridge, MA 02139, USA.
|
---|
| 24 | //
|
---|
| 25 | // The U.S. Government is granted a limited license as per AL 91-7.
|
---|
| 26 | //
|
---|
| 27 |
|
---|
| 28 |
|
---|
| 29 | #include <unistd.h>
|
---|
| 30 | #include <sys/types.h>
|
---|
| 31 | #include <sys/ipc.h>
|
---|
| 32 | #include <sys/sem.h>
|
---|
| 33 | #include <sys/shm.h>
|
---|
| 34 |
|
---|
| 35 |
|
---|
| 36 | #include <util/misc/bug.h>
|
---|
| 37 | #include <util/misc/formio.h>
|
---|
| 38 | #include <util/group/messshm.h>
|
---|
| 39 |
|
---|
| 40 | using namespace std;
|
---|
| 41 | using namespace sc;
|
---|
| 42 |
|
---|
| 43 | //#define DEBUG
|
---|
| 44 |
|
---|
| 45 | #ifndef SEM_A
|
---|
| 46 | # define SEM_A 0200
|
---|
| 47 | #endif
|
---|
| 48 |
|
---|
| 49 | #ifndef SEM_R
|
---|
| 50 | # define SEM_R 0400
|
---|
| 51 | #endif
|
---|
| 52 |
|
---|
| 53 | /* NALIGN is the byte boundary that we align data on. */
|
---|
| 54 | #define NALIGN 8
|
---|
| 55 | #define ROUNDUPTOALIGN(n) (((n) + (NALIGN-1)) & ~(NALIGN-1))
|
---|
| 56 |
|
---|
| 57 | static ClassDesc ShmMessageGrp_cd(
|
---|
| 58 | typeid(ShmMessageGrp),"ShmMessageGrp",1,"public intMessageGrp",
|
---|
| 59 | 0, create<ShmMessageGrp>, 0);
|
---|
| 60 |
|
---|
| 61 | ShmMessageGrp::ShmMessageGrp()
|
---|
| 62 | {
|
---|
| 63 | initialize();
|
---|
| 64 | }
|
---|
| 65 |
|
---|
| 66 | ShmMessageGrp::ShmMessageGrp(int nprocs)
|
---|
| 67 | {
|
---|
| 68 | initialize(nprocs);
|
---|
| 69 | }
|
---|
| 70 |
|
---|
| 71 | ShmMessageGrp::ShmMessageGrp(const Ref<KeyVal>& keyval):
|
---|
| 72 | intMessageGrp(keyval)
|
---|
| 73 | {
|
---|
| 74 | int nprocs = keyval->intvalue("n");
|
---|
| 75 | if (keyval->error() != KeyVal::OK) initialize();
|
---|
| 76 | else initialize(nprocs);
|
---|
| 77 | }
|
---|
| 78 |
|
---|
| 79 | void ShmMessageGrp::sync()
|
---|
| 80 | {
|
---|
| 81 | int i;
|
---|
| 82 | for (i=0; i<n(); i++) {
|
---|
| 83 | if (me() == i) continue;
|
---|
| 84 | wait_for_write(i);
|
---|
| 85 | commbuf[i]->n_sync++;
|
---|
| 86 | if (commbuf[i]->n_sync >= n()-1) {
|
---|
| 87 | while(commbuf[i]->n_wait_for_change) {
|
---|
| 88 | put_change(i);
|
---|
| 89 | commbuf[i]->n_wait_for_change--;
|
---|
| 90 | }
|
---|
| 91 | }
|
---|
| 92 | release_write(i);
|
---|
| 93 | }
|
---|
| 94 | wait_for_write(me());
|
---|
| 95 | while (commbuf[me()]->n_sync < n()-1) {
|
---|
| 96 | commbuf[me()]->n_wait_for_change++;
|
---|
| 97 | release_write(me());
|
---|
| 98 | get_change(me());
|
---|
| 99 | wait_for_write(me());
|
---|
| 100 | }
|
---|
| 101 | commbuf[me()]->n_sync -= n()-1;
|
---|
| 102 | while(commbuf[me()]->n_wait_for_change) {
|
---|
| 103 | put_change(me());
|
---|
| 104 | commbuf[me()]->n_wait_for_change--;
|
---|
| 105 | }
|
---|
| 106 | release_write(me());
|
---|
| 107 | }
|
---|
| 108 |
|
---|
| 109 | ShmMessageGrp::~ShmMessageGrp()
|
---|
| 110 | {
|
---|
| 111 | // sync the nodes
|
---|
| 112 | sync();
|
---|
| 113 |
|
---|
| 114 | // make sure node zero is las to touch the shared memory
|
---|
| 115 | if (me() == 0) {
|
---|
| 116 | wait_for_write(0);
|
---|
| 117 | while (commbuf[0]->n_sync < n()-1) {
|
---|
| 118 | commbuf[0]->n_wait_for_change++;
|
---|
| 119 | release_write(0);
|
---|
| 120 | get_change(0);
|
---|
| 121 | wait_for_write(0);
|
---|
| 122 | }
|
---|
| 123 | release_write(0);
|
---|
| 124 | shmdt((SHMTYPE)sharedmem);
|
---|
| 125 | // release the memory
|
---|
| 126 | shmctl(shmid,IPC_RMID,0);
|
---|
| 127 |
|
---|
| 128 | for (int i=0; i<n(); i++) {
|
---|
| 129 | #ifdef SEMCTL_REQUIRES_SEMUN
|
---|
| 130 | semun junk;
|
---|
| 131 | junk.val = 0;
|
---|
| 132 | #else
|
---|
| 133 | int junk = 0;
|
---|
| 134 | #endif
|
---|
| 135 | semctl(semid,i,IPC_RMID,junk);
|
---|
| 136 | semctl(change_semid,i,IPC_RMID,junk);
|
---|
| 137 | }
|
---|
| 138 | }
|
---|
| 139 | else {
|
---|
| 140 | wait_for_write(0);
|
---|
| 141 | commbuf[0]->n_sync++;
|
---|
| 142 | while(commbuf[0]->n_wait_for_change) {
|
---|
| 143 | put_change(0);
|
---|
| 144 | commbuf[0]->n_wait_for_change--;
|
---|
| 145 | }
|
---|
| 146 | shmdt((SHMTYPE)sharedmem);
|
---|
| 147 | release_write(0);
|
---|
| 148 | }
|
---|
| 149 | }
|
---|
| 150 |
|
---|
| 151 | void ShmMessageGrp::initialize()
|
---|
| 152 | {
|
---|
| 153 | int nprocs = atoi(getenv("NUMPROC"));
|
---|
| 154 | if (!nprocs) nprocs = 1;
|
---|
| 155 | initialize(nprocs);
|
---|
| 156 | }
|
---|
| 157 |
|
---|
| 158 | void ShmMessageGrp::initialize(int nprocs)
|
---|
| 159 | {
|
---|
| 160 | int i;
|
---|
| 161 | void* nextbuf;
|
---|
| 162 |
|
---|
| 163 | semdec.sem_num = 0;
|
---|
| 164 | semdec.sem_op = -1;
|
---|
| 165 | semdec.sem_flg = 0;
|
---|
| 166 | seminc.sem_num = 0;
|
---|
| 167 | seminc.sem_op = 1;
|
---|
| 168 | seminc.sem_flg = 0;
|
---|
| 169 |
|
---|
| 170 | // Each node gets a buffer for incoming data.
|
---|
| 171 | shmid = shmget(IPC_PRIVATE,
|
---|
| 172 | nprocs * sizeof(commbuf_t),
|
---|
| 173 | IPC_CREAT | SHM_R | SHM_W);
|
---|
| 174 |
|
---|
| 175 | // Attach the shared segment.
|
---|
| 176 | nextbuf = sharedmem = shmat(shmid,0,0);
|
---|
| 177 |
|
---|
| 178 | #ifdef SEMCTL_REQUIRES_SEMUN
|
---|
| 179 | semun semzero;
|
---|
| 180 | semzero.val = 0;
|
---|
| 181 | semun semone;
|
---|
| 182 | semone.val = 1;
|
---|
| 183 | #else
|
---|
| 184 | int semzero = 0;
|
---|
| 185 | int semone = 1;
|
---|
| 186 | #endif
|
---|
| 187 |
|
---|
| 188 | // Each shared memory segment gets a semaphore to synchronize access.
|
---|
| 189 | semid = semget(IPC_PRIVATE,nprocs,IPC_CREAT | SEM_R | SEM_A );
|
---|
| 190 | if (semid == -1) {
|
---|
| 191 | perror("semget");
|
---|
| 192 | exit(-1);
|
---|
| 193 | }
|
---|
| 194 |
|
---|
| 195 | change_semid = semget(IPC_PRIVATE,nprocs,IPC_CREAT | SEM_R | SEM_A );
|
---|
| 196 | if (change_semid == -1) {
|
---|
| 197 | perror("semget");
|
---|
| 198 | exit(-1);
|
---|
| 199 | }
|
---|
| 200 |
|
---|
| 201 | for (i=0; i<nprocs; i++) {
|
---|
| 202 |
|
---|
| 203 | // Mark all of the segments as available for writing.
|
---|
| 204 | if (semctl(semid,i,SETVAL,semone) == -1) {
|
---|
| 205 | perror("semctl");
|
---|
| 206 | exit(-1);
|
---|
| 207 | }
|
---|
| 208 |
|
---|
| 209 | if (semctl(change_semid,i,SETVAL,semzero) == -1) {
|
---|
| 210 | perror("semctl");
|
---|
| 211 | exit(-1);
|
---|
| 212 | }
|
---|
| 213 |
|
---|
| 214 | // Alloc shm for each node's commbuf.
|
---|
| 215 | commbuf[i] = (commbuf_t*) nextbuf;
|
---|
| 216 | // Mark the commbuf as having no messages.
|
---|
| 217 | commbuf[i]->nmsg = 0;
|
---|
| 218 | // and no outstanding waits
|
---|
| 219 | commbuf[i]->n_wait_for_change = 0;
|
---|
| 220 | commbuf[i]->n_sync = 0;
|
---|
| 221 | nextbuf = (void*)(((char*)nextbuf) + sizeof(commbuf_t));
|
---|
| 222 | }
|
---|
| 223 |
|
---|
| 224 | // Create the remaining nodes.
|
---|
| 225 | int mynodeid = 0;
|
---|
| 226 | for (i=1; i<nprocs; i++) {
|
---|
| 227 | int pid = fork();
|
---|
| 228 | if (!pid) {
|
---|
| 229 | mynodeid = i;
|
---|
| 230 | break;
|
---|
| 231 | }
|
---|
| 232 | }
|
---|
| 233 |
|
---|
| 234 | // Initialize the base class.
|
---|
| 235 | intMessageGrp::initialize(mynodeid, nprocs, 30);
|
---|
| 236 | }
|
---|
| 237 |
|
---|
| 238 | Ref<MessageGrp>
|
---|
| 239 | ShmMessageGrp::clone(void)
|
---|
| 240 | {
|
---|
| 241 | Ref<MessageGrp> smgrp = new ShmMessageGrp(n());
|
---|
| 242 | return smgrp;
|
---|
| 243 | }
|
---|
| 244 |
|
---|
| 245 | int ShmMessageGrp::basic_probe(int msgtype)
|
---|
| 246 | {
|
---|
| 247 | int i;
|
---|
| 248 | msgbuf_t *message;
|
---|
| 249 |
|
---|
| 250 | wait_for_write(me());
|
---|
| 251 |
|
---|
| 252 | message = (msgbuf_t*)commbuf[me()]->buf;
|
---|
| 253 | for (i=0; i<commbuf[me()]->nmsg; i++) {
|
---|
| 254 | if ((msgtype == -1) || (msgtype == message->type)) {
|
---|
| 255 | release_write(me());
|
---|
| 256 | return 1;
|
---|
| 257 | }
|
---|
| 258 | message = NEXT_MESSAGE(message);
|
---|
| 259 | }
|
---|
| 260 |
|
---|
| 261 | release_write(me());
|
---|
| 262 |
|
---|
| 263 | return 0;
|
---|
| 264 | }
|
---|
| 265 |
|
---|
| 266 | void ShmMessageGrp::basic_recv(int type, void* buf, int bytes)
|
---|
| 267 | {
|
---|
| 268 | int size;
|
---|
| 269 | int i;
|
---|
| 270 | msgbuf_t *message,*lastmessage;
|
---|
| 271 |
|
---|
| 272 | #ifdef DEBUG
|
---|
| 273 | ExEnv::outn() << "ShmGrp: basic_recv: "
|
---|
| 274 | << "type = " << type << ' '
|
---|
| 275 | << "buf = " << buf << ' '
|
---|
| 276 | << "bytes = " << bytes << ' '
|
---|
| 277 | << "me = " << me() << endl;
|
---|
| 278 | print_buffer(me(),me());
|
---|
| 279 | #endif
|
---|
| 280 |
|
---|
| 281 | look_for_message:
|
---|
| 282 |
|
---|
| 283 | wait_for_write(me());
|
---|
| 284 |
|
---|
| 285 | message = (msgbuf_t*)commbuf[me()]->buf;
|
---|
| 286 | for (i=0; i<commbuf[me()]->nmsg; i++) {
|
---|
| 287 | if (message->type == type) break;
|
---|
| 288 | message = NEXT_MESSAGE(message);
|
---|
| 289 | }
|
---|
| 290 | if (i==commbuf[me()]->nmsg) {
|
---|
| 291 | commbuf[me()]->n_wait_for_change++;
|
---|
| 292 | release_write(me());
|
---|
| 293 | get_change(me());
|
---|
| 294 | goto look_for_message;
|
---|
| 295 | }
|
---|
| 296 |
|
---|
| 297 | if (bytes < message->size) {
|
---|
| 298 | ExEnv::errn() << scprintf("messshm.cc: recv buffer too small\n");
|
---|
| 299 | abort();
|
---|
| 300 | }
|
---|
| 301 | if (bytes < message->size) size = bytes;
|
---|
| 302 | else size = message->size;
|
---|
| 303 |
|
---|
| 304 | // Copy the data.
|
---|
| 305 | memcpy(buf,((char*)message) + sizeof(msgbuf_t),size);
|
---|
| 306 |
|
---|
| 307 | // Find the lastmessage.
|
---|
| 308 | lastmessage = (msgbuf_t*)commbuf[me()]->buf;
|
---|
| 309 | for (i=0; i<commbuf[me()]->nmsg; i++) {
|
---|
| 310 | lastmessage = NEXT_MESSAGE(lastmessage);
|
---|
| 311 | }
|
---|
| 312 |
|
---|
| 313 | // Repack the message buffer.
|
---|
| 314 | memmove(message,NEXT_MESSAGE(message),
|
---|
| 315 | (size_t)((char*)lastmessage)-(size_t)((char*)NEXT_MESSAGE(message)));
|
---|
| 316 |
|
---|
| 317 | commbuf[me()]->nmsg--;
|
---|
| 318 |
|
---|
| 319 | while(commbuf[me()]->n_wait_for_change) {
|
---|
| 320 | put_change(me());
|
---|
| 321 | commbuf[me()]->n_wait_for_change--;
|
---|
| 322 | }
|
---|
| 323 |
|
---|
| 324 | release_write(me());
|
---|
| 325 | }
|
---|
| 326 |
|
---|
| 327 | void ShmMessageGrp::basic_send(int dest, int type, const void* buf, int bytes)
|
---|
| 328 | {
|
---|
| 329 | int i;
|
---|
| 330 | msgbuf_t *availmsg;
|
---|
| 331 |
|
---|
| 332 | #ifdef DEBUG
|
---|
| 333 | ExEnv::outn() << "ShmGrp: basic_send: "
|
---|
| 334 | << "dest = " << dest << ' '
|
---|
| 335 | << "type = " << type << ' '
|
---|
| 336 | << "buf = " << buf << ' '
|
---|
| 337 | << "bytes = " << bytes << ' '
|
---|
| 338 | << "me = " << me() << endl;
|
---|
| 339 | #endif
|
---|
| 340 |
|
---|
| 341 | if (dest>=n()) {
|
---|
| 342 | //debug_start("bad destination");
|
---|
| 343 | ExEnv::errn() << scprintf("ShmMessageGrp::basic_send: bad destination\n");
|
---|
| 344 | abort();
|
---|
| 345 | }
|
---|
| 346 |
|
---|
| 347 | try_send_again:
|
---|
| 348 |
|
---|
| 349 | // Obtain write access to the dest's incoming buffer.
|
---|
| 350 | wait_for_write(dest);
|
---|
| 351 |
|
---|
| 352 | availmsg = (msgbuf_t*)commbuf[dest]->buf;
|
---|
| 353 | for (i=0; i<commbuf[dest]->nmsg; i++) {
|
---|
| 354 | availmsg = NEXT_MESSAGE(availmsg);
|
---|
| 355 | }
|
---|
| 356 | if ( (((char*)availmsg) + ROUNDUPTOALIGN(sizeof(msgbuf_t) + bytes))
|
---|
| 357 | > (((char*)commbuf[dest]) + sizeof(commbuf_t))) {
|
---|
| 358 | if (me() == dest) {
|
---|
| 359 | // sending a message to myself and the buffer is full
|
---|
| 360 | // --cannot recover
|
---|
| 361 | ExEnv::errn() << scprintf("commbuf size exceeded on %d\n",me());
|
---|
| 362 | ExEnv::errn() << scprintf(" availmsg = 0x%x\n",availmsg);
|
---|
| 363 | ExEnv::errn() << scprintf(" commbuf[%d] + sizeof(commbuf_t) = 0x%x\n",
|
---|
| 364 | dest,((char*)commbuf[dest]) + sizeof(commbuf_t));
|
---|
| 365 | ExEnv::errn() << scprintf(" size = %d\n",bytes);
|
---|
| 366 | abort();
|
---|
| 367 | }
|
---|
| 368 | else {
|
---|
| 369 | // try to recover from a full buffer by waiting for the dest
|
---|
| 370 | // to read some data.
|
---|
| 371 | commbuf[dest]->n_wait_for_change++;
|
---|
| 372 | release_write(dest);
|
---|
| 373 | get_change(dest);
|
---|
| 374 | goto try_send_again;
|
---|
| 375 | }
|
---|
| 376 | }
|
---|
| 377 | availmsg->from = me();
|
---|
| 378 | availmsg->type = type;
|
---|
| 379 | availmsg->size = bytes;
|
---|
| 380 | memcpy(((char*)availmsg) + sizeof(msgbuf_t),buf,bytes);
|
---|
| 381 | commbuf[dest]->nmsg++;
|
---|
| 382 |
|
---|
| 383 | // let the dest know that there is more data in the buffer
|
---|
| 384 | while(commbuf[dest]->n_wait_for_change) {
|
---|
| 385 | put_change(dest);
|
---|
| 386 | commbuf[dest]->n_wait_for_change--;
|
---|
| 387 | }
|
---|
| 388 |
|
---|
| 389 | // Release write access to the dest's buffer.
|
---|
| 390 | release_write(dest);
|
---|
| 391 | }
|
---|
| 392 |
|
---|
| 393 | msgbuf_t * ShmMessageGrp::NEXT_MESSAGE(msgbuf_t *m)
|
---|
| 394 | {
|
---|
| 395 | msgbuf_t *r;
|
---|
| 396 | if (m->size < 0) {
|
---|
| 397 | ExEnv::errn() << scprintf("NEXT_MESSAGE: m->size = %d (real %d)\n",
|
---|
| 398 | m->size,sizeof(msgbuf_t) + m->size + m->size%8);
|
---|
| 399 | //debug_start("m->size < 0");
|
---|
| 400 | ExEnv::errn() << scprintf("messshm.cc: m->size < 0\n");
|
---|
| 401 | abort();
|
---|
| 402 | }
|
---|
| 403 | r = ((msgbuf_t*)(((char*)m) + ROUNDUPTOALIGN(sizeof(msgbuf_t) + m->size)));
|
---|
| 404 | return r;
|
---|
| 405 | }
|
---|
| 406 |
|
---|
| 407 | void ShmMessageGrp::get_change(int node)
|
---|
| 408 | {
|
---|
| 409 | semdec.sem_num = node;
|
---|
| 410 | semop(change_semid,&semdec,1);
|
---|
| 411 | semdec.sem_num = 0;
|
---|
| 412 | }
|
---|
| 413 |
|
---|
| 414 | void ShmMessageGrp::put_change(int node)
|
---|
| 415 | {
|
---|
| 416 | seminc.sem_num = node;
|
---|
| 417 | semop(change_semid,&seminc,1);
|
---|
| 418 | seminc.sem_num = 0;
|
---|
| 419 |
|
---|
| 420 | }
|
---|
| 421 |
|
---|
| 422 | // Obtain a lock for writing to the node's buffer.
|
---|
| 423 | void ShmMessageGrp::wait_for_write(int node)
|
---|
| 424 | {
|
---|
| 425 | semdec.sem_num = node;
|
---|
| 426 | semop(semid,&semdec,1);
|
---|
| 427 | semdec.sem_num = 0;
|
---|
| 428 | }
|
---|
| 429 |
|
---|
| 430 | // Release lock for writing to node's buffer.
|
---|
| 431 | void ShmMessageGrp::release_write(int node)
|
---|
| 432 | {
|
---|
| 433 | seminc.sem_num = node;
|
---|
| 434 | semop(semid,&seminc,1);
|
---|
| 435 | seminc.sem_num = 0;
|
---|
| 436 | }
|
---|
| 437 |
|
---|
| 438 | #ifdef DEBUG
|
---|
| 439 | void ShmMessageGrp::print_buffer(int node, int me)
|
---|
| 440 | {
|
---|
| 441 | int i;
|
---|
| 442 | msgbuf_t *message;
|
---|
| 443 | message = (msgbuf_t*)commbuf[node]->buf;
|
---|
| 444 |
|
---|
| 445 | ExEnv::outn() << scprintf("Printing buffer for node %d on node %d\n",node,me);
|
---|
| 446 | for (i=0; i<commbuf[node]->nmsg; i++) {
|
---|
| 447 | ExEnv::outn() <<
|
---|
| 448 | scprintf(" on node %2d: to=%2d, bytes=%6d, type=%10d, from=%2d\n",
|
---|
| 449 | me,
|
---|
| 450 | node,
|
---|
| 451 | message->size,
|
---|
| 452 | message->type,
|
---|
| 453 | message->from);
|
---|
| 454 | ExEnv::outn().flush();
|
---|
| 455 | message = NEXT_MESSAGE(message);
|
---|
| 456 | }
|
---|
| 457 |
|
---|
| 458 | }
|
---|
| 459 | #endif
|
---|
| 460 |
|
---|
| 461 | /////////////////////////////////////////////////////////////////////////////
|
---|
| 462 |
|
---|
| 463 | // Local Variables:
|
---|
| 464 | // mode: c++
|
---|
| 465 | // c-file-style: "CLJ"
|
---|
| 466 | // End:
|
---|