Libuv Event循环
libuv通过uv_work_queue来交付任务给工作队列的, 这个api也是libuv实现文件异步操作的基础:
UV_EXTERN int uv_queue_work(uv_loop_t* loop, uv_work_t* req, uv_work_cb work_cb, uv_after_work_cb after_work_cb);
int uv_queue_work(uv_loop_t* loop, uv_work_t* req, uv_work_cb work_cb, uv_after_work_cb after_work_cb) { if (work_cb == NULL) return UV_EINVAL; uv__req_init(loop, req, UV_WORK); req->loop = loop; req->work_cb = work_cb; req->after_work_cb = after_work_cb; uv__work_submit(loop, &req->work_req, uv__queue_work, uv__queue_done); return 0;}
可以看到函数首先初始化这个请求(req), 然后就调用uv__work_submit完成剩余的工作.
下面我们主要就来分析一下uv__work_submit的操作过程. 看看libuv是如何调用work_cb来完成任务并调用到after_work_cb回调函数的.
void uv__work_submit(uv_loop_t* loop, struct uv__work* w, void (*work)(struct uv__work* w), void (*done)(struct uv__work* w, int status)) { uv_once(&once, init_once); w->loop = loop; w->work = work; w->done = done; post(&w->wq);}
uv_once(&once, init_once);用来初始化libuv的线程池, 只会被调用一次. 线程池中的线程都是执行这样一个函数:
static void worker(void* arg) { struct uv__work* w; QUEUE* q; (void) arg; for (;;) { uv_mutex_lock(&mutex); while (QUEUE_EMPTY(&wq)) { idle_threads += 1; uv_cond_wait(&cond, &mutex); idle_threads -= 1; } ...... }}
可以看到每个线程都是等待在uv_cond_wait(&cond, &mutex);.
初始化完成后, 会调用post(&w->wq):
static void post(QUEUE* q) { uv_mutex_lock(&mutex); QUEUE_INSERT_TAIL(&wq, q); if (idle_threads > 0) uv_cond_signal(&cond); uv_mutex_unlock(&mutex);}
可以看到post(&w->wq)就是把w挂到全局的wq上面, 然后调用uv_cond_signal, 这就会唤醒一个前面的正在等待的线程来处理这个任务, 一个线程唤醒后, 就会执行work函数的后续部分.
static void worker(void* arg) { struct uv__work* w; QUEUE* q; (void) arg; for (;;) { uv_mutex_lock(&mutex); while (QUEUE_EMPTY(&wq)) { idle_threads += 1; uv_cond_wait(&cond, &mutex); idle_threads -= 1; } q = QUEUE_HEAD(&wq); if (q == &exit_message) uv_cond_signal(&cond); else { QUEUE_REMOVE(q); QUEUE_INIT(q); /* Signal uv_cancel() that the work req is executing. */ } uv_mutex_unlock(&mutex); if (q == &exit_message) break; w = QUEUE_DATA(q, struct uv__work, wq); w->work(w); uv_mutex_lock(&w->loop->wq_mutex); w->work = NULL; /* Signal uv_cancel() that the work req is done executing. */ QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq); uv_async_send(&w->loop->wq_async); uv_mutex_unlock(&w->loop->wq_mutex); }}
线程先取出一个链表的元素q.
exit_message相关的操作是用来退出所有线程的, 在libuv退出时, post(&exit_message);被调用, 这会让libuv的所有线程都退出.
如果没有exit_message, 线程正常往后执行. 先把q从wq链表中删除, 然后调用w->work(就是最开始的work_cb, 相当于执行工作), 然后把w放到loop->wq中并调用uv_async_send向event loop发送信号. event loop会注意这个信号并作出相应的处理.
为了弄清楚event loop是如何注意到这个信号的, 我们先来看看uv_async_send都干了什么
int uv_async_send(uv_async_t* handle) { /* Do a cheap read first. */ if (ACCESS_ONCE(int, handle->pending) != 0) return 0; if (cmpxchgi(&handle->pending, 0, 1) == 0) uv__async_send(handle->loop); return 0;}
这里async事件在还没有被处理时(penging=1)多次发送也只有一次生效
判断handle->pending如果是1, 表示已经发送过并且还没处理, 所以直接返回.
如果是0就表明没有pending事件, 原子的设置pending为1, 并调用uv__async_send, 这个函数会往loop->async_io_watcher.fd(一个eventfd)里面写入’\n’, 从而event_loop会在epoll中发现. 发现后会调用相应的回调函数, 那么回调函数是什么呢?
在uv_loop_init中会调用uv_async_init(loop, &loop->wq_async, uv__work_done)来指定回调函数是uv__work_done:
int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) { int err; err = uv__async_start(loop); if (err) return err; uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC); handle->async_cb = async_cb; handle->pending = 0; QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue); uv__handle_start(handle); return 0;}
uv__async_start`中创建eventfd, 并将其POLLIN事件加入event loop, 事件发生时会调用`uv__async_io
uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);uv__io_start(loop, &loop->async_io_watcher, POLLIN);
然后把handle初始化并配置async_cb和pending加入到event loop的async_handles中, 并启动handle.
事件发生后调用uv__async_io:
static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) { char buf[1024]; ssize_t r; QUEUE queue; QUEUE* q; uv_async_t* h; assert(w == &loop->async_io_watcher); for (;;) { r = read(w->fd, buf, sizeof(buf)); if (r == sizeof(buf)) continue; if (r != -1) break; if (errno == EAGAIN || errno == EWOULDBLOCK) break; if (errno == EINTR) continue; abort(); } QUEUE_MOVE(&loop->async_handles, &queue); while (!QUEUE_EMPTY(&queue)) { q = QUEUE_HEAD(&queue); h = QUEUE_DATA(q, uv_async_t, queue); QUEUE_REMOVE(q); QUEUE_INSERT_TAIL(&loop->async_handles, q); if (cmpxchgi(&h->pending, 1, 0) == 0) continue; if (h->async_cb == NULL) continue; h->async_cb(h); }}
可以看到这个函数先把eventfd的内容读空, 然后一次对async_handles中的元素判断其pending, 如果为1就原子的将其至0, 这也表示handle有待处理的异步通知, 因此就会调用h->async_cb(h)
前面说过(uv_async_init(loop, &loop->wq_async, uv__work_done))对于我们的线程池来说这个回调是uv__work_done:
void uv__work_done(uv_async_t* handle) { struct uv__work* w; uv_loop_t* loop; QUEUE* q; QUEUE wq; int err; loop = container_of(handle, uv_loop_t, wq_async); uv_mutex_lock(&loop->wq_mutex); QUEUE_MOVE(&loop->wq, &wq); uv_mutex_unlock(&loop->wq_mutex); while (!QUEUE_EMPTY(&wq)) { q = QUEUE_HEAD(&wq); QUEUE_REMOVE(q); w = container_of(q, struct uv__work, wq); err = (w->work == uv__cancelled) ? UV_ECANCELED : 0; w->done(w, err); }}
函数把loop->wq的元素挨个删除并调用done回调函数. 这个done回调函数就是我们前面说的after_work_cb回调.
至此, libuv的工作队列的实现就说完了.