1. libevent 幾個結構體
/*--------------------------------------------------------------------------*/
event_base // 事件集合
event // 事件
event_init() // 初始化預設事件集合
event_base_new() // 建立事件集合
event_dispatch(); // 使用預設事件集合,開始分發事件
event_base_dispatch() // 開始分發事件,循環分發,如果時間集合沒有事件,會終止循環
event_base_free() // 釋放事件集合
// 将未激活或未等待的事件加入到事件集合,可以加入不同的事件集合
event_base_event(struct event_base *, struct event *)
// 事件的關聯項設定,包括關聯fd/socket/signal,事件類型,事件回調,事件回調參數
event_set(struct event *, evutil_socket_t, short, void (*)(evutil_socket_t, short, void *), void *);
// 事件的關聯項設定,包括關聯 event_base, fd/socket/signal, 事件類型,事件回調,事件回調參數
event_assign(struct event *, struct event_base *, evutil_socket_t, short, event_callback_fn, void *)
// 添加事件到等待的事件集合
event_add(struct event *ev, const struct timeval *timeout)
// 從監聽事件集合中删除
event_del(struct event *)
// 生效事件,并出發事件回調
event_active(struct event *ev, int res, short ncalls)
// 查詢事件是否在等待,存在傳回 1,不存在傳回 0
event_pending(const struct event *ev, short events, struct timeval *tv)
/*--------------------------------------------------------------------------*/
// 事件 socket 監聽器
evconnlistener
// 建立監聽器,做一些參數綁定,flags 可以設定是否初始化就啟動監聽器
struct evconnlistener *evconnlistener_new_bind(struct event_base *base,
evconnlistener_cb cb, void *ptr, unsigned flags, int backlog,
const struct sockaddr *sa, int socklen)
// 設定監聽回調
evconnlistener_set_cb
// 使能監聽器
evconnlistener_enable
// 釋放監聽器
evconnlistener_free
/*--------------------------------------------------------------------------*/
bufferevent // 事件緩存
// 建立 bufferevent, 關聯 event_base, fd/socket/signal, 設定讀寫 options
// options 常用 BEV_OPT_CLOSE_ON_FREE
struct bufferevent *bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, int options);
// 設定 bufferevent 讀寫等回調
void bufferevent_setcb(struct bufferevent *bufev,
bufferevent_data_cb readcb, bufferevent_data_cb writecb,
bufferevent_event_cb eventcb, void *cbarg);
// 釋放 bufferevent
bufferevent_free(struct bufferevent *bufev)
// 設定 bufferevent 關聯 event_base
bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
// 擷取 bufferevent 關聯的 event_base
bufferevent_get_base(struct bufferevent *bev)
// 設定 bufferevent 關聯的檔案描述符
int bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd);
// 擷取 bufferevent 關聯的檔案描述符
evutil_socket_t bufferevent_getfd(struct bufferevent *bufev);
// 讀 bufferevent
size_t bufferevent_read(struct bufferevent *bufev, void *data, size_t size);
int bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf);
// 寫 bufferevent
int bufferevent_write(struct bufferevent *bufev, const void *data, size_t size);
int bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf);
2. libevent pipe
#include <iostream>
#include "event.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
int event_assign(struct event *, struct event_base *, evutil_socket_t, short, event_callback_fn, void *);
void fifo_read(evutil_socket_t fd, short, void *)
{
char buf[128] = {0};
int ret = read(fd, buf, sizeof(buf));
if (ret == -1) {
std::cout << "fifo_read error" << std::endl;
} else {
std::cout << "read msg : " << buf << std::endl;
}
}
int main()
{
const std::string fifo_name = "/tmp/ev_fifo";
if (access(fifo_name.c_str(), F_OK) == -1) {
if (mkfifo(fifo_name.c_str(), 0777) == -1) {
std::cout << "mkfifo error" << std::endl;
exit(1);
}
}
int fd_read = open(fifo_name.c_str(), O_RDONLY);
if (fd_read == -1) {
std::cout << "open read error" << std::endl;
exit(1);
}
struct event ev;
// 初始化,event_init 中會初始化 event_base(一個全局變量 current_base)
event_init();
// 設定事件觸發的回調和 fd
event_set(&ev, fd_read, EV_READ | EV_PERSIST, fifo_read, NULL);
// 注冊事件
event_add(&ev, NULL);
// 事件分發開始,循環開始了
event_dispatch();
close(fd_read);
return 0;
}
#include <iostream>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#include <string>
int main()
{
const std::string fifo_name = "/tmp/ev_fifo";
if (access(fifo_name.c_str(), F_OK) == -1) {
if (mkfifo(fifo_name.c_str(), 0777) == -1) {
std::cout << "mkfifo error" << std::endl;
exit(1);
}
}
int fd_write = open("/tmp/ev_fifo", O_WRONLY);
if (fd_write == -1) {
std::cout << "open write error" << std::endl;
exit(1);
}
std::string str;
while (std::cin >> str) {
if (str == "bye") {
break;
}
if (!str.empty()) {
write(fd_write, str.c_str(), str.length());
std::cout << "write msg : " << str << std::endl;
}
}
close(fd_write);
return 0;
}
3. libevent signal
#include "event.h"
#include <signal.h>
#include <iostream>
int count = 0;
void singnal_cb(evutil_socket_t sock, short event, void *arg) {
if (sock == SIGINT) {
std::cout << "SIGINT signal callback" << std::endl;
count++;
}
if (count > 3) {
event_del((struct event*)arg);
}
}
int main()
{
// 建立事件集合
struct event_base *ev_base = event_base_new();
struct event ev;
// 監控 CTRL + C 信号
// EV_PERSIST 說明可以多次處理
event_assign(&ev, ev_base, SIGINT, EV_SIGNAL | EV_PERSIST, singnal_cb, &ev);
// 注冊事件到集合
event_add(&ev, NULL);
// 對指定事件集合啟動分發線程,當沒有注冊事件時退出
event_base_dispatch(ev_base);
// 釋放事件集合
event_base_free(ev_base);
return 0;
}
4. libevent listener
#include <iostream>
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string.h>
#include "event.h"
#include "event2/listener.h"
void bufev_data_read_cb(struct bufferevent *bev, void *ctx)
{
std::cout << "enter bufev_data_read_cb" << std::endl;
char buf[128] = {0};
size_t read = bufferevent_read(bev, buf, sizeof(buf));
evutil_socket_t fd = bufferevent_getfd(bev);
if (read > 0) {
std::cout << "from " << fd << " read msg : " << buf << std::endl;
}
}
void bufev_data_write_cb(struct bufferevent *bev, void *ctx)
{
std::cout << "enter bufev_data_write_cb" << std::endl;
}
void bufev_event_cb(struct bufferevent *bev, short what, void *ctx)
{
std::cout << "enter bufferevent_event_cb" << std::endl;
if (what & BEV_EVENT_EOF) {
std::cout << "BEV_EVENT_EOF" << std::endl;
bufferevent_free(bev); // 釋放 bufferevent 對象
} else {
std::cout << "Unknown error" << std::endl;
}
}
void listen_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sockaddr, int socklen, void *ptr)
{
std::cout << "enter listen_cb" << std::endl;
// 擷取事件集合 - 作為 ptr 示例
struct event_base* ev_base = (struct event_base*)ptr;
// 建立 bufferevent
struct bufferevent *buf_ev = bufferevent_socket_new(ev_base, fd, BEV_OPT_CLOSE_ON_FREE);
if (buf_ev == NULL) {
std::cout << "listen_cb - bufferevent error." << std::endl;
exit(1);
}
// 設定回調
bufferevent_setcb(buf_ev,
bufev_data_read_cb, // 讀回調
NULL, // 寫回調
bufev_event_cb, // 檔案描述符上有事件時觸發的回調
ev_base); // 回調參數
// 使能讀
bufferevent_enable(buf_ev, EV_READ);
}
void listen_err_cb(struct evconnlistener *listener, void *arg)
{
std::cout << "enter listen_err_cb" << std::endl;
}
void init_socketaddr(struct sockaddr_in& addr, std::string host, int port) {
memset(addr, 0, sizeof(sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(host.c_str());
}
int main()
{
std::string host;
int port;
std::cout << "Input host address : ";
std::cin >> host;
std::cout << "Input listen port : ";
std::cin >> port;
struct event_base *ev_base = event_base_new();
struct sockaddr_in server_addr;
init_socketaddr(server_addr, host, port);
struct evconnlistener *listener = evconnlistener_new_bind(ev_base, listen_cb, (void*)ev_base,
// LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE | LEV_OPT_THREADSAFE | LEV_OPT_DISABLED,
LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE,
100,
(struct sockaddr *)&server_addr, sizeof(server_addr));
if (listener == NULL) {
std::cout << "listen error" << std::endl;
exit(1);
}
// 設定監聽回調, 也可以在初始化中設定
// evconnlistener_set_cb(listener, listen_cb, (void*)ev_base);
// 設定監聽錯誤回調
// evconnlistener_set_error_cb(listener, listen_err_cb);
// 使能 listener, 建立時 LEV_OPT_DISABLED
// evconnlistener_enable(listener);
// 啟動事件分發
event_base_dispatch(ev_base);
// 釋放監聽器
evconnlistener_free(listener);
// 釋放事件集合
event_base_free(ev_base);
return 0;
}
client python 代碼
import socket
import threading
from time import sleep
ENCODING = 'utf-8'
def toBytes(msg):
return msg.encode(ENCODING)
def fromBytes(byteArray):
return byteArray.decode(ENCODING)
class Client:
def __init__(self):
self.__socket = None
self.__isRecv = False
self.__isClosed = False
self.__recvThread = None
def __recv(self):
while self.__isRecv:
data = None
try:
data = self.__socket.recv(1024)
except:
self.close()
print("recv exception, connection closed")
return
if not data:
self.close()
break
msg = fromBytes(data)
print("\nrecv msg : \n" + msg)
sleep(0.1)
def connet(self, host, port):
self.__socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
self.__socket.connect((host, port))
def send(self, msg):
self.__socket.send(toBytes(msg))
print("send msg : \n" + msg)
def close(self):
self.stopRecv()
if not self.__isClosed:
sleep(0.1)
self.__socket.close()
self.__isClosed = True
def isClosed(self):
return self.__isClosed
def startRecv(self):
self.__isRecv = True
self.__recvThread = threading.Thread(target=self.__recv, daemon=True)
self.__recvThread.start()
def stopRecv(self):
self.__isRecv = False
host = input("Please input server ipv4 address: ")
port = int(input("Please input server port(1025-65535): "))
client = Client()
client.connet(host, port)
client.startRecv()
cmd = input("Please input cmd: ")
while cmd != "exit" and not client.isClosed():
client.send(cmd)
cmd = input("Please input cmd: ")