YAP 7.1.0
yap_mpi.c
1/*
2 Copyright (C) 2004,2005,2006 (Nuno A. Fonseca) <nuno.fonseca@gmail.com>
3
4 This program is free software; you can redistribute it and/or
5modify it under the terms of the GNU General Public License
6as published by the Free Software Foundation; either
7version 2 of the License, or (at your option) any later
8version.
9
10This program is distributed in the hope that it will be useful,
11but WITHOUT ANY WxuARRANTY; without even the implied warranty of
12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13GNU General Public License for more details.
14
15You should have received a copy of the GNU General Public License
16along with this program; if not, write to the Free Software
17Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18
19
20Last rev: $Id: yap_mpi.c,v 1.4 2006-09-28 11:42:51 vsc Exp $
21Comments: YAP interface to LAM/MPI
22*/
23#include "YapConfig.h"
24#include <stdio.h>
25#if HAVE_STRING_H
26#include <string.h>
27#endif
28#if HAVE_MALLOC_H
29#include <malloc.h>
30#endif
31#if HAVE_UNISTD_H
32#include <unistd.h>
33#endif
34#if HAVE_SYS_TIMES_H
35#include <sys/times.h>
36#endif
37
38#if HAVE_MPI_H
39#include <mpi.h>
40
41#include "prologterms2c.h"
42
43#include <YapInterface.h>
44
45#include "hash.h"
46
47/*********************************************************************/
48struct broadcast_req {
49 void *ptr; // pointer to an allocated memory buffer associated to the broadcast
50 int nreq; // number of requests associated to the broadcast
51};
52typedef struct broadcast_req BroadcastRequest;
53
54/*********************************************************************/
55#define IDTYPE long
56#define HANDLE2INT(ptr) (IDTYPE)ptr
57#define INT2HANDLE(id) (MPI_Request*)id
58
59#define BREQ2INT(ptr) (IDTYPE)ptr
60#define INT2BREQ(ptr) (BroadcastRequest*)ptr
61
62#define MPI_CALL(function) ((mpi_status=function)==MPI_SUCCESS?MPI_SUCCESS:mpi_error(mpi_status))
63
64#ifdef USE_THREADS
65#include <pthread.h>
66#endif
67
68/********************************************************************
69 * Auxiliary data
70 ********************************************************************/
71static YAP_Bool mpi_statuss[1024];
72#define mpi_status (mpi_statuss[YAP_ThreadSelf()])
73
74extern int GLOBAL_argc;
75
76#define HASHSIZE 1777
77static hashtable requests=NULL;
78static hashtable broadcasts=NULL;
79
80X_API void init_mpi(void);
81
82/********************************************************************
83 * Time accounting
84 ********************************************************************/
85#ifdef MPISTATS
86
87#include <sys/time.h>
88#include <time.h>
89
90/* Statistics */
91static unsigned long bytes_sent; // bytes received (mpi headers are ignored)
92static unsigned long bytes_recv; // bytes received
93static unsigned long num_msgs_sent; // number of messages sent
94static unsigned long num_msgs_recv; // number of messages received
95static unsigned long max_s_recv_msg;// maximum size of a message received
96static unsigned long max_s_sent_msg;// maximum size of a message sent
97static double total_time_spent; // total time spend in communication code
98
99/* MSG ACCOUNTING */
100#define RESET_STATS() {total_time_spent=0;bytes_sent=bytes_recv=num_msgs_recv=num_msgs_sent=max_s_recv_msg= max_s_sent_msg=0;}
101#define MSG_SENT(size) {bytes_sent+=size;++num_msgs_sent;if(max_s_sent_msg<size)max_s_sent_msg=size;}
102#define MSG_RECV(size) {bytes_recv+=size;++num_msgs_recv;if(max_s_recv_msg<size)max_s_recv_msg=size;}
103
104#define MPITIME total_time_spent
105
106/* Timer */
107#define CONT_TIMER() {tstart();}
108#define PAUSE_TIMER() {tend();total_time_spent+=tval();}
109
110#define return(p) {PAUSE_TIMER();return (p);}
111
112static struct timeval _tstarts[1024], _tends[1024];
113
114#define _tsart (_tstarts[YAP_ThreadSelf()])
115#define _tend (_tends[YAP_ThreadSelf()])
116
117#include <sys/time.h>
118#include <sys/resource.h>
119#include <unistd.h>
120
121void tstart(void) {
122 struct rusage r;
123 getrusage(RUSAGE_SELF,&r);
124 _tstart=r.ru_utime;
125}
126void tend(void) {
127 struct rusage r;
128 getrusage(RUSAGE_SELF,&r);
129 _tend=r.ru_utime;
130}
131//
132double tval(){
133 double t1, t2,elapsed;
134 t1 = (double)_tstart.tv_sec + (double)_tstart.tv_usec/(1000*1000);
135 t2 = (double)_tend.tv_sec + (double)_tend.tv_usec/(1000*1000);
136 elapsed=t2-t1;
137 if (elapsed==0) return 0.000001;
138 return elapsed;
139}
140/*
141 * returns the statistics
142 */
143static YAP_Bool mpi_stats(void){
144 fprintf(stderr,"%f %ld %ld %ld %ld %ld %ld\n",MPITIME,num_msgs_recv,bytes_recv,max_s_recv_msg,num_msgs_sent,bytes_sent,max_s_sent_msg);
145 return (YAP_Unify(YAP_ARG1, YAP_MkFloatTerm((float)(MPITIME))) &&
146 YAP_Unify(YAP_ARG2, YAP_MkIntTerm((long)num_msgs_recv)) &&
147 YAP_Unify(YAP_ARG3, YAP_MkIntTerm((long)bytes_recv)) &&
148 YAP_Unify(YAP_ARG4, YAP_MkIntTerm((long)max_s_recv_msg)) &&
149 YAP_Unify(YAP_ARG5, YAP_MkIntTerm((long)num_msgs_sent)) &&
150 YAP_Unify(YAP_ARG6, YAP_MkIntTerm((long)bytes_sent)) &&
151 YAP_Unify(YAP_ARG7, YAP_MkIntTerm((long)max_s_sent_msg))
152 );
153}
154/*
155 *
156 */
157static YAP_Bool mpi_reset_stats(void) {RESET_STATS(); return true;}
158#else
159
160#define PAUSE_TIMER()
161#define CONT_TIMER()
162
163
164#define RESET_STATS()
165#define MSG_SENT(size)
166#define MSG_RECV(size)
167#define return(p) {return (p);}
168#endif
169
170/********************************************************************
171 * Functions to store/fetch/delete requests
172 ********************************************************************/
173
174static inline int
175new_request(MPI_Request *handle,void* ptr) {
176 return insere(requests,(ulong)HANDLE2INT(handle),ptr);
177}
178
179static inline void*
180get_request(MPI_Request *handle) {
181 return get_object(requests,(ulong)HANDLE2INT(handle));
182}
183
184static inline void
185free_request(MPI_Request *handle) {
186 void* ptr;
187 ptr=delete(requests,(ulong)HANDLE2INT(handle));
188 free(ptr);
189 free(handle);
190}
191/********************************************************************
192 * Functions to store/fetch/delete broadcast requests
193 ********************************************************************/
194/*
195 * Returns a new BroadcastRequest object
196 */
197static inline BroadcastRequest*
198new_broadcast(void) {
199 BroadcastRequest* b=(BroadcastRequest *)malloc(sizeof(BroadcastRequest));
200 if ( b!=NULL) {
201 b->ptr=NULL;
202 b->nreq=0;
203 }
204 // write_msg(__FUNCTION__,__FILE__,__LINE__,"new broadcast: %p\n",b);
205 return b;
206}
207/*
208 *
209 */
210static inline void
211free_broadcast_request(MPI_Request *handle) {
212 BroadcastRequest* b;
213 b=(BroadcastRequest*)delete(broadcasts,(ulong)BREQ2INT(handle));// get the ptr to broadcast object
214 b->nreq--;
215 if ( !b->nreq ) {
216 // all requests received
217 free(b->ptr);
218 free(b);
219 }
220 // write_msg(__FUNCTION__,__FILE__,__LINE__,"free broadcast_request: %p->%p\n",b,handle);
221 free(handle);
222}
223/*
224 *
225 */
226static inline void*
227get_broadcast_request(MPI_Request *handle) {
228 return get_object(broadcasts,(ulong)HANDLE2INT(handle));
229}
230/*
231 *
232 */
233static inline int
234new_broadcast_request(BroadcastRequest* b,MPI_Request *handle,void* ptr) {
235 b->ptr=ptr;
236 b->nreq++;
237 //write_msg(__FUNCTION__,__FILE__,__LINE__,"new broadcast_request: %p->%p\n",b,handle);
238 return insere(broadcasts,(ulong)HANDLE2INT(handle),b);
239}
240
241/*********************************************************************/
242static YAP_Bool mpi_error(int errcode){
243 char err_msg[MPI_MAX_ERROR_STRING];
244 int len;
245
246 MPI_Error_string(errcode,&err_msg[0],&len);
247 err_msg[len]='\0';
248#ifdef MPI_DEBUG
249 write_msg(__FUNCTION__,__FILE__,__LINE__,"MPI_Error: %s\n",err_msg);
250#endif
251 return errcode;
252}
253/********************************************************************
254
255 ********************************************************************/
256/*
257 * Sets up the mpi enviromment. This function should be called before any other MPI
258 * function.
259 */
260static YAP_Bool
261mpi_init(void){
262 int thread_level;
263 char ** my_argv;
264 int my_argc = YAP_Argv(&my_argv);
265 // MPI_Init(&GLOBAL_argc, &GLOBAL_argv);
266 MPI_Init_thread(&my_argc, &my_argv, MPI_THREAD_MULTIPLE, &thread_level);
267#ifdef MPI_DEBUG
268 write_msg(__FUNCTION__,__FILE__,__LINE__,"Thread level: %d\n",thread_level);
269#endif
270#ifdef MPISTATS
271 RESET_STATS();
272#endif
273 return true;
274}
275
276#ifdef USE_THREADS
277/*
278 * Sets up the mpi enviromment. This function should be called before any other MPI
279 * function.
280 * the argument is the name of the predicate that will be invoked when a message is received
281 */
282static YAP_Bool
283rcv_msg_thread(char *handle_pred) {
284 YAP_Term pred=YAP_MkAtomTerm(YAP_LookupAtom(handle_pred));
285 MPI_Status status;
286
287 while(1) {
288 write_msg(__FUNCTION__,__FILE__,__LINE__,"Waiting for MPI msg\n");
289 if( MPI_CALL(MPI_Probe( MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status )) == MPI_SUCCESS ) {
290 // call handle
291 write_msg(__FUNCTION__,__FILE__,__LINE__,"MPI Msg received\n");
292 YAP_CallProlog(pred);
293 } else
294 write_msg(__FUNCTION__,__FILE__,__LINE__,"Error in MPI_Probe\n");
295 }
296 return 1;
297}
298/*
299 *
300 */
301static YAP_Bool
302mpi_init_rcv_thread(void){
303 int thread_level;
304 // MPI_Init(&GLOBAL_argc, &GLOBAL_argv);
305 pthread_t thread;
306 char *arg="handle_msg";
307
308 MPI_Init_thread(&GLOBAL_argc, &GLOBAL_argv,MPI_THREAD_SINGLE,&thread_level);
309 if(pthread_create(&thread,NULL,(void*)&rcv_msg_thread,arg)) {
310 return false;
311 }
312
313 pthread_detach(thread);
314 write_msg(__FUNCTION__,__FILE__,__LINE__,"Thread level: %d\n",thread_level);
315 return true;
316}
317#endif
318
319/*
320 *Terminates the MPI execution enviroment. Every process must call this function before
321 * exiting.
322 * mpi_comm_finalize.
323 */
324static YAP_Bool
325mpi_finalize(void){
326 return (MPI_Finalize()==MPI_SUCCESS?true:false);
327}
328/*
329 * Returns the number of workers associated to the MPI_COMM_WORLD communicator.
330 * mpi_comm_size(-Size).
331 */
332static YAP_Bool
333mpi_comm_size(void){
334 int size;
335 MPI_CALL(MPI_Comm_size(MPI_COMM_WORLD, &size));
336 return (YAP_Unify(YAP_ARG1, YAP_MkIntTerm(size)));
337}
338/*
339 * Returns the rank of the current process.
340 * mpi_comm_rank(-Rank).
341 */
342static YAP_Bool
343mpi_comm_rank(void){
344 int rank;
345 MPI_CALL(MPI_Comm_rank(MPI_COMM_WORLD, &rank));
346 return(YAP_Unify(YAP_ARG1,YAP_MkIntTerm(rank)));
347}
348/*
349 * Returns the major and minor version of MPI.
350 * mpi_version(-Major,-Minor).
351 */
352static YAP_Bool
353mpi_version(void){
354 int major,minor;
355
356 MPI_CALL(MPI_Get_version(&major,&minor));
357 return (YAP_Unify(YAP_ARG1,YAP_MkIntTerm(major)) && YAP_Unify(YAP_ARG2,YAP_MkIntTerm(minor)));
358}
359/*
360 *
361 *
362 */
363static YAP_Bool
364mpi_get_processor_name(void) {
365 char name[MPI_MAX_PROCESSOR_NAME];
366 int length;
367 MPI_CALL(MPI_Get_processor_name(name,&length));
368 return (YAP_Unify(YAP_ARG1,YAP_MkAtomTerm(YAP_LookupAtom(name))));
369}
370/*
371 * Non blocking communication function. The message is sent when possible. To check for the status of the message,
372 * the mpi_wait and mpi_test should be used. Until mpi_wait is called, the memory allocated for the buffer containing
373 * the message is not released.
374 *
375 * mpi_isend(+Data, +Destination, +Tag, -Handle).
376 */
377static YAP_Bool
378mpi_isend(void) {
379 YAP_Term t1 = YAP_Deref(YAP_ARG1),
380 t2 = YAP_Deref(YAP_ARG2),
381 t3 = YAP_Deref(YAP_ARG3),
382 t4 = YAP_Deref(YAP_ARG4);
383 char *str=NULL;
384 int dest,tag;
385 size_t len=0;
386 MPI_Request *handle=(MPI_Request*)malloc(sizeof(MPI_Request));
387
388 CONT_TIMER();
389 if ( handle==NULL ) return false;
390
391 if (YAP_IsVarTerm(t1) || !YAP_IsIntTerm(t2) || !YAP_IsIntTerm(t3) || !YAP_IsVarTerm(t4)) {
392 PAUSE_TIMER();
393 return false;
394 }
395 //
396 dest = YAP_IntOfTerm(t2);
397 tag = YAP_IntOfTerm(t3);
398 //
399 str=term2string(t2);
400 len=strlen(str)+1;
401 MSG_SENT(len);
402 // send the data
403 if( MPI_CALL(MPI_Isend( str, len, MPI_CHAR, dest, tag, MPI_COMM_WORLD ,handle)) != MPI_SUCCESS ) {
404 PAUSE_TIMER();
405 return false;
406 }
407
408#ifdef MPI_DEBUG
409 write_msg(__FUNCTION__,__FILE__,__LINE__,"%s(%s,%u, MPI_CHAR,%d,%d)\n",__FUNCTION__,str,len,dest,tag);
410#endif
411 USED_BUFFER(); // informs the prologterm2c module that the buffer is now used and should not be messed
412 // We must associate the string to each handle
413 new_request(handle,str);
414 PAUSE_TIMER();
415 return(YAP_Unify(YAP_ARG4,YAP_MkIntTerm(HANDLE2INT(handle))));// it should always succeed
416}
417
418/*
419 * Blocking communication function. The message is sent immediatly.
420 * mpi_send(+Data, +Destination, +Tag).
421 */
422static YAP_Bool
423mpi_send(void) {
424
425 YAP_Term t1 = YAP_Deref(YAP_ARG1),
426 t2 = YAP_Deref(YAP_ARG2),
427 t3 = YAP_Deref(YAP_ARG3);
428 char *str=NULL;
429 int dest,tag;
430 size_t len;
431 int val;
432 if (YAP_IsVarTerm(t1) || !YAP_IsIntTerm(t2) || !YAP_IsIntTerm(t3)) {
433 return false;
434 }
435
436 CONT_TIMER();
437 //
438 dest = YAP_IntOfTerm(t2);
439 tag = YAP_IntOfTerm(t3);
440 // the data is packaged as a string
441 str=term2string(t1);
442 len=strlen(str)+1;
443#if defined(MPI_DEBUG)
444 write_msg(__FUNCTION__,__FILE__,__LINE__,"%s(%s,%u, MPI_CHAR,%d,%d)\n",__FUNCTION__,str,len,dest,tag);
445#endif
446 // send the data
447 val=(MPI_CALL(MPI_Send( str, len, MPI_CHAR, dest, tag, MPI_COMM_WORLD))==MPI_SUCCESS?true:false);
448
449 PAUSE_TIMER();
450 return(val);
451}
452/*
453 * Implements a blocking receive operation.
454 * mpi_recv(?Source,?Tag,-Data).
455 */
456static YAP_Bool
457mpi_recv(void) {
458 YAP_Term t1 = YAP_Deref(YAP_ARG1),
459 t2 = YAP_Deref(YAP_ARG2),
460 t3 = YAP_Deref(YAP_ARG3),
461 t4;
462 int tag, orig;
463 MPI_Status status;
464 char *tmp;
465
466 //The third argument (data) must be unbound
467 if(!YAP_IsVarTerm(t3)) {
468 return false;
469 }
470 /* The first argument (Source) must be bound to an integer
471 (the rank of the source) or left unbound (i.e. any source
472 is OK) */
473 if (YAP_IsVarTerm(t1)) orig = MPI_ANY_SOURCE;
474 else if( !YAP_IsIntTerm(t1) ) return false;
475 else orig = YAP_IntOfTerm(t1);
476
477 /* The second argument must be bound to an integer (the tag)
478 or left unbound (i.e. any tag is OK) */
479 if (YAP_IsVarTerm(t2)) tag = MPI_ANY_TAG;
480 else if( !YAP_IsIntTerm(t2) ) return false;
481 else tag = YAP_IntOfTerm( t2 );
482
483 CONT_TIMER();
484 // probe for term' size
485 if( MPI_CALL(MPI_Probe( orig, tag, MPI_COMM_WORLD, &status )) != MPI_SUCCESS) {
486 PAUSE_TIMER();
487 return false;
488 }
489 int count;
490 if( MPI_CALL(MPI_Get_count( &status, MPI_CHAR, &count )) != MPI_SUCCESS ||
491 status.MPI_TAG==MPI_UNDEFINED ||
492 status.MPI_SOURCE==MPI_UNDEFINED) {
493 PAUSE_TIMER();
494 return false;
495 }
496 //realloc memory buffer
497 char *buf;
498 if (count>BLOCK_SIZE) {
499 buf = malloc(count);
500 tmp = buf;
501 } else {
502 buf = buffer.ptr;
503 tmp = NULL;
504 }
505 // Already know the source from MPI_Probe()
506 if( orig == MPI_ANY_SOURCE ) {
507 orig = status.MPI_SOURCE;
508 if( !YAP_Unify(t1, YAP_MkIntTerm(orig))) {
509 PAUSE_TIMER();
510 return false;
511 }
512 }
513 // Already know the tag from MPI_Probe()
514 if( tag == MPI_ANY_TAG ) {
515 tag = status.MPI_TAG;
516 if( !YAP_Unify(t2, YAP_MkIntTerm(status.MPI_TAG))) {
517 PAUSE_TIMER();
518 return false;
519 }
520 }
521 // Receive the message as a string
522 if( MPI_CALL(MPI_Recv( buf, count, MPI_CHAR, orig, tag,
523 MPI_COMM_WORLD, &status )) != MPI_SUCCESS ) {
524 /* Getting in here should never happen; it means that the first
525 package (containing size) was sent properly, but there was a glitch with
526 the actual content! */
527 PAUSE_TIMER();
528 if (tmp)
529 free(tmp);
530 return false;
531 }
532
533#ifdef MPI_DEBUG
534 write_msg(__FUNCTION__,__FILE__,__LINE__,"%s(%s,%u, MPI_CHAR,%d,%d)\n",__FUNCTION__,buf, count, orig, tag);
535#endif
536 MSG_RECV(count);
537 t4=string2term(buf,NULL);
538 PAUSE_TIMER();
539 if (tmp)
540 free(tmp);
541 return(YAP_Unify(YAP_ARG3,t4));
542}
543
544/*
545 * Implements a non-blocking receive operation.
546 * mpi_irecv(?Source,?Tag,-Handle).
547 */
548static YAP_Bool
549mpi_irecv(void) {
550 YAP_Term t1 = YAP_Deref(YAP_ARG1),
551 t2 = YAP_Deref(YAP_ARG2),
552 t3 = YAP_Deref(YAP_ARG3);
553 int tag, orig;
554 MPI_Request *mpi_req=(MPI_Request*)malloc(sizeof(MPI_Request));
555
556 // The third argument (data) must be unbound
557 if(!YAP_IsVarTerm(t3)) {
558 //Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_receive");
559 return false;
560 }
561 /* The first argument (Source) must be bound to an integer
562 (the rank of the source) or left unbound (i.e. any source
563 is OK) */
564 if (YAP_IsVarTerm(t1)) orig = MPI_ANY_SOURCE;
565 else if( !YAP_IsIntTerm(t1) ) return false;
566 else orig = YAP_IntOfTerm(t1);
567
568 /* The third argument must be bound to an integer (the tag)
569 or left unbound (i.e. any tag is OK) */
570 if (YAP_IsVarTerm(t2)) tag = MPI_ANY_TAG;
571 else if( !YAP_IsIntTerm(t2) ) return false;
572 else tag = YAP_IntOfTerm( t2 );
573
574 CONT_TIMER();
575 if( MPI_CALL(MPI_Irecv( BUFFER_PTR, BLOCK_SIZE, MPI_CHAR, orig, tag,
576 MPI_COMM_WORLD, mpi_req )) != MPI_SUCCESS ) {
577 PAUSE_TIMER();
578 return false;
579 }
580 new_request(mpi_req,BUFFER_PTR);
581 DEL_BUFFER();
582 PAUSE_TIMER();
583 return YAP_Unify(t3,YAP_MkIntTerm(HANDLE2INT(mpi_req)));
584}
585
586/*
587 * Completes a non-blocking operation. IF the operation was a send, the
588 * function waits until the message is buffered or sent by the runtime
589 * system. At this point the send buffer is released. If the operation
590 * was a receive, it waits until the message is copied to the receive
591 * buffer.
592 * mpi_wait(+Handle,-Status).
593*/
594static YAP_Bool
595mpi_wait(void) {
596 YAP_Term t1 = YAP_Deref(YAP_ARG1), // Handle
597 t2 = YAP_Deref(YAP_ARG2); // Status
598 MPI_Status status;
599 MPI_Request *handle;
600
601 // The first argument must be an integer (an handle)
602 if(!YAP_IsIntTerm(t1)) {
603 return false;
604 }
605 handle=INT2HANDLE(YAP_IntOfTerm(t1));
606 CONT_TIMER();
607 // probe for term' size
608 if( MPI_CALL(MPI_Wait( handle , &status )) != MPI_SUCCESS ) {
609 PAUSE_TIMER();
610 return false;
611 }
612 free_request(handle);
613 PAUSE_TIMER();
614 return(YAP_Unify(t2,YAP_MkIntTerm(status.MPI_ERROR)));
615}
616
617/*
618 * mpi_test(+Handle,-Status)
619 *
620 * Provides information regarding a handle, ie. if a communication operation has been completed.
621 * If the operation has been completed the predicate succeeds with the completion status,
622 * otherwise it fails.
623 * ).
624*/
625static YAP_Bool
626mpi_test(void) {
627 YAP_Term t1 = YAP_Deref(YAP_ARG1), // Handle
628 t2 = YAP_Deref(YAP_ARG2); // Status
629 MPI_Status status;
630 MPI_Request *handle;
631 int flag;
632
633 // The first argument (handle) must be an integer
634 if(!YAP_IsIntTerm(t1)) {
635 return false;
636 }
637 CONT_TIMER();
638
639 handle=INT2HANDLE(YAP_IntOfTerm(t1));
640 //
641 MPI_CALL(MPI_Test( handle , &flag, &status ));
642 if( flag != true ) {
643 PAUSE_TIMER();
644 return false;
645 }
646 free_request(handle);
647 PAUSE_TIMER();
648 return(YAP_Unify(t2,YAP_MkIntTerm(status.MPI_ERROR)));
649}
650
660static YAP_Bool
661mpi_wait_recv(void) {
662 YAP_Term t1 = YAP_Deref(YAP_ARG1); // data
663 MPI_Status status;
664 MPI_Request *handle;
665 char *s;
666 int len,ret;
667 YAP_Term out;
668
669 // The first argument (handle) must be an integer
670 if(!YAP_IsIntTerm(t1)) {
671 return false;
672 }
673 CONT_TIMER();
674
675 handle=INT2HANDLE(YAP_IntOfTerm(t1));
676 s=(char*)get_request(handle);
677 // wait for communication completion
678 if( MPI_CALL(MPI_Wait( handle , &status )) != MPI_SUCCESS) {
679 PAUSE_TIMER();
680 return false;
681 }
682 len=YAP_SizeOfExportedTerm(s);
683 // make sure we only fetch ARG3 after constructing the term
684 out = string2term(s,(size_t*)&len);
685 MSG_RECV(len);
686 free_request(handle);
687 PAUSE_TIMER();
688 ret=YAP_Unify(YAP_ARG3,out);
689 return(ret & YAP_Unify(YAP_ARG2,YAP_MkIntTerm(status.MPI_ERROR)));
690}
691
692/*
693 * Provides information regarding a handle, ie. if a communication operation has been completed.
694 * If the operation has been completed the predicate succeeds with the completion status,
695 * otherwise it fails.
696 *
697 * mpi_test(+Handle,-Status,-Data).
698 */
699static YAP_Bool
700mpi_test_recv(void) {
701 YAP_Term t1 = YAP_Deref(YAP_ARG1); // data
702
703 MPI_Status status;
704 MPI_Request *handle;
705 int flag,len,ret;
706 char *s;
707 YAP_Term out;
708
709 // The first argument (handle) must be an integer
710 if(!YAP_IsIntTerm(t1)) {
711 return false;
712 }
713 CONT_TIMER();
714
715 handle=INT2HANDLE(YAP_IntOfTerm(t1));
716 //
717 if( MPI_CALL(MPI_Test( handle , &flag, &status ))!=MPI_SUCCESS) {
718 PAUSE_TIMER();
719 return false;
720 }
721 s=(char*)get_request(handle);
722 len=strlen(s);
723 out = string2term(s,(size_t*)&len);
724 // make sure we only fetch ARG3 after constructing the term
725 ret=YAP_Unify(YAP_ARG3,out);
726 free_request(handle);
727 PAUSE_TIMER();
728 return(ret & YAP_Unify(YAP_ARG2,YAP_MkIntTerm(status.MPI_ERROR)));
729}
730
731/*
732 * Collective communication function that performs a barrier synchronization among all processes.
733 * mpi_barrier
734 */
735static YAP_Bool
736mpi_barrier(void) {
737 CONT_TIMER();
738 int ret=MPI_CALL(MPI_Barrier(MPI_COMM_WORLD));
739 PAUSE_TIMER();
740 return (ret==MPI_SUCCESS?true:false);
741}
742/***********************************
743 * Broadcast
744 ***********************************/
745/*
746 * Broadcasts a message from the process with rank "root" to
747 * all other processes of the group.
748 * Note: Collective communication means all processes within a communicator call the same routine.
749 * To be able to use a regular MPI_Recv to recv the messages, one should use mpi_bcast2
750 *
751 * mpi_bcast(+Root,+Data).
752 */
753static YAP_Bool
754mpi_bcast(void) {
755 YAP_Term t1 = YAP_Deref(YAP_ARG1),
756 t2 = YAP_Deref(YAP_ARG2);
757 int root,val;
758 size_t len=0;
759 char *str;
760 int rank;
761 //The arguments should be bound
762 if(!YAP_IsIntTerm(t1)) {
763 return false;
764 }
765 MPI_CALL(MPI_Comm_rank(MPI_COMM_WORLD, &rank));
766
767 CONT_TIMER();
768 root = YAP_IntOfTerm(t1);
769 if (root == rank) {
770 str=term2string(t2);
771#ifdef MPI_DEBUG
772 write_msg(__FUNCTION__,__FILE__,__LINE__,"mpi_bcast(%s,%u, MPI_CHAR,%d)\n",str,len,root);
773#endif
774 } else {
775 RESET_BUFFER();
776 str = BUFFER_PTR;
777 len = BLOCK_SIZE;
778 }
779 // send the data
780 val=(MPI_CALL(MPI_Bcast( str, len, MPI_CHAR, root, MPI_COMM_WORLD))==MPI_SUCCESS?true:false);
781
782
783#ifdef MPISTATS
784 {
785 int size;
786 MPI_CALL(MPI_Comm_size(MPI_COMM_WORLD, &size));
787 MSG_SENT(len*size);
788 }
789#endif
790 PAUSE_TIMER();
791 if (root != rank) {
792 YAP_Term out;
793 len=YAP_SizeOfExportedTerm(str);
794 // make sure we only fetch ARG3 after constructing the term
795 out = string2term(str,(size_t*)&len);
796 MSG_RECV(len);
797 if (!YAP_Unify(YAP_ARG2, out))
798 return false;
799 }
800 return(val);
801}
802
803/*
804 * Broadcasts a message from the process with rank "root" to
805 * all other processes of the group.
806 * Note: Collective communication means all processes within a communicator call the same routine.
807 * To be able to use a regular MPI_Recv to recv the messages, one should use mpi_bcast2
808 * mpi_bcast_int(+Root,+Data,+Tag).
809 */
810static YAP_Bool
811my_bcast(YAP_Term t1,YAP_Term t2, YAP_Term t3) {
812 int root;
813 int k,worldsize;
814 size_t len=0;
815 char *str;
816 int tag;
817
818 //The arguments should be bound
819 if(YAP_IsVarTerm(t2) || !YAP_IsIntTerm(t1) || !YAP_IsIntTerm(t3)) {
820 return false;
821 }
822
823 CONT_TIMER();
824
825 MPI_CALL(MPI_Comm_size(MPI_COMM_WORLD,&worldsize));
826
827 root = YAP_IntOfTerm(t1);
828 tag = YAP_IntOfTerm(t3);
829 str=term2string(t2);
830
831 for(k=0;k<=worldsize-1;++k)
832 if(k!=root) {
833 // Use async send?
834 MSG_SENT(len);
835 if(MPI_CALL(MPI_Send( str, len, MPI_CHAR, k, tag, MPI_COMM_WORLD))!=MPI_SUCCESS) {
836 PAUSE_TIMER();
837 return false;
838 }
839#ifdef MPI_DEBUG
840 write_msg(__FUNCTION__,__FILE__,__LINE__,"bcast2(%s,%u, MPI_CHAR,%d,%d)\n",str,len,k,tag);
841#endif
842 }
843 PAUSE_TIMER();
844 return true;
845}
846/*
847 * mpi_bcast(+Root,+Data).
848 */
849static YAP_Bool
850mpi_bcast2(void) {
851 return my_bcast(YAP_ARG1,YAP_ARG2,YAP_MkIntTerm(0));
852}
853/*
854 * Broadcasts a message from the process with rank "root" to
855 * all other processes of the group.
856 * Note: Collective communication means all processes within a communicator call the same routine.
857 * To be able to use a regular MPI_Recv to recv the messages, one should use mpi_bcast2
858 *
859 * mpi_bcast(+Root,+Data,+Tag).
860 */
861static YAP_Bool
862mpi_bcast3(void) {
863 return my_bcast(YAP_ARG1,YAP_ARG2,YAP_ARG3);
864}
865/*
866 * Broadcasts a message from the process with rank "root" to
867 * all other processes of the group.
868 * mpi_ibcast(+Root,+Data,+Tag).
869 */
870static YAP_Bool
871my_ibcast(YAP_Term t1,YAP_Term t2, YAP_Term t3) {
872 int root;
873 int k,worldsize;
874 size_t len=0;
875 char *str;
876 int tag;
877 BroadcastRequest *b;
878
879 //fprintf(stderr,"ibcast1");
880 //The arguments should be bound
881 if(YAP_IsVarTerm(t2) || !YAP_IsIntTerm(t1) || !YAP_IsIntTerm(t3)) {
882 return false;
883 }
884
885 CONT_TIMER();
886
887 // fprintf(stderr,"ibcast2");
888 MPI_CALL(MPI_Comm_size(MPI_COMM_WORLD,&worldsize));
889
890 root = YAP_IntOfTerm(t1);
891 tag = YAP_IntOfTerm(t3);
892 str = term2string(t2);
893 b=new_broadcast();
894 if ( b==NULL ) {
895 PAUSE_TIMER();
896 return false;
897 }
898 //fprintf(stderr,"ibcast3");
899 for(k=0;k<=worldsize-1;++k) {
900 if(k!=root) {
901 MPI_Request *handle=(MPI_Request*)malloc(sizeof(MPI_Request));
902 MSG_SENT(len);
903 // Use async send
904 if(MPI_CALL(MPI_Isend(str, len, MPI_CHAR, k, tag, MPI_COMM_WORLD,handle))!=MPI_SUCCESS) {
905 free(handle);
906 PAUSE_TIMER();
907 return false;
908 }
909 new_broadcast_request(b,handle,str);
910 //new_request(handle,str);
911 USED_BUFFER();
912 }
913 }
914 if(!b->nreq)//release b if no messages were sent (worldsize==1)
915 free(b);
916
917#if defined(MPI_DEBUG) && defined(MALLINFO)
918 {
919 struct mallinfo s = mallinfo();
920 printf("%d: %d=%d/%d\n",getpid(),s.arena,s.uordblks,s.fordblks); //vsc
921 }
922#endif
923 PAUSE_TIMER();
924 //fprintf(stderr,"ibcast4");
925 return true;
926}
927/*
928 * Broadcasts a message from the process with rank "root" to
929 * all other processes of the group.
930 * To receive the message the recipients use MPI_Recv
931 * The message is sent using MPI_Isend
932 * mpi_ibcast(+Root,+Data,+Tag).
933 */
934static YAP_Bool
935mpi_ibcast3(void) {
936 return my_ibcast(YAP_ARG1,YAP_ARG2,YAP_ARG3);
937}
938/*
939 * mpi_ibcast(+Root,+Data).
940 */
941static YAP_Bool
942mpi_ibcast2(void) {
943 return my_ibcast(YAP_ARG1,YAP_ARG2,YAP_MkIntTerm(0));
944}
945/*******************************************
946 * Garbage collection
947 *******************************************/
948/*
949 * Attempts to release the requests structures used in asynchronous communications
950 */
951static void
952gc(hashtable ht) {
953 MPI_Request *handle;
954 hashnode* node;
955 MPI_Status status;
956 int flag;
957
958 node=(hashnode*)next_hashnode(ht);
959 if ( node==NULL) return;
960
961 gc(ht); // start at the end
962
963 handle=INT2HANDLE(node->value);
964 MPI_CALL(MPI_Test( handle , &flag, &status ));
965 if ( flag==true) {
966 MPI_CALL(MPI_Wait(handle,&status));
967#ifdef MPI_DEBUG
968 write_msg(__FUNCTION__,__FILE__,__LINE__,"Released handle...%s\n",(char*)node->obj);
969#endif
970 if (ht==requests)
971 free_request(handle);
972 else
973 free_broadcast_request(handle);
974 }
975}
976/*
977 *
978 */
979static YAP_Bool
980mpi_gc(void) {
981 //write_msg(__FUNCTION__,__FILE__,__LINE__,"MPI_gc>: requests=%d\n",requests->n_entries);
982 CONT_TIMER();
983 init_hash_traversal(requests);
984 gc(requests);
985 init_hash_traversal(broadcasts);
986 gc(broadcasts);
987 //write_msg(__FUNCTION__,__FILE__,__LINE__,"MPI_gc<: requests=%d\n",requests->n_entries);
988 PAUSE_TIMER();
989 return true;
990}
991
992//size_t BLOCK_SIZE=4*1024;
993
994static YAP_Bool
995mpi_default_buffer_size(void)
996{
997 if (!YAP_Unify(YAP_ARG1,YAP_MkIntTerm(BLOCK_SIZE))) {
998 return false;
999}
1000 return true;
1001}
1002
1003/********************************************************************
1004 * Init
1005 *******************************************************************/
1006X_API void
1007init_mpi(void) {
1008
1009 YAP_SetYAPFlag(YAP_MkAtomTerm(YAP_LookupAtom("readline")),
1010 YAP_MkAtomTerm(YAP_LookupAtom("false")));
1011 requests=new_hashtable(HASHSIZE);
1012 broadcasts=new_hashtable(HASHSIZE);
1013 RESET_BUFFER();
1014 YAP_UserCPredicate( "mpi_init", mpi_init,0); // mpi_init/0
1015#ifdef USE_THREADS
1016 YAP_UserCPredicate( "mpi_init_rcv_thread", mpi_init_rcv_thread,1); // mpi_init_rcv_thread(+HandleMsgGoal/1)
1017#endif
1018 YAP_UserCPredicate( "mpi_finalize", mpi_finalize,0); // mpi_finalize turn
1019 YAP_UserCPredicate( "mpi_comm_size", mpi_comm_size,1); // mpi_comm_size(-Size)
1020 YAP_UserCPredicate( "mpi_comm_rank", mpi_comm_rank,1); // mpi_comm_rank(-Rank)
1021 YAP_UserCPredicate( "mpi_version", mpi_version,2); // mpi_version(-Major,-Minor)
1022 YAP_UserCPredicate( "mpi_get_processor_name", mpi_get_processor_name,1); // mpi_get_processor_name(-Name)
1023 YAP_UserCPredicate( "mpi_send", mpi_send,3); // mpi_send(+Data, +Destination, +Tag).
1024 YAP_UserCPredicate( "mpi_isend",mpi_isend,4);
1025 YAP_UserCPredicate( "mpi_recv", mpi_recv,3); // mpi_recv(?Source,?Tag,-Data).
1026 YAP_UserCPredicate( "mpi_irecv", mpi_irecv,3); // mpi_irecv(?Source,?Tag,-Handle).
1027 YAP_UserCPredicate( "mpi_wait", mpi_wait,2); // mpi_wait(+Handle,-Status).
1028 YAP_UserCPredicate( "mpi_wait_recv", mpi_wait_recv,3); // mpi_wait_recv(+Handle,-Status,-Data).
1029 YAP_UserCPredicate( "mpi_test", mpi_test,2); // mpi_test(+Handle,-Status).
1030 YAP_UserCPredicate( "mpi_test_recv", mpi_test_recv,3); // mpi_test(+Handle,-Status,-Data).
1031 YAP_UserCPredicate( "mpi_bcast", mpi_bcast,2); // mpi_bcast(Root,Term)
1032 YAP_UserCPredicate( "mpi_bcast2", mpi_bcast2,2); // mpi_bcast2(Root,Term)
1033 YAP_UserCPredicate( "mpi_bcast3", mpi_bcast3,3); // mpi_bcast3(Root,Term,Tag)
1042 YAP_UserCPredicate( "mpi_ibcast", mpi_ibcast2,2); // mpi_ibcast(Root,Term)
1043 YAP_UserCPredicate( "mpi_ibcast", mpi_ibcast3,3); // mpi_ibcast(Root,Term,Tag)
1053 YAP_UserCPredicate( "mpi_barrier", mpi_barrier,0); // mpi_barrier/0
1054 YAP_UserCPredicate( "mpi_gc", mpi_gc,0); // mpi_gc/0
1055 YAP_UserCPredicate( "mpi_default_buffer_size", mpi_default_buffer_size,2); // buffer size
1068#ifdef MPISTATS
1069 YAP_UserCPredicate( "mpi_stats", mpi_stats,7); // mpi_stats(-Time,#MsgsRecv,BytesRecv,MaxRecev,#MsgSent,BytesSent,MaxSent)
1070 YAP_UserCPredicate( "mpi_reset_stats", mpi_reset_stats,0); // cleans the timers
1071 RESET_STATS();
1072#endif
1073 // YAP_UserCPredicate( "mpi_gather", mpi_gather,0); //mpi_gather(+RootRank,?SendData,?RecvData)
1074 // Each process (root process included) sends the contents of its send buffer to the root process. The root process receives the messages and stores them in rank order. The outcome is as if each of the n processes in the group (including the root process) had executed a call to MPI_Send and the root had executed n calls to MPI_Recv. The receive buffer is ignored for all non-root processes.
1075 // MPI_Scatter
1076#ifdef MPI_DEBUG
1077 fprintf(stderr,"MPI module succesfully loaded.");
1078 fflush(stderr);
1079#endif
1080}
1081
1082#endif /* HAVE_MPI_H */
#define YAP_Deref(t)
X_API macro.
Definition: YapInterface.h:86
@ gc
controls garbage collection
Definition: YapGFlagInfo.h:290
Definition: hash.h:40