天天看點

一文搞懂select、poll和epoll差別(上)1 select

1 select

select本質上是通過設定或檢查存放fd标志位的資料結構進行下一步處理。

這帶來缺點:

單個程序可監視的fd數量被限制,即能監聽端口的數量有限

單個程序所能打開的最大連接配接數有FD_SETSIZE宏定義,其大小是32個整數的大小(在32位的機器上,大小就是3232,同理64位機器上FD_SETSIZE為3264),當然我們可以對進行修改,然後重新編譯核心,但是性能可能會受到影響,這需要進一步的測試

一般該數和系統記憶體關系很大,具體數目可以cat /proc/sys/fs/file-max察看。32位機預設1024個,64位預設2048。

一文搞懂select、poll和epoll差別(上)1 select

對socket是線性掃描,即輪詢,效率較低:

僅知道有I/O事件發生,卻不知是哪幾個流,隻會無差異輪詢所有流,找出能讀資料或寫資料的流進行操作。同時處理的流越多,無差别輪詢時間越長 - O(n)。

當socket較多時,每次select都要通過周遊

FD_SETSIZE

個socket,不管是否活躍,這會浪費很多CPU時間。如果能給 socket 注冊某個回調函數,當他們活躍時,自動完成相關操作,即可避免輪詢,這就是epoll與kqueue。

1.1 調用過程

一文搞懂select、poll和epoll差別(上)1 select
asmlinkage long sys_poll(struct pollfd * ufds, unsigned int nfds, long timeout)
{
    int i, j, fdcount, err;
    struct pollfd **fds;
    struct poll_wqueues table, *wait;
    int nchunks, nleft;

    /* Do a sanity check on nfds ... */
    if (nfds > NR_OPEN)
        return -EINVAL;

    if (timeout) {
        /* Careful about overflow in the intermediate values */
        if ((unsigned long) timeout < MAX_SCHEDULE_TIMEOUT / HZ)
            timeout = (unsigned long)(timeout*HZ+999)/1000+1;
        else /* Negative or overflow */
            timeout = MAX_SCHEDULE_TIMEOUT;
    }
    // 2. 注冊回調函數__pollwait
    poll_initwait(&table);
    wait = &table;
    if (!timeout)
        wait = NULL;

    err = -ENOMEM;
    fds = NULL;
    if (nfds != 0) {
        fds = (struct pollfd **)kmalloc(
            (1 + (nfds - 1) / POLLFD_PER_PAGE) * sizeof(struct pollfd *),
            GFP_KERNEL);
        if (fds == NULL)
            goto out;
    }

    nchunks = 0;
    nleft = nfds;
    while (nleft > POLLFD_PER_PAGE) { /* allocate complete PAGE_SIZE chunks */
        fds[nchunks] = (struct pollfd *)__get_free_page(GFP_KERNEL);
        if (fds[nchunks] == NULL)
            goto out_fds;
        nchunks++;
        nleft -= POLLFD_PER_PAGE;
    }
    if (nleft) { /* allocate last PAGE_SIZE chunk, only nleft elements used */
        fds[nchunks] = (struct pollfd *)__get_free_page(GFP_KERNEL);
        if (fds[nchunks] == NULL)
            goto out_fds;
    }

    err = -EFAULT;
    for (i=0; i < nchunks; i++)
        // 
        if (copy_from_user(fds[i], ufds + i*POLLFD_PER_PAGE, PAGE_SIZE))
            goto out_fds1;
    if (nleft) {
        if (copy_from_user(fds[nchunks], ufds + nchunks*POLLFD_PER_PAGE, 
                nleft * sizeof(struct pollfd)))
            goto out_fds1;
    }

    fdcount = do_poll(nfds, nchunks, nleft, fds, wait, timeout);

    /* OK, now copy the revents fields back to user space. */
    for(i=0; i < nchunks; i++)
        for (j=0; j < POLLFD_PER_PAGE; j++, ufds++)
            __put_user((fds[i] + j)->revents, &ufds->revents);
    if (nleft)
        for (j=0; j < nleft; j++, ufds++)
            __put_user((fds[nchunks] + j)->revents, &ufds->revents);

    err = fdcount;
    if (!fdcount && signal_pending(current))
        err = -EINTR;

out_fds1:
    if (nleft)
        free_page((unsigned long)(fds[nchunks]));
out_fds:
    for (i=0; i < nchunks; i++)
        free_page((unsigned long)(fds[i]));
    if (nfds != 0)
        kfree(fds);
out:
    poll_freewait(&table);
    return err;
}      
static int do_poll(unsigned int nfds, unsigned int nchunks, unsigned int nleft, 
    struct pollfd *fds[], struct poll_wqueues *wait, long timeout)
{
    int count;
    poll_table* pt = &wait->pt;

