42#include "prologterms2c.h"
54typedef struct broadcast_req BroadcastRequest;
58#define HANDLE2INT(ptr) (IDTYPE) ptr
59#define INT2HANDLE(id) (MPI_Request *)id
61#define BREQ2INT(ptr) (IDTYPE) ptr
62#define INT2BREQ(ptr) (BroadcastRequest *)ptr
64#define MPI_CALL(function) \
65 ((mpi_status = function) == MPI_SUCCESS ? MPI_SUCCESS : mpi_error(mpi_status))
74static YAP_Bool mpi_statuss[1024];
75#define mpi_status (mpi_statuss[YAP_ThreadSelf()])
77extern int GLOBAL_argc;
83static YAP_Functor FuncPrepare;
85X_API
void init_mpi(
void);
97static unsigned long bytes_sent;
98static unsigned long bytes_recv;
99static unsigned long num_msgs_sent;
100static unsigned long num_msgs_recv;
101static unsigned long max_s_recv_msg;
102static unsigned long max_s_sent_msg;
103static double total_time_spent;
106#define RESET_STATS() \
108 total_time_spent = 0; \
109 bytes_sent = bytes_recv = num_msgs_recv = num_msgs_sent = max_s_recv_msg = \
110 max_s_sent_msg = 0; \
112#define MSG_SENT(size) \
114 bytes_sent += size; \
116 if (max_s_sent_msg < size) \
117 max_s_sent_msg = size; \
119#define MSG_RECV(size) \
121 bytes_recv += size; \
123 if (max_s_recv_msg < size) \
124 max_s_recv_msg = size; \
127#define MPITIME total_time_spent
130#define CONT_TIMER() \
132#define PAUSE_TIMER() \
135 total_time_spent += tval(); \
144static struct timeval _tstarts[1024], _tends[1024];
146#define _tsart (_tstarts[YAP_ThreadSelf()])
147#define _tend (_tends[YAP_ThreadSelf()])
149#include <sys/resource.h>
155 getrusage(RUSAGE_SELF, &r);
156 _tstart = r.ru_utime;
160 getrusage(RUSAGE_SELF, &r);
165 double t1, t2, elapsed;
166 t1 = (double)_tstart.tv_sec + (
double)_tstart.tv_usec / (1000 * 1000);
167 t2 = (double)_tend.tv_sec + (
double)_tend.tv_usec / (1000 * 1000);
176static YAP_Bool mpi_stats(
void) {
177 fprintf(stderr,
"%f %ld %ld %ld %ld %ld %ld\n", MPITIME, num_msgs_recv,
178 bytes_recv, max_s_recv_msg, num_msgs_sent, bytes_sent,
180 return (YAP_Unify(YAP_ARG1, YAP_MkFloatTerm((
float)(MPITIME))) &&
181 YAP_Unify(YAP_ARG2, YAP_MkIntTerm((
long)num_msgs_recv)) &&
182 YAP_Unify(YAP_ARG3, YAP_MkIntTerm((
long)bytes_recv)) &&
183 YAP_Unify(YAP_ARG4, YAP_MkIntTerm((
long)max_s_recv_msg)) &&
184 YAP_Unify(YAP_ARG5, YAP_MkIntTerm((
long)num_msgs_sent)) &&
185 YAP_Unify(YAP_ARG6, YAP_MkIntTerm((
long)bytes_sent)) &&
186 YAP_Unify(YAP_ARG7, YAP_MkIntTerm((
long)max_s_sent_msg)));
191static YAP_Bool mpi_reset_stats(
void) {
201#define MSG_SENT(size)
202#define MSG_RECV(size)
210static int new_request(MPI_Request *handle,
void *ptr) {
211 return insere(requests, (ulong)HANDLE2INT(handle), ptr);
214static void *get_request(MPI_Request *handle) {
215 return get_object(requests, (ulong)HANDLE2INT(handle));
218static void free_request(MPI_Request *handle) {
220 ptr =
delete (requests, (ulong)HANDLE2INT(handle));
230static BroadcastRequest *new_broadcast(
void) {
231 BroadcastRequest *b = (BroadcastRequest *)malloc(
sizeof(BroadcastRequest));
242static void free_broadcast_request(MPI_Request *handle) {
244 b = (BroadcastRequest *)
delete (
245 broadcasts, (ulong)BREQ2INT(handle));
259static int new_broadcast_request(BroadcastRequest *b,
260 MPI_Request *handle,
void *ptr) {
265 return insere(broadcasts, (ulong)HANDLE2INT(handle), b);
269static YAP_Bool mpi_error(
int errcode) {
270 char err_msg[MPI_MAX_ERROR_STRING];
273 MPI_Error_string(errcode, &err_msg[0], &len);
276 write_msg(__FUNCTION__, __FILE__, __LINE__,
"MPI_Error: %s\n", err_msg);
287static YAP_Bool mpi_init(
void) {
290 int my_argc = YAP_Argv(&my_argv);
292 MPI_Init_thread(&my_argc, &my_argv, MPI_THREAD_MULTIPLE, &thread_level);
294 write_msg(__FUNCTION__, __FILE__, __LINE__,
"Thread level: %d\n",
309static YAP_Bool rcv_msg_thread(
char *handle_pred) {
310 YAP_Term pred = YAP_MkAtomTerm(YAP_LookupAtom(handle_pred));
314 write_msg(__FUNCTION__, __FILE__, __LINE__,
"Waiting for MPI msg\n");
315 if (MPI_CALL(MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD,
316 &status)) == MPI_SUCCESS) {
318 write_msg(__FUNCTION__, __FILE__, __LINE__,
"MPI Msg received\n");
319 YAP_CallProlog(pred);
321 write_msg(__FUNCTION__, __FILE__, __LINE__,
"Error in MPI_Probe\n");
328static YAP_Bool mpi_init_rcv_thread(
void) {
332 char *arg =
"handle_msg";
334 MPI_Init_thread(&GLOBAL_argc, &GLOBAL_argv, MPI_THREAD_SINGLE, &thread_level);
335 if (pthread_create(&thread, NULL, (
void *)&rcv_msg_thread, arg)) {
339 pthread_detach(thread);
340 write_msg(__FUNCTION__, __FILE__, __LINE__,
"Thread level: %d\n",
350static YAP_Bool mpi_finalize(
void) {
358static YAP_Bool mpi_comm_size(
void) {
360 MPI_CALL(MPI_Comm_size(MPI_COMM_WORLD, &size));
361 return (YAP_Unify(YAP_ARG1, YAP_MkIntTerm(size)));
367static YAP_Bool mpi_comm_rank(
void) {
369 MPI_CALL(MPI_Comm_rank(MPI_COMM_WORLD, &rank));
370 return (YAP_Unify(YAP_ARG1, YAP_MkIntTerm(rank)));
376static YAP_Bool mpi_version(
void) {
379 MPI_CALL(MPI_Get_version(&major, &minor));
380 return (YAP_Unify(YAP_ARG1, YAP_MkIntTerm(major)) &&
381 YAP_Unify(YAP_ARG2, YAP_MkIntTerm(minor)));
387static YAP_Bool mpi_get_processor_name(
void) {
388 char name[MPI_MAX_PROCESSOR_NAME];
390 MPI_CALL(MPI_Get_processor_name(name, &length));
391 return (YAP_Unify(YAP_ARG1, YAP_MkAtomTerm(YAP_LookupAtom(name))));
401static YAP_Bool mpi_isend(
void) {
407 MPI_Request *handle = (MPI_Request *)malloc(
sizeof(MPI_Request));
413 if (YAP_IsVarTerm(t1) || !YAP_IsIntTerm(t2) || !YAP_IsIntTerm(t3) ||
414 !YAP_IsVarTerm(t4)) {
419 dest = YAP_IntOfTerm(t2);
420 tag = YAP_IntOfTerm(t3);
422 str = term2string( t1);
423 MSG_SENT(strlen(str)+1);
425 if (MPI_CALL(MPI_Isend(str, len, MPI_CHAR, dest, tag, MPI_COMM_WORLD,
426 handle)) != MPI_SUCCESS) {
432 write_msg(__FUNCTION__, __FILE__, __LINE__,
"%s(%s,%u, MPI_CHAR,%d,%d)\n",
433 __FUNCTION__, str, len, dest, tag);
438 new_request(handle, str);
441 YAP_ARG4, YAP_MkIntTerm(HANDLE2INT(handle))));
448static YAP_Bool mpi_send(
void) {
456 if (YAP_IsVarTerm(t1) || !YAP_IsIntTerm(t2) || !YAP_IsIntTerm(t3)) {
462 dest = YAP_IntOfTerm(t2);
463 tag = YAP_IntOfTerm(t3);
465 str = term2string( t1);
467#if defined(MPI_DEBUG) && 0
468 write_msg(__FUNCTION__, __FILE__, __LINE__,
"%s(%s,%u, MPI_CHAR,%d,%d)\n",
469 __FUNCTION__, str, len, dest, tag);
472 val = (MPI_CALL(MPI_Send(str, len, MPI_CHAR, dest, tag, MPI_COMM_WORLD)) ==
484static YAP_Bool mpi_recv(
void) {
492 if (!YAP_IsVarTerm(t3)) {
498 if (YAP_IsVarTerm(t1))
499 orig = MPI_ANY_SOURCE;
500 else if (!YAP_IsIntTerm(t1))
503 orig = YAP_IntOfTerm(t1);
507 if (YAP_IsVarTerm(t2))
509 else if (!YAP_IsIntTerm(t2))
512 tag = YAP_IntOfTerm(t2);
516 if (MPI_CALL(MPI_Probe(orig, tag, MPI_COMM_WORLD, &status)) != MPI_SUCCESS) {
520 if (MPI_CALL(MPI_Get_count(&status, MPI_CHAR, &len)) != MPI_SUCCESS ||
521 status.MPI_TAG == MPI_UNDEFINED || status.MPI_SOURCE == MPI_UNDEFINED) {
526 change_buffer_size((
size_t)(len + 1));
529 if (orig == MPI_ANY_SOURCE) {
530 orig = status.MPI_SOURCE;
531 if (!YAP_Unify(t1, YAP_MkIntTerm(orig))) {
537 if (tag == MPI_ANY_TAG) {
538 tag = status.MPI_TAG;
539 if (!YAP_Unify(t2, YAP_MkIntTerm(status.MPI_TAG))) {
545 if (MPI_CALL(MPI_Recv(BUFFER_PTR, BUFFER_LEN, MPI_CHAR, orig, tag,
546 MPI_COMM_WORLD, &status)) != MPI_SUCCESS) {
554 write_msg(__FUNCTION__, __FILE__, __LINE__,
"%s(%s,%u, MPI_CHAR,%d,%d)\n",
555 __FUNCTION__, BUFFER_PTR, BUFFER_LEN, orig, tag);
557 MSG_RECV(BUFFER_LEN);
558 t4 = string2term(BUFFER_PTR, &BUFFER_LEN);
560 return (YAP_Unify(YAP_ARG3, t4));
567static YAP_Bool mpi_irecv(
void) {
571 MPI_Request *mpi_req = (MPI_Request *)malloc(
sizeof(MPI_Request));
574 if (!YAP_IsVarTerm(t3)) {
581 if (YAP_IsVarTerm(t1))
582 orig = MPI_ANY_SOURCE;
583 else if (!YAP_IsIntTerm(t1))
586 orig = YAP_IntOfTerm(t1);
590 if (YAP_IsVarTerm(t2))
592 else if (!YAP_IsIntTerm(t2))
595 tag = YAP_IntOfTerm(t2);
599 if (MPI_CALL(MPI_Irecv(BUFFER_PTR, BLOCK_SIZE, MPI_CHAR, orig, tag,
600 MPI_COMM_WORLD, mpi_req)) != MPI_SUCCESS) {
604 new_request(mpi_req, BUFFER_PTR);
607 return YAP_Unify(t3, YAP_MkIntTerm(HANDLE2INT(mpi_req)));
618static YAP_Bool mpi_wait(
void) {
625 if (!YAP_IsIntTerm(t1)) {
628 handle = INT2HANDLE(YAP_IntOfTerm(t1));
631 if (MPI_CALL(MPI_Wait(handle, &status)) != MPI_SUCCESS) {
635 free_request(handle);
637 return (YAP_Unify(t2, YAP_MkIntTerm(status.MPI_ERROR)));
648static YAP_Bool mpi_test(
void) {
656 if (!YAP_IsIntTerm(t1)) {
661 handle = INT2HANDLE(YAP_IntOfTerm(t1));
663 MPI_CALL(MPI_Test(handle, &flag, &status));
668 free_request(handle);
670 return (YAP_Unify(t2, YAP_MkIntTerm(status.MPI_ERROR)));
682static YAP_Bool mpi_wait_recv(
void) {
691 if (!YAP_IsIntTerm(t1)) {
696 handle = INT2HANDLE(YAP_IntOfTerm(t1));
697 s = (
char *)get_request(handle);
699 if (MPI_CALL(MPI_Wait(handle, &status)) != MPI_SUCCESS) {
704 if (MPI_CALL(MPI_Probe(0, 0, MPI_COMM_WORLD, &status)) != MPI_SUCCESS) {
708 if (MPI_CALL(MPI_Get_count(&status, MPI_CHAR, &len)) != MPI_SUCCESS ||
709 status.MPI_TAG == MPI_UNDEFINED || status.MPI_SOURCE == MPI_UNDEFINED) {
715 change_buffer_size((
size_t)(len + 1));
719 if (MPI_CALL(MPI_Irecv(BUFFER_PTR, BLOCK_SIZE, MPI_CHAR, 0, 0,
720 MPI_COMM_WORLD, handle)) != MPI_SUCCESS) {
725 out = string2term(BUFFER_PTR, (
size_t *)NULL);
729 free_request(handle);
731 ret = YAP_Unify(YAP_ARG3, out);
732 return (ret & YAP_Unify(YAP_ARG2, YAP_MkIntTerm(status.MPI_ERROR)));
742static YAP_Bool mpi_test_recv(
void) {
752 if (!YAP_IsIntTerm(t1)) {
757 handle = INT2HANDLE(YAP_IntOfTerm(t1));
759 if (MPI_CALL(MPI_Test(handle, &flag, &status)) != MPI_SUCCESS) {
763 s = (
char *)get_request(handle);
765 out = string2term(s, (
size_t *)&len);
767 ret = YAP_Unify(YAP_ARG3, out);
768 free_request(handle);
770 return (ret & YAP_Unify(YAP_ARG2, YAP_MkIntTerm(status.MPI_ERROR)));
777static YAP_Bool mpi_barrier(
void) {
779 int ret = MPI_CALL(MPI_Barrier(MPI_COMM_WORLD));
781 return (ret == MPI_SUCCESS ?
true :
false);
795static YAP_Bool mpi_bcast_3(
int root, YAP_Term data,
int tag) {
801 MPI_CALL(MPI_Comm_rank(MPI_COMM_WORLD, &rank));
805 str = term2string(data);
808 write_msg(__FUNCTION__, __FILE__, __LINE__,
809 "mpi_bcast(%s,%u, MPI_CHAR,%d)\n", str, len, root);
811 if (len >= BLOCK_SIZE) {
812 YAP_Term t = YAP_MkIntTerm(len+1);
813 t = YAP_MkApplTerm(FuncPrepare,1,&t);
814 mpi_bcast_3(root,t,0);
816 val = (MPI_CALL(MPI_Bcast(str, len, MPI_CHAR, root, MPI_COMM_WORLD)) ==
830 MPI_CALL(MPI_Comm_size(MPI_COMM_WORLD, &size));
831 MSG_SENT(len * size);
836 val = (MPI_CALL(MPI_Bcast(str, len, MPI_CHAR, root, MPI_COMM_WORLD)) ==
840 len = YAP_SizeOfExportedTerm(str);
842 out = string2term(str, (
size_t *)&len);
843 if (YAP_IsApplTerm(out) && YAP_FunctorOfTerm(out)==FuncPrepare) {
844 len = YAP_IntOfTerm(YAP_ArgOfTerm(1,out));
846 val = (MPI_CALL(MPI_Bcast(str, len, MPI_CHAR, root, MPI_COMM_WORLD)) ==
850 out = string2term(str, (
size_t *)&len);
851 if (str != BUFFER_PTR)
855 if (!YAP_Unify(YAP_ARG2, out))
869static YAP_Bool my_bcast(YAP_Term t1, YAP_Term t2, YAP_Term t3) {
877 if (YAP_IsVarTerm(t2) || !YAP_IsIntTerm(t1) || !YAP_IsIntTerm(t3)) {
883 MPI_CALL(MPI_Comm_size(MPI_COMM_WORLD, &worldsize));
885 root = YAP_IntOfTerm(t1);
886 tag = YAP_IntOfTerm(t3);
887 str = term2string( t2);
889 return mpi_bcast_3(root, t2, tag);
895static YAP_Bool mpi_bcast2(
void) {
896 return my_bcast(YAP_ARG1, YAP_ARG2, YAP_MkIntTerm(0));
907static YAP_Bool mpi_bcast3(
void) {
908 return my_bcast(YAP_ARG1, YAP_ARG2, YAP_ARG3);
918static YAP_Bool mpi_ibcast3(
void) {
919 return my_bcast(YAP_ARG1, YAP_ARG2, YAP_ARG3);
924static YAP_Bool mpi_ibcast2(
void) {
925 return my_bcast(YAP_ARG1, YAP_ARG2, YAP_MkIntTerm(0));
940 node = (
hashnode *)next_hashnode(ht);
946 handle = INT2HANDLE(node->value);
947 MPI_CALL(MPI_Test(handle, &flag, &status));
949 MPI_CALL(MPI_Wait(handle, &status));
951 write_msg(__FUNCTION__, __FILE__, __LINE__,
"Released handle...%s\n",
955 free_request(handle);
957 free_broadcast_request(handle);
963static YAP_Bool mpi_gc(
void) {
967 init_hash_traversal(requests);
969 init_hash_traversal(broadcasts);
977size_t BLOCK_SIZE = 4 * 1024;
979static YAP_Bool mpi_default_buffer_size(
void) {
981 intptr_t IBLOCK_SIZE;
982 if (!YAP_Unify(YAP_ARG1, YAP_MkIntTerm(BLOCK_SIZE)))
985 if (YAP_IsVarTerm(t2))
987 if (!YAP_IsIntTerm(t2))
989 IBLOCK_SIZE = YAP_IntOfTerm(t2);
990 if (IBLOCK_SIZE < 0) {
991 IBLOCK_SIZE = 4 * 1024;
994 BLOCK_SIZE = IBLOCK_SIZE;
1001X_API
void init_mpi(
void) {
1003 requests = new_hashtable(HASHSIZE);
1004 broadcasts = new_hashtable(HASHSIZE);
1005 FuncPrepare = YAP_MkFunctor(YAP_LookupAtom(
"$<<prepare>>$"),1);
1007 YAP_UserCPredicate(
"mpi_init", mpi_init, 0);
1009 YAP_UserCPredicate(
"mpi_init_rcv_thread", mpi_init_rcv_thread,
1012 YAP_UserCPredicate(
"mpi_finalize", mpi_finalize, 0);
1013 YAP_UserCPredicate(
"mpi_comm_size", mpi_comm_size, 1);
1014 YAP_UserCPredicate(
"mpi_comm_rank", mpi_comm_rank, 1);
1015 YAP_UserCPredicate(
"mpi_version", mpi_version,
1017 YAP_UserCPredicate(
"mpi_get_processor_name", mpi_get_processor_name,
1019 YAP_UserCPredicate(
"mpi_send", mpi_send,
1021 YAP_UserCPredicate(
"mpi_isend", mpi_isend, 4);
1022 YAP_UserCPredicate(
"mpi_recv", mpi_recv, 3);
1023 YAP_UserCPredicate(
"mpi_irecv", mpi_irecv,
1025 YAP_UserCPredicate(
"mpi_wait", mpi_wait, 2);
1026 YAP_UserCPredicate(
"mpi_wait_recv", mpi_wait_recv,
1028 YAP_UserCPredicate(
"mpi_test", mpi_test, 2);
1029 YAP_UserCPredicate(
"mpi_test_recv", mpi_test_recv,
1031 YAP_UserCPredicate(
"mpi_bcast2", mpi_bcast2, 2);
1032 YAP_UserCPredicate(
"mpi_bcast", mpi_bcast3, 3);
1041 YAP_UserCPredicate(
"mpi_ibcast", mpi_ibcast2, 2);
1042 YAP_UserCPredicate(
"mpi_ibcast", mpi_ibcast3, 3);
1052 YAP_UserCPredicate(
"mpi_barrier", mpi_barrier, 0);
1053 YAP_UserCPredicate(
"mpi_gc", mpi_gc, 0);
1054 YAP_UserCPredicate(
"mpi_default_buffer_size", mpi_default_buffer_size,
1070 "mpi_stats", mpi_stats,
1072 YAP_UserCPredicate(
"mpi_reset_stats", mpi_reset_stats,
1085 fprintf(stderr,
"MPI module succesfully loaded.");
#define YAP_Deref(t)
X_API macro.
@ gc
controls garbage collection