Introduction
當下主要有兩種流行的線程庫:
Windows Threads
和
Posix
實作. libuv的thread API類似于
pthread
庫.
libuv 允許你通過開啟線程并且在結束時收集結果來模拟異步操作,而實際上是阻塞操作
是以 libuv 提供自己實作的 thread 方法,是為了能夠讓某些在 loop 中的 handle 能夠在獨立的線程中運作, 也提供方法來與他們進行線程間通信.
接下來的例子假設: libuv 隻有一個
event loop
, 并且隻在單線程上運作.
核心 Thread 操作
建立線程:
int uv_thread_create(uv_thread_t* tid, uv_thread_cb entry, void* arg)
等待線程結束:
int uv_thread_join(uv_thread_t *tid)
龜兔賽跑完整代碼:
#include <stdio.h>
#include <unistd.h>
#include <uv.h>
void hare(void *arg) {
int tracklen = *((int *) arg);
while (tracklen) {
tracklen--;
sleep(1);
fprintf(stderr, "Hare ran another step\n");
}
fprintf(stderr, "Hare done running!\n");
}
void tortoise(void *arg) {
int tracklen = *((int *) arg);
while (tracklen) {
tracklen--;
fprintf(stderr, "Tortoise ran another step\n");
sleep(3);
}
fprintf(stderr, "Tortoise done running!\n");
}
int main() {
int tracklen = 10;
uv_thread_t hare_id;
uv_thread_t tortoise_id;
uv_thread_create(&hare_id, hare, &tracklen);
uv_thread_create(&tortoise_id, tortoise, &tracklen);
uv_thread_join(&hare_id);
uv_thread_join(&tortoise_id);
return 0;
}
線程同步
libuv也提供了相應的同步機制,包括
Mutex
,
rwlocks
,
semaphores
,
condition variables
,
barriers
等
以最經典的讀者寫者問題為例:
#include <stdio.h>
#include <uv.h>
uv_barrier_t blocker;
uv_rwlock_t numlock;
int shared_num;
void reader(void *n)
{
int num = *(int *)n;
int i;
for (i = 0; i < 20; i++) {
uv_rwlock_rdlock(&numlock);
printf("Reader %d: acquired lock\n", num);
printf("Reader %d: shared num = %d\n", num, shared_num);
uv_rwlock_rdunlock(&numlock);
printf("Reader %d: released lock\n", num);
}
uv_barrier_wait(&blocker);
}
void writer(void *n)
{
int num = *(int *)n;
int i;
for (i = 0; i < 20; i++) {
uv_rwlock_wrlock(&numlock);
printf("Writer %d: acquired lock\n", num);
shared_num++;
printf("Writer %d: incremented shared num = %d\n", num, shared_num);
uv_rwlock_wrunlock(&numlock);
printf("Writer %d: released lock\n", num);
}
uv_barrier_wait(&blocker);
}
int main()
{
uv_barrier_init(&blocker, 4);
shared_num = 0;
uv_rwlock_init(&numlock);
uv_thread_t threads[3];
int thread_nums[] = {1, 2, 1};
uv_thread_create(&threads[0], reader, &thread_nums[0]);
uv_thread_create(&threads[1], reader, &thread_nums[1]);
uv_thread_create(&threads[2], writer, &thread_nums[2]);
uv_barrier_wait(&blocker);
uv_barrier_destroy(&blocker);
uv_rwlock_destroy(&numlock);
return 0;
}
到這裡為止, 使用libuv的線程操作可以說與 pthread 太大差別.
libuv work queue
uv_queue_work() 是一個很友善的函數允許應用在一個隔離的線程運作,并且結束之後調用 callback 函數.
int uv_queue_work(uv_loop_t* loop, uv_work_t* req, uv_work_cb work_cb, uv_after_work_cb after_work_cb)
用起來十分簡單, 可以通過在 uv_work_t._data 添加自己的資料來實作與線程通信.
調用 uv_queue_work 一次就類似于之前的調用了 uv_TYPE_t 一次,在loop中添加了一個
handle
(或者說事件), 在
uv_run
的時候,就會被取出并且進行處理
舉個栗子, 異步版斐波那契:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <uv.h>
#define FIB_UNTIL 25
uv_loop_t *loop;
uv_work_t fib_reqs[FIB_UNTIL];
long fib_(long t) {
if (t == 0 || t == 1)
return 1;
else
return fib_(t-1) + fib_(t-2);
}
void fib(uv_work_t *req) {
int n = *(int *) req->data;
if (random() % 2)
sleep(1);
else
sleep(3);
long fib = fib_(n);
fprintf(stderr, "%dth fibonacci is %lu\n", n, fib);
}
void after_fib(uv_work_t *req, int status) {
if (status == UV_ECANCELED)
fprintf(stderr, "Calculation of %d cancelled.\n", *(int *) req->data);
}
void signal_handler(uv_signal_t *req, int signum)
{
printf("Signal received!\n");
int i;
for (i = 0; i < FIB_UNTIL; i++) {
uv_cancel((uv_req_t*) &fib_reqs[i]);
}
uv_signal_stop(req);
}
int main() {
loop = uv_default_loop();
int data[FIB_UNTIL];
int i;
for (i = 0; i < FIB_UNTIL; i++) {
data[i] = i;
fib_reqs[i].data = (void *) &data[i];
uv_queue_work(loop, &fib_reqs[i], fib, after_fib);
}
uv_signal_t sig;
uv_signal_init(loop, &sig);
uv_signal_start(&sig, signal_handler, SIGINT);
return uv_run(loop, UV_RUN_DEFAULT);
}
其中, uv_cancel()用來取消還沒有正在運作的 task, 如果已經開始運作,将會取消失敗.
線程間通信
異步線程通信通過 loop 來溝通, 任何一個線程可以作為 message sender, 但是隻有在 loop 中的線程能夠作為 receiver. 需要注意的是, 由于
uv_async_send
是異步的, 它作用是隻能保證回調函數會被調用一次, 如果有多個 pending 的 uv_async_send 的話,也隻會調用一次
舉個栗子,就以浏覽器下載下傳檔案,需要把下載下傳進度實時彙報給主程序:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <uv.h>
uv_loop_t *loop;
uv_async_t async;
double percentage;
void fake_download(uv_work_t *req) {
int size = *((int*) req->data);
int downloaded = 0;
while (downloaded < size) {
percentage = downloaded*100.0/size;
async.data = (void*) &percentage;
uv_async_send(&async);
sleep(1);
downloaded += (200+random())%1000; // can only download max 1000bytes/sec,
// but at least a 200;
}
}
void after(uv_work_t *req, int status) {
fprintf(stderr, "Download complete\n");
uv_close((uv_handle_t*) &async, NULL);
}
void print_progress(uv_async_t *handle) {
double percentage = *((double*) handle->data);
fprintf(stderr, "Downloaded %.2f%%\n", percentage);
}
int main() {
loop = uv_default_loop();
uv_work_t req;
int size = 10240;
req.data = (void*) &size;
uv_async_init(loop, &async, print_progress);
uv_queue_work(loop, &req, fake_download, after);
return uv_run(loop, UV_RUN_DEFAULT);
}
本文簡化翻譯自 uvbook