天天看點

libuv學習(四) : Threads

Introduction

當下主要有兩種流行的線程庫:

Windows Threads

Posix

實作. libuv的thread API類似于

pthread

庫.

libuv 允許你通過開啟線程并且在結束時收集結果來模拟異步操作,而實際上是阻塞操作

是以 libuv 提供自己實作的 thread 方法,是為了能夠讓某些在 loop 中的 handle 能夠在獨立的線程中運作, 也提供方法來與他們進行線程間通信.

接下來的例子假設: libuv 隻有一個

event loop

, 并且隻在單線程上運作.

核心 Thread 操作

建立線程:

int uv_thread_create(uv_thread_t* tid, uv_thread_cb entry, void* arg)
           

等待線程結束:

int uv_thread_join(uv_thread_t *tid)
           

龜兔賽跑完整代碼:

#include <stdio.h>
#include <unistd.h>
#include <uv.h>

void hare(void *arg) {
    int tracklen = *((int *) arg);
    while (tracklen) {
        tracklen--;
        sleep(1);
        fprintf(stderr, "Hare ran another step\n");
    }
    fprintf(stderr, "Hare done running!\n");
}

void tortoise(void *arg) {
    int tracklen = *((int *) arg);
    while (tracklen) {
        tracklen--;
        fprintf(stderr, "Tortoise ran another step\n");
        sleep(3);
    }
    fprintf(stderr, "Tortoise done running!\n");
}

int main() {
    int tracklen = 10;
    uv_thread_t hare_id;
    uv_thread_t tortoise_id;
    uv_thread_create(&hare_id, hare, &tracklen);
    uv_thread_create(&tortoise_id, tortoise, &tracklen);

    uv_thread_join(&hare_id);
    uv_thread_join(&tortoise_id);
    return 0;
}
           

線程同步

libuv也提供了相應的同步機制,包括

Mutex

,

rwlocks

,

semaphores

,

condition variables

,

barriers

以最經典的讀者寫者問題為例:

#include <stdio.h>
#include <uv.h>

uv_barrier_t blocker;
uv_rwlock_t numlock;
int shared_num;

void reader(void *n)
{
    int num = *(int *)n;
    int i;
    for (i = 0; i < 20; i++) {
        uv_rwlock_rdlock(&numlock);
        printf("Reader %d: acquired lock\n", num);
        printf("Reader %d: shared num = %d\n", num, shared_num);
        uv_rwlock_rdunlock(&numlock);
        printf("Reader %d: released lock\n", num);
    }
    uv_barrier_wait(&blocker);
}

void writer(void *n)
{
    int num = *(int *)n;
    int i;
    for (i = 0; i < 20; i++) {
        uv_rwlock_wrlock(&numlock);
        printf("Writer %d: acquired lock\n", num);
        shared_num++;
        printf("Writer %d: incremented shared num = %d\n", num, shared_num);
        uv_rwlock_wrunlock(&numlock);
        printf("Writer %d: released lock\n", num);
    }
    uv_barrier_wait(&blocker);
}

int main()
{
    uv_barrier_init(&blocker, 4);

    shared_num = 0;
    uv_rwlock_init(&numlock);

    uv_thread_t threads[3];

    int thread_nums[] = {1, 2, 1};
    uv_thread_create(&threads[0], reader, &thread_nums[0]);
    uv_thread_create(&threads[1], reader, &thread_nums[1]);

    uv_thread_create(&threads[2], writer, &thread_nums[2]);

    uv_barrier_wait(&blocker);
    uv_barrier_destroy(&blocker);

    uv_rwlock_destroy(&numlock);
    return 0;
}
           

到這裡為止, 使用libuv的線程操作可以說與 pthread 太大差別.

libuv work queue

uv_queue_work() 是一個很友善的函數允許應用在一個隔離的線程運作,并且結束之後調用 callback 函數.

int uv_queue_work(uv_loop_t* loop, uv_work_t* req, uv_work_cb work_cb, uv_after_work_cb after_work_cb)
           

用起來十分簡單, 可以通過在 uv_work_t._data 添加自己的資料來實作與線程通信.

調用 uv_queue_work 一次就類似于之前的調用了 uv_TYPE_t 一次,在loop中添加了一個

handle

(或者說事件), 在

uv_run

的時候,就會被取出并且進行處理

舉個栗子, 異步版斐波那契:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#include <uv.h>

#define FIB_UNTIL 25
uv_loop_t *loop;
uv_work_t fib_reqs[FIB_UNTIL];

long fib_(long t) {
    if (t == 0 || t == 1)
        return 1;
    else
        return fib_(t-1) + fib_(t-2);
}

void fib(uv_work_t *req) {
    int n = *(int *) req->data;
    if (random() % 2)
        sleep(1);
    else
        sleep(3);
    long fib = fib_(n);
    fprintf(stderr, "%dth fibonacci is %lu\n", n, fib);
}

void after_fib(uv_work_t *req, int status) {
    if (status == UV_ECANCELED)
        fprintf(stderr, "Calculation of %d cancelled.\n", *(int *) req->data);
}

void signal_handler(uv_signal_t *req, int signum)
{
    printf("Signal received!\n");
    int i;
    for (i = 0; i < FIB_UNTIL; i++) {
        uv_cancel((uv_req_t*) &fib_reqs[i]);
    }
    uv_signal_stop(req);
}

int main() {
    loop = uv_default_loop();

    int data[FIB_UNTIL];
    int i;
    for (i = 0; i < FIB_UNTIL; i++) {
        data[i] = i;
        fib_reqs[i].data = (void *) &data[i];
        uv_queue_work(loop, &fib_reqs[i], fib, after_fib);
    }

    uv_signal_t sig;
    uv_signal_init(loop, &sig);
    uv_signal_start(&sig, signal_handler, SIGINT);
    return uv_run(loop, UV_RUN_DEFAULT);
}
           

其中, uv_cancel()用來取消還沒有正在運作的 task, 如果已經開始運作,将會取消失敗.

線程間通信

異步線程通信通過 loop 來溝通, 任何一個線程可以作為 message sender, 但是隻有在 loop 中的線程能夠作為 receiver. 需要注意的是, 由于

uv_async_send

是異步的, 它作用是隻能保證回調函數會被調用一次, 如果有多個 pending 的 uv_async_send 的話,也隻會調用一次

舉個栗子,就以浏覽器下載下傳檔案,需要把下載下傳進度實時彙報給主程序:

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#include <uv.h>

uv_loop_t *loop;
uv_async_t async;

double percentage;

void fake_download(uv_work_t *req) {
    int size = *((int*) req->data);
    int downloaded = 0;
    while (downloaded < size) {
        percentage = downloaded*100.0/size;
        async.data = (void*) &percentage;
        uv_async_send(&async);

        sleep(1);
        downloaded += (200+random())%1000; // can only download max 1000bytes/sec,
                                           // but at least a 200;
    }
}

void after(uv_work_t *req, int status) {
    fprintf(stderr, "Download complete\n");
    uv_close((uv_handle_t*) &async, NULL);
}

void print_progress(uv_async_t *handle) {
    double percentage = *((double*) handle->data);
    fprintf(stderr, "Downloaded %.2f%%\n", percentage);
}

int main() {
    loop = uv_default_loop();

    uv_work_t req;
    int size = 10240;
    req.data = (void*) &size;

    uv_async_init(loop, &async, print_progress);
    uv_queue_work(loop, &req, fake_download, after);

    return uv_run(loop, UV_RUN_DEFAULT);
}
           

本文簡化翻譯自 uvbook

繼續閱讀