41#include "prologterms2c.h"
52typedef struct broadcast_req BroadcastRequest;
56#define HANDLE2INT(ptr) (IDTYPE)ptr
57#define INT2HANDLE(id) (MPI_Request*)id
59#define BREQ2INT(ptr) (IDTYPE)ptr
60#define INT2BREQ(ptr) (BroadcastRequest*)ptr
62#define MPI_CALL(function) ((mpi_status=function)==MPI_SUCCESS?MPI_SUCCESS:mpi_error(mpi_status))
71static YAP_Bool mpi_statuss[1024];
72#define mpi_status (mpi_statuss[YAP_ThreadSelf()])
74extern int GLOBAL_argc;
80X_API
void init_mpi(
void);
91static unsigned long bytes_sent;
92static unsigned long bytes_recv;
93static unsigned long num_msgs_sent;
94static unsigned long num_msgs_recv;
95static unsigned long max_s_recv_msg;
96static unsigned long max_s_sent_msg;
97static double total_time_spent;
100#define RESET_STATS() {total_time_spent=0;bytes_sent=bytes_recv=num_msgs_recv=num_msgs_sent=max_s_recv_msg= max_s_sent_msg=0;}
101#define MSG_SENT(size) {bytes_sent+=size;++num_msgs_sent;if(max_s_sent_msg<size)max_s_sent_msg=size;}
102#define MSG_RECV(size) {bytes_recv+=size;++num_msgs_recv;if(max_s_recv_msg<size)max_s_recv_msg=size;}
104#define MPITIME total_time_spent
107#define CONT_TIMER() {tstart();}
108#define PAUSE_TIMER() {tend();total_time_spent+=tval();}
110#define return(p) {PAUSE_TIMER();return (p);}
112static struct timeval _tstarts[1024], _tends[1024];
114#define _tsart (_tstarts[YAP_ThreadSelf()])
115#define _tend (_tends[YAP_ThreadSelf()])
118#include <sys/resource.h>
123 getrusage(RUSAGE_SELF,&r);
128 getrusage(RUSAGE_SELF,&r);
133 double t1, t2,elapsed;
134 t1 = (double)_tstart.tv_sec + (
double)_tstart.tv_usec/(1000*1000);
135 t2 = (double)_tend.tv_sec + (
double)_tend.tv_usec/(1000*1000);
137 if (elapsed==0)
return 0.000001;
143static YAP_Bool mpi_stats(
void){
144 fprintf(stderr,
"%f %ld %ld %ld %ld %ld %ld\n",MPITIME,num_msgs_recv,bytes_recv,max_s_recv_msg,num_msgs_sent,bytes_sent,max_s_sent_msg);
145 return (YAP_Unify(YAP_ARG1, YAP_MkFloatTerm((
float)(MPITIME))) &&
146 YAP_Unify(YAP_ARG2, YAP_MkIntTerm((
long)num_msgs_recv)) &&
147 YAP_Unify(YAP_ARG3, YAP_MkIntTerm((
long)bytes_recv)) &&
148 YAP_Unify(YAP_ARG4, YAP_MkIntTerm((
long)max_s_recv_msg)) &&
149 YAP_Unify(YAP_ARG5, YAP_MkIntTerm((
long)num_msgs_sent)) &&
150 YAP_Unify(YAP_ARG6, YAP_MkIntTerm((
long)bytes_sent)) &&
151 YAP_Unify(YAP_ARG7, YAP_MkIntTerm((
long)max_s_sent_msg))
157static YAP_Bool mpi_reset_stats(
void) {RESET_STATS();
return true;}
165#define MSG_SENT(size)
166#define MSG_RECV(size)
167#define return(p) {return (p);}
175new_request(MPI_Request *handle,
void* ptr) {
176 return insere(requests,(ulong)HANDLE2INT(handle),ptr);
180get_request(MPI_Request *handle) {
181 return get_object(requests,(ulong)HANDLE2INT(handle));
185free_request(MPI_Request *handle) {
187 ptr=
delete(requests,(ulong)HANDLE2INT(handle));
197static inline BroadcastRequest*
199 BroadcastRequest* b=(BroadcastRequest *)malloc(
sizeof(BroadcastRequest));
211free_broadcast_request(MPI_Request *handle) {
213 b=(BroadcastRequest*)
delete(broadcasts,(ulong)BREQ2INT(handle));
227get_broadcast_request(MPI_Request *handle) {
228 return get_object(broadcasts,(ulong)HANDLE2INT(handle));
234new_broadcast_request(BroadcastRequest* b,MPI_Request *handle,
void* ptr) {
238 return insere(broadcasts,(ulong)HANDLE2INT(handle),b);
242static YAP_Bool mpi_error(
int errcode){
243 char err_msg[MPI_MAX_ERROR_STRING];
246 MPI_Error_string(errcode,&err_msg[0],&len);
249 write_msg(__FUNCTION__,__FILE__,__LINE__,
"MPI_Error: %s\n",err_msg);
264 int my_argc = YAP_Argv(&my_argv);
266 MPI_Init_thread(&my_argc, &my_argv, MPI_THREAD_MULTIPLE, &thread_level);
268 write_msg(__FUNCTION__,__FILE__,__LINE__,
"Thread level: %d\n",thread_level);
283rcv_msg_thread(
char *handle_pred) {
284 YAP_Term pred=YAP_MkAtomTerm(YAP_LookupAtom(handle_pred));
288 write_msg(__FUNCTION__,__FILE__,__LINE__,
"Waiting for MPI msg\n");
289 if( MPI_CALL(MPI_Probe( MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status )) == MPI_SUCCESS ) {
291 write_msg(__FUNCTION__,__FILE__,__LINE__,
"MPI Msg received\n");
292 YAP_CallProlog(pred);
294 write_msg(__FUNCTION__,__FILE__,__LINE__,
"Error in MPI_Probe\n");
302mpi_init_rcv_thread(
void){
306 char *arg=
"handle_msg";
308 MPI_Init_thread(&GLOBAL_argc, &GLOBAL_argv,MPI_THREAD_SINGLE,&thread_level);
309 if(pthread_create(&thread,NULL,(
void*)&rcv_msg_thread,arg)) {
313 pthread_detach(thread);
314 write_msg(__FUNCTION__,__FILE__,__LINE__,
"Thread level: %d\n",thread_level);
326 return (MPI_Finalize()==MPI_SUCCESS?
true:
false);
335 MPI_CALL(MPI_Comm_size(MPI_COMM_WORLD, &size));
336 return (YAP_Unify(YAP_ARG1, YAP_MkIntTerm(size)));
345 MPI_CALL(MPI_Comm_rank(MPI_COMM_WORLD, &rank));
346 return(YAP_Unify(YAP_ARG1,YAP_MkIntTerm(rank)));
356 MPI_CALL(MPI_Get_version(&major,&minor));
357 return (YAP_Unify(YAP_ARG1,YAP_MkIntTerm(major)) && YAP_Unify(YAP_ARG2,YAP_MkIntTerm(minor)));
364mpi_get_processor_name(
void) {
365 char name[MPI_MAX_PROCESSOR_NAME];
367 MPI_CALL(MPI_Get_processor_name(name,&length));
368 return (YAP_Unify(YAP_ARG1,YAP_MkAtomTerm(YAP_LookupAtom(name))));
386 MPI_Request *handle=(MPI_Request*)malloc(
sizeof(MPI_Request));
389 if ( handle==NULL )
return false;
391 if (YAP_IsVarTerm(t1) || !YAP_IsIntTerm(t2) || !YAP_IsIntTerm(t3) || !YAP_IsVarTerm(t4)) {
396 dest = YAP_IntOfTerm(t2);
397 tag = YAP_IntOfTerm(t3);
403 if( MPI_CALL(MPI_Isend( str, len, MPI_CHAR, dest, tag, MPI_COMM_WORLD ,handle)) != MPI_SUCCESS ) {
409 write_msg(__FUNCTION__,__FILE__,__LINE__,
"%s(%s,%u, MPI_CHAR,%d,%d)\n",__FUNCTION__,str,len,dest,tag);
413 new_request(handle,str);
415 return(YAP_Unify(YAP_ARG4,YAP_MkIntTerm(HANDLE2INT(handle))));
432 if (YAP_IsVarTerm(t1) || !YAP_IsIntTerm(t2) || !YAP_IsIntTerm(t3)) {
438 dest = YAP_IntOfTerm(t2);
439 tag = YAP_IntOfTerm(t3);
443#if defined(MPI_DEBUG)
444 write_msg(__FUNCTION__,__FILE__,__LINE__,
"%s(%s,%u, MPI_CHAR,%d,%d)\n",__FUNCTION__,str,len,dest,tag);
447 val=(MPI_CALL(MPI_Send( str, len, MPI_CHAR, dest, tag, MPI_COMM_WORLD))==MPI_SUCCESS?
true:
false);
467 if(!YAP_IsVarTerm(t3)) {
473 if (YAP_IsVarTerm(t1)) orig = MPI_ANY_SOURCE;
474 else if( !YAP_IsIntTerm(t1) )
return false;
475 else orig = YAP_IntOfTerm(t1);
479 if (YAP_IsVarTerm(t2)) tag = MPI_ANY_TAG;
480 else if( !YAP_IsIntTerm(t2) )
return false;
481 else tag = YAP_IntOfTerm( t2 );
485 if( MPI_CALL(MPI_Probe( orig, tag, MPI_COMM_WORLD, &status )) != MPI_SUCCESS) {
490 if( MPI_CALL(MPI_Get_count( &status, MPI_CHAR, &count )) != MPI_SUCCESS ||
491 status.MPI_TAG==MPI_UNDEFINED ||
492 status.MPI_SOURCE==MPI_UNDEFINED) {
498 if (count>BLOCK_SIZE) {
506 if( orig == MPI_ANY_SOURCE ) {
507 orig = status.MPI_SOURCE;
508 if( !YAP_Unify(t1, YAP_MkIntTerm(orig))) {
514 if( tag == MPI_ANY_TAG ) {
515 tag = status.MPI_TAG;
516 if( !YAP_Unify(t2, YAP_MkIntTerm(status.MPI_TAG))) {
522 if( MPI_CALL(MPI_Recv( buf, count, MPI_CHAR, orig, tag,
523 MPI_COMM_WORLD, &status )) != MPI_SUCCESS ) {
534 write_msg(__FUNCTION__,__FILE__,__LINE__,
"%s(%s,%u, MPI_CHAR,%d,%d)\n",__FUNCTION__,buf, count, orig, tag);
537 t4=string2term(buf,NULL);
541 return(YAP_Unify(YAP_ARG3,t4));
554 MPI_Request *mpi_req=(MPI_Request*)malloc(
sizeof(MPI_Request));
557 if(!YAP_IsVarTerm(t3)) {
564 if (YAP_IsVarTerm(t1)) orig = MPI_ANY_SOURCE;
565 else if( !YAP_IsIntTerm(t1) )
return false;
566 else orig = YAP_IntOfTerm(t1);
570 if (YAP_IsVarTerm(t2)) tag = MPI_ANY_TAG;
571 else if( !YAP_IsIntTerm(t2) )
return false;
572 else tag = YAP_IntOfTerm( t2 );
575 if( MPI_CALL(MPI_Irecv( BUFFER_PTR, BLOCK_SIZE, MPI_CHAR, orig, tag,
576 MPI_COMM_WORLD, mpi_req )) != MPI_SUCCESS ) {
580 new_request(mpi_req,BUFFER_PTR);
583 return YAP_Unify(t3,YAP_MkIntTerm(HANDLE2INT(mpi_req)));
602 if(!YAP_IsIntTerm(t1)) {
605 handle=INT2HANDLE(YAP_IntOfTerm(t1));
608 if( MPI_CALL(MPI_Wait( handle , &status )) != MPI_SUCCESS ) {
612 free_request(handle);
614 return(YAP_Unify(t2,YAP_MkIntTerm(status.MPI_ERROR)));
634 if(!YAP_IsIntTerm(t1)) {
639 handle=INT2HANDLE(YAP_IntOfTerm(t1));
641 MPI_CALL(MPI_Test( handle , &flag, &status ));
646 free_request(handle);
648 return(YAP_Unify(t2,YAP_MkIntTerm(status.MPI_ERROR)));
670 if(!YAP_IsIntTerm(t1)) {
675 handle=INT2HANDLE(YAP_IntOfTerm(t1));
676 s=(
char*)get_request(handle);
678 if( MPI_CALL(MPI_Wait( handle , &status )) != MPI_SUCCESS) {
682 len=YAP_SizeOfExportedTerm(s);
684 out = string2term(s,(
size_t*)&len);
686 free_request(handle);
688 ret=YAP_Unify(YAP_ARG3,out);
689 return(ret & YAP_Unify(YAP_ARG2,YAP_MkIntTerm(status.MPI_ERROR)));
710 if(!YAP_IsIntTerm(t1)) {
715 handle=INT2HANDLE(YAP_IntOfTerm(t1));
717 if( MPI_CALL(MPI_Test( handle , &flag, &status ))!=MPI_SUCCESS) {
721 s=(
char*)get_request(handle);
723 out = string2term(s,(
size_t*)&len);
725 ret=YAP_Unify(YAP_ARG3,out);
726 free_request(handle);
728 return(ret & YAP_Unify(YAP_ARG2,YAP_MkIntTerm(status.MPI_ERROR)));
738 int ret=MPI_CALL(MPI_Barrier(MPI_COMM_WORLD));
740 return (ret==MPI_SUCCESS?
true:
false);
762 if(!YAP_IsIntTerm(t1)) {
765 MPI_CALL(MPI_Comm_rank(MPI_COMM_WORLD, &rank));
768 root = YAP_IntOfTerm(t1);
772 write_msg(__FUNCTION__,__FILE__,__LINE__,
"mpi_bcast(%s,%u, MPI_CHAR,%d)\n",str,len,root);
780 val=(MPI_CALL(MPI_Bcast( str, len, MPI_CHAR, root, MPI_COMM_WORLD))==MPI_SUCCESS?
true:
false);
786 MPI_CALL(MPI_Comm_size(MPI_COMM_WORLD, &size));
793 len=YAP_SizeOfExportedTerm(str);
795 out = string2term(str,(
size_t*)&len);
797 if (!YAP_Unify(YAP_ARG2, out))
811my_bcast(YAP_Term t1,YAP_Term t2, YAP_Term t3) {
819 if(YAP_IsVarTerm(t2) || !YAP_IsIntTerm(t1) || !YAP_IsIntTerm(t3)) {
825 MPI_CALL(MPI_Comm_size(MPI_COMM_WORLD,&worldsize));
827 root = YAP_IntOfTerm(t1);
828 tag = YAP_IntOfTerm(t3);
831 for(k=0;k<=worldsize-1;++k)
835 if(MPI_CALL(MPI_Send( str, len, MPI_CHAR, k, tag, MPI_COMM_WORLD))!=MPI_SUCCESS) {
840 write_msg(__FUNCTION__,__FILE__,__LINE__,
"bcast2(%s,%u, MPI_CHAR,%d,%d)\n",str,len,k,tag);
851 return my_bcast(YAP_ARG1,YAP_ARG2,YAP_MkIntTerm(0));
863 return my_bcast(YAP_ARG1,YAP_ARG2,YAP_ARG3);
871my_ibcast(YAP_Term t1,YAP_Term t2, YAP_Term t3) {
881 if(YAP_IsVarTerm(t2) || !YAP_IsIntTerm(t1) || !YAP_IsIntTerm(t3)) {
888 MPI_CALL(MPI_Comm_size(MPI_COMM_WORLD,&worldsize));
890 root = YAP_IntOfTerm(t1);
891 tag = YAP_IntOfTerm(t3);
892 str = term2string(t2);
899 for(k=0;k<=worldsize-1;++k) {
901 MPI_Request *handle=(MPI_Request*)malloc(
sizeof(MPI_Request));
904 if(MPI_CALL(MPI_Isend(str, len, MPI_CHAR, k, tag, MPI_COMM_WORLD,handle))!=MPI_SUCCESS) {
909 new_broadcast_request(b,handle,str);
917#if defined(MPI_DEBUG) && defined(MALLINFO)
919 struct mallinfo s = mallinfo();
920 printf(
"%d: %d=%d/%d\n",getpid(),s.arena,s.uordblks,s.fordblks);
936 return my_ibcast(YAP_ARG1,YAP_ARG2,YAP_ARG3);
943 return my_ibcast(YAP_ARG1,YAP_ARG2,YAP_MkIntTerm(0));
959 if ( node==NULL)
return;
963 handle=INT2HANDLE(node->value);
964 MPI_CALL(MPI_Test( handle , &flag, &status ));
966 MPI_CALL(MPI_Wait(handle,&status));
968 write_msg(__FUNCTION__,__FILE__,__LINE__,
"Released handle...%s\n",(
char*)node->obj);
971 free_request(handle);
973 free_broadcast_request(handle);
983 init_hash_traversal(requests);
985 init_hash_traversal(broadcasts);
995mpi_default_buffer_size(
void)
997 if (!YAP_Unify(YAP_ARG1,YAP_MkIntTerm(BLOCK_SIZE))) {
1009 YAP_SetYAPFlag(YAP_MkAtomTerm(YAP_LookupAtom(
"readline")),
1010 YAP_MkAtomTerm(YAP_LookupAtom(
"false")));
1011 requests=new_hashtable(HASHSIZE);
1012 broadcasts=new_hashtable(HASHSIZE);
1014 YAP_UserCPredicate(
"mpi_init", mpi_init,0);
1016 YAP_UserCPredicate(
"mpi_init_rcv_thread", mpi_init_rcv_thread,1);
1018 YAP_UserCPredicate(
"mpi_finalize", mpi_finalize,0);
1019 YAP_UserCPredicate(
"mpi_comm_size", mpi_comm_size,1);
1020 YAP_UserCPredicate(
"mpi_comm_rank", mpi_comm_rank,1);
1021 YAP_UserCPredicate(
"mpi_version", mpi_version,2);
1022 YAP_UserCPredicate(
"mpi_get_processor_name", mpi_get_processor_name,1);
1023 YAP_UserCPredicate(
"mpi_send", mpi_send,3);
1024 YAP_UserCPredicate(
"mpi_isend",mpi_isend,4);
1025 YAP_UserCPredicate(
"mpi_recv", mpi_recv,3);
1026 YAP_UserCPredicate(
"mpi_irecv", mpi_irecv,3);
1027 YAP_UserCPredicate(
"mpi_wait", mpi_wait,2);
1028 YAP_UserCPredicate(
"mpi_wait_recv", mpi_wait_recv,3);
1029 YAP_UserCPredicate(
"mpi_test", mpi_test,2);
1030 YAP_UserCPredicate(
"mpi_test_recv", mpi_test_recv,3);
1031 YAP_UserCPredicate(
"mpi_bcast", mpi_bcast,2);
1032 YAP_UserCPredicate(
"mpi_bcast2", mpi_bcast2,2);
1033 YAP_UserCPredicate(
"mpi_bcast3", mpi_bcast3,3);
1042 YAP_UserCPredicate(
"mpi_ibcast", mpi_ibcast2,2);
1043 YAP_UserCPredicate(
"mpi_ibcast", mpi_ibcast3,3);
1053 YAP_UserCPredicate(
"mpi_barrier", mpi_barrier,0);
1054 YAP_UserCPredicate(
"mpi_gc", mpi_gc,0);
1055 YAP_UserCPredicate(
"mpi_default_buffer_size", mpi_default_buffer_size,2);
1069 YAP_UserCPredicate(
"mpi_stats", mpi_stats,7);
1070 YAP_UserCPredicate(
"mpi_reset_stats", mpi_reset_stats,0);
1077 fprintf(stderr,
"MPI module succesfully loaded.");
#define YAP_Deref(t)
X_API macro.
@ gc
controls garbage collection