YAP 7.1.0
threads.c
1/*************************************************************************
2 * *
3 * YAP Prolog *
4 * *
5 * Yap Prolog was developed at NCCUP - Universidade do Porto *
6 * *
7 * Copyright L.Damas, V.S.Costa and Universidade do Porto 1985-1997 *
8 * *
9 **************************************************************************
10 * *
11 * File: stdpreds.c *
12 * Last rev: *
13 * mods: *
14 * comments: threads *
15 * *
16 *************************************************************************/
17#ifdef SCCS
18static char SccsId[] = "%W% %G%";
19#endif
20
26#include "Yap.h"
27#include "Yatom.h"
28#include "YapHeap.h"
29#include "YapEval.h"
30#include "yapio.h"
31#include "YapBlobs.h"
32#include <stdio.h>
33#if HAVE_UNISTD_H
34#include <unistd.h>
35#endif
36#if HAVE_STRING_H
37#include <string.h>
38#endif
39#if HAVE_SYS_SYSCALL_H
40#include <sys/syscall.h>
41#endif
42#ifdef TABLING
43#include "tab.macros.h"
44#endif /* TABLING */
45
46
47blob_type_t PL_Message_Queue = {
48 YAP_BLOB_MAGIC_B,
49 PL_BLOB_UNIQUE | PL_BLOB_NOCOPY,
50 "message_queue",
51 0, // release
52 0, // compare
53 0, // write
54 0 // acquire
55};
56
57
58#if DEBUG_LOCKS||DEBUG_PE_LOCKS
59
60bool debug_locks = true, debug_pe_locks = true;
61static Int p_debug_locks( USES_REGS1 ) { debug_pe_locks = 1; return TRUE; }
62
63static Int p_nodebug_locks( USES_REGS1 ) { debug_locks = 0; debug_pe_locks = 0; return TRUE; }
64
65#endif
66
67#if THREADS
68
69#include "threads.h"
70
71
72
73int
74Yap_ThreadID( void )
75{
76 int new_worker_id = 0;
77 pthread_t self = pthread_self();
78 while(new_worker_id < MAX_THREADS &&
79 Yap_local[new_worker_id] &&
80 (REMOTE_ThreadHandle(new_worker_id).in_use == TRUE ||
81 REMOTE_ThreadHandle(new_worker_id).zombie == TRUE) ) {
82 if (pthread_equal(self , REMOTE_ThreadHandle(new_worker_id).pthread_handle) ) {
83 return new_worker_id;
84 }
85 new_worker_id++;
86 }
87 return -1;
88}
89
90int
91Yap_NOfThreads(void) {
92 // GLOBAL_ThreadHandlesLock is held
93 return GLOBAL_NOfThreads;
94}
95
96static int
97allocate_new_tid(void)
98{
99 int new_worker_id = 0;
100 LOCK(GLOBAL_ThreadHandlesLock);
101 while(new_worker_id < MAX_THREADS &&
102 Yap_local[new_worker_id] &&
103 (REMOTE_ThreadHandle(new_worker_id).in_use == TRUE ||
104 REMOTE_ThreadHandle(new_worker_id).zombie == TRUE) )
105 new_worker_id++;
106 if (new_worker_id >= MAX_THREADS) {
107 new_worker_id = -1;
108 } else if (!Yap_local[new_worker_id]) {
109 if (!Yap_InitThread(new_worker_id)) {
110 UNLOCK(GLOBAL_ThreadHandlesLock);
111 return -1;
112 }
113 MUTEX_LOCK(&(REMOTE_ThreadHandle(new_worker_id).tlock));
114 REMOTE_ThreadHandle(new_worker_id).in_use = TRUE;
115 } else if (new_worker_id < MAX_THREADS) {
116 // reuse existing thread
117 MUTEX_LOCK(&(REMOTE_ThreadHandle(new_worker_id).tlock));
118 REMOTE_ThreadHandle(new_worker_id).in_use = TRUE;
119 } else {
120 new_worker_id = -1;
121 }
122 UNLOCK(GLOBAL_ThreadHandlesLock);
123 return new_worker_id;
124}
125
126
127static bool
128mboxCreate( Term namet, mbox_t *mboxp USES_REGS )
129{
130 pthread_mutex_t *mutexp;
131 pthread_cond_t *condp;
132 struct idb_queue *msgsp;
133
134 memset(mboxp, 0, sizeof(mbox_t));
135 condp = & mboxp->cond;
136 pthread_cond_init(condp, NULL);
137 mutexp = & mboxp->mutex;
138 pthread_mutex_init(mutexp, NULL);
139 msgsp = & mboxp->msgs;
140 mboxp->nmsgs = 0;
141 mboxp->nclients = 0;
142 Yap_init_tqueue(msgsp);
143 // match at the end, when everything is built.
144 mboxp->name = namet;
145 mboxp->open = true;
146 return true;
147}
148
149static bool
150mboxDestroy( mbox_t *mboxp USES_REGS )
151{
152 pthread_mutex_t *mutexp = &mboxp->mutex;
153 pthread_cond_t *condp = &mboxp->cond;
154 struct idb_queue *msgsp = &mboxp->msgs;
155 mboxp->open = false;
156 if (mboxp->nclients == 0 ) {
157 pthread_cond_destroy(condp);
158 pthread_mutex_destroy(mutexp);
159 Yap_destroy_tqueue(msgsp PASS_REGS);
160 // at this point, there is nothing left to unlock!
161 return true;
162 } else {
163 /* we have clients in the mailbox, try to wake them up one by one */
164 pthread_cond_broadcast(condp);
165 pthread_mutex_unlock(mutexp);
166 return true;
167 }
168}
169
170static bool
171mboxSend( mbox_t *mboxp, Term t USES_REGS )
172{
173 pthread_mutex_t *mutexp = &mboxp->mutex;
174 pthread_cond_t *condp = &mboxp->cond;
175 struct idb_queue *msgsp = &mboxp->msgs;
176
177 if (!mboxp->open) {
178 // oops, dead mailbox
179 return false;
180 }
181 Yap_enqueue_tqueue(msgsp, t PASS_REGS);
182 // printf("+ (%d) %d/%d\n", worker_id,mboxp->nclients, mboxp->nmsgs);
183 mboxp->nmsgs++;
184 pthread_cond_broadcast(condp);
185 pthread_mutex_unlock(mutexp);
186 return true;
187}
188
189static bool
190mboxReceive( mbox_t *mboxp, Term t USES_REGS )
191{
192 pthread_mutex_t *mutexp = &mboxp->mutex;
193 pthread_cond_t *condp = &mboxp->cond;
194 struct idb_queue *msgsp = &mboxp->msgs;
195 bool rc;
196
197 if (!mboxp->open){
198 return false; // don't try to read if someone else already closed down...
199 }
200 mboxp->nclients++;
201 do {
202 rc = mboxp->nmsgs && Yap_dequeue_tqueue(msgsp, t, false, true PASS_REGS);
203 if (rc) {
204 mboxp->nclients--;
205 mboxp->nmsgs--;
206 //printf("- (%d) %d/%d\n", worker_id,mboxp->nclients, mboxp->nmsgs);
207 // Yap_do_low_level_trace=1;
208 pthread_mutex_unlock(mutexp);
209 return true;
210 } else if (!mboxp->open) {
211 //printf("o (%d)\n", worker_id);
212 mboxp->nclients--;
213 if (!mboxp->nclients) {// release
214 pthread_cond_destroy(condp);
215 pthread_mutex_destroy(mutexp);
216 Yap_destroy_tqueue(msgsp PASS_REGS);
217 // at this point, there is nothing left to unlock!
218 } else {
219 pthread_cond_broadcast(condp);
220 pthread_mutex_unlock(mutexp);
221 }
222 return false;
223 } else {
224 pthread_cond_wait(condp, mutexp);
225 }
226 } while (!rc);
227 return rc;
228}
229
230static bool
231mboxPeek( mbox_t *mboxp, Term t USES_REGS )
232{
233 pthread_mutex_t *mutexp = &mboxp->mutex;
234 struct idb_queue *msgsp = &mboxp->msgs;
235 bool rc = Yap_dequeue_tqueue(msgsp, t, false, false PASS_REGS);
236 pthread_mutex_unlock(mutexp);
237 return rc;
238}
239
240static int
241store_specs(int new_worker_id, UInt ssize, UInt tsize, UInt sysize, Term tgoal, Term tdetach, Term texit)
242{
243 CACHE_REGS
244 UInt pm; /* memory to be requested */
245 Term tmod;
246
247 if (tsize < MinTrailSpace)
248 tsize = MinTrailSpace;
249 if (ssize < MinStackSpace)
250 ssize = MinStackSpace;
251 REMOTE_ThreadHandle(new_worker_id).ssize = ssize;
252 REMOTE_ThreadHandle(new_worker_id).tsize = tsize;
253 REMOTE_ThreadHandle(new_worker_id).sysize = sysize;
254
255 if ((REGSTORE *)pthread_getspecific(Yap_yaamregs_key)) {
256 REMOTE_c_input_stream(new_worker_id) = LOCAL_c_input_stream;
257 REMOTE_c_output_stream(new_worker_id) = LOCAL_c_output_stream;
258 REMOTE_c_error_stream(new_worker_id) = LOCAL_c_error_stream;
259 } else {
260 // thread is created by a thread that has never run Prolog
261 REMOTE_c_input_stream(new_worker_id) = REMOTE_c_input_stream(0);
262 REMOTE_c_output_stream(new_worker_id) = REMOTE_c_output_stream(0);
263 REMOTE_c_error_stream(new_worker_id) = REMOTE_c_error_stream(0);
264 }
265 pm = (ssize + tsize)*K1;
266 if (!(REMOTE_ThreadHandle(new_worker_id).stack_address = malloc(pm))) {
267 return FALSE;
268 }
269 REMOTE_ThreadHandle(new_worker_id).tgoal =
270 Yap_StoreTermInDB(Deref(tgoal), 7);
271
272 if (CurrentModule) {
273 REMOTE_ThreadHandle(new_worker_id).cmod =
274 CurrentModule;
275 } else {
276 REMOTE_ThreadHandle(new_worker_id).cmod = USER_MODULE;
277 }
278 tdetach = Deref(tdetach);
279 if (IsVarTerm(tdetach)){
280 REMOTE_ThreadHandle(new_worker_id).tdetach =
281 MkAtomTerm(AtomFalse);
282 } else {
283 REMOTE_ThreadHandle(new_worker_id).tdetach =
284 tdetach;
285 }
286 tmod = CurrentModule;
287 texit = Yap_StripModule(Deref(texit), &tmod);
288 if (IsAtomTerm(tmod)) {
289 REMOTE_ThreadHandle(new_worker_id).texit_mod = tmod;
290 } else {
291 Yap_Error(TYPE_ERROR_ATOM,tmod,"module in exit call should be an atom");
292 }
293 REMOTE_ThreadHandle(new_worker_id).texit =
294 Yap_StoreTermInDB(texit,7);
295 REMOTE_ThreadHandle(new_worker_id).local_preds =
296 NULL;
297 REMOTE_ThreadHandle(new_worker_id).start_of_timesp =
298 NULL;
299 REMOTE_ThreadHandle(new_worker_id).last_timep =
300 NULL;
301 REMOTE_ScratchPad(new_worker_id).ptr =
302 NULL;
303 // reset arena info
304 REMOTE_GlobalArena(new_worker_id) =0;
305 return TRUE;
306}
307
308
309static void
310kill_thread_engine (int wid, int always_die)
311{
312 Prop p0 = AbsPredProp(REMOTE_ThreadHandle(wid).local_preds);
313 GlobalEntry *gl = REMOTE_GlobalVariables(wid);
314
315 REMOTE_ThreadHandle(wid).local_preds = NIL;
316 REMOTE_GlobalVariables(wid) = NULL;
317 /* kill all thread local preds */
318 while(p0) {
319 PredEntry *ap = RepPredProp(p0);
320 p0 = ap->NextOfPE;
321 Yap_Abolish(ap);
322 Yap_FreeCodeSpace((char *)ap);
323 }
324 while (gl) {
325 gl->global = TermFoundVar;
326 gl = gl->NextGE;
327 }
328 Yap_KillStacks(wid);
329 REMOTE_Signals(wid) = 0L;
330 // must be done before relessing the memory used to store
331 // thread local time.
332 if (!always_die) {
333 /* called by thread itself */
334 GLOBAL_ThreadsTotalTime += Yap_cputime();
335 }
336 if (REMOTE_ScratchPad(wid).ptr)
337 free(REMOTE_ScratchPad(wid).ptr);
338// if (REMOTE_TmpPred(wid).ptr)
339// free(REMOTE_TmpPred(wid).ptr);
340 REMOTE_ThreadHandle(wid).current_yaam_regs = NULL;
341 if (REMOTE_ThreadHandle(wid).start_of_timesp)
342 free(REMOTE_ThreadHandle(wid).start_of_timesp);
343 if (REMOTE_ThreadHandle(wid).last_timep)
344 free(REMOTE_ThreadHandle(wid).last_timep);
345 if (REMOTE_ThreadHandle(wid).texit) {
346 Yap_FreeCodeSpace((ADDR)REMOTE_ThreadHandle(wid).texit);
347 }
348 /* FreeCodeSpace requires LOCAL requires yaam_regs */
349 free(REMOTE_ThreadHandle(wid).default_yaam_regs);
350 REMOTE_ThreadHandle(wid).default_yaam_regs = NULL;
351 LOCK(GLOBAL_ThreadHandlesLock);
352 GLOBAL_NOfThreads--;
353 UNLOCK(GLOBAL_ThreadHandlesLock);
354 MUTEX_LOCK(&(REMOTE_ThreadHandle(wid).tlock));
355 if (REMOTE_ThreadHandle(wid).tdetach == MkAtomTerm(AtomTrue) ||
356 always_die) {
357 REMOTE_ThreadHandle(wid).zombie = FALSE;
358 REMOTE_ThreadHandle(wid).in_use = FALSE;
359 }
360 MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
361}
362
363static void
364thread_die(int wid, int always_die)
365{
366 kill_thread_engine(wid, always_die);
367}
368
369static int
370setup_engine(int myworker_id, int init_thread)
371{
372 CACHE_REGS
373 REGSTORE *standard_regs;
374
375 standard_regs = (REGSTORE *)calloc(1,sizeof(REGSTORE));
376 if (!standard_regs)
377 return FALSE;
378 regcache = standard_regs;
379 /* create the YAAM descriptor */
380 REMOTE_ThreadHandle(myworker_id).default_yaam_regs = standard_regs;
381 REMOTE_ThreadHandle(myworker_id).current_yaam_regs = standard_regs;
382 Yap_InitExStacks(myworker_id, REMOTE_ThreadHandle(myworker_id).tsize, REMOTE_ThreadHandle(myworker_id).ssize);
383 REMOTE_SourceModule(myworker_id) = CurrentModule = REMOTE_ThreadHandle(myworker_id).cmod;
384 // create a mbox
385 mboxCreate( MkIntTerm(myworker_id), &REMOTE_ThreadHandle(myworker_id).mbox_handle PASS_REGS );
386 Yap_InitTime( myworker_id );
387 Yap_InitYaamRegs( myworker_id, true] );
388 REFRESH_CACHE_REGS
389 Yap_ReleasePreAllocCodeSpace(Yap_PreAllocCodeSpace());
390 /* I exist */
391 GLOBAL_NOfThreadsCreated++;
392 GLOBAL_NOfThreads++;
393 MUTEX_UNLOCK(&(REMOTE_ThreadHandle(myworker_id).tlock));
394#ifdef TABLING
395 new_dependency_frame(REMOTE_top_dep_fr(myworker_id), FALSE, NULL, NULL, B, NULL, FALSE, NULL); /* same as in Yap_init_root_frames() */
396#endif /* TABLING */
397 return TRUE;
398}
399
400static void
401start_thread(int myworker_id)
402{
403 CACHE_REGS
404 pthread_setspecific(Yap_yaamregs_key, (void *)REMOTE_ThreadHandle(myworker_id).default_yaam_regs);
405 REFRESH_CACHE_REGS;
406 worker_id = myworker_id;
407 LOCAL = REMOTE(myworker_id);
408}
409
410static void *
411thread_run(void *widp)
412{
413 CACHE_REGS
414 Term tgoal, t;
415 Term tgs[2];
416 int myworker_id = *((int *)widp);
417#ifdef OUTPUT_THREADS_TABLING
418 char thread_name[25];
419 char filename[MAX_PATH];
420
421 sprintf(thread_name, "/thread_output_%d", myworker_id);
422 strcpy(filename, YAP_BINDIR);
423 strncat(filename, thread_name, 25);
424 REMOTE_thread_output(myworker_id) = fopen(filename, "w");
425#endif /* OUTPUT_THREADS_TABLING */
426 start_thread(myworker_id);
427 REFRESH_CACHE_REGS;
428 do {
429 t = tgs[0] = Yap_PopTermFromDB(LOCAL_ThreadHandle.tgoal);
430 if (t == 0) {
431 if (LOCAL_Error_TYPE == RESOURCE_ERROR_ATTRIBUTED_VARIABLES) {
432 LOCAL_Error_TYPE = YAP_NO_ERROR;
433 if (!Yap_growglobal(NULL)) {
434 Yap_Error(RESOURCE_ERROR_ATTRIBUTED_VARIABLES, TermNil, LOCAL_ErrorMessage);
435 thread_die(worker_id, FALSE);
436 return NULL;
437 }
438 } else {
439 LOCAL_Error_TYPE = YAP_NO_ERROR;
440 if (!Yap_growstack(LOCAL_ThreadHandle.tgoal->NOfCells*CellSize)) {
441 Yap_Error(RESOURCE_ERROR_STACK, TermNil, LOCAL_ErrorMessage);
442 thread_die(worker_id, FALSE);
443 return NULL;
444 }
445 }
446 }
447 } while (t == 0);
448 REMOTE_ThreadHandle(myworker_id).tgoal = NULL;
449 tgs[1] = LOCAL_ThreadHandle.tdetach;
450 tgoal = Yap_MkApplTerm(FunctorThreadRun, 2, tgs);
451 Yap_RunTopGoal(tgoal);
452#ifdef TABLING
453 {
454 tab_ent_ptr tab_ent;
455
456 tab_ent = GLOBAL_root_tab_ent;
457 while (tab_ent) {
458 abolish_table(tab_ent);
459 tab_ent = TabEnt_next(tab_ent);
460 }
461 FREE_DEPENDENCY_FRAME(REMOTE_top_dep_fr(worker_id));
462 REMOTE_top_dep_fr(worker_id) = NULL;
463#ifdef USE_PAGES_MALLOC
464 DETACH_PAGES(_pages_void);
465#endif /* USE_PAGES_MALLOC */
466 DETACH_PAGES(_pages_tab_ent);
467#if defined(THREADS_FULL_SHARING) || defined(THREADS_CONSUMER_SHARING)
468 DETACH_PAGES(_pages_sg_ent);
469#endif /* THREADS_FULL_SHARING || THREADS_CONSUMER_SHARING */
470 DETACH_PAGES(_pages_sg_fr);
471 DETACH_PAGES(_pages_dep_fr);
472 DETACH_PAGES(_pages_sg_node);
473 DETACH_PAGES(_pages_sg_hash);
474 DETACH_PAGES(_pages_ans_node);
475 DETACH_PAGES(_pages_ans_hash);
476#if defined(THREADS_FULL_SHARING)
477 DETACH_PAGES(_pages_ans_ref_node);
478#endif /* THREADS_FULL_SHARING */
479 DETACH_PAGES(_pages_gt_node);
480 DETACH_PAGES(_pages_gt_hash);
481#ifdef OUTPUT_THREADS_TABLING
482 fclose(LOCAL_thread_output);
483#endif /* OUTPUT_THREADS_TABLING */
484
485 }
486#endif /* TABLING */
487 thread_die(worker_id, FALSE);
488 return NULL;
489}
490
491static Int
492p_thread_new_tid( USES_REGS1 )
493{
494 int new_worker = allocate_new_tid();
495 if (new_worker == -1) {
496 Yap_Error(RESOURCE_ERROR_MAX_THREADS, MkIntegerTerm(MAX_THREADS), "");
497 return FALSE;
498 }
499 return Yap_unify(MkIntegerTerm(new_worker), ARG1);
500}
501
502static int
503init_thread_engine(int new_worker_id, UInt ssize, UInt tsize, UInt sysize, Term tgoal, Term tdetach, Term texit)
504{
505 return store_specs(new_worker_id, ssize, tsize, sysize, tgoal, tdetach, texit);
506}
507
508static Int
509p_create_thread( USES_REGS1 )
510{
511 UInt ssize;
512 UInt tsize;
513 UInt sysize;
514 Term x2 = Deref(ARG2);
515 Term x3 = Deref(ARG3);
516 Term x4 = Deref(ARG4);
517 int new_worker_id = IntegerOfTerm(Deref(ARG7)),
518 owid = worker_id;
519
520 // fprintf(stderr," %d --> %d\n", worker_id, new_worker_id);
521 if (IsBigIntTerm(x2))
522 return FALSE;
523 if (IsBigIntTerm(x3))
524 return FALSE;
525 ssize = IntegerOfTerm(x2);
526 tsize = IntegerOfTerm(x3);
527 sysize = IntegerOfTerm(x4);
528 /* UInt systemsize = IntegerOfTerm(Deref(ARG4)); */
529 if (new_worker_id == -1) {
530 /* YAP ERROR */
531 return FALSE;
532 }
533 /* make sure we can proceed */
534 if (!init_thread_engine(new_worker_id, ssize, tsize, sysize, ARG1, ARG5, ARG6))
535 return FALSE;
536 //REMOTE_ThreadHandle(new_worker_id).pthread_handle = 0L;
537 REMOTE_ThreadHandle(new_worker_id).id = new_worker_id;
538 REMOTE_ThreadHandle(new_worker_id).ref_count = 1;
539 setup_engine(new_worker_id, FALSE);
540 if ((REMOTE_ThreadHandle(new_worker_id).ret = pthread_create(&REMOTE_ThreadHandle(new_worker_id).pthread_handle, NULL, thread_run, (void *)(&(REMOTE_ThreadHandle(new_worker_id).id)))) == 0) {
541 pthread_setspecific(Yap_yaamregs_key, (const void *)REMOTE_ThreadHandle(owid).current_yaam_regs);
542 /* wait until the client is initialized */
543 return TRUE;
544 }
545 pthread_setspecific(Yap_yaamregs_key, (const void *)REMOTE_ThreadHandle(owid).current_yaam_regs);
546 return FALSE;
547}
548
549static Int
550p_thread_sleep( USES_REGS1 )
551{
552 UInt time = IntegerOfTerm(Deref(ARG1));
553#if HAVE_NANOSLEEP
554 UInt ntime = IntegerOfTerm(Deref(ARG2));
555 struct timespec req, oreq ;
556 req.tv_sec = time;
557 req.tv_nsec = ntime;
558 if (nanosleep(&req, &oreq)) {
559#if HAVE_STRERROR
560 Yap_Error(SYSTEM_ERROR_OPERATING_SYSTEM, ARG1, "%s in thread_sleep/1", strerror(errno));
561#else
562 Yap_Error(SYSTEM_ERROR_OPERATING_SYSTEM, ARG1, "error %d in thread_sleep/1", errno);
563#endif
564 return FALSE;
565 }
566 return Yap_unify(ARG3,MkIntegerTerm(oreq.tv_sec)) &&
567 Yap_unify(ARG4,MkIntegerTerm(oreq.tv_nsec));
568#elif HAVE_SLEEP
569 UInt rtime;
570 if ((rtime = sleep(time)) < 0) {
571#if HAVE_STRERROR
572 Yap_Error(SYSTEM_ERROR_OPERATING_SYSTEM, ARG1, "%s in thread_sleep/1", strerror(errno));
573#else
574 Yap_Error(SYSTEM_ERROR_OPERATING_SYSTEM, ARG1, "error %d in thread_sleep/1", errno);
575#endif
576 }
577 return Yap_unify(ARG3,MkIntegerTerm(rtime)) &&
578 Yap_unify(ARG4,MkIntTerm(0L));
579#else
580 Yap_Error(SYSTEM_ERROR_OPERATING_SYSTEM, ARG1, "no support for thread_sleep/1 in this YAP configuration");
581#endif
582}
583
584static Int
585p_thread_self( USES_REGS1 )
586{
587 if (pthread_getspecific(Yap_yaamregs_key) == NULL)
588 return Yap_unify(MkIntegerTerm(-1), ARG1);
589 return Yap_unify(MkIntegerTerm(worker_id), ARG1);
590}
591
592
593static Int
594p_thread_zombie_self( USES_REGS1 )
595{
596 /* make sure the lock is available */
597 if (pthread_getspecific(Yap_yaamregs_key) == NULL)
598 return Yap_unify(MkIntegerTerm(-1), ARG1);
599 if (Yap_has_signal( YAP_ITI_SIGNAL )) {
600 return FALSE;
601 }
602 // fprintf(stderr," -- %d\n", worker_id);
603 LOCAL_ThreadHandle.in_use = FALSE;
604 LOCAL_ThreadHandle.zombie = TRUE;
605 MUTEX_UNLOCK(&(LOCAL_ThreadHandle.tlock));
606 return Yap_unify(MkIntegerTerm(worker_id), ARG1);
607}
608
609static Int
610p_thread_status_lock( USES_REGS1 )
611{
612 /* make sure the lock is available */
613 if (pthread_getspecific(Yap_yaamregs_key) == NULL)
614 return FALSE;
615 MUTEX_LOCK(&(LOCAL_ThreadHandle.tlock_status));
616 return Yap_unify(MkIntegerTerm(worker_id), ARG1);
617}
618
619static Int
620p_thread_status_unlock( USES_REGS1 )
621{
622 /* make sure the lock is available */
623 if (pthread_getspecific(Yap_yaamregs_key) == NULL)
624 return FALSE;
625 MUTEX_UNLOCK(&(LOCAL_ThreadHandle.tlock_status));
626 return Yap_unify(MkIntegerTerm(worker_id), ARG1);
627}
628
629Int
630Yap_thread_self(void)
631{
632 CACHE_REGS
633 if (pthread_getspecific(Yap_yaamregs_key) == NULL)
634 return -1;
635 return worker_id;
636}
637
638CELL
639Yap_thread_create_engine(YAP_thread_attr *ops)
640{
641 YAP_thread_attr opsv;
642 int new_id = allocate_new_tid();
643 Term t = TermNil;
644
645 /*
646 ok, this creates a problem, because we are initializing an engine from
647 some "empty" thread.
648 We need first to fool the thread into believing it is the main thread
649 */
650 if (new_id == -1) {
651 /* YAP ERROR */
652 return -1;
653 }
654 if (ops == NULL) {
655 ops = &opsv;
656 ops->tsize = DefHeapSpace;
657 ops->ssize = DefStackSpace;
658 ops->sysize = 0;
659 ops->egoal = t;
660 }
661 if (!pthread_equal(pthread_self() , GLOBAL_master_thread) ) {
662 /* we are worker_id 0 for now, lock master thread so that no one messes with us */
663 pthread_setspecific(Yap_yaamregs_key, (const void *)&Yap_standard_regs);
664 MUTEX_LOCK(&(REMOTE_ThreadHandle(0).tlock));
665 }
666 if (!init_thread_engine(new_id, ops->ssize, ops->tsize, ops->sysize, t, t, (ops->egoal)))
667 return -1;
668 //REMOTE_ThreadHandle(new_id).pthread_handle = 0L;
669 REMOTE_ThreadHandle(new_id).id = new_id;
670 REMOTE_ThreadHandle(new_id).ref_count = 0;
671 if (!setup_engine(new_id, FALSE))
672 return -1;
673 if (!pthread_equal(pthread_self(), GLOBAL_master_thread)) {
674 pthread_setspecific(Yap_yaamregs_key, NULL);
675 MUTEX_UNLOCK(&(REMOTE_ThreadHandle(0).tlock));
676 }
677 return new_id;
678}
679
680Int
681Yap_thread_attach_engine(int wid)
682{
683 /*
684 already locked
685 MUTEX_LOCK(&(REMOTE_ThreadHandle(wid).tlock));
686 */
687 if (REMOTE_ThreadHandle(wid).ref_count ) {
688 REMOTE_ThreadHandle(wid).ref_count++;
689 REMOTE_ThreadHandle(wid).pthread_handle = pthread_self();
690 MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
691 return TRUE;
692 }
693 REMOTE_ThreadHandle(wid).pthread_handle = pthread_self();
694 REMOTE_ThreadHandle(wid).ref_count++;
695 pthread_setspecific(Yap_yaamregs_key, (const void *)REMOTE_ThreadHandle(wid).current_yaam_regs);
696 MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
697 return TRUE;
698}
699
700Int
701Yap_thread_detach_engine(int wid)
702{
703 MUTEX_LOCK(&(REMOTE_ThreadHandle(wid).tlock));
704 //REMOTE_ThreadHandle(wid).pthread_handle = 0;
705 REMOTE_ThreadHandle(wid).ref_count--;
706 pthread_setspecific(Yap_yaamregs_key, NULL);
707 MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
708 return TRUE;
709}
710
711Int
712Yap_thread_destroy_engine(int wid)
713{
714 MUTEX_LOCK(&(REMOTE_ThreadHandle(wid).tlock));
715 if (REMOTE_ThreadHandle(wid).ref_count == 0) {
716 kill_thread_engine(wid, TRUE);
717 return TRUE;
718 } else {
719 MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
720 return FALSE;
721 }
722}
723
724
725static Int
726p_thread_join( USES_REGS1 )
727{
728 Int tid = IntegerOfTerm(Deref(ARG1));
729 pthread_t thread;
730
731 MUTEX_LOCK(&(REMOTE_ThreadHandle(tid).tlock));
732 if (!(REMOTE_ThreadHandle(tid).in_use ||
733 REMOTE_ThreadHandle(tid).zombie)) {
734 // he's dead, jim
735 MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
736 return FALSE;
737 }
738 if (!REMOTE_ThreadHandle(tid).tdetach == MkAtomTerm(AtomTrue)) {
739 MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
740 return FALSE;
741 }
742 thread = REMOTE_ThreadHandle(tid).pthread_handle;
743 MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
744 /* make sure this lock is accessible */
745 if (pthread_join(thread, NULL) < 0) {
746 /* ERROR */
747 return FALSE;
748 }
749 return TRUE;
750}
751
752static Int
753p_thread_destroy( USES_REGS1 )
754{
755 Int tid = IntegerOfTerm(Deref(ARG1));
756
757 MUTEX_LOCK(&(REMOTE_ThreadHandle(tid).tlock));
758 REMOTE_ThreadHandle(tid).zombie = FALSE;
759 REMOTE_ThreadHandle(tid).in_use = FALSE;
760 MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
761 return TRUE;
762}
763
764static Int
765p_thread_detach( USES_REGS1 )
766{
767 Int tid = IntegerOfTerm(Deref(ARG1));
768 MUTEX_LOCK(&(REMOTE_ThreadHandle(tid).tlock));
769 if (pthread_detach(REMOTE_ThreadHandle(tid).pthread_handle) < 0) {
770 /* ERROR */
771 MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
772 return FALSE;
773 }
774 REMOTE_ThreadHandle(tid).tdetach =
775 MkAtomTerm(AtomTrue);
776 MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
777 return TRUE;
778}
779
780static Int
781p_thread_detached( USES_REGS1 )
782{
783 if (LOCAL_ThreadHandle.tdetach)
784 return Yap_unify(ARG1,LOCAL_ThreadHandle.tdetach);
785 else
786 return FALSE;
787}
788
789static Int
790p_thread_detached2( USES_REGS1 )
791{
792 Int tid = IntegerOfTerm(Deref(ARG1));
793 if (REMOTE_ThreadHandle(tid).tdetach)
794 return Yap_unify(ARG2,REMOTE_ThreadHandle(tid).tdetach);
795 else
796 return FALSE;
797}
798
799static Int
800p_thread_exit( USES_REGS1 )
801{
802 thread_die(worker_id, FALSE);
803 pthread_exit(NULL);
804 /* done, just make gcc happy */
805 return TRUE;
806}
807
808static Int
809p_thread_set_concurrency( USES_REGS1 )
810{
811#if HAVE_PTHREAD_GETCONCURRENCY
812 int newc;
813 int cur;
814 Term tnew = Deref(ARG2);
815 if (IsVarTerm(tnew)) {
816 newc = 0;
817 } else if (IsIntegerTerm(tnew)) {
818 newc = IntegerOfTerm(tnew);
819 } else {
820 Yap_Error(TYPE_ERROR_INTEGER,tnew,"thread_set_concurrency/2");
821 return(FALSE);
822 }
823 cur = MkIntegerTerm(pthread_getconcurrency());
824 if (pthread_setconcurrency(newc) != 0) {
825 return FALSE;
826 }
827 return Yap_unify(ARG1, MkIntegerTerm(cur));
828#else
829 return FALSE;
830#endif
831}
832
833static Int
834p_thread_yield( USES_REGS1 )
835{
836 if (sched_yield() != 0) {
837 return FALSE;
838 }
839 return TRUE;
840}
841
842static Int
843p_valid_thread( USES_REGS1 )
844{
845 Int i = IntegerOfTerm(Deref(ARG1));
846 return REMOTE_ThreadHandle(i).in_use || REMOTE_ThreadHandle(i).zombie;
847}
848
849/* Mutex Support */
850
851typedef struct swi_mutex {
852 UInt owners;
853 Int tid_own;
854 MutexEntry *alias;
855 pthread_mutex_t m;
856 UInt timestamp;
857 struct swi_mutex *backbone; // chain of all mutexes
858 struct swi_mutex *prev, *next; // chain of locked mutexes
859} SWIMutex;
860
861static SWIMutex *NewMutex(void) {
862 SWIMutex* mutp;
863 pthread_mutexattr_t mat;
864#if defined(HAVE_PTHREAD_MUTEXATTR_SETKIND_NP) && !defined(__MINGW32__)
865 extern int pthread_mutexattr_setkind_np(pthread_mutexattr_t *attr, int kind);
866#endif
867
868 LOCK(GLOBAL_MUT_ACCESS);
869 mutp = GLOBAL_FreeMutexes;
870 while (mutp) {
871 if ((Int)(mutp->owners) < 0) {
872 // just making sure
873 break;
874 }
875 mutp = mutp->next;
876 }
877 if (mutp == NULL) {
878 mutp = (SWIMutex *)Yap_AllocCodeSpace(sizeof(SWIMutex));
879 if (mutp == NULL) {
880 UNLOCK(GLOBAL_MUT_ACCESS);
881 return NULL;
882 } else {
883 pthread_mutexattr_init(&mat);
884 mutp->timestamp = 0;
885#if defined(HAVE_PTHREAD_MUTEXATTR_SETKIND_NP) && !defined(__MINGW32__)
886 pthread_mutexattr_setkind_np(&mat, PTHREAD_MUTEX_RECURSIVE_NP);
887#else
888#ifdef HAVE_PTHREAD_MUTEXATTR_SETTYPE
889 pthread_mutexattr_settype(&mat, PTHREAD_MUTEX_RECURSIVE);
890#endif
891#endif
892 pthread_mutex_init(&mutp->m, &mat);
893 }
894 mutp->backbone = GLOBAL_mutex_backbone;
895 GLOBAL_mutex_backbone = mutp;
896 } else {
897 // reuse existing mutex
898 mutp->timestamp++;
899 }
900 mutp->owners = 0;
901 mutp->tid_own = 0;
902 mutp->alias = NIL;
903 UNLOCK(GLOBAL_MUT_ACCESS);
904 return mutp;
905}
906
907#define MutexOfTerm(t) MutexOfTerm__(t PASS_REGS)
908
909static SWIMutex *MutexOfTerm__(Term t USES_REGS){
910 Term t1 = Deref(t);
911 SWIMutex *mut = NULL;
912
913 if (IsVarTerm(t1)) {
914 Yap_Error(INSTANTIATION_ERROR, t1, "mutex operation");
915 return NULL;
916 } else if (IsApplTerm(t1) && FunctorOfTerm(t1) == FunctorMutex) {
917 mut = AddressOfTerm(ArgOfTerm(1,t1));
918 if ((Int)(mut->owners) < 0 ||
919 IntegerOfTerm(ArgOfTerm(2,t1)) != mut->timestamp) {
920 Yap_Error(EXISTENCE_ERROR_MUTEX, t1, "mutex access");
921 return NULL;
922 }
923 } else if (IsAtomTerm(t1)) {
924 mut = Yap_GetMutexFromProp(AtomOfTerm(t1));
925 if (!mut) {
926 mut = NewMutex();
927 if ( !Yap_PutAtomMutex( AtomOfTerm(t1), mut ) ) {
928 return NULL;
929 }
930 }
931 }
932 return mut;
933}
934
935static Int
936p_new_mutex( USES_REGS1 ){
937 SWIMutex* mutp;
938 Term t1;
939 if (IsVarTerm((t1 = Deref(ARG1)))) {
940 Term ts[2];
941
942 if (!(mutp = NewMutex()))
943 return FALSE;
944 ts[0] = MkAddressTerm(mutp);
945 ts[1] = MkIntegerTerm(mutp->timestamp);
946 if (Yap_unify(ARG1, Yap_MkApplTerm(FunctorMutex, 2, ts) ) ) {
947 return TRUE;
948 }
949 Yap_Error(UNINSTANTIATION_ERROR, t1, "mutex_create on an existing mutex");
950 return FALSE;
951 } else if(IsAtomTerm(t1)) {
952 if (!(mutp = NewMutex()))
953 return FALSE;
954 return Yap_PutAtomMutex( AtomOfTerm(t1), mutp );
955 } else if (IsApplTerm(t1) && FunctorOfTerm(t1) == FunctorMutex) {
956 Yap_Error(UNINSTANTIATION_ERROR, t1, "mutex_create on an existing mutex");
957 return FALSE;
958 }
959 return FALSE;
960}
961
967static Int p_destroy_mutex( USES_REGS1 )
968{
969 SWIMutex *mut = MutexOfTerm(Deref(ARG1));
970 if (!mut)
971 return FALSE;
972 if (pthread_mutex_destroy(&mut->m) < 0)
973 return FALSE;
974 if (mut->alias) {
975 mut->alias->Mutex = NULL;
976 }
977 mut->owners = -1;
978 mut->tid_own = -1;
979 LOCK(GLOBAL_MUT_ACCESS);
980 if (GLOBAL_FreeMutexes)
981 mut->prev = GLOBAL_FreeMutexes->prev;
982 else
983 mut->prev = NULL;
984 mut->next = GLOBAL_FreeMutexes;
985 GLOBAL_FreeMutexes = mut;
986 UNLOCK(GLOBAL_MUT_ACCESS);
987 return TRUE;
988}
989
990static bool
991LockMutex( SWIMutex *mut USES_REGS)
992{
993#if DEBUG_LOCKS
994 MUTEX_LOCK(&mut->m);
995#else
996 if (MUTEX_LOCK(&mut->m) < 0)
997 return FALSE;
998#endif
999 mut->owners++;
1000 mut->tid_own = worker_id;
1001 if (LOCAL_Mutexes)
1002 mut->prev = LOCAL_Mutexes->prev;
1003 else
1004 mut->prev = NULL;
1005 mut->next = LOCAL_Mutexes;
1006 LOCAL_Mutexes = NULL;
1007 return true;
1008}
1009
1010static bool
1011UnLockMutex( SWIMutex *mut USES_REGS)
1012{
1013#if DEBUG_LOCKS
1014 MUTEX_UNLOCK(&mut->m);
1015#else
1016 if (MUTEX_UNLOCK(&mut->m) < 0)
1017 return FALSE;
1018#endif
1019 mut->owners--;
1020 if (mut->prev) {
1021 mut->prev->next = mut->next;
1022 } else {
1023 LOCAL_Mutexes = mut->next;
1024 if (mut->next)
1025 mut->next->prev = NULL;
1026 }
1027 if (mut->next)
1028 mut->next->prev = mut->prev;
1029 return true;
1030}
1031
1053static Int
1054p_lock_mutex( USES_REGS1 )
1055{
1056 SWIMutex *mut = MutexOfTerm(Deref(ARG1));
1057 if (!mut || !LockMutex( mut PASS_REGS))
1058 return FALSE;
1059 return TRUE;
1060}
1061
1070static Int
1071p_trylock_mutex( USES_REGS1 )
1072{
1073 SWIMutex *mut = MutexOfTerm(Deref(ARG1));
1074 if (!mut)
1075 return FALSE;
1076
1077 if (MUTEX_TRYLOCK(&mut->m) == EBUSY)
1078 return FALSE;
1079 mut->owners++;
1080 mut->tid_own = worker_id;
1081 return TRUE;
1082}
1083
1093static Int
1094p_unlock_mutex( USES_REGS1 )
1095{
1096 SWIMutex *mut = MutexOfTerm(Deref(ARG1));
1097 if (!mut || !UnLockMutex( mut PASS_REGS))
1098 return FALSE;
1099 return TRUE;
1100}
1101
1118static Int
1119p_with_mutex( USES_REGS1 )
1120{
1121 Term excep;
1122 Int rc = FALSE;
1123 Int creeping = Yap_get_signal(YAP_CREEP_SIGNAL);
1124 PredEntry *pe;
1125 Term tm = CurrentModule;
1126 Term tg = Deref(ARG2);
1127 SWIMutex *mut = MutexOfTerm( ARG1 );
1128
1129 if (!mut || !LockMutex(mut PASS_REGS)) {
1130 return FALSE;
1131 }
1132
1133 tg = Yap_StripModule(tg, &tm);
1134 if (IsVarTerm(tg)) {
1135 Yap_Error(INSTANTIATION_ERROR, ARG2, "with_mutex/2");
1136 goto end;
1137 } else if (IsApplTerm(tg)) {
1138 register Functor f = FunctorOfTerm(tg);
1139 register CELL *pt;
1140 size_t i, arity;
1141
1142 f = FunctorOfTerm(tg);
1143 if (IsExtensionFunctor(f)) {
1144 Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2");
1145 goto end;
1146 }
1147 arity = ArityOfFunctor(f);
1148 if (arity > MaxTemps) {
1149 Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2");
1150 goto end;
1151 }
1152 pe = RepPredProp(PredPropByFunc(f, tm));
1153 pt = RepAppl(tg)+1;
1154 for (i= 0; i < arity; i++ )
1155 XREGS[i+1] = pt[i];
1156 } else if (IsAtomTerm(tg)) {
1157 pe = RepPredProp(PredPropByAtom(AtomOfTerm(tg), tm));
1158 } else if (IsPairTerm(tg)) {
1159 register CELL *pt;
1160 Functor f;
1161
1162 f = FunctorDot;
1163 pe = RepPredProp(PredPropByFunc(f, tm));
1164 pt = RepPair(tg);
1165 XREGS[1] = pt[0];
1166 XREGS[2] = pt[1];
1167 } else {
1168 Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2");
1169 goto end;
1170 }
1171 if (
1172 pe->OpcodeOfPred != FAIL_OPCODE &&
1173 Yap_execute_pred(pe, NULL, true PASS_REGS) ) {
1174 rc = TRUE;
1175 }
1176 end:
1177 excep = Yap_GetException();
1178 if ( !UnLockMutex(mut PASS_REGS) ) {
1179 return FALSE;
1180 }
1181 if (creeping) {
1182 Yap_signal( YAP_CREEP_SIGNAL );
1183 } else if ( excep != 0) {
1184 return Yap_JumpToEnv(excep);
1185 }
1186 return rc;
1187}
1188
1189
1190static Int
1191p_with_with_mutex( USES_REGS1 )
1192{
1193 if (GLOBAL_WithMutex == NULL) {
1194 p_new_mutex( PASS_REGS1 );
1195 GLOBAL_WithMutex = (SWIMutex*)IntegerOfTerm(Deref(ARG1));
1196 } else {
1197 ARG1 = MkIntegerTerm((Int)GLOBAL_WithMutex);
1198 }
1199 return p_lock_mutex( PASS_REGS1 );
1200}
1201
1202static Int
1203p_unlock_with_mutex( USES_REGS1 )
1204{
1205 ARG1 = MkIntegerTerm((Int)GLOBAL_WithMutex);
1206 return p_unlock_mutex( PASS_REGS1 );
1207}
1208
1209static Int
1210p_mutex_info( USES_REGS1 )
1211{
1212 SWIMutex *mut = MutexOfTerm(Deref(ARG1));
1213 if (!mut)
1214 return FALSE;
1215
1216 return Yap_unify(ARG2, MkIntegerTerm(mut->owners)) &&
1217 Yap_unify(ARG3, MkIntegerTerm(mut->tid_own));
1218 return TRUE;
1219}
1220
1221typedef struct {
1222 UInt indx;
1223 mbox_t mbox;
1224} counted_mbox;
1225
1226 static Int
1227 p_mbox_create( USES_REGS1 )
1228 {
1229 Term namet = Deref(ARG1);
1230 mbox_t* mboxp = GLOBAL_named_mboxes;
1231
1232 if (IsVarTerm(namet)) {
1233 AtomEntry *ae;
1234 int new;
1235 mbox_t mbox;
1236
1237 ae = Yap_lookupBlob(&mbox, sizeof(mbox), &PL_Message_Queue, &new);
1238 namet = MkAtomTerm(RepAtom(ae));
1239 mboxp = (mbox_t *)(ae->rep.blob[0].data);
1240 Yap_unify(ARG1, namet);
1241 LOCK(GLOBAL_mboxq_lock);
1242 } else if (IsAtomTerm(namet)) {
1243 LOCK(GLOBAL_mboxq_lock);
1244 while( mboxp && mboxp->name != namet)
1245 mboxp = mboxp->next;
1246 if (mboxp) {
1247 UNLOCK(GLOBAL_mboxq_lock);
1248 return FALSE;
1249 }
1250 mboxp = (mbox_t *)Yap_AllocCodeSpace(sizeof(mbox_t));
1251 if (mboxp == NULL) {
1252 UNLOCK(GLOBAL_mboxq_lock);
1253 return FALSE;
1254 }
1255 // global mbox, for now we'll just insert in list
1256 mboxp->next = GLOBAL_named_mboxes;
1257 GLOBAL_named_mboxes = mboxp;
1258 }
1259 bool rc = mboxCreate( namet, mboxp PASS_REGS );
1260 UNLOCK(GLOBAL_mboxq_lock);
1261 return rc;
1262 }
1263
1264
1265static Int
1266p_mbox_destroy( USES_REGS1 )
1267{
1268 Term namet = Deref(ARG1);
1269 mbox_t* mboxp = GLOBAL_named_mboxes, *prevp;
1270
1271 if (IsVarTerm(namet) )
1272 return FALSE;
1273 if (IsIntTerm(namet) ) {
1274 return FALSE;
1275 }
1276 LOCK(GLOBAL_mboxq_lock);
1277 prevp = NULL;
1278 while( mboxp && mboxp->name != namet) {
1279 prevp = mboxp;
1280 mboxp = mboxp->next;
1281 }
1282 if (!mboxp) {
1283 UNLOCK(GLOBAL_mboxq_lock);
1284 return FALSE;
1285 }
1286 if (mboxp == GLOBAL_named_mboxes) {
1287 GLOBAL_named_mboxes = mboxp->next;
1288 } else {
1289 prevp->next = mboxp->next;
1290 }
1291 UNLOCK(GLOBAL_mboxq_lock);
1292 mboxDestroy(mboxp PASS_REGS);
1293 Yap_FreeCodeSpace( (char *)mboxp );
1294 return TRUE;
1295 }
1296
1297 static mbox_t*
1298 getMbox(Term t)
1299 {
1300 mbox_t* mboxp;
1301
1302 if (IsAtomTerm(t=Deref(t))) {
1303 Atom at = AtomOfTerm(t);
1304 LOCK(GLOBAL_mboxq_lock);
1305 if (IsBlob(at)) {
1306 mboxp = (mbox_t *)(RepAtom(at)->rep.blob[0].data);
1307 } else {
1308 mboxp = GLOBAL_named_mboxes;
1309 while( mboxp && mboxp->name != t) {
1310 mboxp = mboxp->next;
1311 }
1312 }
1313 if (!mboxp->open)
1314 mboxp = NULL;
1315 if (mboxp) {
1316 pthread_mutex_lock(& mboxp->mutex);
1317 }
1318 UNLOCK(GLOBAL_mboxq_lock);
1319 } else if (IsIntTerm(t)) {
1320 int wid = IntOfTerm(t);
1321 if (REMOTE(wid) &&
1322 (REMOTE_ThreadHandle(wid).in_use || REMOTE_ThreadHandle(wid).zombie))
1323 {
1324 return &REMOTE_ThreadHandle(wid).mbox_handle;
1325 } else {
1326 return NULL;
1327 }
1328 if (!mboxp->open)
1329 mboxp = NULL;
1330 if (mboxp) {
1331 pthread_mutex_lock(& mboxp->mutex);
1332 }
1333 } else {
1334 return NULL;
1335 }
1336 return mboxp;
1337 }
1338
1339
1340 static Int
1341 p_mbox_send( USES_REGS1 )
1342 {
1343 Term namet = Deref(ARG1);
1344 mbox_t* mboxp = getMbox(namet) ;
1345
1346 if (!mboxp)
1347 return FALSE;
1348 return mboxSend(mboxp, Deref(ARG2) PASS_REGS);
1349 }
1350
1351 static Int
1352 p_mbox_size( USES_REGS1 )
1353 {
1354 Term namet = Deref(ARG1);
1355 mbox_t* mboxp = getMbox(namet) ;
1356
1357 if (!mboxp)
1358 return FALSE;
1359 return Yap_unify( ARG2, MkIntTerm(mboxp->nmsgs));
1360 }
1361
1362
1363 static Int
1364 p_mbox_receive( USES_REGS1 )
1365 {
1366 Term namet = Deref(ARG1);
1367 mbox_t* mboxp = getMbox(namet) ;
1368
1369 if (!mboxp)
1370 return FALSE;
1371 return mboxReceive(mboxp, Deref(ARG2) PASS_REGS);
1372 }
1373
1374
1375 static Int
1376 p_mbox_peek( USES_REGS1 )
1377 {
1378 Term namet = Deref(ARG1);
1379 mbox_t* mboxp = getMbox(namet) ;
1380
1381 if (!mboxp)
1382 return FALSE;
1383 return mboxPeek(mboxp, Deref(ARG2) PASS_REGS);
1384 }
1385
1386static Int
1387p_cond_create( USES_REGS1 )
1388{
1389 pthread_cond_t* condp;
1390
1391 condp = (pthread_cond_t *)Yap_AllocCodeSpace(sizeof(pthread_cond_t));
1392 if (condp == NULL) {
1393 return FALSE;
1394 }
1395 pthread_cond_init(condp, NULL);
1396 return Yap_unify(ARG1, MkIntegerTerm((Int)condp));
1397}
1398
1399static Int
1400 p_cond_destroy( USES_REGS1 )
1401 {
1402 pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1));
1403
1404 if (pthread_cond_destroy(condp) < 0)
1405 return FALSE;
1406 Yap_FreeCodeSpace((void *)condp);
1407 return TRUE;
1408 }
1409
1410 static Int
1411 p_cond_signal( USES_REGS1 )
1412 {
1413 pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1));
1414
1415 if (pthread_cond_signal(condp) < 0)
1416 return FALSE;
1417 return TRUE;
1418 }
1419
1420 static Int
1421 p_cond_broadcast( USES_REGS1 )
1422 {
1423 pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1));
1424
1425 if (pthread_cond_broadcast(condp) < 0)
1426 return FALSE;
1427 return TRUE;
1428 }
1429
1430 static Int
1431 p_cond_wait( USES_REGS1 )
1432 {
1433 pthread_cond_t *condp = (pthread_cond_t *)IntegerOfTerm(Deref(ARG1));
1434 SWIMutex *mut = (SWIMutex*)IntegerOfTerm(Deref(ARG2));
1435 pthread_cond_wait(condp, &mut->m);
1436 return TRUE;
1437}
1438
1439static Int
1440p_thread_stacks( USES_REGS1 )
1441{ /* '$thread_signal'(+P) */
1442 Int tid = IntegerOfTerm(Deref(ARG1));
1443 Int status= TRUE;
1444
1445 MUTEX_LOCK(&(REMOTE_ThreadHandle(tid).tlock));
1446 if (REMOTE(tid) &&
1447 (REMOTE_ThreadHandle(tid).in_use || REMOTE_ThreadHandle(tid).zombie)) {
1448 status &= Yap_unify(ARG2,MkIntegerTerm(REMOTE_ThreadHandle(tid).ssize));
1449 status &= Yap_unify(ARG3,MkIntegerTerm(REMOTE_ThreadHandle(tid).tsize));
1450 status &= Yap_unify(ARG4,MkIntegerTerm(REMOTE_ThreadHandle(tid).sysize));
1451 MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
1452 return status;
1453 }
1454 MUTEX_UNLOCK(&(REMOTE_ThreadHandle(tid).tlock));
1455 return FALSE;
1456}
1457
1458static Int
1459p_thread_atexit( USES_REGS1 )
1460{ /* '$thread_signal'(+P) */
1461 Term t;
1462
1463 if (LOCAL_ThreadHandle.texit == NULL ||
1464 LOCAL_ThreadHandle.texit->Entry == MkAtomTerm(AtomTrue)) {
1465 return FALSE;
1466 }
1467 do {
1468 t = Yap_PopTermFromDB(LOCAL_ThreadHandle.texit);
1469 if (t == 0) {
1470 if (LOCAL_Error_TYPE == RESOURCE_ERROR_ATTRIBUTED_VARIABLES) {
1471 LOCAL_Error_TYPE = YAP_NO_ERROR;
1472 if (!Yap_growglobal(NULL)) {
1473 Yap_Error(RESOURCE_ERROR_ATTRIBUTED_VARIABLES, TermNil, LOCAL_ErrorMessage);
1474 thread_die(worker_id, FALSE);
1475 return FALSE;
1476 }
1477 } else {
1478 LOCAL_Error_TYPE = YAP_NO_ERROR;
1479 if (!Yap_growstack(LOCAL_ThreadHandle.tgoal->NOfCells*CellSize)) {
1480 Yap_Error(RESOURCE_ERROR_STACK, TermNil, LOCAL_ErrorMessage);
1481 thread_die(worker_id, FALSE);
1482 return FALSE;
1483 }
1484 }
1485 }
1486 } while (t == 0);
1487 LOCAL_ThreadHandle.texit = NULL;
1488 return Yap_unify(ARG1, t) && Yap_unify(ARG2, LOCAL_ThreadHandle.texit_mod);
1489}
1490
1491
1492
1493static Int
1494p_thread_signal( USES_REGS1 )
1495{ /* '$thread_signal'(+P) */
1496 Int wid = IntegerOfTerm(Deref(ARG1));
1497 /* make sure the lock is available */
1498 MUTEX_LOCK(&(REMOTE_ThreadHandle(wid).tlock));
1499 if (!REMOTE_ThreadHandle(wid).in_use ||
1500 !REMOTE_ThreadHandle(wid).current_yaam_regs) {
1501 MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
1502 return TRUE;
1503 }
1504 Yap_external_signal( wid, YAP_ITI_SIGNAL );
1505 MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
1506 return TRUE;
1507}
1508
1509static Int
1510p_no_threads( USES_REGS1 )
1511{ /* '$thread_signal'(+P) */
1512 return FALSE;
1513}
1514
1515static Int
1516p_nof_threads( USES_REGS1 )
1517{ /* '$nof_threads'(+P) */
1518 int i = 0, wid;
1519 LOCK(GLOBAL_ThreadHandlesLock);
1520 for (wid = 0; wid < MAX_THREADS; wid++) {
1521 if (!Yap_local[wid]) break;
1522 if (REMOTE_ThreadHandle(wid).in_use)
1523 i++;
1524 }
1525 UNLOCK(GLOBAL_ThreadHandlesLock);
1526 return Yap_unify(ARG1,MkIntegerTerm(i));
1527}
1528
1529static Int
1530p_max_workers( USES_REGS1 )
1531{ /* '$max_workers'(+P) */
1532 return Yap_unify(ARG1,MkIntegerTerm(MAX_WORKERS));
1533}
1534
1535static Int
1536p_max_threads( USES_REGS1 )
1537{ /* '$max_threads'(+P) */
1538 return Yap_unify(ARG1,MkIntegerTerm(MAX_THREADS));
1539}
1540
1541static Int
1542p_nof_threads_created( USES_REGS1 )
1543{ /* '$nof_threads'(+P) */
1544 return Yap_unify(ARG1,MkIntTerm(GLOBAL_NOfThreadsCreated));
1545}
1546
1547static Int
1548p_thread_runtime( USES_REGS1 )
1549{ /* '$thread_runtime'(+P) */
1550 return Yap_unify(ARG1,MkIntegerTerm(GLOBAL_ThreadsTotalTime));
1551}
1552
1553static Int
1554p_thread_self_lock( USES_REGS1 )
1555{ /* '$thread_unlock' */
1556 MUTEX_LOCK(&(LOCAL_ThreadHandle.tlock));
1557 return Yap_unify(ARG1,MkIntegerTerm(worker_id));
1558}
1559
1560static Int
1561p_thread_unlock( USES_REGS1 )
1562{ /* '$thread_unlock' */
1563 Int wid = IntegerOfTerm(Deref(ARG1));
1564 MUTEX_UNLOCK(&(REMOTE_ThreadHandle(wid).tlock));
1565 return TRUE;
1566}
1567
1568intptr_t
1569system_thread_id(void)
1570{
1571#if defined(__APPLE__)
1572 return syscall(SYS_thread_selfid);
1573#elif HAVE_SYS_GETTID || defined(__APPLE__)
1574 return syscall( SYS_GETTID );
1575#elif HAVE_GETTID_SYSCALL
1576 return syscall(__NR_gettid);
1577#elif defined( HAVE_GETTID_MACRO )
1578 return gettid();
1579#elif defined(__WINDOWS__)
1580 return GetCurrentThreadId();
1581#endif
1582
1583}
1584
1585
1586
1587void
1588Yap_InitFirstWorkerThreadHandle(void)
1589{
1590 CACHE_REGS
1591 LOCAL_ThreadHandle.id = 0;
1592 LOCAL_ThreadHandle.in_use = TRUE;
1593 LOCAL_ThreadHandle.default_yaam_regs =
1594 &Yap_standard_regs;
1595 LOCAL_ThreadHandle.current_yaam_regs =
1596 &Yap_standard_regs;
1597 LOCAL_ThreadHandle.pthread_handle = pthread_self();
1598 pthread_mutex_init(&REMOTE_ThreadHandle(0).tlock, NULL);
1599 pthread_mutex_init(&REMOTE_ThreadHandle(0).tlock_status, NULL);
1600 LOCAL_ThreadHandle.tdetach = MkAtomTerm(AtomFalse);
1601 LOCAL_ThreadHandle.ref_count = 1;
1602}
1603
1604void Yap_InitThreadPreds(void)
1605{
1606
1607
1608 Yap_InitCPred("$no_threads", 0, p_no_threads, 0);
1609 Yap_InitCPred("$max_workers", 1, p_max_workers, 0);
1610 Yap_InitCPred("$max_threads", 1, p_max_threads, 0);
1611 Yap_InitCPred("$thread_new_tid", 1, p_thread_new_tid, 0);
1612 Yap_InitCPred("$create_thread", 7, p_create_thread, 0);
1613 Yap_InitCPred("$thread_self", 1, p_thread_self, SafePredFlag);
1614 Yap_InitCPred("$thread_status_lock", 1, p_thread_status_lock, SafePredFlag);
1615 Yap_InitCPred("$thread_status_unlock", 1, p_thread_status_unlock, SafePredFlag);
1616 Yap_InitCPred("$thread_zombie_self", 1, p_thread_zombie_self, SafePredFlag);
1617 Yap_InitCPred("$thread_join", 1, p_thread_join, 0);
1618 Yap_InitCPred("$thread_destroy", 1, p_thread_destroy, 0);
1619 Yap_InitCPred("thread_yield", 0, p_thread_yield, 0);
1627 Yap_InitCPred("$detach_thread", 1, p_thread_detach, 0);
1628 Yap_InitCPred("$thread_detached", 1, p_thread_detached, 0);
1629 Yap_InitCPred("$thread_detached", 2, p_thread_detached2, 0);
1630 Yap_InitCPred("$thread_exit", 0, p_thread_exit, 0);
1631 Yap_InitCPred("thread_setconcurrency", 2, p_thread_set_concurrency, 0);
1645 Yap_InitCPred("$valid_thread", 1, p_valid_thread, 0);
1646 Yap_InitCPred("mutex_create", 1, p_new_mutex, SafePredFlag);
1647 Yap_InitCPred("mutex_destroy", 1, p_destroy_mutex, SafePredFlag);
1648 Yap_InitCPred("mutex_lock", 1, p_lock_mutex, SafePredFlag);
1649 Yap_InitCPred("mutex_trylock", 1, p_trylock_mutex, SafePredFlag);
1650 Yap_InitCPred("mutex_unlock", 1, p_unlock_mutex, SafePredFlag);
1651 Yap_InitCPred("with_mutex", 2, p_with_mutex, MetaPredFlag);
1652 Yap_InitCPred("$with_with_mutex", 1, p_with_with_mutex, 0);
1653 Yap_InitCPred("$unlock_with_mutex", 1, p_unlock_with_mutex, 0);
1654 Yap_InitCPred("$mutex_info", 3, p_mutex_info, SafePredFlag);
1655 Yap_InitCPred("$cond_create", 1, p_cond_create, SafePredFlag);
1656 Yap_InitCPred("$cond_destroy", 1, p_cond_destroy, SafePredFlag);
1657 Yap_InitCPred("$cond_signal", 1, p_cond_signal, SafePredFlag);
1658 Yap_InitCPred("$cond_broadcast", 1, p_cond_broadcast, SafePredFlag);
1659 Yap_InitCPred("$cond_wait", 2, p_cond_wait, SafePredFlag);
1660 Yap_InitCPred("$message_queue_create", 1, p_mbox_create, SafePredFlag);
1661 Yap_InitCPred("$message_queue_destroy", 1, p_mbox_destroy, SafePredFlag);
1662 Yap_InitCPred("$message_queue_send", 2, p_mbox_send, SafePredFlag);
1663 Yap_InitCPred("$message_queue_receive", 2, p_mbox_receive, SafePredFlag);
1664 Yap_InitCPred("$message_queue_size", 2, p_mbox_size, SafePredFlag);
1665 Yap_InitCPred("$message_queue_peek", 2, p_mbox_peek, SafePredFlag);
1666 Yap_InitCPred("$thread_stacks", 4, p_thread_stacks, SafePredFlag);
1667 Yap_InitCPred("$signal_thread", 1, p_thread_signal, SafePredFlag);
1668 Yap_InitCPred("$nof_threads", 1, p_nof_threads, SafePredFlag);
1669 Yap_InitCPred("$nof_threads_created", 1, p_nof_threads_created, SafePredFlag);
1670 Yap_InitCPred("$thread_sleep", 4, p_thread_sleep, SafePredFlag);
1671 Yap_InitCPred("$thread_runtime", 1, p_thread_runtime, SafePredFlag);
1672 Yap_InitCPred("$thread_self_lock", 1, p_thread_self_lock, SafePredFlag);
1673 Yap_InitCPred("$thread_run_at_exit", 2, p_thread_atexit, SafePredFlag);
1674 Yap_InitCPred("$thread_unlock", 1, p_thread_unlock, SafePredFlag);
1675#if DEBUG_LOCKS||DEBUG_PE_LOCKS
1676 Yap_InitCPred("debug_locks", 0, p_debug_locks, SafePredFlag);
1677 Yap_InitCPred("nodebug_locks", 0, p_nodebug_locks, SafePredFlag);
1678#endif
1679}
1680
1681#else
1682
1683int
1684Yap_NOfThreads(void) {
1685 // GLOBAL_ThreadHandlesLock is held
1686#ifdef YAPOR
1687 return 2;
1688#else
1689 return 1;
1690#endif
1691}
1692
1693
1694static Int
1695p_no_threads(void)
1696{ /* '$thread_signal'(+P) */
1697 return TRUE;
1698}
1699
1700static Int
1701p_nof_threads(void)
1702{ /* '$nof_threads'(+P) */
1703 return Yap_unify(ARG1,MkIntTerm(1));
1704}
1705
1706static Int
1707p_max_threads(void)
1708{ /* '$nof_threads'(+P) */
1709 return Yap_unify(ARG1,MkIntTerm(1));
1710}
1711
1712static Int
1713p_nof_threads_created(void)
1714{ /* '$nof_threads'(+P) */
1715 return Yap_unify(ARG1,MkIntTerm(1));
1716}
1717
1718static Int
1719p_thread_runtime(void)
1720{ /* '$thread_runtime'(+P) */
1721 return Yap_unify(ARG1,MkIntTerm(0));
1722}
1723
1724static Int
1725p_thread_self(void)
1726{ /* '$thread_runtime'(+P) */
1727 return Yap_unify(ARG1,MkIntTerm(0));
1728}
1729
1730static Int
1731p_thread_stacks(void)
1732{ /* '$thread_runtime'(+P) */
1733 return FALSE;
1734}
1735
1736static Int
1737p_thread_unlock(void)
1738{ /* '$thread_runtime'(+P) */
1739 return TRUE;
1740}
1741
1742static Int
1743p_max_workers(void)
1744{ /* '$max_workers'(+P) */
1745 return Yap_unify(ARG1,MkIntTerm(1));
1746}
1747
1748static Int
1749p_new_mutex(void)
1750{ /* '$max_workers'(+P) */
1751 static int mutexes = 1;
1752 return Yap_unify(ARG1, MkIntegerTerm(mutexes++) );
1753}
1754
1755 static Int
1756 p_with_mutex( USES_REGS1 )
1757 {
1758 Int mut;
1759 Term t1 = Deref(ARG1), excep;
1760 Int rc = FALSE;
1761 Int creeping = Yap_get_signal(YAP_CREEP_SIGNAL);
1762 PredEntry *pe;
1763 Term tm = CurrentModule;
1764 Term tg = Deref(ARG2);
1765
1766 if (IsVarTerm(t1)) {
1767 p_new_mutex( PASS_REGS1 );
1768 }
1769 t1 = Deref(ARG1);
1770 mut = IntOfTerm(t1);
1771 tg = Yap_StripModule(tg, &tm);
1772 if (IsVarTerm(tg)) {
1773 Yap_Error(INSTANTIATION_ERROR, ARG2, "with_mutex/2");
1774 goto end;
1775 } else if (IsApplTerm(tg)) {
1776 register Functor f = FunctorOfTerm(tg);
1777 register CELL *pt;
1778 size_t i, arity;
1779
1780 f = FunctorOfTerm(tg);
1781 if (IsExtensionFunctor(f)) {
1782 Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2");
1783 goto end;
1784 }
1785 arity = ArityOfFunctor(f);
1786 if (arity > MaxTemps) {
1787 Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2");
1788 goto end;
1789 }
1790 pe = RepPredProp(PredPropByFunc(f, tm));
1791 pt = RepAppl(tg)+1;
1792 for (i= 0; i < arity; i++ )
1793 XREGS[i+1] = pt[i];
1794 } else if (IsAtomTerm(tg)) {
1795 pe = RepPredProp(PredPropByAtom(AtomOfTerm(tg), tm));
1796 } else if (IsPairTerm(tg)) {
1797 register CELL *pt;
1798 Functor f;
1799
1800 f = FunctorDot;
1801 pe = RepPredProp(PredPropByFunc(f, tm));
1802 pt = RepPair(tg);
1803 XREGS[1] = pt[0];
1804 XREGS[2] = pt[1];
1805 } else {
1806 Yap_Error(TYPE_ERROR_CALLABLE, tg, "with_mutex/2");
1807 goto end;
1808 }
1809 if (
1810 pe->OpcodeOfPred != FAIL_OPCODE &&
1811 Yap_execute_pred(pe, NULL, false PASS_REGS) ) {
1812 rc = TRUE;
1813 }
1814 end:
1815 ARG1 = MkIntegerTerm(mut);
1816 excep = MkAddressTerm(Yap_GetException());
1817 if (creeping) {
1818 Yap_signal( YAP_CREEP_SIGNAL );
1819 } else if ( excep != 0) {
1820 return Yap_JumpToEnv();
1821 }
1822 return rc;
1823}
1824
1825void
1826Yap_InitFirstWorkerThreadHandle(void)
1827{
1828}
1829
1830void Yap_InitThreadPreds(void)
1831{
1832 Yap_InitCPred("with_mutex", 2, p_with_mutex, MetaPredFlag);
1833 Yap_InitCPred("mutex_create", 1, p_new_mutex, SafePredFlag);
1834 Yap_InitCPred("$max_workers", 1, p_max_workers, 0);
1835 Yap_InitCPred("$thread_self", 1, p_thread_self, SafePredFlag);
1836 Yap_InitCPred("$no_threads", 0, p_no_threads, SafePredFlag);
1837 Yap_InitCPred("$max_threads", 1, p_max_threads, SafePredFlag);
1838 Yap_InitCPred("$nof_threads", 1, p_nof_threads, SafePredFlag);
1839 Yap_InitCPred("$nof_threads_created", 1, p_nof_threads_created, SafePredFlag);
1840 Yap_InitCPred("$thread_stacks", 4, p_thread_stacks, SafePredFlag);
1841 Yap_InitCPred("$thread_runtime", 1, p_thread_runtime, SafePredFlag);
1842 Yap_InitCPred("$thread_unlock", 1, p_thread_unlock, SafePredFlag);
1843#if DEBUG_LOCKS||DEBUG_PE_LOCKS
1844 Yap_InitCPred("debug_locks", 0, p_debug_locks, SafePredFlag);
1845 Yap_InitCPred("nodebug_locks", 0, p_nodebug_locks, SafePredFlag);
1846#endif
1847}
1848
1849
1850#endif /* THREADS */
1851
1852
Main definitions.
yap_error_descriptor_t * Yap_GetException(void)
clone Active Error
Definition: errors.c:1389
@ system_thread_id
report the thread running YAP
Definition: YapGFlagInfo.h:648
Definition: Yatom.h:151
Definition: Yap.h:606
A matrix.
Definition: matrix.c:68
Definition: Yatom.h:1020
Definition: Yatom.h:544
Definition: tab.structs.h:22