commit 7ef007a12521d76e90b989602124cbde6a12a240
parent 7ca9b7adb656acb42a3db4b0fb33b020afa1e201
Author: Andres Navarro <canavarro82@gmail.com>
Date: Fri, 31 Aug 2012 00:00:53 -0300
Added thread-join to query the result of other threads (but only if they do exit, throw an error or pass a value to the root cont, otherwise they return #inert, at least for now).
Bugfix: (GC) Added a thread table to properly root the running threads (just setting the fixed bit protected the klisp_States but not the objects contained within).
Diffstat:
5 files changed, 138 insertions(+), 30 deletions(-)
diff --git a/src/kgc.c b/src/kgc.c
@@ -558,11 +558,11 @@ static void freeobj (klisp_State *K, GCObject *o) {
break;
case K_TTHREAD: {
klisp_State *K2 = (klisp_State *) o;
- klisp_assert(K2 != K && K2 != G(K)->mainthread);
- /* do join to avoid memory leak, thread is guaranteed to have
- completed execution, so join should not block (but it can fail
- if a join was performed already) */
- UNUSED(pthread_join(K2->thread, NULL));
+
+ klisp_assert(K2 != G(K)->mainthread);
+ klisp_assert(K2 != K);
+ /* threads are always created detached, so there's no
+ need to do a join here */
klispT_freethread(K, K2);
break;
}
@@ -679,12 +679,11 @@ static void markroot (klisp_State *K) {
g->grayagain = NULL;
g->weak = NULL;
- /* TEMP: this is quite awful, think of other way to do this */
- /* MAYBE: some of these could be FIXED */
- markobject(g, g->mainthread);
+ markobject(g, g->mainthread); /* this is also in the thread table */
markvalue(g, g->name_table);
markvalue(g, g->cont_name_table);
+ markvalue(g, g->thread_table);
markvalue(g, g->eval_op);
markvalue(g, g->list_app);
diff --git a/src/kgthreads.c b/src/kgthreads.c
@@ -10,6 +10,7 @@
#include <stdint.h>
#include "kstate.h"
+#include "ktable.h"
#include "kobject.h"
#include "kmutex.h"
#include "kcondvar.h"
@@ -49,6 +50,8 @@ routine somewhere */
continuations/guards */
/* LOCK: We need the GIL for allocating the objects */
klisp_lock(K);
+
+ K->status = KLISP_THREAD_RUNNING;
/* create the guard set error flag after errors */
TValue exit_int = kmake_operative(K, do_int_mark_error,
1, p2tv(&errorp));
@@ -91,24 +94,27 @@ routine somewhere */
klisp_unlock(K);
- /* LOCK: run will acquire the lock */
+ /* LOCK: run will acquire the lock, and release it when done */
klispT_run(K);
- /* TODO get the return value */
-/*
- int status = errorp? STATUS_ERROR :
- (rootp? STATUS_ROOT : STATUS_CONTINUE);
-*/
-
-/* /XXX */
-
+ klisp_lock(K);
- /* thread is done, we can remove the fixed bit */
+ /* thread is done, we can remove it from the thread table */
/* XXX what happens if this threads terminates abnormally?? */
- klisp_lock(K);
- resetbit(K->gct, FIXEDBIT);
+ TValue *node = klispH_set(K, tv2table(G(K)->thread_table),
+ gc2th(K));
+ *node = KFREE;
+
+ K->status = errorp? KLISP_THREAD_ERROR : KLISP_THREAD_DONE;
+ /* the thrown object/return value remains in K->next_obj */
+ /* NOTICE that unless root continuation is explicitly invoked
+ the value returned by the function is discarded!!
+ This may change in the future */
+
+ /* signal all threads waiting to join */
+ int32_t ret = pthread_cond_broadcast(&K->joincond);
+ klisp_assert(ret == 0); /* shouldn't happen */
klisp_unlock(K);
- /* TODO return value */
return NULL;
}
@@ -138,7 +144,17 @@ static void make_thread(klisp_State *K)
new_K->next_env = kmake_empty_environment(K);
TValue si = ktry_get_si(new_K, top);
klispT_tail_call_si(new_K, top, KNIL, new_K->next_env, si);
- int32_t ret = pthread_create(&new_K->thread, NULL, thread_run, new_K);
+
+ pthread_attr_t attr;
+ int32_t ret = pthread_attr_init(&attr);
+ klisp_assert(ret == 0); /* this shouldn't really happen... */
+ /* make threads detached, the running state and return value
+ will be kept in the corresponding klisp_State struct */
+ pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+ klisp_assert(ret == 0); /* this shouldn't really happen... */
+
+ K->status = KLISP_THREAD_STARTING;
+ ret = pthread_create(&new_K->thread, &attr, thread_run, new_K);
if (ret != 0) {
/* let the GC collect the failed State */
@@ -148,10 +164,53 @@ static void make_thread(klisp_State *K)
return;
}
+ /* this shouldn't fail */
+ UNUSED(pthread_attr_destroy(&attr));
+
/* thread created correctly, return it */
kapply_cc(K, new_th);
}
+static void thread_join(klisp_State *K)
+{
+ TValue *xparams = K->next_xparams;
+ TValue ptree = K->next_value;
+ TValue denv = K->next_env;
+ klisp_assert(ttisenvironment(K->next_env));
+ UNUSED(xparams);
+ UNUSED(denv);
+
+ bind_1tp(K, ptree, "thread", ttisthread, thread);
+
+ if (tv_equal(gc2th(K), thread)) {
+ klispE_throw_simple(K, "Thread can't join with itself");
+ return;
+ } else if (tv_equal(gc2th(G(K)->mainthread), thread)) {
+ klispE_throw_simple(K, "Can't join with main thread");
+ return;
+ }
+
+ klisp_State *K2 = tv2th(thread);
+
+ while(true) {
+ fflush(stdout);
+ if (K2->status == KLISP_THREAD_DONE) {
+ /* NOTICE that unless root continuation was explicitly invoked
+ the value returned by the thread is discarded!!
+ This may change in the future */
+ kapply_cc(K, K2->next_value);
+ } else if (K2->status == KLISP_THREAD_ERROR) {
+ /* throw the same object, but in this thread */
+ kcall_cont(K, G(K)->error_cont, K2->next_value);
+ return;
+ } else {
+ /* must wait for this thread to end */
+ /* LOCK: the GIL should be acquired exactly once */
+ int32_t ret = pthread_cond_wait(&K2->joincond, &G(K)->gil);
+ klisp_assert(ret == 0); /* shouldn't happen */
+ }
+ }
+}
/* make-mutex */
static void make_mutex(klisp_State *K)
@@ -284,6 +343,9 @@ void kinit_threads_ground_env(klisp_State *K)
/* ?.3? make-thread */
add_applicative(K, ground_env, "make-thread", make_thread, 0);
+ /* ?.4? thread-join */
+ add_applicative(K, ground_env, "thread-join", thread_join, 0);
+
/* Mutexes */
/* mutex? */
add_applicative(K, ground_env, "mutex?", typep, 2, symbol,
diff --git a/src/klimits.h b/src/klimits.h
@@ -62,6 +62,10 @@
#define MINCONTNAMETABSIZE 32
#endif
+#ifndef MINTHREADTABSIZE
+#define MINTHREADTABSIZE 32
+#endif
+
/* minimum size for the require table (must be power of 2) */
#ifndef MINREQUIRETABSIZE
#define MINREQUIRETABSIZE 32
diff --git a/src/kstate.c b/src/kstate.c
@@ -62,6 +62,7 @@ typedef struct KG {
/*
** open parts that may cause memory-allocation errors
*/
+/* TODO move other stuff that cause allocs here */
static void f_klispopen (klisp_State *K, void *ud) {
global_State *g = G(K);
UNUSED(ud);
@@ -99,6 +100,7 @@ static void f_klispopen (klisp_State *K, void *ud) {
static void preinit_state (klisp_State *K, global_State *g) {
G(K) = g;
+ K->status = KLISP_THREAD_CREATED;
K->gil_count = 0;
K->curr_cont = KNIL;
K->next_obj = KINERT;
@@ -209,6 +211,7 @@ klisp_State *klisp_newstate(klisp_Alloc f, void *ud)
g->strt.hash = NULL;
g->name_table = KINERT;
g->cont_name_table = KINERT;
+ g->thread_table = KINERT;
g->empty_string = KINERT;
g->empty_bytevector = KINERT;
@@ -291,6 +294,9 @@ klisp_State *klisp_newstate(klisp_Alloc f, void *ud)
/* here the keys are uncollectable */
g->cont_name_table = klispH_new(K, 0, MINCONTNAMETABSIZE,
K_FLAG_WEAK_NOTHING);
+ /* here the keys are uncollectable */
+ g->thread_table = klispH_new(K, 0, MINTHREADTABSIZE,
+ K_FLAG_WEAK_NOTHING);
/* Empty string */
/* MAYBE: fix it so we can remove empty_string from roots */
@@ -383,6 +389,10 @@ klisp_State *klisp_newstate(klisp_Alloc f, void *ud)
kinit_ground_env(K);
kinit_cont_names(K);
+ /* put the main thread in the thread table */
+ TValue *node = klispH_set(K, tv2table(g->thread_table), gc2th(K));
+ *node = KTRUE;
+
/* create a std environment and leave it in g->next_env */
K->next_env = kmake_table_environment(K, g->ground_env);
@@ -397,7 +407,7 @@ klisp_State *klisp_newstate(klisp_Alloc f, void *ud)
klisp_State *klisp_newthread(klisp_State *K)
{
/* TODO */
- return K;
+ return NULL;
}
klisp_State *klispT_newthread(klisp_State *K)
@@ -405,13 +415,11 @@ klisp_State *klispT_newthread(klisp_State *K)
klisp_State *K1 = tostate(klispM_malloc(K, state_size(klisp_State)));
klispC_link(K, (GCObject *) K1, K_TTHREAD, 0);
- /* This is added in klisp to avoid the collection
- of running, thread objects, they are unfixed
- when the native threads terminate */
- k_setbit(K->gct, FIXEDBIT);
-
preinit_state(K1, G(K));
+ /* protect from gc */
+ krooted_tvs_push(K, gc2th(K1));
+
/* initialize temp stacks */
ks_sbuf(K1) = (TValue *) klispM_malloc(K, KS_ISSIZE * sizeof(TValue));
ks_ssize(K1) = KS_ISSIZE;
@@ -420,7 +428,21 @@ klisp_State *klispT_newthread(klisp_State *K)
ks_tbuf(K1) = (char *) klispM_malloc(K, KS_ITBSIZE);
ks_tbsize(K1) = KS_ITBSIZE;
ks_tbidx(K1) = 0; /* buffer is empty */
-
+
+ /* initialize condition variable for joining */
+ int32_t ret = pthread_cond_init(&K1->joincond, NULL);
+
+ if (ret != 0) {
+ klispE_throw_simple_with_irritants(K, "Error creating joincond for "
+ "new thread", 1, i2tv(ret));
+ return NULL;
+ }
+
+ /* everything went well, put the thread in the thread table */
+ TValue *node = klispH_set(K, tv2table(G(K)->thread_table), gc2th(K1));
+ *node = KTRUE;
+ krooted_tvs_pop(K);
+
klisp_assert(iswhite((GCObject *) (K1)));
return K1;
}
@@ -428,6 +450,11 @@ klisp_State *klispT_newthread(klisp_State *K)
void klispT_freethread (klisp_State *K, klisp_State *K1)
{
+ /* main thread can't come here, so it's safe to remove the
+ condvar here */
+ int32_t ret = pthread_cond_destroy(&K1->joincond);
+ klisp_assert(ret == 0); /* shouldn't happen */
+
klispM_freemem(K, ks_sbuf(K1), ks_ssize(K1) * sizeof(TValue));
klispM_freemem(K, ks_tbuf(K1), ks_tbsize(K1));
/* userstatefree() */
diff --git a/src/kstate.h b/src/kstate.h
@@ -56,6 +56,7 @@ typedef struct global_State {
stringtable strt; /* hash table for immutable strings & symbols */
TValue name_table; /* hash tables for naming objects */
TValue cont_name_table; /* hash tables for naming continuation functions */
+ TValue thread_table; /* hash table for all live (non done/error) threads */
/* Memory allocator */
klisp_Alloc frealloc; /* function to reallocate memory */
@@ -137,11 +138,26 @@ typedef struct global_State {
pthread_mutex_t gil;
} global_State;
+/*
+** Possible states of a thread/klisp_State,
+** currently threads are started as soon as they are created, but
+** that may change in the future. If the state is done, or error,
+** the returned/thrown object is kept in next_value
+*/
+#define KLISP_THREAD_CREATED (0)
+#define KLISP_THREAD_STARTING (1)
+#define KLISP_THREAD_RUNNING (2)
+#define KLISP_THREAD_DONE (3)
+#define KLISP_THREAD_ERROR (4)
+
struct klisp_State {
CommonHeader; /* This represents a thread object */
global_State *k_G;
pthread_t thread;
-
+ int32_t status; /* the execution status of this thread */
+ /* The main thread doesn't have a condition variable here because
+ you can't join it. This may be changed in the future */
+ pthread_cond_t joincond; /* the condition variable for joining */
/* Current state of execution */
int32_t gil_count; /* the number of times the GIL was acquired */
TValue curr_cont; /* the current continuation of this thread */