    for (;;) {
        unsigned int i;

        set_current_state(TASK_INTERRUPTIBLE);
        count = 0;
        for (i=0; i < nchunks; i++)
            do_pollfd(POLLFD_PER_PAGE, fds[i], &pt, &count);
        if (nleft)
            do_pollfd(nleft, fds[nchunks], &pt, &count);
        pt = NULL;
        if (count || !timeout || signal_pending(current))
            break;
        count = wait->error;
        if (count)
            break;
        timeout = schedule_timeout(timeout);
    }
    current->state = TASK_RUNNING;
    return count;
}      
  1. 使用copy_from_user從使用者空間拷貝fd_set到核心空間
  2. 注冊回調函數__pollwait
    一文搞懂select、poll和epoll差別(上)1 select
  3. 周遊所有fd,調用其對應的poll方法(對于socket,這個poll方法是sock_poll,sock_poll根據情況會調用到tcp_poll,udp_poll或datagram_poll)
  4. 以tcp_poll為例,核心實作就是

    __pollwait

    ,即上面注冊的回調函數
  5. __pollwait,就是把current(目前程序)挂到裝置的等待隊列,不同裝置有不同等待隊列,如tcp_poll的等待隊列是sk->sk_sleep(把程序挂到等待隊列中并不代表程序已睡眠)。在裝置收到一條消息(網絡裝置)或填寫完檔案資料(磁盤裝置)後,會喚醒裝置等待隊列上睡眠的程序,這時current便被喚醒。
void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *_p)
{
    struct poll_wqueues *p = container_of(_p, struct poll_wqueues, pt);
    struct poll_table_page *table = p->table;

    if (!table || POLL_TABLE_FULL(table)) {
        struct poll_table_page *new_table;

        new_table = (struct poll_table_page *) __get_free_page(GFP_KERNEL);
        if (!new_table) {
            p->error = -ENOMEM;
            __set_current_state(TASK_RUNNING);
            return;
        }
        new_table->entry = new_table->entries;
        new_table->next = table;
        p->table = new_table;
        table = new_table;
    }

    /* 添加新節點 */
    {
        struct poll_table_entry * entry = table->entry;
        table->entry = entry+1;
        get_file(filp);
        entry->filp = filp;
        entry->wait_address = wait_address;
        init_waitqueue_entry(&entry->wait, current);
        add_wait_queue(wait_address,&entry->wait);
    }
}      
static void do_pollfd(unsigned int num, struct pollfd * fdpage,
    poll_table ** pwait, int *count)
{
    int i;

    for (i = 0; i < num; i++) {
        int fd;
        unsigned int mask;
        struct pollfd *fdp;

        mask = 0;
        fdp = fdpage+i;
        fd = fdp->fd;
        if (fd >= 0) {
            struct file * file = fget(fd);
            mask = POLLNVAL;
            if (file != NULL) {
                mask = DEFAULT_POLLMASK;
                if (file->f_op && file->f_op->poll)
                    mask = file->f_op->poll(file, *pwait);
                mask &= fdp->events | POLLERR | POLLHUP;
                fput(file);
            }
            if (mask) {
                *pwait = NULL;
                (*count)++;
            }
        }
        fdp->revents = mask;
    }
}      
  1. poll方法傳回時會傳回一個描述讀寫操作是否就緒的mask掩碼,根據這個mask掩碼給fd_set指派
  2. 若周遊完所有fd,還沒傳回一個可讀寫的mask掩碼,則調schedule_timeout是調用select的程序(也就是current)進入睡眠。當裝置驅動發生自身資源可讀寫後,會喚醒其等待隊列上睡眠的程序。若超過一定逾時時間(schedule_timeout指定),還沒人喚醒,則調用select的程序會重新被喚醒獲得CPU,進而重新周遊fd,判斷有無就緒的fd
  3. 把fd_set從核心空間拷貝到使用者空間

繼續閱讀