【网络通信 -- 直播】SRS 源码分析 -- 协程基础知识点总结(一)
【1】协程简介
IO 同步操作的逻辑代码
IO 异步操作的逻辑代码
IO 异步操作与 IO 同步操作对比
协程主要解决的问题,提供同时具有异步性能与同步代码逻辑的解决方案;C/C++ 典型的协程库 NtyCo,https://github.com/wangbojing/NtyCo;
【2】NtyCo 简介
【2.1】NtyCo 相关 API 简介
NtyCo 封装了若干接口,一类是协程本身的,一类是 posix 异步封装协程 API;
协程相关 API 函数接口
协程创建
int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg);
协程调度器运行
void nty_schedule_run(void);
POSIX 异步封装 API 函数接口
创建 socke 实例
int nty_socket(int domain, int type, int protocol);
接收请求
int nty_accept(int fd, struct sockaddr *addr, socklen_t *len);
接收数据
ssize_t nty_recv(int fd, void *buf, size_t len, int flags);
发送数据
ssize_t nty_send(int fd, const void *buf, size_t len, int flags);
关闭 socket 实例
int nty_close(int fd);
【2.2】协程的实现
1. 协程的实现之创建协程
int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg);
- 参数 1 : nty_coroutine **new_co,需要传入空的协程的对象,这个对象是由函数内部创建并返回的;
- 参数 2 : proc_coroutine func 协程的子过程;当协程被调度的时候,就会执行该函数;
- 参数 3 : void *arg 需要传入到新协程中的参数;
协程不存在亲属关系,都是一致的调度关系,接受调度器的调度;调用 create API 就会创建一个新协程,新协程就会加入到调度器的就绪队列中;
2. 协程的实现之实现 IO 异步操作
调度器与协程的上下文切换图示
在协程的上下文 IO 异步操作函数,步骤如下
- 1. 将 sockfd 添加到 epoll 管理中
- 2. 进行上下文环境切换,由协程上下文 yield 到调度器的上下文
- 3. 调度器获取下一个协程上下文,resume 新的协程
IO 异步操作的上下文切换的时序图
3. 协程的实现之回调协程的子过程
关键示例代码
static void nty_coroutine_init(nty_coroutine *co) {
void **stack = (void **)(co->stack + co->stack_size);
stack[-3] = NULL;
stack[-2] = (void *)co;
// ctx 协程的上下文
// 可以将回调函数地址存入 EIP 中,将相应参数存储到相应的参数寄存器中
co->ctx.esp = (void*)stack - (4 * sizeof(void*));
co->ctx.ebp = (void*)stack - (3 * sizeof(void*));
// CPU 寄存器 EIP 用于存储 CPU 运行下一条指令的地址
co->ctx.eip = (void*)_exec; // 设置回调函数入口
co->status = BIT(NTY_COROUTINE_STATUS_READY);
}
static void _exec(void *lt) {
#if defined(__lvm__) && defined(__x86_64__)
__asm__("movq 16(%%rbp), %[lt]" : [lt] "=r" (lt));
#endif
nty_coroutine *co = (nty_coroutine*)lt;
co->func(co->arg); //
co->status |= (BIT(NTY_COROUTINE_STATUS_EXITED) | BIT(NTY_COROUTINE_STATUS_FDEOF)
| BIT(NTY_COROUTINE_STATUS_DETACH));
#if 1
nty_coroutine_yield(co);
#else
co->ops = 0;
_switch(&co->sched->ctx, &co->ctx);
#endif
}
4. 协程实现之原语操作
协程的核心原语操作 : create, resume, yield;
协程的原语操作有 create 但却没有 exit 的分析
- 以 NtyCo 为例,协程一旦创建就不能由用户自己销毁,必须在子过程执行结束时便自动销毁协程的上下文数据;
- 以 _exec 执行入口函数返回而销毁协程的上下文与相关信息;
- co->func(co->arg) 是子过程,若用户需要长久运行协程,就必须要在 func 函数里面写入循环等操作;
- 因此 NtyCo 里面没有实现 exit 的原语操作
原语 create : 创建一个协程
// 1. 调度器是否存在,不存在也创建;调度器作为全局的单例;将调度器的实例存储在线程的私有空间 pthread_setspecific
// 2. 分配一个 coroutine 的内存空间,分别设置
// coroutine 的数据项,栈空间,栈大小,初始状态,创建时间,子过程回调函数,子过程的调用参数
// 3. 将新分配协程添加到就绪队列 ready_queue 中
int nty_coroutine_create(nty_coroutine **new_co, proc_coroutine func, void *arg)
原语 yield : 让出 CPU
// 参数:需要恢复运行的协程实例
// 调用后该函数也不会立即返回,而是切换到运行协程实例的 yield 的位置;
// 返回是在等协程相应事务处理完成后,主动 yield 会返回到 resume 的地方;
void nty_coroutine_yield(nty_coroutine *co)
原语 resume : 恢复协程的运行权
// 参数:需要恢复运行的协程实例
// 调用后该函数也不会立即返回,而是切换到运行协程实例的 yield 的位置;
// 返回是在等协程相应事务处理完成后,主动 yield 会返回到 resume 的地方;
int nty_coroutine_resume(nty_coroutine *co)
5. 协程实现之切换
上下文切换,就是将 CPU 的寄存器暂时保存,再将即将运行的协程的上下文寄存器,分别 mov 到相对应的寄存器上,从而完成上下文的切换;
nty_cpu_ctx 结构体
// 关联相关寄存器的结构体
typedef struct _nty_cpu_ctx {
void *esp;
void *ebp;
void *eip;
void *edi;
void *esi;
void *ebx;
void *r1;
void *r2;
void *r3;
void *r4;
void *r5;
} nty_cpu_ctx;
_switch 函数 API
// 参数 1 : 即将运行协程的上下文,寄存器列表
// 参数 2 : 正在运行协程的上下文,寄存器列表
int _switch(nty_cpu_ctx *new_ctx, nty_cpu_ctx *cur_ctx);
// _switch 返回后,执行即将运行协程的上下文,从而实现上下文的切换
_switch 函数实现代码分析
// %rdi 函数第一个参数, %rsi 函数第二个参数
__asm__ (
" .text \n"
" .p2align 4,,15 \n"
".globl _switch \n"
".globl __switch \n"
"_switch: \n"
"__switch: \n"
" movq %rsp, 0(%rsi) # save stack_pointer \n" // 保存栈指针到 cur_ctx 实例的 rsp 项
" movq %rbp, 8(%rsi) # save frame_pointer \n" // 保存帧指针到 cur_ctx 实例的 rbp 项
" movq (%rsp), %rax # save insn_pointer \n" // 将栈顶地址里面的值存储到 rax 寄存器中
" movq %rax, 16(%rsi) \n" // 保存 rax,rbx,r12 - r15
" movq %rbx, 24(%rsi) # save rbx,r12-r15 \n"
" movq %r12, 32(%rsi) \n"
" movq %r13, 40(%rsi) \n"
" movq %r14, 48(%rsi) \n"
" movq %r15, 56(%rsi) \n"
" movq 56(%rdi), %r15 \n" // 将 rdi 对应偏移量中的值保存入相应的寄存器中
" movq 48(%rdi), %r14 \n" // 即将 rdi 对应偏移量中的值保存入 new_ctx 的每一个项中
" movq 40(%rdi), %r13 # restore rbx,r12-r15 \n"
" movq 32(%rdi), %r12 \n"
" movq 24(%rdi), %rbx \n"
" movq 8(%rdi), %rbp # restore frame_pointer \n"
" movq 0(%rdi), %rsp # restore stack_pointer \n"
" movq 16(%rdi), %rax # restore insn_pointer \n" // 将指令指针 rip 的值存储到 rax 中
" movq %rax, (%rsp) \n" // 将存储 rip 值的 rax 寄存器赋值给栈指针指向的地址处
" ret \n" // 出栈,回到栈指针,执行 rip 指向的指令
);
6. 协程实现之定义
协程的运行体 R 与运行体调度器 S
- 1. 运行体 R:包含运行状态 { 就绪,睡眠,等待 },运行体回调函数,回调参数,栈指针,栈大小,当前运行体;
- 2. 调度器 S:包含执行集合 { 就绪,睡眠,等待 };
6.1 运行体在多种状态集合的切换
协程的状态
- 创建新的协程,创建完成后,便加入到就绪集合,等待调度器的调度;
- 协程在运行完成后,进行 IO 操作,此时 IO 并未准备好,便进入等待状态集合;
- IO 准备就绪,协程开始运行,后续进行 sleep 操作,便进入到睡眠状态集合;
就绪 (ready),睡眠 (sleep),等待 (wait) 集合对应的数据结构
- 就绪 (ready) 集合并没有设置优先级的选型,所有的协程优先级一致,可以使用队列来存储就绪的协程,简称为就绪队列 (ready_queue)
- 睡眠 (sleep) 集合需要按照睡眠时长进行排序,因此采用红黑树来存储,简称睡眠树 (sleep_tree);红黑树在工程中为 <key, value>, key 为睡眠时长,value 为对应的协程结点
- 等待 (wait) 集合,其功能是在等待 IO 准备就绪,等待 IO 是有时长的,因此等待 (wait) 集合采用红黑树存储,简称等待树 (wait_tree)
图示中,Coroutine 是协程的相应属性,status 表示协程的运行状态,并包含 sleep 与 wait 两棵红黑树以及 ready 队列;前提条件是不管何种运行状态的协程,都在就绪队列中,只是同时包含有其他的运行状态;
6.2 调度器与协程的功能界限
协程属性 : 每一协程都需要使用的而且可能会不同的属性;调度器属性 : 每一协程都需要的而且一致的数据;
协程的核心结构体
typedef struct _nty_coroutine {
//private
// 协程自身的上下文,需要保存 CPU 的寄存器 ctx
nty_cpu_ctx ctx;
// 子过程的回调函数 func
proc_coroutine func;
// 子过程回调函数的参数 arg 以及数据 data
void *arg;
void *data;
// 协程自身的栈空间的大小 stack_size
size_t stack_size;
size_t last_stack_size;
// 协程当前的运行状态
nty_coroutine_status status;
// 调度器的全局对象
nty_schedule *sched;
// 协程的创建时间
uint64_t birth;
// 协程 id
uint64_t id;
#if CANCEL_FD_WAIT_UINT64
int fd;
unsigned short events; //POLL_EVENT
#else
int64_t fd_wait;
#endif
char funcname[64];
struct _nty_coroutine *co_join;
void **co_exit_ptr;
// 协程自身的栈空间
void *stack;
void *ebp;
uint32_t ops;
uint64_t sleep_usecs;
// 当前运行状态的结点
RB_ENTRY(_nty_coroutine) sleep_node;
RB_ENTRY(_nty_coroutine) wait_node;
LIST_ENTRY(_nty_coroutine) busy_next; //
TAILQ_ENTRY(_nty_coroutine) ready_next;
TAILQ_ENTRY(_nty_coroutine) defer_next;
TAILQ_ENTRY(_nty_coroutine) cond_next;
TAILQ_ENTRY(_nty_coroutine) io_next;
TAILQ_ENTRY(_nty_coroutine) compute_next;
struct {
void *buf;
size_t nbytes;
int fd;
int ret;
int err;
} io;
struct _nty_coroutine_compute_sched *compute_sched;
int ready_fds;
struct pollfd *pfds;
nfds_t nfds;
} nty_coroutine;
调度器的属性,需要有保存 CPU 的寄存器上下文 ctx,可以从协程运行状态 yield 到调度器运行;从协程到调度器用 yield,从调度器到协程用 resume;
调度器结构体
typedef struct _nty_coroutine_link nty_coroutine_link;
typedef struct _nty_coroutine_queue nty_coroutine_queue;
typedef struct _nty_coroutine_rbtree_sleep nty_coroutine_rbtree_sleep;
typedef struct _nty_coroutine_rbtree_wait nty_coroutine_rbtree_wait;
typedef struct _nty_schedule {
uint64_t birth;
nty_cpu_ctx ctx;
void *stack;
size_t stack_size;
int spawned_coroutines;
uint64_t default_timeout;
struct _nty_coroutine *curr_thread;
int page_size;
int poller_fd;
int eventfd;
struct epoll_event eventlist[NTY_CO_MAX_EVENTS];
int nevents;
int num_new_events;
pthread_mutex_t defer_mutex;
nty_coroutine_queue ready;
nty_coroutine_queue defer;
nty_coroutine_link busy;
nty_coroutine_rbtree_sleep sleeping;
nty_coroutine_rbtree_wait waiting;
//private
} nty_schedule;
7. 协程实现之调度器
调度器的实现,有两种方案,一种是生产者消费者模式,另一种是多状态运行;
7.1 生产者消费者模式
逻辑代码示例
while (1) {
// 遍历睡眠集合,将满足条件的协程加入到 ready 队列
nty_coroutine *expired = NULL;
while ((expired = sleep_tree_expired(sched)) != NULL) {
TAILQ_ADD(&sched->ready, expired);
}
// 遍历等待集合,将满足条件的协程加入到 ready 队列
nty_coroutine *wait = NULL;
int nready = epoll_wait(sched->epfd, events, EVENT_MAX, 1);
for (i = 0; i < nready; i ++) {
wait = wait_tree_search(events[i].data.fd);
TAILQ_ADD(&sched->ready, wait);
}
// 使用 resume 恢复 ready 的协程运行权
while (!TAILQ_EMPTY(&sched->ready)) {
nty_coroutine *ready = TAILQ_POP(sched->ready);
resume(ready);
}
}
7.2 多状态运行
逻辑代码示例
while (1) {
// 遍历睡眠集合,使用 resume 恢复 expired 的协程运行权
nty_coroutine *expired = NULL;
while ((expired = sleep_tree_expired(sched)) != NULL) {
resume(expired);
}
// 遍历等待集合,使用 resume 恢复 wait 的协程运行权
nty_coroutine *wait = NULL;
int nready = epoll_wait(sched->epfd, events, EVENT_MAX, 1);
for (i = 0; i < nready; i ++) {
wait = wait_tree_search(events[i].data.fd);
resume(wait);
}
// 使用 resume 恢复 ready 的协程运行权
while (!TAILQ_EMPTY(sched->ready)) {
nty_coroutine *ready = TAILQ_POP(sched->ready);
resume(ready);
}
}
补充知识点
1. X86-64寄存器和栈帧
1.1 寄存器
通用寄存器
X86_64 有 16 个 64 位寄存器,分别是:%rax,%rbx,%rcx,%rdx,%rsi,%rdi,%rbp,%rsp,%r8,%r9,%r10,%r11,%r12,%r13,%r14,%r15;
- %rax 作为函数返回值使用
- %rsp 栈指针寄存器,指向栈顶
- %rdi,%rsi,%rdx,%rcx,%r8,%r9 用作函数参数,依次对应第 1 参数,第 2 参数...
- %rbx,%rbp,%r12,%r13,%r14,%r15 用作数据存储,遵循被调用者使用规则,调用子函数之前要备份上述寄存器,以防被修改
- %r10,%r11 用作数据存储,遵循调用者使用规则,即使用之前要先保存原值
段寄存器
- cs 代码段寄存器
- ds, es, fs, gs 数据段寄存器
- ss 堆栈段寄存器
状态和控制寄存器 eflags
该寄存器表示的意义非常丰富,程序中并不直接操作此寄存器,并由此衍生出很多操作指令,除去一些保留位,其他每位都代表一个具体的含义,
其中 bits 0, 2, 4, 6, 7, 11 是状态位,标识了此操作后的状态
- CF (bit 0) —— 进位标识,算术操作进行了进位和借位,则此位被设置
- PF (bit 2) —— 奇偶标识,结果包含奇数个1,则设置此位
- AF (bit 4) —— 辅助进位标识,结果的第3位向第4位借位,则此位被设置
- ZF (bit 6) —— 零标识,结果为零,此位设置
- SF (bit 7) —— 符号标识,若为负数则设置此位
- OF (bit 11) —— 溢出标识,结果向最高位符号位进行借位或者进位,此标志被设置
8, 9, 10 位为控制标识
- TF (bit 8) —— 陷阱标识,设置进程可以被单步调试
- IF (bit 9) —— 中断标识,设置能够响应中断请求
- DF (bit 10) —— 方向标识,用于标示字符处理过程中指针移动方向
指令寄存器 EIP
EIP —— 标志当前进程将要执行指令位置,在64位模式下扩展为 RIP 64位指令寄存器
1.2 栈帧
1.2.1 栈帧结构
C 语言属于面向过程语言,其最大特点就是把一个程序分解成若干过程 (函数),比如:入口函数是 main,然后调用各个子函数;在对应机器语言中,GCC 把过程转化成栈帧 (frame),每个栈帧对应一个过程;X86-32 典型栈帧结构中,由 %ebp 指向栈帧开始,%esp 指向栈顶;
1.2.2 C 函数与汇编分析
int foo ( int x )
{
int array[] = {1,3,5};
return array[x];
} /* ----- end of function foo ----- */
int main ( int argc, char *argv[] )
{
int i = 1;
int j = foo(i);
fprintf(stdout, "i=%d,j=%d\n", i, j);
return EXIT_SUCCESS;
} /* ----- end of function main ----- */
未经过优化的结果
优化后的结果
从优化可见,此处直接引用了栈顶之外的空间;
访问栈顶之外
通过 readelf 查看可执行程序的 header 信息
根据红色区域可见 x86-64 遵循 ABI 规则的版本,其定义了一些规范,遵循 ABI 的具体实现应该满足这些规范,其中,便规定了程序可以使用栈顶之外 128 字节的地址;
汇编相关分析
一、movl value, %eax
- movl value, %eax 这条指令是把 value 的值 (32位),送入 %eax 寄存器;
- value 表达式格式 : base_address (offset_address, index, size);计算公式 : base_address + offset_address + index * size;
二、call foo
- Pushl %rip // 保存下一条指令的地址,用于函数返回继续执行
- Jmp foo // 跳转到函数 foo
三、ret
- popl %rip // 恢复指令指针寄存器
四、cltq
- cltq 指令,特指 %eax->%rax 的符号拓展转换,等价于 movslq %eax, %rax
五、.LC0
.LC0 指一个字符串,此处代表 fprintf 的格式化字符串;
六、leave
leave 指令等价于
Movq %rbp %rsp // 撤销栈空间,回滚 %rsp
Popq %rbp // 恢复上一个栈帧的 %rbp
1.3 寄存器保存惯例
C 语言示例代码
#include <stdio.h>
#include <stdlib.h>
void sfact_helper ( long int x, long int * resultp)
{
if (x <= 1)
*resultp = 1;
else {
long int nresult;
sfact_helper(x-1, &nresult);
*resultp = x * nresult;
}
} /* ----- end of function foo ----- */
long int sfact ( long int x )
{
long int result;
sfact_helper(x, &result);
return result;
} /* ----- end of function sfact ----- */
int
main ( int argc, char *argv[] )
{
int sum = sfact(10);
fprintf(stdout, "sum=%d\n", sum);
return EXIT_SUCCESS;
} /* ---------- end of function main ---------- */
sfact_helper 函数生成的汇编代码
在函数 sfact_helper 中,用到了寄存器 %rbx 和 %rbp,在覆盖之前,GCC 选择了先保存他们的值,如代码 6~9 所示;在函数返回之前,GCC 依次恢复了他们,如代码 27-28 所示;
%rbx 在函数进入的时候指向的是 -16(%rsp),而在退出的时候,指向的是32(%rsp);此处使用了访问栈帧之外的空间的特性,即 GCC 不用预先分配空间再使用,而是先使用栈空间,然后在适当的时机分配,如第 11 行代码分配了空间,之后栈指针发生变化,所以同一个地址的引用偏移也相应做出调整;
注意事项
- 尽量使用 6 个以下的参数列表
- 传递大对象,尽量使用指针或者引用,鉴于寄存器只有 64 位,而且只能存储整形数值,寄存器存不下大对象
汇编相关分析
一、lea 指令
- 指令格式 : LEA 目的 源
- 指令功能 : 取源操作数地址的偏移量,并把它传送到目的操作数所在的单元
二、imul 指令
- (1) 双操作数的有符号乘指令
- 语句格式 : IMUL OPD, OPS
- 功能 : (OPD) * (OPS) ----> OPD
- 其中 OPD 可为 16/32 的寄存器,OPS 可为同类型的寄存器、存储器操作数或立即数
- (2) 3 个操作数的有符号乘指令
- 语句格式 : IMUL OPD, OPS, N
- 功能 : (OPS * N) -----> OPD
- 其中 OPD 可为 16/32 的寄存器,OPS 可为同类型的寄存器、存储器操作数,n 为立即数
- (3) 单操作数的有符号乘指令
- 语句格式 : IMUL OPS
- 功能 :
- 字节乘法 : (AL) * (OPS) ----> AX
- 字乘法 : (AX) * (OPS) ----> DX, AX
- 双字乘法 : (EAX) * (OPS) ----> EDX, EAX
三、mul 指令
- 语句格式 : MUL OPS
- 功能 :
- 字节乘法 : (AL) * (OPS) ----> AX
- 字乘法 : (AX) * (OPS) ----> DX, AX
- 双字乘法 : (EAX) * (OPS) ----> EDX, EAX
- 功能 :
1.4 参数传递范例
C 示例代码
#include <stdio.h>
#include <stdlib.h>
int foo ( int arg1, int arg2, int arg3, int arg4, int arg5, int arg6, int arg7 )
{
int array[] = {100, 200, 300, 400, 500, 600, 700};
int sum = array[ arg1 ] + array[ arg7 ];
return sum;
} /* ----- end of function foo ----- */
int
main ( int argc, char *argv[] )
{
int i = 1;
int j = foo(0, 1, 2, 3, 4, 5, 6);
fprintf(stdout, "i = %d, j = %d\n", i, j);
return EXIT_SUCCESS;
} /* ---------- end of function main ---------- */
生成的汇编代码
Main 函数中,代码 31~37 准备函数 foo 的参数
- 参数 7 存储在栈上即 %rsp 指向的位置;
- 参数 6 存储在寄存器 %r9d;
- 参数 5 存储在寄存器 %r8d;
- 参数 4 对应于 %ecx;
- 参数 3 对应于 %edx;
- 参数 2 对应于 %esi;
- 参数 1 对应于 %edi;
Foo 函数中,代码 14-15,分别取出参数 7 和参数 1,参与运算;
此处数组引用,使用了最经典的寻址方式,-40(%rsp,%rdi,4) = %rsp + %rdi * 4 + (-40); 其中 %rsp 用作数组基地址,%rdi 用作了数组的下标,数字 4 表示 sizeof(int) = 4
1.5 结构体传参
C 示例代码
#include <stdio.h>
#include <stdlib.h>
struct demo_s {
char var8;
int var32;
long var64;
};
struct demo_s foo (struct demo_s d)
{
d.var8 = 8;
d.var32 = 32;
d.var64 = 64;
return d;
} /* ----- end of function foo ----- */
int
main ( int argc, char *argv[] )
{
struct demo_s d, result;
result = foo (d);
fprintf(stdout, "demo: %d, %d, %ld\n", result.var8, result.var32, result.var64);
return EXIT_SUCCESS;
} /* ---------- end of function main ---------- */
生成汇编代码
- 问题1 : 结构体如何传递;被分成了两个部分,var8 和 var32 合并成 8 个字节的大小,放在寄存器 %rdi 中,var64 放在寄存器的 %rsi 中,即结构体分解
- 问题2 : 结构体如何存储;如 foo 函数的第 15~17 行所示,结构体的引用变成了一个偏移量访问;类似于数组,只不过其元素大小可变;
- 问题3 : 结构体如何返回,原本 %rax 充当返回值的角色,现在添加第二个返回值角色 %rdx;同样,GCC 用两个寄存器来表示结构体;
- 即使在缺省情况下,GCC 依然想尽办法使用寄存器;随着结构变得越来越大,寄存器不够用了,则只能使用栈了;
参考与致谢
本博客为博主的学习实践总结,并参考了众多博主的博文,在此表示感谢,博主若有不足之处,请批评指正。
【1】协程的实现与原理 [ M ]
【2】X86-64寄存器和栈帧
【3】x64 ASM 常用汇编指令
【4】movl 16(%ebx,%esi,4),%eax的含义是?
【5】寄存器介绍
【6】Push, Pop, call, leave 和 Ret 指令图解
【7】镇宅之宝扛鼎之作
【8】协程的实现之切换