commit 42406a0400c3ba8779c4de6023df6162132d24ea
parent fcd8ffa31614baff9fc6f92daf3b5f6055130048
Author: Andres Navarro <canavarro82@gmail.com>
Date: Tue, 28 Aug 2012 20:09:09 -0300
Added create-thread to the ground environment (no options for now). Added pthreads to the libraries param in Makefile. Fixed memory leak in thread GC (doing an explicit join to free the thread stack).
Diffstat:
9 files changed, 189 insertions(+), 45 deletions(-)
diff --git a/src/Makefile b/src/Makefile
@@ -17,7 +17,7 @@ RANLIB= ranlib
# Use "RM= del /q /f" if you want to compile with MinGW without using MSYS
RM= rm -f
-LIBS=-lm $(MYLIBS)
+LIBS=-lm -lpthread $(MYLIBS)
# Set USE_LIBFFI=1 (or other nonempty string) to enable libffi-dependent
# code.
diff --git a/src/kgc.c b/src/kgc.c
@@ -50,7 +50,7 @@
/* klisp: NOT USED YET */
#define isfinalized(u) testbit((u)->gct, FINALIZEDBIT)
-#define markfinalized(u) l_setbit((u)->gct, FINALIZEDBIT)
+#define markfinalized(u) k_setbit((u)->gct, FINALIZEDBIT)
/* klisp: NOT USED YET */
#define KEYWEAK bitmask(KEYWEAKBIT)
@@ -541,9 +541,13 @@ static void freeobj (klisp_State *K, GCObject *o) {
klispM_free(K, (Library *)o);
break;
case K_TTHREAD: {
- klisp_assert((klisp_State *) o != K &&
- (klisp_State *) o != G(K)->mainthread);
- klispT_freethread(K, (klisp_State *) o);
+ klisp_State *K2 = (klisp_State *) o;
+ klisp_assert(K2 != K && K2 != G(K)->mainthread);
+ /* do join to avoid memory leak, thread is guaranteed to have
+ completed execution, so join should not block (but it can fail
+ if a join was performed already) */
+ UNUSED(pthread_join(K2->thread, NULL));
+ klispT_freethread(K, K2);
break;
}
default:
diff --git a/src/kgc.h b/src/kgc.h
@@ -35,7 +35,7 @@
#define testbits(x,m) ((x) & (m))
#define bitmask(b) (1<<(b))
#define bit2mask(b1,b2) (bitmask(b1) | bitmask(b2))
-#define l_setbit(x,b) setbits(x, bitmask(b))
+#define k_setbit(x,b) setbits(x, bitmask(b))
#define resetbit(x,b) resetbits(x, bitmask(b))
#define testbit(x,b) testbits(x, bitmask(b))
#define set2bits(x,b1,b2) setbits(x, (bit2mask(b1, b2)))
@@ -77,7 +77,7 @@
#define isdead(g,v) ((v)->gch.gct & otherwhite(g) & WHITEBITS)
#define changewhite(x) ((x)->gch.gct ^= WHITEBITS)
-#define gray2black(x) l_setbit((x)->gch.gct, BLACKBIT)
+#define gray2black(x) k_setbit((x)->gch.gct, BLACKBIT)
#define valiswhite(x) (iscollectable(x) && iswhite(gcvalue(x)))
diff --git a/src/kghelpers.c b/src/kghelpers.c
@@ -1930,3 +1930,37 @@ void guard_dynamic_extent(klisp_State *K)
ktail_eval(K, expr, denv);
}
+
+
+void do_int_mark_error(klisp_State *K)
+{
+ TValue *xparams = K->next_xparams;
+ TValue ptree = K->next_value;
+ TValue denv = K->next_env;
+ klisp_assert(ttisenvironment(K->next_env));
+ /*
+ ** xparams[0]: errorp pointer
+ */
+ UNUSED(denv);
+ bool *errorp = (bool *) pvalue(xparams[0]);
+ *errorp = true;
+ /* ptree is (object divert) */
+ TValue error_obj = kcar(ptree);
+ /* pass the error along after setting the flag */
+ kapply_cc(K, error_obj);
+}
+
+void do_int_mark_root(klisp_State *K)
+{
+ TValue *xparams = K->next_xparams;
+ TValue obj = K->next_value;
+ klisp_assert(ttisnil(K->next_env));
+ /*
+ ** xparams[0]: rootp pointer
+ */
+ UNUSED(obj); /* ignore obj */
+ bool *rootp = (bool *) pvalue(xparams[0]);
+ *rootp = false; /* mark that we didn't explicitly call the root cont */
+ /* pass #INERT to the root continuation */
+ kapply_cc(K, KINERT);
+}
diff --git a/src/kghelpers.h b/src/kghelpers.h
@@ -513,6 +513,13 @@ TValue map_for_each_transpose(klisp_State *K, TValue lss,
int32_t res_apairs, int32_t res_cpairs);
+/* for thread continuation guarding */
+void do_int_mark_root(klisp_State *K);
+void do_int_mark_error(klisp_State *K);
+
+/* TODO add handler for entry guards to avoid
+ continuations to cross threads */
+
/*
** Macros for ground environment initialization
*/
diff --git a/src/kgthreads.c b/src/kgthreads.c
@@ -30,6 +30,124 @@ static void get_current_thread(klisp_State *K)
kapply_cc(K, gc2th(K));
}
+static void *thread_run(void *data)
+{
+ klisp_State *K = (klisp_State *) data;
+
+/* XXX/REFACTOR This is more or less the same that is repeated
+ over and over again in the repl code (klisp.c), move to a helper
+routine somewhere */
+ bool errorp = false; /* may be set to true in error handler */
+ bool rootp = true; /* may be set to false in continuation */
+
+ /* We have already the appropriate environment,
+ operative and arguments in place, but we still need the
+ continuations/guards */
+ /* LOCK: We need the GIL for allocating the objects */
+ klisp_lock(K);
+ /* create the guard set error flag after errors */
+ TValue exit_int = kmake_operative(K, do_int_mark_error,
+ 1, p2tv(&errorp));
+ krooted_tvs_push(K, exit_int);
+ TValue exit_guard = kcons(K, G(K)->error_cont, exit_int);
+ krooted_tvs_pop(K); /* already in guard */
+ krooted_tvs_push(K, exit_guard);
+ TValue exit_guards = kcons(K, exit_guard, KNIL);
+ krooted_tvs_pop(K); /* already in guards */
+ krooted_tvs_push(K, exit_guards);
+
+ TValue entry_guards = KNIL;
+
+ /* this is needed for interception code */
+ TValue env = kmake_empty_environment(K);
+ krooted_tvs_push(K, env);
+ TValue outer_cont = kmake_continuation(K, G(K)->root_cont,
+ do_pass_value, 2, entry_guards, env);
+ kset_outer_cont(outer_cont);
+ krooted_tvs_push(K, outer_cont);
+ TValue inner_cont = kmake_continuation(K, outer_cont,
+ do_pass_value, 2, exit_guards, env);
+ kset_inner_cont(inner_cont);
+ krooted_tvs_pop(K); krooted_tvs_pop(K); krooted_tvs_pop(K);
+
+ krooted_tvs_push(K, inner_cont);
+
+ /* This continuation will discard the result of the evaluation
+ and return #inert instead, it will also signal via rootp = false
+ that the evaluation didn't explicitly invoke the root continuation
+ */
+ TValue discard_cont = kmake_continuation(K, inner_cont, do_int_mark_root,
+ 1, p2tv(&rootp));
+
+ krooted_tvs_pop(K); /* pop inner cont */
+ krooted_tvs_push(K, discard_cont);
+
+ kset_cc(K, discard_cont);
+ krooted_tvs_pop(K); /* pop discard cont */
+
+ klisp_unlock(K);
+
+ /* LOCK: run will acquire the lock */
+ klispT_run(K);
+
+ /* TODO get the return value */
+/*
+ int status = errorp? STATUS_ERROR :
+ (rootp? STATUS_ROOT : STATUS_CONTINUE);
+*/
+
+/* /XXX */
+
+
+ /* thread is done, we can remove the fixed bit */
+ /* XXX what happens if this threads terminates abnormally?? */
+ klisp_lock(K);
+ resetbit(K->gct, FIXEDBIT);
+ klisp_unlock(K);
+ /* TODO return value */
+ return NULL;
+}
+
+/* ?.3? make-thread */
+static void make_thread(klisp_State *K)
+{
+ TValue *xparams = K->next_xparams;
+ TValue ptree = K->next_value;
+ TValue denv = K->next_env;
+ klisp_assert(ttisenvironment(K->next_env));
+ UNUSED(xparams);
+ UNUSED(denv);
+
+ bind_1tp(K, ptree, "combiner", ttiscombiner, comb);
+ TValue top = comb;
+ while(ttisapplicative(top))
+ top = kunwrap(top);
+
+ /* GC: threads are fixed, no need to protect it */
+ klisp_State *new_K = klispT_newthread(K);
+ TValue new_th = gc2th(new_K);
+ /* Prepare the new_K state to call the passed combiner with
+ no arguments and an empty environment */
+ /* TODO set_cc */
+ klispT_set_cc(new_K, G(K)->root_cont);
+ /* This will protect it from GC */
+ new_K->next_env = kmake_empty_environment(K);
+ TValue si = ktry_get_si(new_K, top);
+ klispT_tail_call_si(new_K, top, KNIL, new_K->next_env, si);
+ int32_t ret = pthread_create(&new_K->thread, NULL, thread_run, new_K);
+
+ if (ret != 0) {
+ /* let the GC collect the failed State */
+ resetbit(new_K->gct, FIXEDBIT);
+ klispE_throw_simple_with_irritants(K, "Error creating thread",
+ 1, i2tv(ret));
+ return;
+ }
+
+ /* thread created correctly, return it */
+ kapply_cc(K, new_th);
+}
+
/* init ground */
void kinit_threads_ground_env(klisp_State *K)
{
@@ -47,4 +165,7 @@ void kinit_threads_ground_env(klisp_State *K)
/* ?.2? get-current-thread */
add_applicative(K, ground_env, "get-current-thread", get_current_thread, 0);
+
+ /* ?.3? make-thread */
+ add_applicative(K, ground_env, "make-thread", make_thread, 0);
}
diff --git a/src/klisp.c b/src/klisp.c
@@ -40,7 +40,7 @@
#include "kerror.h"
#include "krepl.h"
#include "ksystem.h"
-#include "kghelpers.h" /* for do_pass_value and do_seq */
+#include "kghelpers.h" /* for do_pass_value and do_seq, mark_root & mark_error */
static const char *progname = KLISP_PROGNAME;
@@ -168,39 +168,6 @@ static void print_version(void)
printf("%s\n", KLISP_RELEASE " " KLISP_COPYRIGHT);
}
-void do_int_mark_error(klisp_State *K)
-{
- TValue *xparams = K->next_xparams;
- TValue ptree = K->next_value;
- TValue denv = K->next_env;
- klisp_assert(ttisenvironment(K->next_env));
- /*
- ** xparams[0]: errorp pointer
- */
- UNUSED(denv);
- bool *errorp = (bool *) pvalue(xparams[0]);
- *errorp = true;
- /* ptree is (object divert) */
- TValue error_obj = kcar(ptree);
- /* pass the error along after setting the flag */
- kapply_cc(K, error_obj);
-}
-
-void do_int_mark_root(klisp_State *K)
-{
- TValue *xparams = K->next_xparams;
- TValue obj = K->next_value;
- klisp_assert(ttisnil(K->next_env));
- /*
- ** xparams[0]: rootp pointer
- */
- UNUSED(obj); /* ignore obj */
- bool *rootp = (bool *) pvalue(xparams[0]);
- *rootp = false; /* mark that we didn't explicitly call the root cont */
- /* pass #INERT to the root continuation */
- kapply_cc(K, KINERT);
-}
-
static int dostring (klisp_State *K, const char *s, const char *name)
{
bool errorp = false; /* may be set to true in error handler */
@@ -727,6 +694,9 @@ int main(int argc, char *argv[])
{
struct Smain s;
klisp_State *K = klispL_newstate();
+ /* Set the main thread as the current thread */
+ /* XXX/TEMP this could be made in run... */
+ K->thread = pthread_self();
if (K == NULL) {
k_message(argv[0], "cannot create state: not enough memory");
diff --git a/src/kstate.c b/src/kstate.c
@@ -162,7 +162,7 @@ static void close_state(klisp_State *K)
klispM_freemem(K, ks_sbuf(K), ks_ssize(K) * sizeof(TValue));
klispM_freemem(K, ks_tbuf(K), ks_tbsize(K));
/* free string/symbol table */
- klispM_freearray(K, g->strt.hash, G(K)->strt.size, GCObject *);
+ klispM_freearray(K, g->strt.hash, g->strt.size, GCObject *);
/* destroy the GIL */
pthread_mutex_destroy(&g->gil);
@@ -404,6 +404,12 @@ klisp_State *klispT_newthread(klisp_State *K)
{
klisp_State *K1 = tostate(klispM_malloc(K, state_size(klisp_State)));
klispC_link(K, (GCObject *) K1, K_TTHREAD, 0);
+
+ /* This is added in klisp to avoid the collection
+ of running, thread objects, they are unfixed
+ when the native threads terminate */
+ k_setbit(K->gct, FIXEDBIT);
+
preinit_state(K1, G(K));
/* initialize temp stacks */
@@ -577,16 +583,16 @@ void klispT_run(klisp_State *K)
/* TEMP: do nothing, the loop will call the continuation */
klisp_unlock_all(K);
} else {
- klisp_lock(K);
+ klisp_lock(K);
/* all ok, continue with next func */
while (K->next_func) {
/* next_func is either operative or continuation
but in any case the call is the same */
(*(K->next_func))(K);
- klispi_threadyield(K);
+ klispi_threadyield(K);
}
/* K->next_func is NULL, this means we should exit already */
- klisp_unlock(K);
+ klisp_unlock(K);
break;
}
}
diff --git a/src/kstate.h b/src/kstate.h
@@ -140,6 +140,8 @@ typedef struct global_State {
struct klisp_State {
CommonHeader; /* This represents a thread object */
global_State *k_G;
+ pthread_t thread;
+
/* Current state of execution */
int32_t gil_count; /* the number of times the GIL was acquired */
TValue curr_cont; /* the current continuation of this thread */