1. libevent 几个结构体
/*--------------------------------------------------------------------------*/
event_base // 事件集合
event // 事件
event_init() // 初始化默认事件集合
event_base_new() // 新建事件集合
event_dispatch(); // 使用默认事件集合,开始分发事件
event_base_dispatch() // 开始分发事件,循环分发,如果时间集合没有事件,会终止循环
event_base_free() // 释放事件集合
// 将未激活或未等待的事件加入到事件集合,可以加入不同的事件集合
event_base_event(struct event_base *, struct event *)
// 事件的关联项设置,包括关联fd/socket/signal,事件类型,事件回调,事件回调参数
event_set(struct event *, evutil_socket_t, short, void (*)(evutil_socket_t, short, void *), void *);
// 事件的关联项设置,包括关联 event_base, fd/socket/signal, 事件类型,事件回调,事件回调参数
event_assign(struct event *, struct event_base *, evutil_socket_t, short, event_callback_fn, void *)
// 添加事件到等待的事件集合
event_add(struct event *ev, const struct timeval *timeout)
// 从监听事件集合中删除
event_del(struct event *)
// 生效事件,并出发事件回调
event_active(struct event *ev, int res, short ncalls)
// 查询事件是否在等待,存在返回 1,不存在返回 0
event_pending(const struct event *ev, short events, struct timeval *tv)
/*--------------------------------------------------------------------------*/
// 事件 socket 监听器
evconnlistener
// 创建监听器,做一些参数绑定,flags 可以设置是否初始化就启动监听器
struct evconnlistener *evconnlistener_new_bind(struct event_base *base,
evconnlistener_cb cb, void *ptr, unsigned flags, int backlog,
const struct sockaddr *sa, int socklen)
// 设置监听回调
evconnlistener_set_cb
// 使能监听器
evconnlistener_enable
// 释放监听器
evconnlistener_free
/*--------------------------------------------------------------------------*/
bufferevent // 事件缓存
// 创建 bufferevent, 关联 event_base, fd/socket/signal, 设置读写 options
// options 常用 BEV_OPT_CLOSE_ON_FREE
struct bufferevent *bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, int options);
// 设置 bufferevent 读写等回调
void bufferevent_setcb(struct bufferevent *bufev,
bufferevent_data_cb readcb, bufferevent_data_cb writecb,
bufferevent_event_cb eventcb, void *cbarg);
// 释放 bufferevent
bufferevent_free(struct bufferevent *bufev)
// 设置 bufferevent 关联 event_base
bufferevent_base_set(struct event_base *base, struct bufferevent *bufev)
// 获取 bufferevent 关联的 event_base
bufferevent_get_base(struct bufferevent *bev)
// 设置 bufferevent 关联的文件描述符
int bufferevent_setfd(struct bufferevent *bufev, evutil_socket_t fd);
// 获取 bufferevent 关联的文件描述符
evutil_socket_t bufferevent_getfd(struct bufferevent *bufev);
// 读 bufferevent
size_t bufferevent_read(struct bufferevent *bufev, void *data, size_t size);
int bufferevent_read_buffer(struct bufferevent *bufev, struct evbuffer *buf);
// 写 bufferevent
int bufferevent_write(struct bufferevent *bufev, const void *data, size_t size);
int bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf);
2. libevent pipe
#include <iostream>
#include "event.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
int event_assign(struct event *, struct event_base *, evutil_socket_t, short, event_callback_fn, void *);
void fifo_read(evutil_socket_t fd, short, void *)
{
char buf[128] = {0};
int ret = read(fd, buf, sizeof(buf));
if (ret == -1) {
std::cout << "fifo_read error" << std::endl;
} else {
std::cout << "read msg : " << buf << std::endl;
}
}
int main()
{
const std::string fifo_name = "/tmp/ev_fifo";
if (access(fifo_name.c_str(), F_OK) == -1) {
if (mkfifo(fifo_name.c_str(), 0777) == -1) {
std::cout << "mkfifo error" << std::endl;
exit(1);
}
}
int fd_read = open(fifo_name.c_str(), O_RDONLY);
if (fd_read == -1) {
std::cout << "open read error" << std::endl;
exit(1);
}
struct event ev;
// 初始化,event_init 中会初始化 event_base(一个全局变量 current_base)
event_init();
// 设置事件触发的回调和 fd
event_set(&ev, fd_read, EV_READ | EV_PERSIST, fifo_read, NULL);
// 注册事件
event_add(&ev, NULL);
// 事件分发开始,循环开始了
event_dispatch();
close(fd_read);
return 0;
}
#include <iostream>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
#include <string>
int main()
{
const std::string fifo_name = "/tmp/ev_fifo";
if (access(fifo_name.c_str(), F_OK) == -1) {
if (mkfifo(fifo_name.c_str(), 0777) == -1) {
std::cout << "mkfifo error" << std::endl;
exit(1);
}
}
int fd_write = open("/tmp/ev_fifo", O_WRONLY);
if (fd_write == -1) {
std::cout << "open write error" << std::endl;
exit(1);
}
std::string str;
while (std::cin >> str) {
if (str == "bye") {
break;
}
if (!str.empty()) {
write(fd_write, str.c_str(), str.length());
std::cout << "write msg : " << str << std::endl;
}
}
close(fd_write);
return 0;
}
3. libevent signal
#include "event.h"
#include <signal.h>
#include <iostream>
int count = 0;
void singnal_cb(evutil_socket_t sock, short event, void *arg) {
if (sock == SIGINT) {
std::cout << "SIGINT signal callback" << std::endl;
count++;
}
if (count > 3) {
event_del((struct event*)arg);
}
}
int main()
{
// 创建事件集合
struct event_base *ev_base = event_base_new();
struct event ev;
// 监控 CTRL + C 信号
// EV_PERSIST 说明可以多次处理
event_assign(&ev, ev_base, SIGINT, EV_SIGNAL | EV_PERSIST, singnal_cb, &ev);
// 注册事件到集合
event_add(&ev, NULL);
// 对指定事件集合启动分发线程,当没有注册事件时退出
event_base_dispatch(ev_base);
// 释放事件集合
event_base_free(ev_base);
return 0;
}
4. libevent listener
#include <iostream>
#include <stdlib.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <string.h>
#include "event.h"
#include "event2/listener.h"
void bufev_data_read_cb(struct bufferevent *bev, void *ctx)
{
std::cout << "enter bufev_data_read_cb" << std::endl;
char buf[128] = {0};
size_t read = bufferevent_read(bev, buf, sizeof(buf));
evutil_socket_t fd = bufferevent_getfd(bev);
if (read > 0) {
std::cout << "from " << fd << " read msg : " << buf << std::endl;
}
}
void bufev_data_write_cb(struct bufferevent *bev, void *ctx)
{
std::cout << "enter bufev_data_write_cb" << std::endl;
}
void bufev_event_cb(struct bufferevent *bev, short what, void *ctx)
{
std::cout << "enter bufferevent_event_cb" << std::endl;
if (what & BEV_EVENT_EOF) {
std::cout << "BEV_EVENT_EOF" << std::endl;
bufferevent_free(bev); // 释放 bufferevent 对象
} else {
std::cout << "Unknown error" << std::endl;
}
}
void listen_cb(struct evconnlistener *listener, evutil_socket_t fd, struct sockaddr *sockaddr, int socklen, void *ptr)
{
std::cout << "enter listen_cb" << std::endl;
// 获取事件集合 - 作为 ptr 示例
struct event_base* ev_base = (struct event_base*)ptr;
// 创建 bufferevent
struct bufferevent *buf_ev = bufferevent_socket_new(ev_base, fd, BEV_OPT_CLOSE_ON_FREE);
if (buf_ev == NULL) {
std::cout << "listen_cb - bufferevent error." << std::endl;
exit(1);
}
// 设置回调
bufferevent_setcb(buf_ev,
bufev_data_read_cb, // 读回调
NULL, // 写回调
bufev_event_cb, // 文件描述符上有事件时触发的回调
ev_base); // 回调参数
// 使能读
bufferevent_enable(buf_ev, EV_READ);
}
void listen_err_cb(struct evconnlistener *listener, void *arg)
{
std::cout << "enter listen_err_cb" << std::endl;
}
void init_socketaddr(struct sockaddr_in& addr, std::string host, int port) {
memset(addr, 0, sizeof(sockaddr_in));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(host.c_str());
}
int main()
{
std::string host;
int port;
std::cout << "Input host address : ";
std::cin >> host;
std::cout << "Input listen port : ";
std::cin >> port;
struct event_base *ev_base = event_base_new();
struct sockaddr_in server_addr;
init_socketaddr(server_addr, host, port);
struct evconnlistener *listener = evconnlistener_new_bind(ev_base, listen_cb, (void*)ev_base,
// LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE | LEV_OPT_THREADSAFE | LEV_OPT_DISABLED,
LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE,
100,
(struct sockaddr *)&server_addr, sizeof(server_addr));
if (listener == NULL) {
std::cout << "listen error" << std::endl;
exit(1);
}
// 设置监听回调, 也可以在初始化中设置
// evconnlistener_set_cb(listener, listen_cb, (void*)ev_base);
// 设置监听错误回调
// evconnlistener_set_error_cb(listener, listen_err_cb);
// 使能 listener, 创建时 LEV_OPT_DISABLED
// evconnlistener_enable(listener);
// 启动事件分发
event_base_dispatch(ev_base);
// 释放监听器
evconnlistener_free(listener);
// 释放事件集合
event_base_free(ev_base);
return 0;
}
client python 代码
import socket
import threading
from time import sleep
ENCODING = 'utf-8'
def toBytes(msg):
return msg.encode(ENCODING)
def fromBytes(byteArray):
return byteArray.decode(ENCODING)
class Client:
def __init__(self):
self.__socket = None
self.__isRecv = False
self.__isClosed = False
self.__recvThread = None
def __recv(self):
while self.__isRecv:
data = None
try:
data = self.__socket.recv(1024)
except:
self.close()
print("recv exception, connection closed")
return
if not data:
self.close()
break
msg = fromBytes(data)
print("\nrecv msg : \n" + msg)
sleep(0.1)
def connet(self, host, port):
self.__socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
self.__socket.connect((host, port))
def send(self, msg):
self.__socket.send(toBytes(msg))
print("send msg : \n" + msg)
def close(self):
self.stopRecv()
if not self.__isClosed:
sleep(0.1)
self.__socket.close()
self.__isClosed = True
def isClosed(self):
return self.__isClosed
def startRecv(self):
self.__isRecv = True
self.__recvThread = threading.Thread(target=self.__recv, daemon=True)
self.__recvThread.start()
def stopRecv(self):
self.__isRecv = False
host = input("Please input server ipv4 address: ")
port = int(input("Please input server port(1025-65535): "))
client = Client()
client.connet(host, port)
client.startRecv()
cmd = input("Please input cmd: ")
while cmd != "exit" and not client.isClosed():
client.send(cmd)
cmd = input("Please input cmd: ")