25#if defined(HAVE_MPI_H) && (defined(HAVE_LIBMPI) || defined(HAVE_LIBMPICH))
40void YAP_Write(Term,
void (*)(
int),
int);
42static Int p_mpi_open( USES_REGS1 );
43static Int p_mpi_close( USES_REGS1 );
44static Int p_mpi_send( USES_REGS1 );
45static Int p_mpi_receive( USES_REGS1 );
46static Int p_mpi_bcast3( USES_REGS1 );
47static Int p_mpi_bcast2( USES_REGS1 );
48static Int p_mpi_barrier( USES_REGS1 );
55static int rank, numprocs, namelen;
56static char processor_name[MPI_MAX_PROCESSOR_NAME];
59static char **mpi_argv;
62#define RECV_BUF_SIZE 1024*32
69static size_t bufsize, bufstrlen;
74expand_buffer(
int space )
86 tmp = malloc( bufsize + space );
88 Yap_Error(SYSTEM_ERROR_INTERNAL, TermNil,
"out of memory" );
89 Yap_exit( EXIT_FAILURE );
91 memmove( tmp, buf, bufsize );
95 buf = realloc( buf, bufsize + space );
97 Yap_Error(SYSTEM_ERROR_INTERNAL, TermNil,
"out of memory");
98 Yap_exit( EXIT_FAILURE );
109 if( bufptr >= bufsize ) expand_buffer( RECV_BUF_SIZE );
121p_mpi_open( USES_REGS1 )
123 Term t_rank = Deref(ARG1), t_numprocs = Deref(ARG2), t_procname = Deref(ARG3);
145 retv = MPI_Init( &mpi_argc, &mpi_argv );
149 t = MkIntegerTerm(retv);
150 Yap_Error( SYSTEM_ERROR_INTERNAL, t,
"MPI_Init() returned non-zero" );
153 MPI_Comm_size( MPI_COMM_WORLD, &numprocs );
154 MPI_Comm_rank( MPI_COMM_WORLD, &rank );
155 MPI_Get_processor_name( processor_name, &namelen );
157 retv = Yap_unify(t_rank, MkIntTerm(rank));
158 retv = retv && Yap_unify(t_numprocs, MkIntTerm(numprocs));
159 retv = retv && Yap_unify(t_procname, MkAtomTerm(Yap_LookupAtom(processor_name)));
166p_mpi_close( USES_REGS1 )
174p_mpi_send( USES_REGS1 )
176 Term t_data = Deref(ARG1), t_dest = Deref(ARG2), t_tag = Deref(ARG3);
180 if (IsVarTerm(t_data)) {
181 Yap_Error(INSTANTIATION_ERROR, t_data,
"mpi_send");
186 if (IsVarTerm(t_dest)) {
187 Yap_Error(INSTANTIATION_ERROR, t_dest,
"mpi_send");
189 }
else if( !IsIntegerTerm(t_dest) ) {
190 Yap_Error(TYPE_ERROR_INTEGER, t_dest,
"mpi_send");
193 dest = IntOfTerm( t_dest );
195 if (IsVarTerm(t_tag)) {
196 Yap_Error(INSTANTIATION_ERROR, t_tag,
"mpi_send");
198 }
else if( !IsIntegerTerm(t_tag) ) {
199 Yap_Error(TYPE_ERROR_INTEGER, t_tag,
"mpi_send");
202 tag = IntOfTerm( t_tag );
207 YAP_Write( t_data, mpi_putc, Quote_illegal_f|Handle_vars_f );
214 bufstrlen = strlen(buf);
218 retv = MPI_Send( &buf[bufptr], bufstrlen, MPI_CHAR, dest, tag, MPI_COMM_WORLD );
219 if( retv != MPI_SUCCESS )
return FALSE;
226p_mpi_receive( USES_REGS1 )
228 Term t, t_data = Deref(ARG1), t_orig = Deref(ARG2), t_tag = Deref(ARG3);
233 if(!IsVarTerm(t_data)) {
234 Yap_Error(INSTANTIATION_ERROR, t_data,
"mpi_receive");
241 if (IsVarTerm(t_orig)) {
242 orig = MPI_ANY_SOURCE;
243 }
else if( !IsIntegerTerm(t_orig) ) {
244 Yap_Error(TYPE_ERROR_INTEGER, t_orig,
"mpi_receive");
247 orig = IntOfTerm( t_orig );
252 if (IsVarTerm(t_tag)) {
254 }
else if( !IsIntegerTerm(t_tag) ) {
255 Yap_Error(TYPE_ERROR_INTEGER, t_tag,
"mpi_receive");
258 tag = IntOfTerm( t_tag );
261 retv = MPI_Probe( orig, tag, MPI_COMM_WORLD, &status );
262 if( retv != MPI_SUCCESS ) {
265 MPI_Get_count( &status, MPI_CHAR, &bufstrlen );
268 if( bufsize < bufstrlen ) expand_buffer(bufstrlen-bufsize);
271 if( orig == MPI_ANY_SOURCE ) {
272 orig = status.MPI_SOURCE;
273 retv = Yap_unify(t_orig, MkIntTerm(orig));
274 if( retv == FALSE ) {
275 printf(
"PROBLEM: file %s, line %d\n", __FILE__, __LINE__);
280 if( tag == MPI_ANY_TAG ) {
281 tag = status.MPI_TAG;
282 retv = Yap_unify(t_tag, MkIntTerm(status.MPI_TAG));
283 if( retv == FALSE ) {
284 printf(
"PROBLEM: file %s, line %d\n", __FILE__, __LINE__);
289 retv = MPI_Recv( buf, bufstrlen, MPI_CHAR, orig, tag,
290 MPI_COMM_WORLD, &status );
291 if( retv != MPI_SUCCESS ) {
301 t = YAP_ReadBuffer( buf, NULL );
307 retv = Yap_unify(t, t_data);
315p_mpi_bcast3( USES_REGS1 )
317 Term t_data = Deref(ARG1), t_root = Deref(ARG2), t_max_size = Deref(ARG3);
318 int root, retv, max_size;
322 if (IsVarTerm(t_root)) {
323 Yap_Error(INSTANTIATION_ERROR, t_root,
"mpi_bcast");
326 root = IntOfTerm( t_root );
331 if( IsVarTerm(t_data) ) {
332 Yap_Error(INSTANTIATION_ERROR, t_data,
"mpi_bcast");
337 YAP_Write( t_data, mpi_putc, Quote_illegal_f|Handle_vars_f );
343 bufstrlen = strlen(buf);
348 if (IsVarTerm(t_max_size)) {
349 Yap_Error(INSTANTIATION_ERROR, t_max_size,
"mpi_bcast");
353 max_size = IntOfTerm( t_max_size ) + 3;
355 if( max_size < bufstrlen ) {
357 printf(
"MAYDAY: max_size == %d, bufstrlen == %d\n ", max_size, bufstrlen);
362 if( max_size > bufsize ) {
363 expand_buffer( max_size-bufsize );
366 retv = MPI_Bcast( buf, max_size, MPI_CHAR, root, MPI_COMM_WORLD );
367 if( retv != MPI_SUCCESS ) {
368 printf(
"OOOPS! MPI_Bcast() returned %d.\n", retv );
372 if( root == rank )
return TRUE;
375 if( !IsVarTerm(t_data) ) {
376 Yap_Error(INSTANTIATION_ERROR, t_data,
"mpi_bcast");
380 bufstrlen = strlen(buf);
384 return Yap_unify( YAP_ReadBuffer( buf, NULL ), ARG1 );
396p_mpi_bcast2( USES_REGS1 )
398 Term t_data = Deref(ARG1), t_root = Deref(ARG2);
403 if (IsVarTerm(t_root)) {
404 Yap_Error(INSTANTIATION_ERROR, t_root,
"mpi_bcast");
407 root = IntOfTerm( t_root );
413 if( IsVarTerm(t_data) ) {
414 Yap_Error(INSTANTIATION_ERROR, t_data,
"mpi_bcast");
419 YAP_Write( t_data, mpi_putc, Quote_illegal_f|Handle_vars_f );
424 bufstrlen = bufptr + 2;
428 if( !IsVarTerm(t_data) ) {
429 Yap_Error(INSTANTIATION_ERROR, t_data,
"mpi_bcast");
436 retv = MPI_Bcast( &bufstrlen,
sizeof bufstrlen, MPI_INT, root, MPI_COMM_WORLD );
437 if( retv != MPI_SUCCESS ) {
438 printf(
"PROBLEM: file %s, line %d\n", __FILE__, __LINE__);
443 if( bufstrlen > bufsize ) {
444 expand_buffer( bufstrlen - bufsize );
447 retv = MPI_Bcast( buf, bufstrlen, MPI_CHAR, root, MPI_COMM_WORLD );
448 if( retv != MPI_SUCCESS ) {
449 printf(
"PROBLEM: file %s, line %d\n", __FILE__, __LINE__);
453 if( root == rank )
return TRUE;
456 if( !IsVarTerm(t_data) ) {
457 Yap_Error(INSTANTIATION_ERROR, t_data,
"mpi_bcast");
461 bufstrlen = strlen(buf);
464 return Yap_unify(YAP_ReadBuffer( buf, NULL ), ARG1);
470p_mpi_barrier( USES_REGS1 )
474 retv = MPI_Barrier( MPI_COMM_WORLD );
491 mpi_argv = malloc( GLOBAL_argc *
sizeof(
char *) );
492 mpi_argv[0] = strdup( GLOBAL_argv[0] );
494 bufsize = RECV_BUF_SIZE;
495 buf = malloc(bufsize *
sizeof(
char));
497 for( i=1; i<GLOBAL_argc; ++i ) {
498 if( !strcmp(GLOBAL_argv[i],
"--") ) { ++i;
break; }
500 for( j=1; i<GLOBAL_argc; ++i, ++j ) {
501 mpi_argv[j] = strdup( GLOBAL_argv[i] );
505 mpi_argv[0] = strdup( GLOBAL_argv[0] );
507 Yap_InitCPred(
"mpi_open", 3, p_mpi_open, SafePredFlag|SyncPredFlag );
508 Yap_InitCPred(
"mpi_close", 0, p_mpi_close, SafePredFlag|SyncPredFlag );
509 Yap_InitCPred(
"mpi_send", 3, p_mpi_send, SafePredFlag|SyncPredFlag );
510 Yap_InitCPred(
"mpi_receive", 3, p_mpi_receive, SafePredFlag|SyncPredFlag );
511 Yap_InitCPred(
"mpi_bcast", 3, p_mpi_bcast3, SafePredFlag|SyncPredFlag );
512 Yap_InitCPred(
"mpi_bcast", 2, p_mpi_bcast2, SafePredFlag|SyncPredFlag );
513 Yap_InitCPred(
"mpi_barrier", 0, p_mpi_barrier, SafePredFlag|SyncPredFlag );