Libuv Event循环

libuv通过uv_work_queue来交付任务给工作队列的, 这个api也是libuv实现文件异步操作的基础:

UV_EXTERN int uv_queue_work(uv_loop_t* loop,                            uv_work_t* req,                            uv_work_cb work_cb,                            uv_after_work_cb after_work_cb);
int uv_queue_work(uv_loop_t* loop,                  uv_work_t* req,                  uv_work_cb work_cb,                  uv_after_work_cb after_work_cb) {  if (work_cb == NULL)    return UV_EINVAL;  uv__req_init(loop, req, UV_WORK);  req->loop = loop;  req->work_cb = work_cb;  req->after_work_cb = after_work_cb;  uv__work_submit(loop, &req->work_req, uv__queue_work, uv__queue_done);  return 0;}

可以看到函数首先初始化这个请求(req), 然后就调用uv__work_submit完成剩余的工作.

下面我们主要就来分析一下uv__work_submit的操作过程. 看看libuv是如何调用work_cb来完成任务并调用到after_work_cb回调函数的.

void uv__work_submit(uv_loop_t* loop,                     struct uv__work* w,                     void (*work)(struct uv__work* w),                     void (*done)(struct uv__work* w, int status)) {  uv_once(&once, init_once);  w->loop = loop;  w->work = work;  w->done = done;  post(&w->wq);}

uv_once(&once, init_once);用来初始化libuv的线程池, 只会被调用一次. 线程池中的线程都是执行这样一个函数:

static void worker(void* arg) {  struct uv__work* w;  QUEUE* q;  (void) arg;  for (;;) {    uv_mutex_lock(&mutex);    while (QUEUE_EMPTY(&wq)) {      idle_threads += 1;      uv_cond_wait(&cond, &mutex);      idle_threads -= 1;    }    ......  }}

可以看到每个线程都是等待在uv_cond_wait(&cond, &mutex);.

初始化完成后, 会调用post(&w->wq):

static void post(QUEUE* q) {  uv_mutex_lock(&mutex);  QUEUE_INSERT_TAIL(&wq, q);  if (idle_threads > 0)    uv_cond_signal(&cond);  uv_mutex_unlock(&mutex);}

可以看到post(&w->wq)就是把w挂到全局的wq上面, 然后调用uv_cond_signal, 这就会唤醒一个前面的正在等待的线程来处理这个任务, 一个线程唤醒后, 就会执行work函数的后续部分.

static void worker(void* arg) {  struct uv__work* w;  QUEUE* q;  (void) arg;  for (;;) {    uv_mutex_lock(&mutex);    while (QUEUE_EMPTY(&wq)) {      idle_threads += 1;      uv_cond_wait(&cond, &mutex);      idle_threads -= 1;    }    q = QUEUE_HEAD(&wq);    if (q == &exit_message)      uv_cond_signal(&cond);    else {      QUEUE_REMOVE(q);      QUEUE_INIT(q);  /* Signal uv_cancel() that the work req is                             executing. */    }    uv_mutex_unlock(&mutex);    if (q == &exit_message)      break;    w = QUEUE_DATA(q, struct uv__work, wq);    w->work(w);    uv_mutex_lock(&w->loop->wq_mutex);    w->work = NULL;  /* Signal uv_cancel() that the work req is done                        executing. */    QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);    uv_async_send(&w->loop->wq_async);    uv_mutex_unlock(&w->loop->wq_mutex);  }}

线程先取出一个链表的元素q.

exit_message相关的操作是用来退出所有线程的, 在libuv退出时, post(&exit_message);被调用, 这会让libuv的所有线程都退出.

如果没有exit_message, 线程正常往后执行. 先把qwq链表中删除, 然后调用w->work(就是最开始的work_cb, 相当于执行工作), 然后把w放到loop->wq中并调用uv_async_send向event loop发送信号. event loop会注意这个信号并作出相应的处理.

为了弄清楚event loop是如何注意到这个信号的, 我们先来看看uv_async_send都干了什么

