filewriter.h 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205
  1. /*
  2. *
  3. * Copyright (c) 2011, Jue Ruan <ruanjue@gmail.com>
  4. *
  5. *
  6. * This program is free software: you can redistribute it and/or modify
  7. * it under the terms of the GNU General Public License as published by
  8. * the Free Software Foundation, either version 3 of the License, or
  9. * (at your option) any later version.
  10. *
  11. * This program is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU General Public License
  17. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  18. */
  19. #ifndef __FILEWRITER_RJ_H
  20. #define __FILEWRITER_RJ_H
  21. #include "mem_share.h"
  22. #include "thread.h"
  23. #include "pgzf.h"
  24. typedef size_t (*write_data_func)(void *obj, void *dat, size_t len);
  25. typedef void (*close_output_func)(void *obj);
  26. static inline size_t _write_data_file(void *obj, void *dat, size_t len){ return fwrite(dat, 1, len, (FILE*)obj); }
  27. static inline void _close_output_file(void *obj){ if(obj) fclose((FILE*)obj); }
  28. /**
  29. * BufferedWriter
  30. */
  31. typedef struct {
  32. FILE *bios[2];
  33. FILE *out;
  34. void *_file;
  35. write_data_func _write;
  36. close_output_func _close;
  37. int bidx;
  38. size_t buf_size;
  39. char *buffs[2];
  40. size_t blens[2];
  41. size_t nbytes;
  42. pthread_mutex_t lock;
  43. pthread_t pid;
  44. int running, flush;
  45. } BufferedWriter;
  46. static inline void* _buffered_writer_thread_func(void *obj){
  47. BufferedWriter *bw;
  48. size_t bsize[2];
  49. int bidx, lock;
  50. bw = (BufferedWriter*)obj;
  51. bw->running = 1;
  52. bw->flush = 0;
  53. bw->nbytes = 0;
  54. while(bw->running){
  55. bidx = bw->bidx;
  56. bsize[0] = ftell(bw->bios[0]);
  57. bsize[1] = ftell(bw->bios[1]);
  58. if(bsize[bidx] >= bw->buf_size || (bsize[bidx] && bw->flush == 1)){
  59. lock = 1;
  60. pthread_mutex_lock(&bw->lock);
  61. } else {
  62. lock = 0;
  63. }
  64. if(bsize[!bidx]){
  65. fflush(bw->bios[!bidx]);
  66. bw->_write(bw->_file, bw->buffs[!bidx], bsize[!bidx]);
  67. bw->nbytes += bsize[!bidx];
  68. fseek(bw->bios[!bidx], 0, SEEK_SET);
  69. }
  70. if(lock){
  71. bw->bidx = !bidx;
  72. pthread_mutex_unlock(&bw->lock);
  73. } else if(bsize[bidx]){
  74. pthread_mutex_lock(&bw->lock);
  75. bw->bidx = !bidx;
  76. pthread_mutex_unlock(&bw->lock);
  77. }
  78. if(bw->flush && bsize[0] == 0 && bsize[1] == 0){
  79. bw->flush = 2;
  80. while(bw->flush == 2){
  81. nano_sleep(1);
  82. }
  83. bw->flush = 0;
  84. }
  85. nano_sleep(10);
  86. }
  87. {
  88. bsize[0] = ftell(bw->bios[0]);
  89. bsize[1] = ftell(bw->bios[1]);
  90. fflush(bw->bios[0]);
  91. fflush(bw->bios[1]);
  92. bidx = bw->bidx;
  93. if(bsize[!bidx]){
  94. bw->_write(bw->_file, bw->buffs[!bidx], bsize[!bidx]);
  95. bw->nbytes += bsize[!bidx];
  96. }
  97. if(bsize[bidx]){
  98. bw->_write(bw->_file, bw->buffs[bidx], bsize[bidx]);
  99. bw->nbytes += bsize[bidx];
  100. }
  101. }
  102. return NULL;
  103. }
  104. static inline BufferedWriter* open2_bufferedwriter(void *obj, write_data_func _write, close_output_func _close, size_t buf_size){
  105. BufferedWriter *bw;
  106. bw = malloc(sizeof(BufferedWriter));
  107. bw->_file = obj;
  108. bw->_write = _write;
  109. bw->_close = _close;
  110. bw->buffs[0] = NULL;
  111. bw->buffs[1] = NULL;
  112. bw->blens[0] = 0;
  113. bw->blens[1] = 0;
  114. bw->bios[0] = open_memstream(bw->buffs + 0, bw->blens + 0);
  115. bw->bios[1] = open_memstream(bw->buffs + 1, bw->blens + 1);
  116. bw->out = NULL;
  117. bw->bidx = 0;
  118. bw->buf_size = buf_size? buf_size : 4 * 1024;
  119. bw->nbytes = 0;
  120. bw->lock = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER;
  121. bw->running = 0;
  122. bw->flush = 0;
  123. if(pthread_create(&bw->pid, NULL, _buffered_writer_thread_func, bw) != 0){
  124. fprintf(stderr, " -- Failed to create thread [%s] in %s -- %s:%d --\n", "_buffered_writer_thread_func", __FUNCTION__, __FILE__, __LINE__);
  125. bw->pid = 0;
  126. }
  127. while(bw->running != 1){ nano_sleep(1); }
  128. return bw;
  129. }
  130. static inline BufferedWriter* open_bufferedwriter(FILE *out, size_t buf_size){
  131. return open2_bufferedwriter(out, _write_data_file, NULL, buf_size);
  132. }
  133. static inline BufferedWriter* zopen_bufferedwriter(FILE *out, size_t buf_size, int ncpu, int level){
  134. PGZF *pz;
  135. pz = open_pgzf_writer(out, buf_size, ncpu, level);
  136. return open2_bufferedwriter(pz, write_pgzf4filewriter, close_pgzf4filewriter, pz->bufsize);
  137. }
  138. static inline int beg_bufferedwriter(BufferedWriter *bw){
  139. if(bw->pid){
  140. while(bw->flush){ nano_sleep(1); }
  141. pthread_mutex_lock(&bw->lock);
  142. bw->out = bw->bios[bw->bidx];
  143. return 0;
  144. } else {
  145. bw->out = NULL;
  146. return 1; // error
  147. }
  148. }
  149. static inline int end_bufferedwriter(BufferedWriter *bw){
  150. if(bw->pid){
  151. pthread_mutex_unlock(&bw->lock);
  152. }
  153. bw->out = NULL;
  154. return 0;
  155. }
  156. static inline size_t flush_bufferedwriter(BufferedWriter *bw){
  157. size_t ret;
  158. if(bw->pid){
  159. pthread_mutex_unlock(&bw->lock);
  160. while(bw->flush == 1){ nano_sleep(1); }
  161. bw->flush = 1;
  162. while(bw->flush == 1){
  163. nano_sleep(1);
  164. }
  165. pthread_mutex_lock(&bw->lock);
  166. bw->flush = 0;
  167. bw->out = bw->bios[bw->bidx];
  168. ret = bw->nbytes;
  169. } else {
  170. ret = 0;
  171. }
  172. return ret;
  173. }
  174. static inline size_t close_bufferedwriter(BufferedWriter *bw){
  175. size_t ret;
  176. if(bw->pid){
  177. bw->running = 0;
  178. pthread_join(bw->pid, NULL);
  179. }
  180. fclose(bw->bios[0]);
  181. fclose(bw->bios[1]);
  182. if(bw->buffs[0]) free(bw->buffs[0]);
  183. if(bw->buffs[1]) free(bw->buffs[1]);
  184. if(bw->_close){
  185. bw->_close(bw->_file);
  186. }
  187. ret = bw->nbytes;
  188. free(bw);
  189. return ret;
  190. }
  191. #endif