123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- typedef size_t (*write_data_func)(void *obj, void *dat, size_t len)
- typedef void (*close_output_func)(void *obj)
- static inline size_t _write_data_file(void *obj, void *dat, size_t len){ return fwrite(dat, 1, len, (FILE*)obj)
- static inline void _close_output_file(void *obj){ if(obj) fclose((FILE*)obj)
- typedef struct {
- FILE *bios[2];
- FILE *out
- void *_file
- write_data_func _write
- close_output_func _close
- int bidx;
- size_t buf_size;
- char *buffs[2];
- size_t blens[2];
- size_t nbytes
- pthread_mutex_t lock
- pthread_t pid
- int running, flush
- } BufferedWriter;
- static inline void* _buffered_writer_thread_func(void *obj){
- BufferedWriter *bw;
- size_t bsize[2];
- int bidx, lock
- bw = (BufferedWriter*)obj;
- bw->running = 1
- bw->flush = 0
- bw->nbytes = 0
- while(bw->running){
- bidx = bw->bidx;
- bsize[0] = ftell(bw->bios[0]);
- bsize[1] = ftell(bw->bios[1]);
- if(bsize[bidx] >= bw->buf_size || (bsize[bidx] && bw->flush == 1)){
- lock = 1
- pthread_mutex_lock(&bw->lock);
- } else {
- lock = 0
- }
- if(bsize[!bidx]){
- fflush(bw->bios[!bidx]);
- bw->_write(bw->_file, bw->buffs[!bidx], bsize[!bidx]);
- bw->nbytes += bsize[!bidx];
- fseek(bw->bios[!bidx], 0, SEEK_SET)
- }
- if(lock){
- bw->bidx = !bidx;
- pthread_mutex_unlock(&bw->lock);
- } else if(bsize[bidx]){
- pthread_mutex_lock(&bw->lock);
- bw->bidx = !bidx;
- pthread_mutex_unlock(&bw->lock);
- }
- if(bw->flush && bsize[0] == 0 && bsize[1] == 0){
- bw->flush = 2
- while(bw->flush == 2){
- nano_sleep(1)
- }
- bw->flush = 0
- }
- nano_sleep(10)
- }
- {
- bsize[0] = ftell(bw->bios[0]);
- bsize[1] = ftell(bw->bios[1]);
- fflush(bw->bios[0]);
- fflush(bw->bios[1]);
- bidx = bw->bidx;
- if(bsize[!bidx]){
- bw->_write(bw->_file, bw->buffs[!bidx], bsize[!bidx]);
- bw->nbytes += bsize[!bidx];
- }
- if(bsize[bidx]){
- bw->_write(bw->_file, bw->buffs[bidx], bsize[bidx]);
- bw->nbytes += bsize[bidx];
- }
- }
- return NULL
- }
- static inline BufferedWriter* open2_bufferedwriter(void *obj, write_data_func _write, close_output_func _close, size_t buf_size){
- BufferedWriter *bw;
- bw = malloc(sizeof(BufferedWriter));
- bw->_file = obj
- bw->_write = _write
- bw->_close = _close
- bw->buffs[0] = NULL
- bw->buffs[1] = NULL
- bw->blens[0] = 0
- bw->blens[1] = 0
- bw->bios[0] = open_memstream(bw->buffs + 0, bw->blens + 0)
- bw->bios[1] = open_memstream(bw->buffs + 1, bw->blens + 1)
- bw->out = NULL
- bw->bidx = 0
- bw->buf_size = buf_size? buf_size : 4 * 1024
- bw->nbytes = 0
- bw->lock = (pthread_mutex_t)PTHREAD_MUTEX_INITIALIZER
- bw->running = 0
- bw->flush = 0
- if(pthread_create(&bw->pid, NULL, _buffered_writer_thread_func, bw) != 0){
- fprintf(stderr, " -- Failed to create thread [%s] in %s -- %s:%d --\n", "_buffered_writer_thread_func", __FUNCTION__, __FILE__, __LINE__)
- bw->pid = 0
- }
- while(bw->running != 1){ nano_sleep(1)
- return bw;
- }
- static inline BufferedWriter* open_bufferedwriter(FILE *out, size_t buf_size){
- return open2_bufferedwriter(out, _write_data_file, NULL, buf_size);
- }
- static inline BufferedWriter* zopen_bufferedwriter(FILE *out, size_t buf_size, int ncpu, int level){
- PGZF *pz
- pz = open_pgzf_writer(out, buf_size, ncpu, level)
- return open2_bufferedwriter(pz, write_pgzf4filewriter, close_pgzf4filewriter, pz->bufsize);
- }
- static inline int beg_bufferedwriter(BufferedWriter *bw){
- if(bw->pid){
- while(bw->flush){ nano_sleep(1)
- pthread_mutex_lock(&bw->lock);
- bw->out = bw->bios[bw->bidx];
- return 0
- } else {
- bw->out = NULL
- return 1
- }
- }
- static inline int end_bufferedwriter(BufferedWriter *bw){
- if(bw->pid){
- pthread_mutex_unlock(&bw->lock);
- }
- bw->out = NULL
- return 0
- }
- static inline size_t flush_bufferedwriter(BufferedWriter *bw){
- size_t ret
- if(bw->pid){
- pthread_mutex_unlock(&bw->lock);
- while(bw->flush == 1){ nano_sleep(1)
- bw->flush = 1
- while(bw->flush == 1){
- nano_sleep(1)
- }
- pthread_mutex_lock(&bw->lock);
- bw->flush = 0
- bw->out = bw->bios[bw->bidx];
- ret = bw->nbytes;
- } else {
- ret = 0
- }
- return ret
- }
- static inline size_t close_bufferedwriter(BufferedWriter *bw){
- size_t ret
- if(bw->pid){
- bw->running = 0
- pthread_join(bw->pid, NULL)
- }
- fclose(bw->bios[0]);
- fclose(bw->bios[1]);
- if(bw->buffs[0]) free(bw->buffs[0]);
- if(bw->buffs[1]) free(bw->buffs[1]);
- if(bw->_close){
- bw->_close(bw->_file);
- }
- ret = bw->nbytes;
- free(bw);
- return ret
- }
|