kgthreads.c (12061B)
1 /* 2 ** kgstrings.c 3 ** Strings features for the ground environment 4 ** See Copyright Notice in klisp.h 5 */ 6 7 #include <assert.h> 8 #include <stdlib.h> 9 #include <stdbool.h> 10 #include <stdint.h> 11 12 #include "kstate.h" 13 #include "ktable.h" 14 #include "kobject.h" 15 #include "kmutex.h" 16 #include "kcondvar.h" 17 #include "kghelpers.h" 18 19 /* ?.1? thread? */ 20 /* uses typep */ 21 22 /* ?.2? get-current-thread */ 23 static void get_current_thread(klisp_State *K) 24 { 25 TValue *xparams = K->next_xparams; 26 TValue ptree = K->next_value; 27 TValue denv = K->next_env; 28 klisp_assert(ttisenvironment(K->next_env)); 29 UNUSED(xparams); 30 UNUSED(denv); 31 check_0p(K, ptree); 32 kapply_cc(K, gc2th(K)); 33 } 34 35 static void *thread_run(void *data) 36 { 37 klisp_State *K = (klisp_State *) data; 38 39 /* XXX/REFACTOR This is more or less the same that is repeated 40 over and over again in the repl code (klisp.c), move to a helper 41 routine somewhere */ 42 bool errorp = false; /* may be set to true in error handler */ 43 bool rootp = true; /* may be set to false in continuation */ 44 45 /* ???/TODO should the fact that the thread thrown an exception 46 be reported to the error output??? */ 47 48 /* We have already the appropriate environment, 49 operative and arguments in place, but we still need the 50 continuations/guards */ 51 /* LOCK: We need the GIL for allocating the objects */ 52 klisp_lock(K); 53 54 K->status = KLISP_THREAD_RUNNING; 55 /* create the guard set error flag after errors */ 56 TValue exit_int = kmake_operative(K, do_int_mark_error, 57 1, p2tv(&errorp)); 58 krooted_tvs_push(K, exit_int); 59 TValue exit_guard = kcons(K, G(K)->error_cont, exit_int); 60 krooted_tvs_pop(K); /* already in guard */ 61 krooted_tvs_push(K, exit_guard); 62 TValue exit_guards = kcons(K, exit_guard, KNIL); 63 krooted_tvs_pop(K); /* already in guards */ 64 krooted_tvs_push(K, exit_guards); 65 66 TValue entry_guards = KNIL; 67 68 /* this is needed for interception code */ 69 TValue env = kmake_empty_environment(K); 70 krooted_tvs_push(K, env); 71 TValue outer_cont = kmake_continuation(K, G(K)->root_cont, 72 do_pass_value, 2, entry_guards, env); 73 kset_outer_cont(outer_cont); 74 krooted_tvs_push(K, outer_cont); 75 TValue inner_cont = kmake_continuation(K, outer_cont, 76 do_pass_value, 2, exit_guards, env); 77 kset_inner_cont(inner_cont); 78 krooted_tvs_pop(K); krooted_tvs_pop(K); krooted_tvs_pop(K); 79 80 krooted_tvs_push(K, inner_cont); 81 82 /* This continuation will discard the result of the evaluation 83 and return #inert instead, it will also signal via rootp = false 84 that the evaluation didn't explicitly invoke the root continuation 85 */ 86 TValue discard_cont = kmake_continuation(K, inner_cont, do_int_mark_root, 87 1, p2tv(&rootp)); 88 89 krooted_tvs_pop(K); /* pop inner cont */ 90 krooted_tvs_push(K, discard_cont); 91 92 kset_cc(K, discard_cont); 93 krooted_tvs_pop(K); /* pop discard cont */ 94 95 klisp_unlock(K); 96 97 /* LOCK: run will acquire the lock, and release it when done */ 98 klispT_run(K); 99 100 klisp_lock(K); 101 102 /* thread is done, we can remove it from the thread table */ 103 /* XXX what happens if this threads terminates abnormally?? */ 104 TValue *node = klispH_set(K, tv2table(G(K)->thread_table), 105 gc2th(K)); 106 *node = KFREE; 107 108 K->status = errorp? KLISP_THREAD_ERROR : KLISP_THREAD_DONE; 109 /* the thrown object/return value remains in K->next_obj */ 110 /* NOTICE that unless root continuation is explicitly invoked 111 the value returned by the function is discarded!! 112 This may change in the future */ 113 114 /* signal all threads waiting to join */ 115 int32_t ret = pthread_cond_broadcast(&K->joincond); 116 klisp_assert(ret == 0); /* shouldn't happen */ 117 klisp_unlock(K); 118 return NULL; 119 } 120 121 /* ?.3? make-thread */ 122 static void make_thread(klisp_State *K) 123 { 124 TValue *xparams = K->next_xparams; 125 TValue ptree = K->next_value; 126 TValue denv = K->next_env; 127 klisp_assert(ttisenvironment(K->next_env)); 128 UNUSED(xparams); 129 UNUSED(denv); 130 131 bind_1tp(K, ptree, "combiner", ttiscombiner, comb); 132 TValue top = comb; 133 while(ttisapplicative(top)) 134 top = kunwrap(top); 135 136 /* GC: threads are fixed, no need to protect it */ 137 klisp_State *new_K = klispT_newthread(K); 138 TValue new_th = gc2th(new_K); 139 /* Prepare the new_K state to call the passed combiner with 140 no arguments and an empty environment */ 141 /* TODO set_cc */ 142 klispT_set_cc(new_K, G(K)->root_cont); 143 /* This will protect it from GC */ 144 new_K->next_env = kmake_empty_environment(K); 145 TValue si = ktry_get_si(new_K, top); 146 klispT_tail_call_si(new_K, top, KNIL, new_K->next_env, si); 147 148 pthread_attr_t attr; 149 int32_t ret = pthread_attr_init(&attr); 150 klisp_assert(ret == 0); /* this shouldn't really happen... */ 151 /* make threads detached, the running state and return value 152 will be kept in the corresponding klisp_State struct */ 153 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); 154 klisp_assert(ret == 0); /* this shouldn't really happen... */ 155 156 K->status = KLISP_THREAD_STARTING; 157 ret = pthread_create(&new_K->thread, &attr, thread_run, new_K); 158 159 if (ret != 0) { 160 /* let the GC collect the failed State */ 161 resetbit(new_K->gct, FIXEDBIT); 162 klispE_throw_simple_with_irritants(K, "Error creating thread", 163 1, i2tv(ret)); 164 return; 165 } 166 167 /* this shouldn't fail */ 168 UNUSED(pthread_attr_destroy(&attr)); 169 170 /* thread created correctly, return it */ 171 kapply_cc(K, new_th); 172 } 173 174 static void thread_join(klisp_State *K) 175 { 176 TValue *xparams = K->next_xparams; 177 TValue ptree = K->next_value; 178 TValue denv = K->next_env; 179 klisp_assert(ttisenvironment(K->next_env)); 180 UNUSED(xparams); 181 UNUSED(denv); 182 183 bind_1tp(K, ptree, "thread", ttisthread, thread); 184 185 if (tv_equal(gc2th(K), thread)) { 186 klispE_throw_simple(K, "Thread can't join with itself"); 187 return; 188 } else if (tv_equal(gc2th(G(K)->mainthread), thread)) { 189 klispE_throw_simple(K, "Can't join with main thread"); 190 return; 191 } 192 193 klisp_State *K2 = tv2th(thread); 194 195 while(true) { 196 fflush(stdout); 197 if (K2->status == KLISP_THREAD_DONE) { 198 /* NOTICE that unless root continuation was explicitly invoked 199 the value returned by the thread is discarded!! 200 This may change in the future */ 201 kapply_cc(K, K2->next_value); 202 } else if (K2->status == KLISP_THREAD_ERROR) { 203 /* throw the same object, but in this thread */ 204 kcall_cont(K, G(K)->error_cont, K2->next_value); 205 return; 206 } else { 207 /* must wait for this thread to end */ 208 /* LOCK: the GIL should be acquired exactly once */ 209 int32_t ret = pthread_cond_wait(&K2->joincond, &G(K)->gil); 210 klisp_assert(ret == 0); /* shouldn't happen */ 211 } 212 } 213 } 214 215 /* make-mutex */ 216 static void make_mutex(klisp_State *K) 217 { 218 TValue *xparams = K->next_xparams; 219 TValue ptree = K->next_value; 220 TValue denv = K->next_env; 221 klisp_assert(ttisenvironment(K->next_env)); 222 UNUSED(xparams); 223 UNUSED(denv); 224 225 check_0p(K, ptree); 226 227 TValue new_mutex = kmake_mutex(K); 228 kapply_cc(K, new_mutex); 229 } 230 231 /* mutex-lock */ 232 static void mutex_lock(klisp_State *K) 233 { 234 TValue *xparams = K->next_xparams; 235 TValue ptree = K->next_value; 236 TValue denv = K->next_env; 237 klisp_assert(ttisenvironment(K->next_env)); 238 UNUSED(xparams); 239 UNUSED(denv); 240 241 bind_1tp(K, ptree, "mutex", ttismutex, mutex); 242 kmutex_lock(K, mutex); 243 kapply_cc(K, KINERT); 244 } 245 246 /* mutex-unlock */ 247 static void mutex_unlock(klisp_State *K) 248 { 249 TValue *xparams = K->next_xparams; 250 TValue ptree = K->next_value; 251 TValue denv = K->next_env; 252 klisp_assert(ttisenvironment(K->next_env)); 253 UNUSED(xparams); 254 UNUSED(denv); 255 256 bind_1tp(K, ptree, "mutex", ttismutex, mutex); 257 kmutex_unlock(K, mutex); 258 kapply_cc(K, KINERT); 259 } 260 261 /* mutex-trylock */ 262 static void mutex_trylock(klisp_State *K) 263 { 264 TValue *xparams = K->next_xparams; 265 TValue ptree = K->next_value; 266 TValue denv = K->next_env; 267 klisp_assert(ttisenvironment(K->next_env)); 268 UNUSED(xparams); 269 UNUSED(denv); 270 271 bind_1tp(K, ptree, "mutex", ttismutex, mutex); 272 bool res = kmutex_trylock(K, mutex); 273 kapply_cc(K, b2tv(res)); 274 } 275 276 /* make-condition-variable */ 277 static void make_condvar(klisp_State *K) 278 { 279 TValue *xparams = K->next_xparams; 280 TValue ptree = K->next_value; 281 TValue denv = K->next_env; 282 klisp_assert(ttisenvironment(K->next_env)); 283 UNUSED(xparams); 284 UNUSED(denv); 285 286 bind_1tp(K, ptree, "mutex", ttismutex, mutex); 287 288 TValue new_condvar = kmake_condvar(K, mutex); 289 kapply_cc(K, new_condvar); 290 } 291 292 /* condition-variable-wait */ 293 static void condvar_wait(klisp_State *K) 294 { 295 TValue *xparams = K->next_xparams; 296 TValue ptree = K->next_value; 297 TValue denv = K->next_env; 298 klisp_assert(ttisenvironment(K->next_env)); 299 UNUSED(xparams); 300 UNUSED(denv); 301 302 bind_1tp(K, ptree, "condition-variable", ttiscondvar, condvar); 303 kcondvar_wait(K, condvar); 304 kapply_cc(K, KINERT); 305 } 306 307 /* condition-variable-signal / condition-variable-broadcast */ 308 static void condvar_signal(klisp_State *K) 309 { 310 TValue *xparams = K->next_xparams; 311 TValue ptree = K->next_value; 312 TValue denv = K->next_env; 313 klisp_assert(ttisenvironment(K->next_env)); 314 UNUSED(denv); 315 /* 316 ** xparams[0]: broadcast? 317 */ 318 bool broadcast = bvalue(xparams[0]); 319 320 bind_1tp(K, ptree, "condition-variable", ttiscondvar, condvar); 321 kcondvar_signal(K, condvar, broadcast); 322 kapply_cc(K, KINERT); 323 } 324 325 /* init ground */ 326 void kinit_threads_ground_env(klisp_State *K) 327 { 328 TValue ground_env = G(K)->ground_env; 329 TValue symbol, value; 330 331 /* 332 ** This section is still missing from the report. The bindings here are 333 ** taken from a mix of scheme implementations and the pthreads library 334 */ 335 336 /* ?.1? thread? */ 337 add_applicative(K, ground_env, "thread?", typep, 2, symbol, 338 i2tv(K_TTHREAD)); 339 340 /* ?.2? get-current-thread */ 341 add_applicative(K, ground_env, "get-current-thread", get_current_thread, 0); 342 343 /* ?.3? make-thread */ 344 add_applicative(K, ground_env, "make-thread", make_thread, 0); 345 346 /* ?.4? thread-join */ 347 add_applicative(K, ground_env, "thread-join", thread_join, 0); 348 349 /* Mutexes */ 350 /* mutex? */ 351 add_applicative(K, ground_env, "mutex?", typep, 2, symbol, 352 i2tv(K_TMUTEX)); 353 354 /* make-mutex */ 355 add_applicative(K, ground_env, "make-mutex", make_mutex, 0); 356 /* REFACTOR: should lock and unlock have an '!'? 357 What about try lock?? '!', '?', '!?', neither? */ 358 /* mutex-lock */ 359 add_applicative(K, ground_env, "mutex-lock", mutex_lock, 0); 360 /* mutex-unlock */ 361 add_applicative(K, ground_env, "mutex-unlock", mutex_unlock, 0); 362 /* mutex-trylock */ 363 add_applicative(K, ground_env, "mutex-trylock", mutex_trylock, 0); 364 365 /* Condition variables */ 366 /* condition-variable? */ 367 add_applicative(K, ground_env, "condition-variable?", typep, 2, symbol, 368 i2tv(K_TCONDVAR)); 369 370 /* make-condition-variable */ 371 add_applicative(K, ground_env, "make-condition-variable", 372 make_condvar, 0); 373 /* REFACTOR: should signal have an '!'? */ 374 /* condition-variable-wait */ 375 add_applicative(K, ground_env, "condition-variable-wait", 376 condvar_wait, 0); 377 /* condition-variable-signal */ 378 add_applicative(K, ground_env, "condition-variable-signal", 379 condvar_signal, 1, b2tv(false)); 380 /* condition-variable-broadcast */ 381 add_applicative(K, ground_env, "condition-variable-broadcast", 382 condvar_signal, 1, b2tv(true)); 383 }