YAP 7.1.0
mpi.c
1/*************************************************************************
2* *
3* YAP Prolog *
4* *
5* Yap Prolog was developed at NCCUP - Universidade do Porto *
6* *
7* Copyright S. Konstantopoulos and Universidade do Porto 2002-2003 *
8* *
9**************************************************************************
10* *
11* File: mpi.c *
12* Last rev: $Date: 2003-07-03 15:01:18 $ *
13* mods: *
14* comments: Interface to MPI libraries *
15* *
16*************************************************************************/
17
18#ifndef lint
19// static char *rcsid = "$Header: /Users/vitor/Yap/yap-cvsbackup/library/mpi/mpi.c,v 1.20 2003-07-03 15:01:18 stasinos Exp $";
20#endif
21
22#include "Yap.h"
23
24/* Should we use MPI ? */
25#if defined(HAVE_MPI_H) && (defined(HAVE_LIBMPI) || defined(HAVE_LIBMPICH))
26 #define HAVE_MPI 1
27#else
28 #define HAVE_MPI 0
29#endif
30
31#if HAVE_MPI
32
33#include "Yatom.h"
34#include "yapio.h"
35
36#include <stdlib.h>
37#include <string.h>
38#include <mpi.h>
39
40void YAP_Write(Term, void (*)(int), int);
41
42static Int p_mpi_open( USES_REGS1 );
43static Int p_mpi_close( USES_REGS1 );
44static Int p_mpi_send( USES_REGS1 );
45static Int p_mpi_receive( USES_REGS1 );
46static Int p_mpi_bcast3( USES_REGS1 );
47static Int p_mpi_bcast2( USES_REGS1 );
48static Int p_mpi_barrier( USES_REGS1 );
49
50
51/*
52 * Auxiliary Data
53 */
54
55static int rank, numprocs, namelen;
56static char processor_name[MPI_MAX_PROCESSOR_NAME];
57
58static int mpi_argc;
59static char **mpi_argv;
60
61/* this should eventually be moved to config.h */
62#define RECV_BUF_SIZE 1024*32
63
64
65/*
66 * A simple stream
67 */
68
69static size_t bufsize, bufstrlen;
70static char *buf;
71static int bufptr;
72
73static void
74expand_buffer( int space )
75{
76#if MPI_AVOID_REALLOC
77 /*
78 realloc() has been SIGSEGV'ing on HP-UX 10.20, but there is
79 no problem in HP-UX 11.0. We can remove this bit here as soon
80 as Yap stops compiling on 10.20 anyway. If removed, also remove
81 the MPI_AVOID_REALLOC bits from configure.in and config.h.in
82 */
83
84 char *tmp;
85
86 tmp = malloc( bufsize + space );
87 if( tmp == NULL ) {
88 Yap_Error(SYSTEM_ERROR_INTERNAL, TermNil, "out of memory" );
89 Yap_exit( EXIT_FAILURE );
90 }
91 memmove( tmp, buf, bufsize );
92 free( buf );
93 buf = tmp;
94#else /* use realloc */
95 buf = realloc( buf, bufsize + space );
96 if( buf == NULL ) {
97 Yap_Error(SYSTEM_ERROR_INTERNAL, TermNil, "out of memory");
98 Yap_exit( EXIT_FAILURE );
99 }
100#endif
101
102 bufsize += space;
103}
104
105static void
106mpi_putc(Int ch)
107{
108 if( ch > 0 ) {
109 if( bufptr >= bufsize ) expand_buffer( RECV_BUF_SIZE );
110 buf[bufptr++] = ch;
111 }
112}
113
114
115/*
116 * C Predicates
117 */
118
119
120static Int
121p_mpi_open( USES_REGS1 ) /* mpi_open(?rank, ?num_procs, ?proc_name) */
122{
123 Term t_rank = Deref(ARG1), t_numprocs = Deref(ARG2), t_procname = Deref(ARG3);
124 Int retv;
125
126/*
127With MPICH MPI_Init() must be called during initialisation,
128but with LAM it can be called from Prolog (mpi_open/3)
129
130The symptoms match a known RedHat bug, see
131http://email.osc.edu/pipermail/mpiexec/2002-July/000067.html
132for a suggested workaround:
133 Redhat have somehow broken their sem.h and ipc.h. If you use your own
134 kernel, copy from ../src/kernel/include/asm & ../src/kernel/include/linux
135 the file ipc.h and sem.h to /usr/include/sys, recompile your mpich and
136 everything might start working. (it did for us)
137*/
138
139/*
140Note that if MPI_Init() fails, Yap/MPICH and Yap/LAM behave differently:
141in Yap/MPICH we are still at the Yap initialisation phase, so we get
142Yap exit(FAILURE), whereas in Yap/LAM mpi_open/3 simply fails.
143*/
144
145 retv = MPI_Init( &mpi_argc, &mpi_argv );
146 if( retv ) {
147 Term t;
148
149 t = MkIntegerTerm(retv);
150 Yap_Error( SYSTEM_ERROR_INTERNAL, t, "MPI_Init() returned non-zero" );
151 return FALSE;
152 }
153 MPI_Comm_size( MPI_COMM_WORLD, &numprocs );
154 MPI_Comm_rank( MPI_COMM_WORLD, &rank );
155 MPI_Get_processor_name( processor_name, &namelen );
156
157 retv = Yap_unify(t_rank, MkIntTerm(rank));
158 retv = retv && Yap_unify(t_numprocs, MkIntTerm(numprocs));
159 retv = retv && Yap_unify(t_procname, MkAtomTerm(Yap_LookupAtom(processor_name)));
160
161 return retv;
162}
163
164
165static Int /* mpi_close */
166p_mpi_close( USES_REGS1 )
167{
168 MPI_Finalize();
169 return TRUE;
170}
171
172
173static Int
174p_mpi_send( USES_REGS1 ) /* mpi_send(+data, +destination, +tag) */
175{
176 Term t_data = Deref(ARG1), t_dest = Deref(ARG2), t_tag = Deref(ARG3);
177 int tag, dest, retv;
178
179 /* The first argument (data) must be bound */
180 if (IsVarTerm(t_data)) {
181 Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_send");
182 return (FALSE);
183 }
184
185 /* The second and third args must be bount to integers */
186 if (IsVarTerm(t_dest)) {
187 Yap_Error(INSTANTIATION_ERROR, t_dest, "mpi_send");
188 return (FALSE);
189 } else if( !IsIntegerTerm(t_dest) ) {
190 Yap_Error(TYPE_ERROR_INTEGER, t_dest, "mpi_send");
191 return (FALSE);
192 } else {
193 dest = IntOfTerm( t_dest );
194 }
195 if (IsVarTerm(t_tag)) {
196 Yap_Error(INSTANTIATION_ERROR, t_tag, "mpi_send");
197 return (FALSE);
198 } else if( !IsIntegerTerm(t_tag) ) {
199 Yap_Error(TYPE_ERROR_INTEGER, t_tag, "mpi_send");
200 return (FALSE);
201 } else {
202 tag = IntOfTerm( t_tag );
203 }
204
205 /* Turn the term into its ASCII representation */
206 bufptr = 0;
207 YAP_Write( t_data, mpi_putc, Quote_illegal_f|Handle_vars_f );
208
209 /* The buf is not NULL-terminated and does not have the
210 trailing ". " required by the parser */
211 mpi_putc( '.' );
212 mpi_putc( ' ' );
213 mpi_putc( 0 );
214 bufstrlen = strlen(buf);
215
216 /* send the data */
217 bufptr = 0;
218 retv = MPI_Send( &buf[bufptr], bufstrlen, MPI_CHAR, dest, tag, MPI_COMM_WORLD );
219 if( retv != MPI_SUCCESS ) return FALSE;
220
221 return TRUE;
222}
223
224
225static Int
226p_mpi_receive( USES_REGS1 ) /* mpi_receive(-data, ?orig, ?tag) */
227{
228 Term t, t_data = Deref(ARG1), t_orig = Deref(ARG2), t_tag = Deref(ARG3);
229 int tag, orig, retv;
230 MPI_Status status;
231
232 /* The first argument (data) must be unbound */
233 if(!IsVarTerm(t_data)) {
234 Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_receive");
235 return FALSE;
236 }
237
238 /* The second argument (source) must be bound to an integer
239 (the rank of the source) or left unbound (i.e. any source
240 is OK) */
241 if (IsVarTerm(t_orig)) {
242 orig = MPI_ANY_SOURCE;
243 } else if( !IsIntegerTerm(t_orig) ) {
244 Yap_Error(TYPE_ERROR_INTEGER, t_orig, "mpi_receive");
245 return (FALSE);
246 } else {
247 orig = IntOfTerm( t_orig );
248 }
249
250 /* The third argument must be bound to an integer (the tag)
251 or left unbound (i.e. any tag is OK) */
252 if (IsVarTerm(t_tag)) {
253 tag = MPI_ANY_TAG;
254 } else if( !IsIntegerTerm(t_tag) ) {
255 Yap_Error(TYPE_ERROR_INTEGER, t_tag, "mpi_receive");
256 return (FALSE);
257 } else
258 tag = IntOfTerm( t_tag );
259
260 /* probe for the size of the term */
261 retv = MPI_Probe( orig, tag, MPI_COMM_WORLD, &status );
262 if( retv != MPI_SUCCESS ) {
263 return FALSE;
264 }
265 MPI_Get_count( &status, MPI_CHAR, &bufstrlen );
266
267 /* adjust the buffer */
268 if( bufsize < bufstrlen ) expand_buffer(bufstrlen-bufsize);
269
270 /* Already know the source from MPI_Probe() */
271 if( orig == MPI_ANY_SOURCE ) {
272 orig = status.MPI_SOURCE;
273 retv = Yap_unify(t_orig, MkIntTerm(orig));
274 if( retv == FALSE ) {
275 printf("PROBLEM: file %s, line %d\n", __FILE__, __LINE__);
276 }
277 }
278
279 /* Already know the tag from MPI_Probe() */
280 if( tag == MPI_ANY_TAG ) {
281 tag = status.MPI_TAG;
282 retv = Yap_unify(t_tag, MkIntTerm(status.MPI_TAG));
283 if( retv == FALSE ) {
284 printf("PROBLEM: file %s, line %d\n", __FILE__, __LINE__);
285 }
286 }
287
288 /* Receive the message as a C string */
289 retv = MPI_Recv( buf, bufstrlen, MPI_CHAR, orig, tag,
290 MPI_COMM_WORLD, &status );
291 if( retv != MPI_SUCCESS ) {
292 /* Getting in here would be weird; it means the first package
293 (size) was sent properly, but there was a glitch with
294 the actual content! */
295 return FALSE;
296 }
297
298 /* parse received string into a Prolog term */
299
300 bufptr = 0;
301 t = YAP_ReadBuffer( buf, NULL );
302
303 if( t == TermNil ) {
304 retv = FALSE;
305 }
306 else {
307 retv = Yap_unify(t, t_data);
308 }
309
310 return retv;
311}
312
313
314static Int
315p_mpi_bcast3( USES_REGS1 ) /* mpi_bcast( ?data, +root, +max_size ) */
316{
317 Term t_data = Deref(ARG1), t_root = Deref(ARG2), t_max_size = Deref(ARG3);
318 int root, retv, max_size;
319
320 /* The second argument must be bound to an integer (the rank of
321 root processor */
322 if (IsVarTerm(t_root)) {
323 Yap_Error(INSTANTIATION_ERROR, t_root, "mpi_bcast");
324 return FALSE;
325 }
326 root = IntOfTerm( t_root );
327
328 /* If this is the root processor, then the first argument must
329 be bound to the term to be sent. */
330 if( root == rank ) {
331 if( IsVarTerm(t_data) ) {
332 Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_bcast");
333 return FALSE;
334 }
335 /* Turn the term into its ASCII representation */
336 bufptr = 0;
337 YAP_Write( t_data, mpi_putc, Quote_illegal_f|Handle_vars_f );
338 /* NULL-terminate the string and add the ". " termination
339 required by the parser. */
340 mpi_putc( '.' );
341 mpi_putc( ' ' );
342 mpi_putc( 0 );
343 bufstrlen = strlen(buf);
344 }
345
346 /* The third argument must be bound to an integer (the maximum length
347 of the broadcast term's ASCII representation */
348 if (IsVarTerm(t_max_size)) {
349 Yap_Error(INSTANTIATION_ERROR, t_max_size, "mpi_bcast");
350 return FALSE;
351 }
352 /* allow for the ". " bit and the NULL at the end */
353 max_size = IntOfTerm( t_max_size ) + 3;
354
355 if( max_size < bufstrlen ) {
356 /* issue a warning? explode? bcast s'thing unparsable? */
357 printf( "MAYDAY: max_size == %d, bufstrlen == %d\n ", max_size, bufstrlen);
358 return FALSE;
359 }
360
361 /* adjust the buffer size, if necessary */
362 if( max_size > bufsize ) {
363 expand_buffer( max_size-bufsize );
364 }
365
366 retv = MPI_Bcast( buf, max_size, MPI_CHAR, root, MPI_COMM_WORLD );
367 if( retv != MPI_SUCCESS ) {
368 printf( "OOOPS! MPI_Bcast() returned %d.\n", retv );
369 return FALSE;
370 }
371
372 if( root == rank ) return TRUE;
373 else {
374 /* ARG1 must be unbound so that it can receive data */
375 if( !IsVarTerm(t_data) ) {
376 Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_bcast");
377 return FALSE;
378 }
379
380 bufstrlen = strlen(buf);
381 bufptr = 0;
382
383 /* parse received string into a Prolog term */
384 return Yap_unify( YAP_ReadBuffer( buf, NULL ), ARG1 );
385 }
386}
387
388
389/*
390 This is the same as above, but for dynamic data size.
391 It is implemented as two broadcasts, the first being the size
392 and the second the actual data.
393*/
394
395static Int
396p_mpi_bcast2( USES_REGS1 ) /* mpi_bcast( ?data, +root ) */
397{
398 Term t_data = Deref(ARG1), t_root = Deref(ARG2);
399 int root, retv;
400
401 /* The second argument must be bound to an integer (the rank of
402 root processor */
403 if (IsVarTerm(t_root)) {
404 Yap_Error(INSTANTIATION_ERROR, t_root, "mpi_bcast");
405 return FALSE;
406 }
407 root = IntOfTerm( t_root );
408
409
410 /* If this is the root processor, then the first argument must
411 be bound to the term to be sent. */
412 if( root == rank ) {
413 if( IsVarTerm(t_data) ) {
414 Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_bcast");
415 return FALSE;
416 }
417 bufptr = 0;
418 /* Turn the term into its ASCII representation */
419 YAP_Write( t_data, mpi_putc, Quote_illegal_f|Handle_vars_f );
420 /* NULL-terminate the string and add the ". " termination
421 required by the parser. */
422 buf[bufptr] = 0;
423 strcat( buf, ". " );
424 bufstrlen = bufptr + 2;
425 }
426 /* Otherwise, it must a variable */
427 else {
428 if( !IsVarTerm(t_data) ) {
429 Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_bcast");
430 return FALSE;
431 }
432 }
433
434
435 /* Broadcast the data size */
436 retv = MPI_Bcast( &bufstrlen, sizeof bufstrlen, MPI_INT, root, MPI_COMM_WORLD );
437 if( retv != MPI_SUCCESS ) {
438 printf("PROBLEM: file %s, line %d\n", __FILE__, __LINE__);
439 return FALSE;
440 }
441
442 /* adjust the buffer size, if necessary */
443 if( bufstrlen > bufsize ) {
444 expand_buffer( bufstrlen - bufsize );
445 }
446 /* Broadcast the data */
447 retv = MPI_Bcast( buf, bufstrlen, MPI_CHAR, root, MPI_COMM_WORLD );
448 if( retv != MPI_SUCCESS ) {
449 printf("PROBLEM: file %s, line %d\n", __FILE__, __LINE__);
450 return FALSE;
451 }
452
453 if( root == rank ) return TRUE;
454 else {
455 /* ARG1 must be unbound so that it can receive data */
456 if( !IsVarTerm(t_data) ) {
457 Yap_Error(INSTANTIATION_ERROR, t_data, "mpi_bcast");
458 return FALSE;
459 }
460
461 bufstrlen = strlen(buf);
462 bufptr = 0;
463
464 return Yap_unify(YAP_ReadBuffer( buf, NULL ), ARG1);
465 }
466}
467
468
469static Int
470p_mpi_barrier( USES_REGS1 ) /* mpi_barrier/0 */
471{
472 int retv;
473
474 retv = MPI_Barrier( MPI_COMM_WORLD );
475
476 return (retv == 0);
477}
478
479
480
481/*
482 * Init
483 */
484
485
486void
487Yap_InitMPI(void)
488{
489 int i,j;
490
491 mpi_argv = malloc( GLOBAL_argc * sizeof(char *) );
492 mpi_argv[0] = strdup( GLOBAL_argv[0] );
493
494 bufsize = RECV_BUF_SIZE;
495 buf = malloc(bufsize * sizeof(char));
496
497 for( i=1; i<GLOBAL_argc; ++i ) {
498 if( !strcmp(GLOBAL_argv[i], "--") ) { ++i; break; }
499 }
500 for( j=1; i<GLOBAL_argc; ++i, ++j ) {
501 mpi_argv[j] = strdup( GLOBAL_argv[i] );
502 }
503 mpi_argc = j;
504
505 mpi_argv[0] = strdup( GLOBAL_argv[0] );
506
507 Yap_InitCPred( "mpi_open", 3, p_mpi_open, SafePredFlag|SyncPredFlag );
508 Yap_InitCPred( "mpi_close", 0, p_mpi_close, SafePredFlag|SyncPredFlag );
509 Yap_InitCPred( "mpi_send", 3, p_mpi_send, SafePredFlag|SyncPredFlag );
510 Yap_InitCPred( "mpi_receive", 3, p_mpi_receive, SafePredFlag|SyncPredFlag );
511 Yap_InitCPred( "mpi_bcast", 3, p_mpi_bcast3, SafePredFlag|SyncPredFlag );
512 Yap_InitCPred( "mpi_bcast", 2, p_mpi_bcast2, SafePredFlag|SyncPredFlag );
513 Yap_InitCPred( "mpi_barrier", 0, p_mpi_barrier, SafePredFlag|SyncPredFlag );
514}
515
516#endif /* HAVE_MPI */
Main definitions.