thread.h 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. /*
  2. *
  3. * Copyright (c) 2011, Jue Ruan <ruanjue@gmail.com>
  4. *
  5. *
  6. * This program is free software: you can redistribute it and/or modify
  7. * it under the terms of the GNU General Public License as published by
  8. * the Free Software Foundation, either version 3 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU General Public License
  17. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. */
  19. #ifndef __THEAD_RJ_H
  20. #define __THEAD_RJ_H
  21. #include <pthread.h>
  22. #include <assert.h>
  23. #include <stdlib.h>
  24. #define lock_cmpxchg(location, value, comparand) \
  25. ({ \
  26. __typeof (*location) _result; \
  27. if(sizeof(*location) == 1){ \
  28. __asm__ __volatile__ ( \
  29. "lock\n\t" \
  30. "cmpxchgb %b2,(%1)" \
  31. :"=a" (_result) \
  32. :"r" (location), "r" (value), "a" (comparand) \
  33. :"memory", "cc"); \
  34. } else if(sizeof(*location) == 2){ \
  35. __asm__ __volatile__ ( \
  36. "lock\n\t" \
  37. "cmpxchgw %w2,(%1)" \
  38. :"=a" (_result) \
  39. :"r" (location), "r" (value), "a" (comparand) \
  40. :"memory", "cc"); \
  41. } else if(sizeof(*location) == 4){ \
  42. __asm__ __volatile__ ( \
  43. "lock\n\t" \
  44. "cmpxchgl %w2,(%1)" \
  45. :"=a" (_result) \
  46. :"r" (location), "r" (value), "a" (comparand) \
  47. :"memory", "cc"); \
  48. } else { \
  49. __asm__ __volatile__ ( \
  50. "lock\n\t" \
  51. "cmpxchgq %2,(%1)" \
  52. :"=a" (_result) \
  53. :"r" (location), "r" (value), "a" (comparand) \
  54. :"memory", "cc"); \
  55. } \
  56. _result; \
  57. })
  58. #define thread_typeof(tname) (struct tname##_struct)
  59. #define thread_begin_def(tname) \
  60. struct tname##_struct { \
  61. struct tname##_struct *tname##_params; \
  62. struct tname##_struct *tname##_array; \
  63. int n_cpu; \
  64. int t_idx; \
  65. volatile int running; \
  66. volatile int state; \
  67. volatile int once; \
  68. pthread_mutex_t *mutex_lock; \
  69. pthread_rwlock_t *rw_lock; \
  70. pthread_mutex_t _COND_LOCK; \
  71. pthread_cond_t _COND
  72. #define thread_end_def(tname) }
  73. #define thread_def_shared_vars(tname) \
  74. struct tname##_struct *tname##_params; \
  75. pthread_t *tname##_pids; \
  76. pthread_mutex_t *tname##_mlock; \
  77. pthread_rwlock_t *tname##_rwlock; \
  78. struct tname##_struct *tname; \
  79. int tname##_i; \
  80. int tname##_j; \
  81. int tname##_var_next
  82. #define thread_beg_def(tname) thread_begin_def(tname)
  83. #define thread_begin_func_core(tname) inline void* thread_##tname##_func(void *obj){\
  84. struct tname##_struct * tname = (struct tname##_struct *)obj;\
  85. int tname##_var_i;\
  86. struct tname##_struct * tname##_params; \
  87. tname##_params = tname->tname##_params; \
  88. if(tname##_params + tname->t_idx != tname){ \
  89. fprintf(stderr, " -- Unexcepted error in thread [%s] in %s -- %s:%d --\n", #tname, __FUNCTION__, __FILE__, __LINE__); \
  90. }
  91. #define thread_begin_func(tname) static thread_begin_func_core(tname)
  92. #define thread_beg_func(tname) thread_begin_func(tname)
  93. #define thread_beg_func_inline(tname) inline void* thread_##tname##_func(void *obj){\
  94. struct tname##_struct * tname = (struct tname##_struct *)obj;\
  95. int tname##_var_i;\
  96. struct tname##_struct * tname##_params; \
  97. tname##_params = tname->tname##_params; \
  98. if(tname##_params + tname->t_idx != tname){ \
  99. fprintf(stderr, " -- Unexcepted error in thread [%s] in %s -- %s:%d --\n", #tname, __FUNCTION__, __FILE__, __LINE__); \
  100. }
  101. #define thread_begin_loop(tname) \
  102. pthread_mutex_lock(&tname->_COND_LOCK); \
  103. tname->state = 0; \
  104. pthread_cond_signal(&tname->_COND); \
  105. pthread_mutex_unlock(&tname->_COND_LOCK); \
  106. while(tname->running){ \
  107. if(tname->state != 1){ \
  108. struct timespec _timeout; \
  109. pthread_mutex_lock(&tname->_COND_LOCK); \
  110. clock_gettime(CLOCK_REALTIME, &_timeout); \
  111. _timeout.tv_nsec += 1000000; \
  112. pthread_cond_timedwait(&tname->_COND, &tname->_COND_LOCK, &_timeout); \
  113. pthread_mutex_unlock(&tname->_COND_LOCK); \
  114. continue; \
  115. } \
  116. for(tname##_var_i=0;tname##_var_i<1;tname##_var_i++){
  117. #define thread_beg_loop(tname) thread_begin_loop(tname)
  118. #define thread_begin_syn(tname) pthread_mutex_lock(tname->mutex_lock)
  119. #define thread_beg_syn(tname) thread_begin_syn(tname)
  120. #define thread_end_syn(tname) pthread_mutex_unlock(tname->mutex_lock)
  121. #define thread_beg_syn_read(tname) pthread_rwlock_rdlock(tname->rw_lock)
  122. #define thread_end_syn_read(tname) pthread_rwlock_unlock(tname->rw_lock)
  123. #define thread_beg_syn_write(tname) pthread_rwlock_wrlock(tname->rw_lock)
  124. #define thread_end_syn_write(tname) pthread_rwlock_unlock(tname->rw_lock)
  125. #define thread_end_loop(tname) \
  126. } \
  127. if(tname->once){ \
  128. pthread_mutex_lock(&tname->_COND_LOCK); \
  129. tname->state = 2; \
  130. pthread_cond_signal(&tname->_COND); \
  131. pthread_mutex_unlock(&tname->_COND_LOCK); \
  132. } \
  133. } \
  134. pthread_mutex_lock(&tname->_COND_LOCK); \
  135. tname->state = 2; \
  136. pthread_cond_signal(&tname->_COND); \
  137. pthread_mutex_unlock(&tname->_COND_LOCK)
  138. #define thread_end_func(tname) return NULL; }
  139. #define thread_preprocess(tname) \
  140. thread_def_shared_vars(tname); \
  141. (void)(tname##_params); \
  142. (void)(tname##_pids); \
  143. (void)(tname##_mlock); \
  144. (void)(tname##_rwlock); \
  145. (void)(tname); \
  146. tname##_i = 0; \
  147. tname##_j = 0; \
  148. tname##_var_next = 0
  149. #define thread_prepare(tname) thread_preprocess(tname)
  150. #define thread_begin_init(tname, n_thread) assert(n_thread > 0);\
  151. tname##_params = (struct tname##_struct *)malloc(sizeof(struct tname##_struct) * n_thread);\
  152. tname##_pids = (pthread_t *)malloc(sizeof(pthread_t) * n_thread); \
  153. tname##_mlock = calloc(1, sizeof(pthread_mutex_t)); \
  154. *tname##_mlock = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER; \
  155. tname##_rwlock = calloc(1, sizeof(pthread_rwlock_t)); \
  156. *tname##_rwlock = (pthread_rwlock_t)PTHREAD_RWLOCK_INITIALIZER; \
  157. for(tname##_i=0,tname##_j=0;tname##_i<(int)(n_thread);tname##_i++){ \
  158. tname = tname##_params + tname##_i;\
  159. tname->mutex_lock = tname##_mlock;\
  160. tname->rw_lock = tname##_rwlock;\
  161. tname->_COND_LOCK = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER; \
  162. tname->_COND = (pthread_cond_t)PTHREAD_COND_INITIALIZER; \
  163. tname->n_cpu = n_thread;\
  164. tname->t_idx = tname##_i;\
  165. tname->running = 1;\
  166. tname->state = 3;\
  167. tname->once = 1;\
  168. tname->tname##_params = (struct tname##_struct*)tname##_params;\
  169. tname->tname##_array = (struct tname##_struct*)tname##_params
  170. #define thread_beg_init(tname, n_thread) thread_begin_init(tname, n_thread)
  171. #define thread_end_init(tname) \
  172. if(pthread_create(tname##_pids + tname##_i, NULL, thread_##tname##_func, (void*)tname) != 0){\
  173. fprintf(stderr, " -- Failed to create thread [%s, %04d] in %s -- %s:%d --\n", #tname, tname##_i, __FUNCTION__, __FILE__, __LINE__);\
  174. exit(1);\
  175. }\
  176. while(1){ \
  177. int _stop; \
  178. _stop = 0; \
  179. if(tname->state == 0) _stop = 1; \
  180. else { \
  181. struct timespec _timeout; \
  182. pthread_mutex_lock(&tname->_COND_LOCK); \
  183. clock_gettime(CLOCK_REALTIME, &_timeout); \
  184. _timeout.tv_nsec += 1000000; \
  185. pthread_cond_timedwait(&tname->_COND, &tname->_COND_LOCK, &_timeout); \
  186. if(tname->state == 0) _stop = 1; \
  187. pthread_mutex_unlock(&tname->_COND_LOCK); \
  188. } \
  189. if(_stop) break; \
  190. } \
  191. }\
  192. tname = tname##_params + 0; \
  193. tname##_var_next = 0; \
  194. tname##_i = 0; \
  195. tname##_j = 0
  196. #define thread_export_core(tname, _tname, params, pids, mlock, rwlock, i, j, next) \
  197. do { \
  198. _tname = tname; \
  199. params = tname##_params; \
  200. pids = tname##_pids; \
  201. mlock = tname##_mlock; \
  202. rwlock = tname##_rwlock; \
  203. i = tname##_i; \
  204. j = tname##_j; \
  205. next = tname##_var_next; \
  206. } while(0)
  207. #define thread_export(tname, obj) thread_export_core(tname, (obj)->tname, (obj)->tname##_params, (obj)->tname##_pids, (obj)->tname##_mlock, (obj)->tname##_rwlock, (obj)->tname##_i, (obj)->tname##_j, (obj)->tname##_var_next)
  208. #define thread_import_core(tname, _tname, params, pids, mlock, rwlock, i, j, next) \
  209. do { \
  210. tname = _tname; \
  211. tname##_params = params; \
  212. tname##_pids = pids; \
  213. tname##_mlock = mlock; \
  214. tname##_rwlock = rwlock; \
  215. tname##_i = i; \
  216. tname##_j = j; \
  217. tname##_var_next = next; \
  218. } while(0)
  219. #define thread_import(tname, obj) thread_import_core(tname, (obj)->tname, (obj)->tname##_params, (obj)->tname##_pids, (obj)->tname##_mlock, (obj)->tname##_rwlock, (obj)->tname##_i, (obj)->tname##_j, (obj)->tname##_var_next)
  220. #define thread_begin_operate(tname, idx) tname = tname##_params + idx
  221. #define thread_beg_operate(tname, idx) thread_begin_operate(tname, idx)
  222. #define thread_wake(tname) do { pthread_mutex_lock(&tname->_COND_LOCK); tname->state = 1; pthread_cond_signal(&tname->_COND); pthread_mutex_unlock(&tname->_COND_LOCK); } while(0)
  223. #define thread_wake_all(tname) do { thread_beg_iter(tname); thread_wake(tname); thread_end_iter(tname); } while(0);
  224. #define thread_waitfor_idle(tname) \
  225. while(1){ \
  226. int _stop; \
  227. _stop = 0; \
  228. pthread_mutex_lock(&tname->_COND_LOCK); \
  229. if(tname->state != 1) _stop = 1; \
  230. else { \
  231. struct timespec _timeout; \
  232. clock_gettime(CLOCK_REALTIME, &_timeout); \
  233. _timeout.tv_nsec += 1000; \
  234. pthread_cond_timedwait(&tname->_COND, &tname->_COND_LOCK, &_timeout); \
  235. if(tname->state != 1) _stop = 1; \
  236. } \
  237. pthread_mutex_unlock(&tname->_COND_LOCK); \
  238. if(_stop) break; \
  239. } \
  240. tname->state = 0
  241. #define thread_wait(tname) thread_waitfor_idle(tname)
  242. #define thread_wait_next(tname) do { thread_beg_operate(tname, tname##_var_next); thread_wait(tname); tname##_var_next = (tname##_var_next + 1) % tname##_params[0].n_cpu; } while(0)
  243. #define thread_end_operate(tname, idx) tname = NULL
  244. #define thread_begin_iter(tname) { struct tname##_struct * tname = NULL; int tname##_i; for(tname##_i=0;tname##_i<tname##_params[0].n_cpu;tname##_i++){ tname = tname##_params + tname##_i
  245. #define thread_beg_iter(tname) thread_begin_iter(tname)
  246. #define thread_is_idle(tname) (tname->state != 1)
  247. #define thread_n_cpus(tname) (tname->n_cpu)
  248. #define thread_index(tname) (tname->t_idx)
  249. #define thread_end_iter(tname) } }
  250. #define thread_access(tname, idx) (tname##_params + idx)
  251. #define thread_beg_monitor(tname, usec) \
  252. while(1){ \
  253. nano_sleep(usec)
  254. #define thread_end_monitor(tname) \
  255. for(tname##_j=0;tname##_j<tname##_params[0].n_cpu;tname##_j++){ \
  256. if(tname##_params[tname##_j].state == 1) break; \
  257. } \
  258. if(tname##_j==tname##_params[0].n_cpu) break; \
  259. }
  260. #define thread_waitfor_one_idle(tname) \
  261. while(1){ \
  262. for(;tname##_j<tname##_params[0].n_cpu;tname##_j++){ \
  263. if(tname##_params[tname##_j].state != 1){ \
  264. tname = tname##_params + tname##_j; \
  265. break; \
  266. } \
  267. } \
  268. if(tname##_j >= tname##_params[0].n_cpu){ \
  269. tname##_j = 0; \
  270. nano_sleep(10); \
  271. } else { \
  272. tname##_j = (tname##_j + 1) % tname##_params[0].n_cpu; \
  273. break; \
  274. } \
  275. } \
  276. tname->state = 0
  277. #define thread_wait_one(tname) thread_waitfor_one_idle(tname)
  278. #define thread_wait_done(tname) \
  279. while(1){ \
  280. int _nrun_; \
  281. _nrun_ = 0; \
  282. for(tname##_j=0;tname##_j<tname##_params[0].n_cpu;tname##_j++){ \
  283. if(tname##_params[tname##_j].state == 2){ \
  284. tname = tname##_params + tname##_j; \
  285. break; \
  286. } else if(tname##_params[tname##_j].state == 1){ \
  287. _nrun_ ++; \
  288. } \
  289. } \
  290. if(tname##_j >= tname##_params[0].n_cpu){ \
  291. tname##_j = 0; \
  292. if(_nrun_ == 0){ \
  293. tname = tname##_params + tname##_j; \
  294. tname##_j = (tname##_j + 1) % tname##_params[0].n_cpu; \
  295. break; \
  296. } else { \
  297. nano_sleep(10); \
  298. } \
  299. } else { \
  300. tname##_j = (tname##_j + 1) % tname##_params[0].n_cpu; \
  301. break; \
  302. } \
  303. } \
  304. tname->state = 0
  305. #define thread_test_all(tname, expr) ({int ret = 1; thread_beg_iter(tname); if(!(expr)){ ret = 0; break;} thread_end_iter(tname); ret;})
  306. #define thread_count_all(tname, expr) ({int ret = 0; thread_beg_iter(tname); if((expr)){ ret ++; } thread_end_iter(tname); ret;})
  307. #define thread_all_idle(tname) thread_test_all(tname, (tname)->state != 1)
  308. #define thread_waitfor_all_idle(tname) { thread_begin_iter(tname); thread_waitfor_idle(tname); thread_end_iter(tname); }
  309. #define thread_wait_all(tname) thread_waitfor_all_idle(tname)
  310. #define thread_apply_all(tname, expr) { thread_begin_iter(tname); (expr); thread_wake(tname); thread_end_iter(tname); thread_waitfor_all_idle(tname); }
  311. #define thread_begin_close(tname) for(tname##_i=0;tname##_i<tname##_params[0].n_cpu;tname##_i++){ \
  312. tname = tname##_params + tname##_i;\
  313. pthread_mutex_lock(&tname->_COND_LOCK); \
  314. tname->running = 0; \
  315. pthread_cond_signal(&tname->_COND); \
  316. pthread_mutex_unlock(&tname->_COND_LOCK); \
  317. thread_wait(tname);\
  318. pthread_join(tname##_pids[tname##_i], NULL)
  319. #define thread_beg_close(tname) thread_begin_close(tname)
  320. #define thread_end_close(tname) } free((void*)tname##_params); free(tname##_pids); free(tname##_mlock); free(tname##_rwlock)
  321. #define thread_run(tname, ncpu, vars_expr, init_expr, free_expr, pre_expr, loop_expr, post_expr, invoke_expr) \
  322. { \
  323. thread_beg_def(tname); \
  324. vars_expr \
  325. thread_end_def(tname); \
  326. thread_begin_func_core(tname); \
  327. pre_expr \
  328. thread_beg_loop(tname); \
  329. loop_expr \
  330. thread_end_loop(tname); \
  331. post_expr \
  332. thread_end_func(tname); \
  333. { \
  334. thread_preprocess(tname); \
  335. thread_beg_init(tname, ncpu); \
  336. init_expr \
  337. thread_end_init(tname); \
  338. invoke_expr \
  339. thread_beg_close(tname); \
  340. free_expr \
  341. thread_end_close(tname); \
  342. } \
  343. }
  344. #define THREAD_EXPR(...) __VA_ARGS__
  345. #define thread_fast_run(tname, ncpu, loop_expr) thread_run(tname, ncpu, , , , THREAD_EXPR(int NCPU; int TIDX; NCPU = tname->n_cpu; TIDX = tname->t_idx;), THREAD_EXPR(loop_expr;), , THREAD_EXPR(thread_wake_all(tname); thread_wait_all(tname);))
  346. #define thread_fast_run2(tname, ncpu, expr) \
  347. { \
  348. thread_beg_def(tname); \
  349. thread_end_def(tname); \
  350. thread_begin_func_core(tname); \
  351. int NCPU, TIDX; \
  352. NCPU = tname->n_cpu; \
  353. TIDX = tname->t_idx; \
  354. UNUSED(NCPU); \
  355. UNUSED(TIDX); \
  356. thread_beg_loop(tname); \
  357. (expr); \
  358. thread_end_loop(tname); \
  359. thread_end_func(tname); \
  360. { \
  361. thread_preprocess(tname); \
  362. thread_beg_init(tname, ncpu); \
  363. thread_end_init(tname); \
  364. thread_wake_all(tname); \
  365. thread_wait_all(tname); \
  366. thread_beg_close(tname); \
  367. thread_end_close(tname); \
  368. } \
  369. }
  370. #endif