int uv_async_send(uv_async_t* handle) {  /* Do a cheap read first. */  if (ACCESS_ONCE(int, handle->pending) != 0)    return 0;  if (cmpxchgi(&handle->pending, 0, 1) == 0)    uv__async_send(handle->loop);  return 0;}

这里async事件在还没有被处理时(penging=1)多次发送也只有一次生效

判断handle->pending如果是1, 表示已经发送过并且还没处理, 所以直接返回.

如果是0就表明没有pending事件, 原子的设置pending为1, 并调用uv__async_send, 这个函数会往loop->async_io_watcher.fd(一个eventfd)里面写入’\n’, 从而event_loop会在epoll中发现. 发现后会调用相应的回调函数, 那么回调函数是什么呢?

uv_loop_init中会调用uv_async_init(loop, &loop->wq_async, uv__work_done)来指定回调函数是uv__work_done:

int uv_async_init(uv_loop_t* loop, uv_async_t* handle, uv_async_cb async_cb) {  int err;  err = uv__async_start(loop);  if (err)    return err;  uv__handle_init(loop, (uv_handle_t*)handle, UV_ASYNC);  handle->async_cb = async_cb;  handle->pending = 0;  QUEUE_INSERT_TAIL(&loop->async_handles, &handle->queue);  uv__handle_start(handle);  return 0;}
uv__async_start`中创建eventfd, 并将其POLLIN事件加入event loop, 事件发生时会调用`uv__async_io
uv__io_init(&loop->async_io_watcher, uv__async_io, pipefd[0]);uv__io_start(loop, &loop->async_io_watcher, POLLIN);

然后把handle初始化并配置async_cb和pending加入到event loop的async_handles中, 并启动handle.

事件发生后调用uv__async_io:

static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {  char buf[1024];  ssize_t r;  QUEUE queue;  QUEUE* q;  uv_async_t* h;  assert(w == &loop->async_io_watcher);  for (;;) {    r = read(w->fd, buf, sizeof(buf));    if (r == sizeof(buf))      continue;    if (r != -1)      break;    if (errno == EAGAIN || errno == EWOULDBLOCK)      break;    if (errno == EINTR)      continue;    abort();  }  QUEUE_MOVE(&loop->async_handles, &queue);  while (!QUEUE_EMPTY(&queue)) {    q = QUEUE_HEAD(&queue);    h = QUEUE_DATA(q, uv_async_t, queue);    QUEUE_REMOVE(q);    QUEUE_INSERT_TAIL(&loop->async_handles, q);    if (cmpxchgi(&h->pending, 1, 0) == 0)      continue;    if (h->async_cb == NULL)      continue;    h->async_cb(h);  }}

可以看到这个函数先把eventfd的内容读空, 然后一次对async_handles中的元素判断其pending, 如果为1就原子的将其至0, 这也表示handle有待处理的异步通知, 因此就会调用h->async_cb(h)

前面说过(uv_async_init(loop, &loop->wq_async, uv__work_done))对于我们的线程池来说这个回调是uv__work_done:

void uv__work_done(uv_async_t* handle) {  struct uv__work* w;  uv_loop_t* loop;  QUEUE* q;  QUEUE wq;  int err;  loop = container_of(handle, uv_loop_t, wq_async);  uv_mutex_lock(&loop->wq_mutex);  QUEUE_MOVE(&loop->wq, &wq);  uv_mutex_unlock(&loop->wq_mutex);  while (!QUEUE_EMPTY(&wq)) {    q = QUEUE_HEAD(&wq);    QUEUE_REMOVE(q);    w = container_of(q, struct uv__work, wq);    err = (w->work == uv__cancelled) ? UV_ECANCELED : 0;    w->done(w, err);  }}

函数把loop->wq的元素挨个删除并调用done回调函数. 这个done回调函数就是我们前面说的after_work_cb回调. 至此, libuv的工作队列的实现就说完了.