-
Notifications
You must be signed in to change notification settings - Fork 276
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support MSG_ZEROCOPY for streaming server. #13
Comments
看起来是Kernel 4.18新增的功能:https://www.douban.com/note/686726381/ 这篇文章说如果不重用内存的话可以怎样,但不重用的话那还是会多一次拷贝了:https://zhuanlan.zhihu.com/p/28575308 从Linux文章介绍说可能不一定会拷贝成功:https://www.kernel.org/doc/html/latest/networking/msg_zerocopy.html#loopback
其中提到的
|
ST SchedulerZero Copy可能涉及到ST对于 IDLE Thread以一个测试程序为例,分析ST的调度过程:
下面是IDLE协程做的事情的分析:
总结起来说,IDLE协程,就是IO事件+超时事件驱动协程,本质上就是 Scheduler我们看到了 首先,调度器的工作函数是
上面大致这些函数就是用户代码中,调用了
这个调度宏定义,就是将当前协程的信息保存,然后进入调度函数
可见run_q实际上是要马上执行的活动协程,由于IDLE叫IDLE空闲协程,所以在创建完它后,会把它从run_q中删除(默认创建线程后就会加入到run_q执行):
我们分析下下面例子程序的调度过程:
Cond & Mux条件变量cond和锁mux,它们的调度实际上是由用户行为调度的,比如下面的程序:
这个程序的运行序列是:
|
这是目前SRS的UDP的调度结构图。基本上还是很高效的,所以st没有出现在瓶颈中。可以改进的点包括:
|
ST IO关于ST的IO机制,主要看下ST基于Linux的epoll如何实现,其他事件框架类似。 OpenST提供几个open的函数:
以上三个函数,最终都是调用 Accept作为TCP服务器,ST提供了
_st_netfd_t *st_accept(_st_netfd_t *fd, struct sockaddr *addr, int *addrlen, st_utime_t timeout)
{
for ( ; ; ) {
osfd = accept(fd->osfd, addr, (socklen_t *)addrlen);
if (osfd >= 0)
break;
if (errno == EINTR)
continue;
if (!_IO_NOT_READY_ERROR)
return NULL;
/* Wait until the socket becomes readable */
if (st_netfd_poll(fd, POLLIN, timeout) < 0)
return NULL;
}
newfd = _st_netfd_new(osfd, 1, 1);
return newfd;
}
#if EAGAIN != EWOULDBLOCK
#define _IO_NOT_READY_ERROR ((errno == EAGAIN) || (errno == EWOULDBLOCK))
#else
#define _IO_NOT_READY_ERROR (errno == EAGAIN)
#endif Polling当FD没有准备好时,也就是EAGAIN时,异步回调的框架,就会逐步返回函数,下次cycle大循环再继续处理。而ST则更好解决这个问题,就会调用 int st_netfd_poll(_st_netfd_t *fd, int how, st_utime_t timeout)
{
if ((n = st_poll(&pd, 1, timeout)) < 0)
}
// in sched.c
int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)
{
if ((*_st_eventsys->pollset_add)(pds, npds) < 0)
return -1;
pq.pds = pds;
pq.npds = npds;
pq.thread = me;
pq.on_ioq = 1;
_ST_ADD_IOQ(pq);
if (timeout != ST_UTIME_NO_TIMEOUT)
_ST_ADD_SLEEPQ(me, timeout);
me->state = _ST_ST_IO_WAIT;
_ST_SWITCH_CONTEXT(me);
}
我们看下 ST_HIDDEN int _st_epoll_pollset_add(struct pollfd *pds, int npds)
{
for (i = 0; i < npds; i++) {
fd = pds[i].fd;
old_events = _ST_EPOLL_EVENTS(fd);
if (pds[i].events & POLLIN)
_ST_EPOLL_READ_CNT(fd)++;
if (pds[i].events & POLLOUT)
_ST_EPOLL_WRITE_CNT(fd)++;
if (pds[i].events & POLLPRI)
_ST_EPOLL_EXCEP_CNT(fd)++;
events = _ST_EPOLL_EVENTS(fd);
if (events != old_events) {
op = old_events ? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
ev.events = events;
ev.data.fd = fd;
if (epoll_ctl(_st_epoll_data->epfd, op, fd, &ev)
// 全局缓存的epoll对象,有每个fd的缓存。
// (gdb) p *_st_epoll_data
// $43 = {fd_data = 0xd511f0, evtlist = 0xd61200, evtlist_size = 4096, evtlist_cnt = 0, epfd = 7}
// (gdb) p _st_epoll_data->fd_data[11]
// $45 = {rd_ref_cnt = 0, wr_ref_cnt = 0, ex_ref_cnt = 0, revents = 0}
加入到epoll后,就开始等待IO事件了。 IO Event如果来了一个TCP客户端,比如RTMP推流,那么epoll_wait返回就是1了。处理流程如下: ST_HIDDEN void _st_epoll_dispatch(void)
{
nfd = epoll_wait(_st_epoll_data->epfd, _st_epoll_data->evtlist, _st_epoll_data->evtlist_size, timeout);
if (nfd > 0) {
for (i = 0; i < nfd; i++) {
osfd = _st_epoll_data->evtlist[i].data.fd;
_ST_EPOLL_REVENTS(osfd) = _st_epoll_data->evtlist[i].events;
if (_ST_EPOLL_REVENTS(osfd) & (EPOLLERR | EPOLLHUP)) {
/* Also set I/O bits on error */
_ST_EPOLL_REVENTS(osfd) |= _ST_EPOLL_EVENTS(osfd);
}
}
for (q = _ST_IOQ.next; q != &_ST_IOQ; q = q->next) {
首先,添加到io_q就一个地方,在 int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)
{
_st_pollq_t pq;
pq.pds = pds;
pq.npds = npds;
pq.thread = me;
pq.on_ioq = 1;
_ST_ADD_IOQ(pq);
me->state = _ST_ST_IO_WAIT;
// pq就是代表当前等待的事件和线程,内容如下:
// (gdb) p &pq
// $96 = (_st_pollq_t *) 0x7ffff7fc4bb0
// (gdb) p pq.links
// $93 = {next = 0x0, prev = 0x0}
// 调用前,io_q是这样的,io_q指向自己,也就是空的:
// (gdb) p &_st_this_vp.io_q
// $90 = (_st_clist_t *) 0xc8e640 <_st_this_vp+32>
// (gdb) p _st_this_vp.io_q
// $91 = {next = 0xc8e640 <_st_this_vp+32>, prev = 0xc8e640 <_st_this_vp+32>}
// 调用后,io_q和qp.links链接起来了:
// (gdb) p _st_this_vp.io_q
// $97 = {next = 0x7ffff7fc4bb0, prev = 0x7ffff7fc4bb0}
// (gdb) p pq.links
// $107 = {next = 0xc8e640 <_st_this_vp+32>, prev = 0xc8e640 <_st_this_vp+32>}
// 注意:_st_pollq_t被直接转成了_st_clist_t,它们两个的内存布局开头是相同的。
// (gdb) p _st_this_vp.io_q.next
// $103 = (struct _st_clist *) 0x7ffff7fc4bb0
// 所以我们要将io_q的next,直接转成_st_pollq_t,才是它真正指向的对象。
// (gdb) p *(_st_pollq_t*)_st_this_vp.io_q.next
// $105 = {links = {next = 0xc8e640 <_st_this_vp+32>, prev = 0xc8e640 <_st_this_vp+32>}, thread = 0x7ffff7fc4e30, pds = 0x7ffff7fc4c30, npds = 1, on_ioq = 1}
现在协程已经在io_q中,我们看epoll_wait读取到新的TCP连接时的处理: ST_HIDDEN void _st_epoll_dispatch(void)
{
nfd = epoll_wait(_st_epoll_data->epfd, _st_epoll_data->evtlist, _st_epoll_data->evtlist_size, timeout);
if (nfd > 0) {
for (q = _ST_IOQ.next; q != &_ST_IOQ; q = q->next) {
pq = _ST_POLLQUEUE_PTR(q);
// 和上面一样,`q: _st_clist_t*`是链表,我们需要将它转成实际的类型`pq: _st_pollq_t*`。
// _ST_POLLQUEUE_PTR这个宏,就是计算两个结构转换需要的偏移量。
// (gdb) p q
// $109 = (_st_clist_t *) 0x7ffff7fc4bb0
// (gdb) p pq
// $110 = (_st_pollq_t *) 0x7ffff7fc4bb0
notify = 0;
epds = pq->pds + pq->npds;
for (pds = pq->pds; pds < epds; pds++) {
// 这段就是遍历所有的pds,也就是所有的侦听。
// (gdb) p pq->pds
// $118 = (struct pollfd *) 0x7ffff7fc4c30
// (gdb) p pq->npds
// $119 = 1
// (gdb) p *pds
// $123 = {fd = 11, events = 1, revents = 0}
// 这段就是看fd是否有事件,如果没有就忽略,继续找下一个。
// revents就是这次循环会设置的标记,代表fd的活跃事件。
if (_ST_EPOLL_REVENTS(pds->fd) == 0) {
pds->revents = 0;
continue;
}
// 下面这段就是fd有活跃事件,设置pds的revents和notify,激活这个协程。
// 全局缓存中的revents,和pq的revents,都代表了这次poll的事件。
osfd = pds->fd;
events = pds->events;
pds->revents = revents;
revents = 0;
if ((events & POLLIN) && (_ST_EPOLL_REVENTS(osfd) & EPOLLIN))
revents |= POLLIN;
......
if (revents) {
notify = 1;
}
// 这里将fd对应的pds设置事件。
// (gdb) p *pds
// $127 = {fd = 11, events = 1, revents = 1}
}
if (notify) {
// 如果协程有侦听的fd被激活,那么就把协程从io_q移除。
ST_REMOVE_LINK(&pq->links);
pq->on_ioq = 0;
// 减少fd的全局计数器,代表这个协程已经不再侦听这个fd了。
// 调用结束后,可以看这个fd的cache信息,rd_ref_cnt已经从1变成0了:
// (gdb) p _st_epoll_data->fd_data[11]
// $146 = {rd_ref_cnt = 0, wr_ref_cnt = 0, ex_ref_cnt = 0, revents = 1}
// 但是注意,如果有数据,并不会调用epoll_ctl删除,因为是属于fired的fd,revents非0。
// 比如线程侦听了两个fd,一个活跃一个不活跃,只会把不活跃的给删除侦听了。
_st_epoll_pollset_del(pq->pds, pq->npds);
// 设置协程为活跃,调度器将跳到这个协程执行。
pq->thread->state = _ST_ST_RUNNABLE;
_ST_ADD_RUNQ(pq->thread);
}
// 对于fired也就是活跃的fd,重置revents为0,并修改它的侦听。
// 比如如果有两个协程在侦听,一个侦听的是IN,一个侦听的是OUT,
// 这次收到的是IN事件,那么就会把IN去掉不侦听,只侦听OUT。
// 如果fd只有一个协程在侦听,那么就直接DEL掉侦听了。
for (i = 0; i < nfd; i++) {
/* Delete/modify descriptors that fired */
osfd = _st_epoll_data->evtlist[i].data.fd;
_ST_EPOLL_REVENTS(osfd) = 0;
events = _ST_EPOLL_EVENTS(osfd);
op = events ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
ev.events = events;
ev.data.fd = osfd;
if (epoll_ctl(_st_epoll_data->epfd, op, osfd, &ev) == 0 && op == EPOLL_CTL_DEL) {
_st_epoll_data->evtlist_cnt--;
}
} |
看了下ST的IO机制,是可以支持Zero Copy的,在收到EPOLLERR时,对应poll这个fd的协程会被唤起和激活,我们就可以接收反馈消息了。 可以写一个测试程序,序列如下:
需要确认的是:
实例代码:https://github.com/ossrs/srs/tree/develop/trunk/research/msg_zerocopy Linux Kernel需要Linux 5.0内核,才能支持UDP的Zero Copy,参考4.19: udpgso_bench_tx: setsockopt zerocopy: Unknown error 524
升级CentOS8内核到5.0:
重启服务器后,查看内核版本:
Reception使用Zero Copy需要设置socket:
发送时,设置flags:
然后,我们接收内核的反馈消息,参考Reception:
发送后,ST在
ST发现是EPOLLERR(0x08),则会把侦听的事件EPOLLIN(0x01)也加到revents了:
这时候如果有st_recvmsg,则会唤醒所有的协程,不管是否是侦听了EPOLLERR。比如如果有另外协程,是在st_recvfrom接收消息,也一样会被激活:
也就是所有侦听这个fd的协程都会唤醒,然后再次尝试读取消息,当然如果一般接收数据的协程就会进入EAGAIN继续下一次等待。 也就是说,在实际的服务器上,一般上会有两个协程,一个收一个发:
当协程2使用ZeroCopy发送数据后,协程1和2会被依次唤醒继续尝试recvmsg,协程1会发现EAGAIN实际上没有数据,协程2会收到ZeroMessage的消息。
Notification Parsing收到control消息后,解析内核确认的序列,参考Notification Parsing:
比如发送第一个消息后,range=1,lo=0,hi=0,表示内核确认了这个消息。 |
Sendmmsg内核是以msghdr为单位确认的。对于sendmmsg,内核是每个msghdr分配一个id,比如sendmmsg一次发送2个消息,内核确认是:
range=2, lo=0, hi=1,确认了这两个消息。 Multiple Sendmmsg若发送了4个消息,分两次发送,每次用sendmmsg发送2个,那么kernel反馈的消息如下:
Merged Reception如果发送了多次消息,但只在最后收一次,那么内核会一次确认这4个消息:
range=4, lo=0, hi=3,表示这4个消息都被确认了。 |
Mix支持ZeroCopy的消息和非ZeroCopy的消息混合发送,参考Mixing CopyAvoidance and Copying。 如果混合了其他消息,kernel确认的ID只对zerocopy的msghdr编号,比如我们发送的序列如下:
确认的序列:
或者一次确认:
也就是range=4,lo=0, hi=3,并不包含非zerocopy的消息。 |
发现ZeroCopy时是有些限制的:
下表是测出来实际可用的数据:
MTU实测发现如果包超过1472字节,就会出现defered copies:
UDP MaxSize如果包超过65508字节,就会出现errno=90(Message too long)错误:
GSO可以使用GSO将包分成多个包,当然就不是在一个Payload中发送,而是在多个UDP包了。 比如1500字节的内容,我们GSO设为1400,那么会分成两个UDP包:
但是注意,GSO也不能超过65508字节内容,比如:
还有,GSO的包个数不能超过36个,比如:
IOVEC单个msghdr的iovs不能超过134个:
另外,iov的长度变长时,iovs个数会更少,比如iov每个是6字节时,不能超过24:
如果有GSO,iovs最多不能超过36个:
SENSMMSG对于sendmmsg没有限制:
或者无GSO:
|
测试了下,不需要ST做修改,在调用层接收内核的消息就可以。 另外,从数据看,ZeroCopy降低了copy_user_enhanced_fast_string,但是也导致一个新的函数成为热点,比如get_user_pages_fast和_raw_spin_unlock_irqrestore。总体看起来和没有COPY差不多。 代码:https://github.com/winlinvip/srs/tree/feature/msg_zerocopy |
参考:ossrs/srs#307 (comment)
目前内核最热的函数是
copy_user_enhanced_fast_string
,它主要是将用户空间的数据,拷贝到内核,可以想到是因为要将发送的UDP的payload拷贝到内核发送。同样的,TCP也是这个是瓶颈,实际上Linux内核支持了很多种零拷贝方式,比如sendfile、splice、tee还有MSG_ZEROCOPY。
它提到是有代价的,如果要发送大量的数据,那么比较值得:
若使用sendmmsg,600Kbps码率的流,1个连接观看时一次发送50KB数据,1000个连接观看一次发送8.5MB的数据,2000个连接观看一次发送14.4MB数据,3000个连接观看一次发送20MB数据。
这可能需要修改ST做支持,参考:#13
The text was updated successfully, but these errors were encountered: