天天看點

libevent 程式設計介紹1. libevent 幾個結構體2. libevent pipe3. libevent signal4. libevent listener

1. libevent 幾個結構體

/*--------------------------------------------------------------------------*/
event_base  				// 事件集合
event						// 事件
   
event_init() 				// 初始化預設事件集合
event_base_new() 			// 建立事件集合
event_dispatch();			// 使用預設事件集合,開始分發事件
event_base_dispatch()		// 開始分發事件,循環分發,如果時間集合沒有事件,會終止循環
event_base_free()			// 釋放事件集合

// 将未激活或未等待的事件加入到事件集合,可以加入不同的事件集合
event_base_event(struct event_base *, struct event *) 

// 事件的關聯項設定,包括關聯fd/socket/signal,事件類型,事件回調,事件回調參數
event_set(struct event *, evutil_socket_t, short, void (*)(evutil_socket_t, short, void *), void *);

// 事件的關聯項設定,包括關聯 event_base, fd/socket/signal, 事件類型,事件回調,事件回調參數
event_assign(struct event *, struct event_base *, evutil_socket_t, short, event_callback_fn, void *)

// 添加事件到等待的事件集合
event_add(struct event *ev, const struct timeval *timeout)
// 從監聽事件集合中删除
event_del(struct event *)
// 生效事件,并出發事件回調
event_active(struct event *ev, int res, short ncalls)
// 查詢事件是否在等待,存在傳回 1,不存在傳回 0
event_pending(const struct event *ev, short events, struct timeval *tv)

/*--------------------------------------------------------------------------*/
// 事件 socket 監聽器
evconnlistener

// 建立監聽器,做一些參數綁定,flags 可以設定是否初始化就啟動監聽器
struct evconnlistener *evconnlistener_new_bind(struct event_base *base,
    evconnlistener_cb cb, void *ptr, unsigned flags, int backlog,
    const struct sockaddr *sa, int socklen)

// 設定監聽回調
evconnlistener_set_cb

// 使能監聽器
evconnlistener_enable

// 釋放監聽器
evconnlistener_free

/*--------------------------------------------------------------------------*/
bufferevent		// 事件緩存

// 建立 bufferevent, 關聯 event_base, fd/socket/signal, 設定讀寫 options
// options 常用 BEV_OPT_CLOSE_ON_FREE
struct bufferevent *bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, int options);

// 設定 bufferevent 讀寫等回調
void bufferevent_setcb(struct bufferevent *bufev,
    bufferevent_data_cb readcb, bufferevent_data_cb writecb,
    bufferevent_event_cb eventcb, void *cbarg);

// 釋放 bufferevent
bufferevent_free(struct bufferevent *bufev)

// 設定 bufferevent 關聯 event_base
bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
// 擷取 bufferevent 關聯的 event_base
bufferevent_get_base(struct bufferevent *bev)

// 設定 bufferevent 關聯的檔案描述符
int bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd);
// 擷取 bufferevent 關聯的檔案描述符
evutil_socket_t bufferevent_getfd(struct bufferevent *bufev);

// 讀 bufferevent
size_t bufferevent_read(struct bufferevent *bufev, void *data, size_t size);
int bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf);
// 寫 bufferevent
int bufferevent_write(struct bufferevent *bufev, const void *data, size_t size);
int bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf);

           

2. libevent pipe

#include <iostream>
#include "event.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>

int event_assign(struct event *, struct event_base *, evutil_socket_t, short, event_callback_fn, void *);

void fifo_read(evutil_socket_t fd, short, void *)
{
    char buf[128] = {0};
    int ret = read(fd, buf, sizeof(buf));
    if (ret == -1) {
        std::cout << "fifo_read error" << std::endl;
    } else {
         std::cout << "read msg : " << buf << std::endl;
    }
}

int main()
{
    const std::string fifo_name = "/tmp/ev_fifo";
    if (access(fifo_name.c_str(), F_OK) == -1) {
        if (mkfifo(fifo_name.c_str(), 0777) == -1) {
            std::cout << "mkfifo error" << std::endl;
            exit(1);
        }
    }

    int fd_read = open(fifo_name.c_str(), O_RDONLY);
    if (fd_read == -1) {
        std::cout << "open read error" << std::endl;
        exit(1);
    }

    struct event ev;

    // 初始化,event_init 中會初始化 event_base(一個全局變量 current_base)
    event_init();

    // 設定事件觸發的回調和 fd
    event_set(&ev, fd_read, EV_READ | EV_PERSIST, fifo_read, NULL);

    // 注冊事件
    event_add(&ev, NULL);

    // 事件分發開始,循環開始了
    event_dispatch();

    close(fd_read);
    
    return 0;
}
           
#include <iostream>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#include <string>

int main()
{
    const std::string fifo_name = "/tmp/ev_fifo";
    if (access(fifo_name.c_str(), F_OK) == -1) {
        if (mkfifo(fifo_name.c_str(), 0777) == -1) {
            std::cout << "mkfifo error" << std::endl;
            exit(1);
        }
    }

    int fd_write = open("/tmp/ev_fifo", O_WRONLY);
    if (fd_write == -1) {
        std::cout << "open write error" << std::endl;
        exit(1);
    }

    std::string str;
    while (std::cin >> str) {
        if (str == "bye") {
            break;
        }

        if (!str.empty()) {
            write(fd_write, str.c_str(), str.length());
            std::cout << "write msg : " << str << std::endl;
        }       
    }

    close(fd_write);
    return 0;
}
           

3. libevent signal

#include "event.h"
#include <signal.h>
#include <iostream>

int count = 0;
void singnal_cb(evutil_socket_t sock, short event, void *arg) {
    if (sock == SIGINT) {
        std::cout << "SIGINT signal callback" << std::endl;
        count++;
    } 

    if (count > 3) {
        event_del((struct event*)arg);
    }
}

int main()
{
    // 建立事件集合
    struct event_base *ev_base = event_base_new();
    struct event ev;

    // 監控 CTRL + C 信号
    // EV_PERSIST 說明可以多次處理
    event_assign(&ev, ev_base, SIGINT, EV_SIGNAL | EV_PERSIST, singnal_cb, &ev);

    // 注冊事件到集合
    event_add(&ev, NULL);

    // 對指定事件集合啟動分發線程,當沒有注冊事件時退出
    event_base_dispatch(ev_base);

    // 釋放事件集合
    event_base_free(ev_base);

    return 0;
}
           

4. libevent listener

#include <iostream>
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string.h>
#include "event.h"
#include "event2/listener.h"

void bufev_data_read_cb(struct bufferevent *bev, void *ctx)
{
    std::cout << "enter bufev_data_read_cb" << std::endl;

    char buf[128] = {0};
    size_t read = bufferevent_read(bev, buf, sizeof(buf));
    evutil_socket_t fd = bufferevent_getfd(bev);
    if (read > 0) {
        std::cout << "from " << fd << " read msg : " << buf << std::endl;
    }
}

void bufev_data_write_cb(struct bufferevent *bev, void *ctx)
{
    std::cout << "enter bufev_data_write_cb" << std::endl;
}

void bufev_event_cb(struct bufferevent *bev, short what, void *ctx)
{
    std::cout << "enter bufferevent_event_cb" << std::endl;

    if (what & BEV_EVENT_EOF) {
        std::cout << "BEV_EVENT_EOF" << std::endl;
        bufferevent_free(bev); // 釋放 bufferevent 對象
    } else {
        std::cout << "Unknown error" << std::endl;
    }
}

void listen_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sockaddr, int socklen, void *ptr)
{
    std::cout << "enter listen_cb" << std::endl;

    // 擷取事件集合 - 作為 ptr 示例
    struct event_base* ev_base = (struct event_base*)ptr;

    // 建立 bufferevent
    struct bufferevent *buf_ev = bufferevent_socket_new(ev_base, fd, BEV_OPT_CLOSE_ON_FREE);
    if (buf_ev == NULL) {
        std::cout << "listen_cb - bufferevent error." << std::endl;
        exit(1);
    }

    // 設定回調
    bufferevent_setcb(buf_ev,
            bufev_data_read_cb, // 讀回調
            NULL, // 寫回調
            bufev_event_cb, // 檔案描述符上有事件時觸發的回調 
            ev_base); // 回調參數

    // 使能讀
    bufferevent_enable(buf_ev, EV_READ);
}

void listen_err_cb(struct evconnlistener *listener, void *arg)
{
    std::cout << "enter listen_err_cb" << std::endl;
}

void init_socketaddr(struct sockaddr_in& addr, std::string host, int port) {
    memset(addr, 0, sizeof(sockaddr_in));
    addr.sin_family = AF_INET;
    addr.sin_port = htons(port);
    addr.sin_addr.s_addr = inet_addr(host.c_str());
}

int main()
{
    std::string host;
    int port;
    std::cout << "Input host address : ";
    std::cin >> host;
    std::cout << "Input listen port : ";
    std::cin >> port;

    struct event_base *ev_base = event_base_new();
    struct sockaddr_in server_addr;
    init_socketaddr(server_addr, host, port);
    struct evconnlistener *listener = evconnlistener_new_bind(ev_base, listen_cb, (void*)ev_base, 
        // LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE | LEV_OPT_THREADSAFE | LEV_OPT_DISABLED, 
        LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE,
        100,
        (struct sockaddr *)&server_addr, sizeof(server_addr)); 
    if (listener == NULL) {
        std::cout << "listen error" << std::endl;
        exit(1);
    }

    // 設定監聽回調, 也可以在初始化中設定
    // evconnlistener_set_cb(listener, listen_cb, (void*)ev_base);

    // 設定監聽錯誤回調
    // evconnlistener_set_error_cb(listener, listen_err_cb);

    // 使能 listener, 建立時 LEV_OPT_DISABLED
    // evconnlistener_enable(listener);

    // 啟動事件分發
    event_base_dispatch(ev_base);

    // 釋放監聽器
    evconnlistener_free(listener);

    // 釋放事件集合
    event_base_free(ev_base);

    return 0;
}
           

client python 代碼

import socket
import threading
from time import sleep

ENCODING = 'utf-8'

def toBytes(msg):
    return msg.encode(ENCODING)

def fromBytes(byteArray):
    return byteArray.decode(ENCODING)

class Client:
    def __init__(self):
        self.__socket = None
        self.__isRecv = False
        self.__isClosed = False
        self.__recvThread = None

    def __recv(self):
        while self.__isRecv:
            data = None
            try:
                data = self.__socket.recv(1024)
            except:
                self.close()
                print("recv exception, connection closed")
                return
            if not data:
                self.close()
                break
            msg = fromBytes(data)
            print("\nrecv msg : \n" + msg)
            sleep(0.1)

    def connet(self, host, port):
        self.__socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
        self.__socket.connect((host, port))

    def send(self, msg):
        self.__socket.send(toBytes(msg))
        print("send msg : \n" + msg)

    def close(self):
        self.stopRecv()
        if not self.__isClosed:
            sleep(0.1)
            self.__socket.close()
            self.__isClosed = True

    def isClosed(self):
        return self.__isClosed

    def startRecv(self):
        self.__isRecv = True
        self.__recvThread = threading.Thread(target=self.__recv, daemon=True)
        self.__recvThread.start()

    def stopRecv(self):
        self.__isRecv = False

host = input("Please input server ipv4 address: ")
port = int(input("Please input server port(1025-65535): "))

client = Client()
client.connet(host, port)
client.startRecv()

cmd = input("Please input cmd: ")

while cmd != "exit" and not client.isClosed():
    client.send(cmd)
    cmd = input("Please input cmd: ")