YAP 7.1.0
yap_mpi.c
1/*
2 Copyright (C) 2004,2005,2006 (Nuno A. Fonseca) <nuno.fonseca@gmail.com>
3
4 This program is free software; you can redistribute it and/or
5modify it under the terms of the GNU General Public License
6as published by the Free Software Foundation; either
7version 2 of the License, or (at your option) any later
8version.
9
10This program is distributed in the hope that it will be useful,
11but WITHOUT ANY WxuARRANTY; without even the implied warranty of
12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13GNU General Public License for more details.
14
15You should have received a copy of the GNU General Public License
16along with this program; if not, write to the Free Software
17Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18
19
20Last rev: $Id: yap_mpi.c,v 1.4 2006-09-28 11:42:51 vsc Exp $
21Comments: YAP interface to LAM/MPI
22*/
23#include "YapConfig.h"
24#include <stdio.h>
25#include <stdlib.h>
26#if HAVE_STRING_H
27#include <string.h>
28#endif
29#if HAVE_MALLOC_H
30#include <malloc.h>
31#endif
32#if HAVE_UNISTD_H
33#include <unistd.h>
34#endif
35#if HAVE_SYS_TIMES_H
36#include <sys/times.h>
37#endif
38
39#if HAVE_MPI_H
40#include <mpi.h>
41
42#include "prologterms2c.h"
43
44#include <YapInterface.h>
45
46#include "hash.h"
47
48/*********************************************************************/
49struct broadcast_req {
50 void
51 *ptr; // pointer to an allocated memory buffer associated to the broadcast
52 int nreq; // number of requests associated to the broadcast
53};
54typedef struct broadcast_req BroadcastRequest;
55
56/*********************************************************************/
57#define IDTYPE long
58#define HANDLE2INT(ptr) (IDTYPE) ptr
59#define INT2HANDLE(id) (MPI_Request *)id
60
61#define BREQ2INT(ptr) (IDTYPE) ptr
62#define INT2BREQ(ptr) (BroadcastRequest *)ptr
63
64#define MPI_CALL(function) \
65 ((mpi_status = function) == MPI_SUCCESS ? MPI_SUCCESS : mpi_error(mpi_status))
66
67#ifdef USE_THREADS
68#include <pthread.h>
69#endif
70
71/********************************************************************
72 * Auxiliary data
73v ********************************************************************/
74static YAP_Bool mpi_statuss[1024];
75#define mpi_status (mpi_statuss[YAP_ThreadSelf()])
76
77extern int GLOBAL_argc;
78
79#define HASHSIZE 1777
80static hashtable requests = NULL;
81static hashtable broadcasts = NULL;
82
83static YAP_Functor FuncPrepare;
84
85X_API void init_mpi(void);
86
87
88/********************************************************************
89 * Time accounting
90 ********************************************************************/
91#ifdef MPISTATS
92
93#include <sys/time.h>
94#include <time.h>
95
96/* Statistics */
97static unsigned long bytes_sent; // bytes received (mpi headers are ignored)
98static unsigned long bytes_recv; // bytes received
99static unsigned long num_msgs_sent; // number of messages sent
100static unsigned long num_msgs_recv; // number of messages received
101static unsigned long max_s_recv_msg; // maximum size of a message received
102static unsigned long max_s_sent_msg; // maximum size of a message sent
103static double total_time_spent; // total time spend in communication code
104
105/* MSG ACCOUNTING */
106#define RESET_STATS() \
107 { \
108 total_time_spent = 0; \
109 bytes_sent = bytes_recv = num_msgs_recv = num_msgs_sent = max_s_recv_msg = \
110 max_s_sent_msg = 0; \
111 }
112#define MSG_SENT(size) \
113 { \
114 bytes_sent += size; \
115 ++num_msgs_sent; \
116 if (max_s_sent_msg < size) \
117 max_s_sent_msg = size; \
118 }
119#define MSG_RECV(size) \
120 { \
121 bytes_recv += size; \
122 ++num_msgs_recv; \
123 if (max_s_recv_msg < size) \
124 max_s_recv_msg = size; \
125 }
126
127#define MPITIME total_time_spent
128
129/* Timer */
130#define CONT_TIMER() \
131 { tstart(); }
132#define PAUSE_TIMER() \
133 { \
134 tend(); \
135 total_time_spent += tval(); \
136 }
137
138#define RETURN(p) \
139 { \
140 PAUSE_TIMER(); \
141 return (p); \
142 }
143
144static struct timeval _tstarts[1024], _tends[1024];
145
146#define _tsart (_tstarts[YAP_ThreadSelf()])
147#define _tend (_tends[YAP_ThreadSelf()])
148
149#include <sys/resource.h>
150#include <sys/time.h>
151#include <unistd.h>
152
153void tstart(void) {
154 struct rusage r;
155 getrusage(RUSAGE_SELF, &r);
156 _tstart = r.ru_utime;
157}
158void tend(void) {
159 struct rusage r;
160 getrusage(RUSAGE_SELF, &r);
161 _tend = r.ru_utime;
162}
163//
164double tval() {
165 double t1, t2, elapsed;
166 t1 = (double)_tstart.tv_sec + (double)_tstart.tv_usec / (1000 * 1000);
167 t2 = (double)_tend.tv_sec + (double)_tend.tv_usec / (1000 * 1000);
168 elapsed = t2 - t1;
169 if (elapsed == 0)
170 return 0.000001;
171 return elapsed;
172}
173/*
174 * returns the statistics
175 */
176static YAP_Bool mpi_stats(void) {
177 fprintf(stderr, "%f %ld %ld %ld %ld %ld %ld\n", MPITIME, num_msgs_recv,
178 bytes_recv, max_s_recv_msg, num_msgs_sent, bytes_sent,
179 max_s_sent_msg);
180 return (YAP_Unify(YAP_ARG1, YAP_MkFloatTerm((float)(MPITIME))) &&
181 YAP_Unify(YAP_ARG2, YAP_MkIntTerm((long)num_msgs_recv)) &&
182 YAP_Unify(YAP_ARG3, YAP_MkIntTerm((long)bytes_recv)) &&
183 YAP_Unify(YAP_ARG4, YAP_MkIntTerm((long)max_s_recv_msg)) &&
184 YAP_Unify(YAP_ARG5, YAP_MkIntTerm((long)num_msgs_sent)) &&
185 YAP_Unify(YAP_ARG6, YAP_MkIntTerm((long)bytes_sent)) &&
186 YAP_Unify(YAP_ARG7, YAP_MkIntTerm((long)max_s_sent_msg)));
187}
188/*
189 *
190 */
191static YAP_Bool mpi_reset_stats(void) {
192 RESET_STATS();
193 return true;
194}
195#else
196
197#define PAUSE_TIMER()
198#define CONT_TIMER()
199
200#define RESET_STATS()
201#define MSG_SENT(size)
202#define MSG_RECV(size)
203//#define return (p){return (p); }
204#endif
205
206/********************************************************************
207 * Functions to store/fetch/delete requests
208 ********************************************************************/
209
210static int new_request(MPI_Request *handle, void *ptr) {
211 return insere(requests, (ulong)HANDLE2INT(handle), ptr);
212}
213
214static void *get_request(MPI_Request *handle) {
215 return get_object(requests, (ulong)HANDLE2INT(handle));
216}
217
218static void free_request(MPI_Request *handle) {
219 void *ptr;
220 ptr = delete (requests, (ulong)HANDLE2INT(handle));
221 free(ptr);
222 free(handle);
223}
224/********************************************************************
225 * Functions to store/fetch/delete broadcast requests
226 ********************************************************************/
227/*
228 * Returns a new BroadcastRequest object
229 */
230static BroadcastRequest *new_broadcast(void) {
231 BroadcastRequest *b = (BroadcastRequest *)malloc(sizeof(BroadcastRequest));
232 if (b != NULL) {
233 b->ptr = NULL;
234 b->nreq = 0;
235 }
236 // write_msg(__FUNCTION__,__FILE__,__LINE__,"new broadcast: %p\n",b);
237 return b;
238}
239/*
240 *
241 */
242static void free_broadcast_request(MPI_Request *handle) {
243 BroadcastRequest *b;
244 b = (BroadcastRequest *)delete (
245 broadcasts, (ulong)BREQ2INT(handle)); // get the ptr to broadcast object
246 b->nreq--;
247 if (!b->nreq) {
248 // all requests received
249 free(b->ptr);
250 free(b);
251 }
252 // write_msg(__FUNCTION__,__FILE__,__LINE__,"free broadcast_request:
253 // %p->%p\n",b,handle);
254 free(handle);
255}
256/*
257 *
258 */
259static int new_broadcast_request(BroadcastRequest *b,
260 MPI_Request *handle, void *ptr) {
261 b->ptr = ptr;
262 b->nreq++;
263 // write_msg(__FUNCTION__,__FILE__,__LINE__,"new broadcast_request:
264 // %p->%p\n",b,handle);
265 return insere(broadcasts, (ulong)HANDLE2INT(handle), b);
266}
267
268/*********************************************************************/
269static YAP_Bool mpi_error(int errcode) {
270 char err_msg[MPI_MAX_ERROR_STRING];
271 int len;
272
273 MPI_Error_string(errcode, &err_msg[0], &len);
274 err_msg[len] = '\0';
275#ifdef MPI_DEBUG
276 write_msg(__FUNCTION__, __FILE__, __LINE__, "MPI_Error: %s\n", err_msg);
277#endif
278 return errcode;
279}
280/********************************************************************
281
282 ********************************************************************/
283/*
284 * Sets up the mpi enviromment. This function should be called before any other
285 * MPI function.
286 */
287static YAP_Bool mpi_init(void) {
288 int thread_level;
289 char **my_argv;
290 int my_argc = YAP_Argv(&my_argv);
291 // MPI_Init(&GLOBAL_argc, &GLOBAL_argv);
292 MPI_Init_thread(&my_argc, &my_argv, MPI_THREAD_MULTIPLE, &thread_level);
293#ifdef MPI_DEBUG
294 write_msg(__FUNCTION__, __FILE__, __LINE__, "Thread level: %d\n",
295 thread_level);
296#endif
297#ifdef MPISTATS
298 RESET_STATS();
299#endif
300 return true;
301}
302
303#ifdef USE_THREADS
304/*
305 * Sets up the mpi enviromment. This function should be called before any other
306 * MPI function. the argument is the name of the predicate that will be invoked
307 * when a message is received
308 */
309static YAP_Bool rcv_msg_thread(char *handle_pred) {
310 YAP_Term pred = YAP_MkAtomTerm(YAP_LookupAtom(handle_pred));
311 MPI_Status status;
312
313 while (1) {
314 write_msg(__FUNCTION__, __FILE__, __LINE__, "Waiting for MPI msg\n");
315 if (MPI_CALL(MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD,
316 &status)) == MPI_SUCCESS) {
317 // call handle
318 write_msg(__FUNCTION__, __FILE__, __LINE__, "MPI Msg received\n");
319 YAP_CallProlog(pred);
320 } else
321 write_msg(__FUNCTION__, __FILE__, __LINE__, "Error in MPI_Probe\n");
322 }
323 return 1;
324}
325/*
326 *
327 */
328static YAP_Bool mpi_init_rcv_thread(void) {
329 int thread_level;
330 // MPI_Init(&GLOBAL_argc, &GLOBAL_argv);
331 pthread_t thread;
332 char *arg = "handle_msg";
333
334 MPI_Init_thread(&GLOBAL_argc, &GLOBAL_argv, MPI_THREAD_SINGLE, &thread_level);
335 if (pthread_create(&thread, NULL, (void *)&rcv_msg_thread, arg)) {
336 return false;
337 }
338
339 pthread_detach(thread);
340 write_msg(__FUNCTION__, __FILE__, __LINE__, "Thread level: %d\n",
341 thread_level);
342 return true;
343}
344#endif
345
346/*
347 *Terminates the MPI execution enviroment. Every process must call this function
348 *before exiting. mpi_comm_finalize.
349 */
350static YAP_Bool mpi_finalize(void) {
351 (MPI_Finalize());
352 return true;
353}
354/*
355 * Returns the number of workers associated to the MPI_COMM_WORLD communicator.
356 * mpi_comm_size(-Size).
357 */
358static YAP_Bool mpi_comm_size(void) {
359 int size;
360 MPI_CALL(MPI_Comm_size(MPI_COMM_WORLD, &size));
361 return (YAP_Unify(YAP_ARG1, YAP_MkIntTerm(size)));
362}
363/*
364 * Returns the rank of the current process.
365 * mpi_comm_rank(-Rank).
366 */
367static YAP_Bool mpi_comm_rank(void) {
368 int rank;
369 MPI_CALL(MPI_Comm_rank(MPI_COMM_WORLD, &rank));
370 return (YAP_Unify(YAP_ARG1, YAP_MkIntTerm(rank)));
371}
372/*
373 * Returns the major and minor version of MPI.
374 * mpi_version(-Major,-Minor).
375 */
376static YAP_Bool mpi_version(void) {
377 int major, minor;
378
379 MPI_CALL(MPI_Get_version(&major, &minor));
380 return (YAP_Unify(YAP_ARG1, YAP_MkIntTerm(major)) &&
381 YAP_Unify(YAP_ARG2, YAP_MkIntTerm(minor)));
382}
383/*
384 *
385 *
386 */
387static YAP_Bool mpi_get_processor_name(void) {
388 char name[MPI_MAX_PROCESSOR_NAME];
389 int length;
390 MPI_CALL(MPI_Get_processor_name(name, &length));
391 return (YAP_Unify(YAP_ARG1, YAP_MkAtomTerm(YAP_LookupAtom(name))));
392}
393/*
394 * Non blocking communication function. The message is sent when possible. To
395 * check for the status of the message, the mpi_wait and mpi_test should be
396 * used. Until mpi_wait is called, the memory allocated for the buffer
397 * containing the message is not released.
398 *
399 * mpi_isend(+Data, +Destination, +Tag, -Handle).
400 */
401static YAP_Bool mpi_isend(void) {
402 YAP_Term t1 = YAP_Deref(YAP_ARG1), t2 = YAP_Deref(YAP_ARG2),
403 t3 = YAP_Deref(YAP_ARG3), t4 = YAP_Deref(YAP_ARG4);
404 char *str = NULL;
405 int dest, tag;
406 size_t len = 0;
407 MPI_Request *handle = (MPI_Request *)malloc(sizeof(MPI_Request));
408
409 CONT_TIMER();
410 if (handle == NULL)
411 return false;
412
413 if (YAP_IsVarTerm(t1) || !YAP_IsIntTerm(t2) || !YAP_IsIntTerm(t3) ||
414 !YAP_IsVarTerm(t4)) {
415 PAUSE_TIMER();
416 return false;
417 }
418 //
419 dest = YAP_IntOfTerm(t2);
420 tag = YAP_IntOfTerm(t3);
421 //
422 str = term2string( t1);
423 MSG_SENT(strlen(str)+1);
424 // send the data
425 if (MPI_CALL(MPI_Isend(str, len, MPI_CHAR, dest, tag, MPI_COMM_WORLD,
426 handle)) != MPI_SUCCESS) {
427 PAUSE_TIMER();
428 return false;
429 }
430
431#ifdef MPI_DEBUG
432 write_msg(__FUNCTION__, __FILE__, __LINE__, "%s(%s,%u, MPI_CHAR,%d,%d)\n",
433 __FUNCTION__, str, len, dest, tag);
434#endif
435 USED_BUFFER(); // informs the prologterm2c module that the buffer is now used
436 // and should not be messed
437 // We must associate the string to each handle
438 new_request(handle, str);
439 PAUSE_TIMER();
440 return (YAP_Unify(
441 YAP_ARG4, YAP_MkIntTerm(HANDLE2INT(handle)))); // it should always succeed
442}
443
444/*
445 * Blocking communication function. The message is sent immediatly.
446 * mpi_send(+Data, +Destination, +Tag).
447 */
448static YAP_Bool mpi_send(void) {
449
450 YAP_Term t1 = YAP_Deref(YAP_ARG1), t2 = YAP_Deref(YAP_ARG2),
451 t3 = YAP_Deref(YAP_ARG3);
452 char *str = NULL;
453 int dest, tag;
454 size_t len = 0;
455 int val;
456 if (YAP_IsVarTerm(t1) || !YAP_IsIntTerm(t2) || !YAP_IsIntTerm(t3)) {
457 return false;
458 }
459
460 CONT_TIMER();
461 //
462 dest = YAP_IntOfTerm(t2);
463 tag = YAP_IntOfTerm(t3);
464 // the data is packaged as a string
465 str = term2string( t1);
466 len = strlen(str)+1;
467#if defined(MPI_DEBUG) && 0
468 write_msg(__FUNCTION__, __FILE__, __LINE__, "%s(%s,%u, MPI_CHAR,%d,%d)\n",
469 __FUNCTION__, str, len, dest, tag);
470#endif
471 // send the data
472 val = (MPI_CALL(MPI_Send(str, len, MPI_CHAR, dest, tag, MPI_COMM_WORLD)) ==
473 MPI_SUCCESS
474 ? true
475 : false);
476
477 PAUSE_TIMER();
478 return (val);
479}
480/*
481 * Implements a blocking receive operation.
482 * mpi_recv(?Source,?Tag,-Data).
483 */
484static YAP_Bool mpi_recv(void) {
485 YAP_Term t1 = YAP_Deref(YAP_ARG1), t2 = YAP_Deref(YAP_ARG2),
486 t3 = YAP_Deref(YAP_ARG3), t4;
487 int tag, orig;
488 int len = 0;
489 MPI_Status status;
490
491 // The third argument (data) must be unbound
492 if (!YAP_IsVarTerm(t3)) {
493 return false;
494 }
495 /* The first argument (Source) must be bound to an integer
496 (the rank of the source) or left unbound (i.e. any source
497 is OK) */
498 if (YAP_IsVarTerm(t1))
499 orig = MPI_ANY_SOURCE;
500 else if (!YAP_IsIntTerm(t1))
501 return false;
502 else
503 orig = YAP_IntOfTerm(t1);
504
505 /* The second argument must be bound to an integer (the tag)
506 or left unbound (i.e. any tag is OK) */
507 if (YAP_IsVarTerm(t2))
508 tag = MPI_ANY_TAG;
509 else if (!YAP_IsIntTerm(t2))
510 return false;
511 else
512 tag = YAP_IntOfTerm(t2);
513
514 CONT_TIMER();
515 // probe for term' size
516 if (MPI_CALL(MPI_Probe(orig, tag, MPI_COMM_WORLD, &status)) != MPI_SUCCESS) {
517 PAUSE_TIMER();
518 return false;
519 }
520 if (MPI_CALL(MPI_Get_count(&status, MPI_CHAR, &len)) != MPI_SUCCESS ||
521 status.MPI_TAG == MPI_UNDEFINED || status.MPI_SOURCE == MPI_UNDEFINED) {
522 PAUSE_TIMER();
523 return false;
524 }
525 // realloc memory buffer
526 change_buffer_size((size_t)(len + 1));
527 BUFFER_LEN = len;
528 // Already know the source from MPI_Probe()
529 if (orig == MPI_ANY_SOURCE) {
530 orig = status.MPI_SOURCE;
531 if (!YAP_Unify(t1, YAP_MkIntTerm(orig))) {
532 PAUSE_TIMER();
533 return false;
534 }
535 }
536 // Already know the tag from MPI_Probe()
537 if (tag == MPI_ANY_TAG) {
538 tag = status.MPI_TAG;
539 if (!YAP_Unify(t2, YAP_MkIntTerm(status.MPI_TAG))) {
540 PAUSE_TIMER();
541 return false;
542 }
543 }
544 // Receive the message as a string
545 if (MPI_CALL(MPI_Recv(BUFFER_PTR, BUFFER_LEN, MPI_CHAR, orig, tag,
546 MPI_COMM_WORLD, &status)) != MPI_SUCCESS) {
547 /* Getting in here should never happen; it means that the first
548 package (containing size) was sent properly, but there was a glitch with
549 the actual content! */
550 PAUSE_TIMER();
551 return false;
552 }
553#ifdef MPI_DEBUG
554 write_msg(__FUNCTION__, __FILE__, __LINE__, "%s(%s,%u, MPI_CHAR,%d,%d)\n",
555 __FUNCTION__, BUFFER_PTR, BUFFER_LEN, orig, tag);
556#endif
557 MSG_RECV(BUFFER_LEN);
558 t4 = string2term(BUFFER_PTR, &BUFFER_LEN);
559 PAUSE_TIMER();
560 return (YAP_Unify(YAP_ARG3, t4));
561}
562
563/*
564 * Implements a non-blocking receive operation.
565 * mpi_irecv(?Source,?Tag,-Handle).
566 */
567static YAP_Bool mpi_irecv(void) {
568 YAP_Term t1 = YAP_Deref(YAP_ARG1), t2 = YAP_Deref(YAP_ARG2),
569 t3 = YAP_Deref(YAP_ARG3);
570 int tag, orig;
571 MPI_Request *mpi_req = (MPI_Request *)malloc(sizeof(MPI_Request));
572
573 // The third argument (data) must be unbound
574 if (!YAP_IsVarTerm(t3)) {
575 // Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_receive");
576 return false;
577 }
578 /* The first argument (Source) must be bound to an integer
579 (the rank of the source) or left unbound (i.e. any source
580 is OK) */
581 if (YAP_IsVarTerm(t1))
582 orig = MPI_ANY_SOURCE;
583 else if (!YAP_IsIntTerm(t1))
584 return false;
585 else
586 orig = YAP_IntOfTerm(t1);
587
588 /* The third argument must be bound to an integer (the tag)
589 or left unbound (i.e. any tag is OK) */
590 if (YAP_IsVarTerm(t2))
591 tag = MPI_ANY_TAG;
592 else if (!YAP_IsIntTerm(t2))
593 return false;
594 else
595 tag = YAP_IntOfTerm(t2);
596
597 CONT_TIMER();
598 RESET_BUFFER();
599 if (MPI_CALL(MPI_Irecv(BUFFER_PTR, BLOCK_SIZE, MPI_CHAR, orig, tag,
600 MPI_COMM_WORLD, mpi_req)) != MPI_SUCCESS) {
601 PAUSE_TIMER();
602 return false;
603 }
604 new_request(mpi_req, BUFFER_PTR);
605 DEL_BUFFER();
606 PAUSE_TIMER();
607 return YAP_Unify(t3, YAP_MkIntTerm(HANDLE2INT(mpi_req)));
608}
609
610/*
611 * Completes a non-blocking operation. IF the operation was a send, the
612 * function waits until the message is buffered or sent by the runtime
613 * system. At this point the send buffer is released. If the operation
614 * was a receive, it waits until the message is copied to the receive
615 * buffer.
616 * mpi_wait(+Handle,-Status).
617 */
618static YAP_Bool mpi_wait(void) {
619 YAP_Term t1 = YAP_Deref(YAP_ARG1), // Handle
620 t2 = YAP_Deref(YAP_ARG2); // Status
621 MPI_Status status;
622 MPI_Request *handle;
623
624 // The first argument must be an integer (an handle)
625 if (!YAP_IsIntTerm(t1)) {
626 return false;
627 }
628 handle = INT2HANDLE(YAP_IntOfTerm(t1));
629 CONT_TIMER();
630 // probe for term' size
631 if (MPI_CALL(MPI_Wait(handle, &status)) != MPI_SUCCESS) {
632 PAUSE_TIMER();
633 return false;
634 }
635 free_request(handle);
636 PAUSE_TIMER();
637 return (YAP_Unify(t2, YAP_MkIntTerm(status.MPI_ERROR)));
638}
639
640/*
641 * mpi_test(+Handle,-Status)
642 *
643 * Provides information regarding a handle, ie. if a communication operation
644 * has been completed. If the operation has been completed the predicate
645 * succeeds with the completion status, otherwise it fails.
646 * ).
647 */
648static YAP_Bool mpi_test(void) {
649 YAP_Term t1 = YAP_Deref(YAP_ARG1), // Handle
650 t2 = YAP_Deref(YAP_ARG2); // Status
651 MPI_Status status;
652 MPI_Request *handle;
653 int flag;
654
655 // The first argument (handle) must be an integer
656 if (!YAP_IsIntTerm(t1)) {
657 return false;
658 }
659 CONT_TIMER();
660
661 handle = INT2HANDLE(YAP_IntOfTerm(t1));
662 //
663 MPI_CALL(MPI_Test(handle, &flag, &status));
664 if (flag != true) {
665 PAUSE_TIMER();
666 return false;
667 }
668 free_request(handle);
669 PAUSE_TIMER();
670 return (YAP_Unify(t2, YAP_MkIntTerm(status.MPI_ERROR)));
671}
672
682static YAP_Bool mpi_wait_recv(void) {
683 YAP_Term t1 = YAP_Deref(YAP_ARG1); // data
684 MPI_Status status;
685 MPI_Request *handle;
686 char *s;
687 int len, ret;
688 YAP_Term out;
689
690 // The first argument (handle) must be an integer
691 if (!YAP_IsIntTerm(t1)) {
692 return false;
693 }
694 CONT_TIMER();
695
696 handle = INT2HANDLE(YAP_IntOfTerm(t1));
697 s = (char *)get_request(handle);
698 // wait for communication completion
699 if (MPI_CALL(MPI_Wait(handle, &status)) != MPI_SUCCESS) {
700 PAUSE_TIMER();
701 return false;
702 }
703 // probe for term' size
704 if (MPI_CALL(MPI_Probe(0, 0, MPI_COMM_WORLD, &status)) != MPI_SUCCESS) {
705 PAUSE_TIMER();
706 return false;
707 }
708 if (MPI_CALL(MPI_Get_count(&status, MPI_CHAR, &len)) != MPI_SUCCESS ||
709 status.MPI_TAG == MPI_UNDEFINED || status.MPI_SOURCE == MPI_UNDEFINED) {
710 PAUSE_TIMER();
711 return false;
712 }
713 // realloc memory buffer
714 RESET_BUFFER();
715 change_buffer_size((size_t)(len + 1));
716 BUFFER_LEN = len;
717 //len = YAP_SizeOfExportedTerm(s);
718 // make sure we only fetch ARG3 after constructing the term
719 if (MPI_CALL(MPI_Irecv(BUFFER_PTR, BLOCK_SIZE, MPI_CHAR, 0, 0,
720 MPI_COMM_WORLD, handle)) != MPI_SUCCESS) {
721 PAUSE_TIMER();
722 return false;
723 }
724 // new_request(mpi_req, BUFFER_PTR);
725 out = string2term(BUFFER_PTR, (size_t *)NULL);
726 len = strlen(s)+1;
727 MSG_RECV(len);
728 DEL_BUFFER();
729 free_request(handle);
730 PAUSE_TIMER();
731 ret = YAP_Unify(YAP_ARG3, out);
732 return (ret & YAP_Unify(YAP_ARG2, YAP_MkIntTerm(status.MPI_ERROR)));
733}
734
735/*
736 * Provides information regarding a handle, ie. if a communication operation has
737 * been completed. If the operation has been completed the predicate succeeds
738 * with the completion status, otherwise it fails.
739 *
740 * mpi_test(+Handle,-Status,-Data).
741 */
742static YAP_Bool mpi_test_recv(void) {
743 YAP_Term t1 = YAP_Deref(YAP_ARG1); // data
744
745 MPI_Status status;
746 MPI_Request *handle;
747 int flag, len, ret;
748 char *s;
749 YAP_Term out;
750
751 // The first argument (handle) must be an integer
752 if (!YAP_IsIntTerm(t1)) {
753 return false;
754 }
755 CONT_TIMER();
756
757 handle = INT2HANDLE(YAP_IntOfTerm(t1));
758 //
759 if (MPI_CALL(MPI_Test(handle, &flag, &status)) != MPI_SUCCESS) {
760 PAUSE_TIMER();
761 return false;
762 }
763 s = (char *)get_request(handle);
764 len = strlen(s);
765 out = string2term(s, (size_t *)&len);
766 // make sure we only fetch ARG3 after constructing the term
767 ret = YAP_Unify(YAP_ARG3, out);
768 free_request(handle);
769 PAUSE_TIMER();
770 return (ret & YAP_Unify(YAP_ARG2, YAP_MkIntTerm(status.MPI_ERROR)));
771}
772
773/*
774 * Collective communication function that performs a barrier synchronization
775 * among all processes. mpi_barrier
776 */
777static YAP_Bool mpi_barrier(void) {
778 CONT_TIMER();
779 int ret = MPI_CALL(MPI_Barrier(MPI_COMM_WORLD));
780 PAUSE_TIMER();
781 return (ret == MPI_SUCCESS ? true : false);
782}
783/***********************************
784 * Broadcast
785 ***********************************/
786/*
787 * Broadcasts a message from the process with rank "root" to
788 * all other processes of the group.
789 * Note: Collective communication means all processes within a communicator
790 * call the same routine. To be able to use a regular MPI_Recv to recv the
791 * messages, one should use mpi_bcast2
792 *
793 * mpi_bcast(+Root,+Data).
794 */
795static YAP_Bool mpi_bcast_3(int root, YAP_Term data, int tag) {
796 int val;
797 size_t len = 0;
798 char *str;
799 int rank;
800 // The arguments should be bound
801 MPI_CALL(MPI_Comm_rank(MPI_COMM_WORLD, &rank));
802
803 CONT_TIMER();
804 if (root == rank) {
805 str = term2string(data);
806 len = strlen(str)+1;
807#ifdef MPI_DEBUG
808 write_msg(__FUNCTION__, __FILE__, __LINE__,
809 "mpi_bcast(%s,%u, MPI_CHAR,%d)\n", str, len, root);
810#endif
811 if (len >= BLOCK_SIZE) {
812 YAP_Term t = YAP_MkIntTerm(len+1);
813 t = YAP_MkApplTerm(FuncPrepare,1,&t);
814 mpi_bcast_3(root,t,0);
815 }
816 val = (MPI_CALL(MPI_Bcast(str, len, MPI_CHAR, root, MPI_COMM_WORLD)) ==
817 MPI_SUCCESS
818 ? true
819 : false);
820 PAUSE_TIMER();
821 return (val);
822 } else {
823 RESET_BUFFER();
824 str = BUFFER_PTR;
825 len = BLOCK_SIZE;
826
827#ifdef MPISTATS
828 {
829 int size;
830 MPI_CALL(MPI_Comm_size(MPI_COMM_WORLD, &size));
831 MSG_SENT(len * size);
832 }
833#endif
834
835 YAP_Term out;
836 val = (MPI_CALL(MPI_Bcast(str, len, MPI_CHAR, root, MPI_COMM_WORLD)) ==
837 MPI_SUCCESS
838 ? true
839 : false);
840 len = YAP_SizeOfExportedTerm(str);
841 // make sure we only fetch ARG3 after constructing the term
842 out = string2term(str, (size_t *)&len);
843 if (YAP_IsApplTerm(out) && YAP_FunctorOfTerm(out)==FuncPrepare) {
844 len = YAP_IntOfTerm(YAP_ArgOfTerm(1,out));
845 str = malloc(len);
846 val = (MPI_CALL(MPI_Bcast(str, len, MPI_CHAR, root, MPI_COMM_WORLD)) ==
847 MPI_SUCCESS
848 ? true
849 : false);
850 out = string2term(str, (size_t *)&len);
851 if (str != BUFFER_PTR)
852 free(str);
853 }
854 MSG_RECV(len);
855 if (!YAP_Unify(YAP_ARG2, out))
856 return false;
857 }
858 return (val);
859}
860
861/*
862 * Broadcasts a message from the process with rank "root" to
863 * all other processes of the group.
864 * Note: Collective communication means all processes within a communicator call
865 * the same routine. To be able to use a regular MPI_Recv to recv the messages,
866 * one should use mpi_bcast2 mpi_bcast_int(+Root,+Data,+Tag).
867v
868 */
869static YAP_Bool my_bcast(YAP_Term t1, YAP_Term t2, YAP_Term t3) {
870 int root;
871 int worldsize;
872 size_t len = 0;
873 char *str;
874 int tag;
875
876 // The arguments should be bound
877 if (YAP_IsVarTerm(t2) || !YAP_IsIntTerm(t1) || !YAP_IsIntTerm(t3)) {
878 return false;
879 }
880
881 CONT_TIMER();
882
883 MPI_CALL(MPI_Comm_size(MPI_COMM_WORLD, &worldsize));
884
885 root = YAP_IntOfTerm(t1);
886 tag = YAP_IntOfTerm(t3);
887 str = term2string( t2);
888 len = strlen(str)+1;
889 return mpi_bcast_3(root, t2, tag);
890}
891
892/*
893 * mpi_bcast(+Root,+Data).
894 */
895static YAP_Bool mpi_bcast2(void) {
896 return my_bcast(YAP_ARG1, YAP_ARG2, YAP_MkIntTerm(0));
897}
898/*
899 * Broadcasts a message from the process with rank "root" to
900 * all other processes of the group.
901 * Note: Collective communication means all processes within a communicator call
902 * the same routine. To be able to use a regular MPI_Recv to recv the messages,
903 * one should use mpi_bcast2
904 *
905 * mpi_bcast(+Root,+Data,+Tag).
906 */
907static YAP_Bool mpi_bcast3(void) {
908 return my_bcast(YAP_ARG1, YAP_ARG2, YAP_ARG3);
909}
910
911/*
912 * Broadcasts a message from the process with rank "root" to
913 * all other processes of the group.
914 * To receive the message the recipients use MPI_Recv
915 * The message is sent using MPI_Isend
916 * mpi_ibcast(+Root,+Data,+Tag).
917 */
918static YAP_Bool mpi_ibcast3(void) {
919 return my_bcast(YAP_ARG1, YAP_ARG2, YAP_ARG3);
920}
921/*
922 * mpi_ibcast(+Root,+Data).
923 */
924static YAP_Bool mpi_ibcast2(void) {
925 return my_bcast(YAP_ARG1, YAP_ARG2, YAP_MkIntTerm(0));
926}
927/*******************************************
928 * Garbage collection
929 *******************************************/
930/*
931 * Attempts to release the requests structures used in asynchronous
932 * communications
933 */
934static void gc(hashtable ht) {
935 MPI_Request *handle;
936 hashnode *node;
937 MPI_Status status;
938 int flag;
939
940 node = (hashnode *)next_hashnode(ht);
941 if (node == NULL)
942 return;
943
944 gc(ht); // start at the end
945
946 handle = INT2HANDLE(node->value);
947 MPI_CALL(MPI_Test(handle, &flag, &status));
948 if (flag == true) {
949 MPI_CALL(MPI_Wait(handle, &status));
950#ifdef MPI_DEBUG
951 write_msg(__FUNCTION__, __FILE__, __LINE__, "Released handle...%s\n",
952 (char *)node->obj);
953#endif
954 if (ht == requests)
955 free_request(handle);
956 else
957 free_broadcast_request(handle);
958 }
959}
960/*
961 *
962 */
963static YAP_Bool mpi_gc(void) {
964 // write_msg(__FUNCTION__,__FILE__,__LINE__,"MPI_gc>:
965 // requests=%d\n",requests->n_entries);
966 CONT_TIMER();
967 init_hash_traversal(requests);
968 gc(requests);
969 init_hash_traversal(broadcasts);
970 gc(broadcasts);
971 // write_msg(__FUNCTION__,__FILE__,__LINE__,"MPI_gc<:
972 // requests=%d\n",requests->n_entries);
973 PAUSE_TIMER();
974 return true;
975}
976
977size_t BLOCK_SIZE = 4 * 1024;
978
979static YAP_Bool mpi_default_buffer_size(void) {
980 YAP_Term t2;
981 intptr_t IBLOCK_SIZE;
982 if (!YAP_Unify(YAP_ARG1, YAP_MkIntTerm(BLOCK_SIZE)))
983 return false;
984 t2 = YAP_ARG2;
985 if (YAP_IsVarTerm(t2))
986 return true;
987 if (!YAP_IsIntTerm(t2))
988 return false;
989 IBLOCK_SIZE = YAP_IntOfTerm(t2);
990 if (IBLOCK_SIZE < 0) {
991 IBLOCK_SIZE = 4 * 1024;
992 return false;
993 }
994 BLOCK_SIZE = IBLOCK_SIZE;
995 return true;
996}
997
998/********************************************************************
999 * Init
1000 *******************************************************************/
1001X_API void init_mpi(void) {
1002
1003 requests = new_hashtable(HASHSIZE);
1004 broadcasts = new_hashtable(HASHSIZE);
1005 FuncPrepare = YAP_MkFunctor(YAP_LookupAtom("$<<prepare>>$"),1);
1006 DEL_BUFFER();
1007 YAP_UserCPredicate("mpi_init", mpi_init, 0); // mpi_init/0
1008#ifdef USE_THREADS
1009 YAP_UserCPredicate("mpi_init_rcv_thread", mpi_init_rcv_thread,
1010 1); // mpi_init_rcv_thread(+HandleMsgGoal/1)
1011#endif
1012 YAP_UserCPredicate("mpi_finalize", mpi_finalize, 0); // mpi_finalize turn
1013 YAP_UserCPredicate("mpi_comm_size", mpi_comm_size, 1); // mpi_comm_size(-Size)
1014 YAP_UserCPredicate("mpi_comm_rank", mpi_comm_rank, 1); // mpi_comm_rank(-Rank)
1015 YAP_UserCPredicate("mpi_version", mpi_version,
1016 2); // mpi_version(-Major,-Minor)
1017 YAP_UserCPredicate("mpi_get_processor_name", mpi_get_processor_name,
1018 1); // mpi_get_processor_name(-Name)
1019 YAP_UserCPredicate("mpi_send", mpi_send,
1020 3); // mpi_send(+Data, +Destination, +Tag).
1021 YAP_UserCPredicate("mpi_isend", mpi_isend, 4);
1022 YAP_UserCPredicate("mpi_recv", mpi_recv, 3); // mpi_recv(?Source,?Tag,-Data).
1023 YAP_UserCPredicate("mpi_irecv", mpi_irecv,
1024 3); // mpi_irecv(?Source,?Tag,-Handle).
1025 YAP_UserCPredicate("mpi_wait", mpi_wait, 2); // mpi_wait(+Handle,-Status).
1026 YAP_UserCPredicate("mpi_wait_recv", mpi_wait_recv,
1027 3); // mpi_wait_recv(+Handle,-Status,-Data).
1028 YAP_UserCPredicate("mpi_test", mpi_test, 2); // mpi_test(+Handle,-Status).
1029 YAP_UserCPredicate("mpi_test_recv", mpi_test_recv,
1030 3); // mpi_test(+Handle,-Status,-Data).
1031 YAP_UserCPredicate("mpi_bcast2", mpi_bcast2, 2); // mpi_bcast2(Root,Term)
1032 YAP_UserCPredicate("mpi_bcast", mpi_bcast3, 3); // mpi_bcast2(Root,Term)
1041 YAP_UserCPredicate("mpi_ibcast", mpi_ibcast2, 2); // mpi_ibcast(Root,Term)
1042 YAP_UserCPredicate("mpi_ibcast", mpi_ibcast3, 3); // mpi_ibcast(Root,Term,Tag)
1052 YAP_UserCPredicate("mpi_barrier", mpi_barrier, 0); // mpi_barrier/0
1053 YAP_UserCPredicate("mpi_gc", mpi_gc, 0); // mpi_gc/0
1054 YAP_UserCPredicate("mpi_default_buffer_size", mpi_default_buffer_size,
1055 2); // buffer size
1068#ifdef MPISTATS
1069 YAP_UserCPredicate(
1070 "mpi_stats", mpi_stats,
1071 7); // mpi_stats(-Time,#MsgsRecv,BytesRecv,MaxRecev,#MsgSent,BytesSent,MaxSent)
1072 YAP_UserCPredicate("mpi_reset_stats", mpi_reset_stats,
1073 0); // cleans the timers
1074 RESET_STATS();
1075#endif
1076 // YAP_UserCPredicate( "mpi_gather", mpi_gather,0);
1077 // //mpi_gather(+RootRank,?SendData,?RecvData)
1078 // Each process (root process included) sends the contents of its send buffer
1079 // to the root process. The root process receives the messages and stores them
1080 // in rank order. The outcome is as if each of the n processes in the group
1081 // (including the root process) had executed a call to MPI_Send and the root
1082 // had executed n calls to MPI_Recv. The receive buffer is ignored for all
1083 // non-root processes. MPI_Scatter
1084#ifdef MPI_DEBUG
1085 fprintf(stderr, "MPI module succesfully loaded.");
1086 fflush(stderr);
1087#endif
1088}
1089
1090#endif /* HAVE_MPI_H */
#define YAP_Deref(t)
X_API macro.
Definition: YapInterface.h:86
@ gc
controls garbage collection
Definition: YapGFlagInfo.h:290
Definition: hash.h:40