不是很喜欢起这么大的一个名字,不过好像也没啥办法,这次带大家一起聊聊,Linux Network
的全貌(从软件工程师的角度)
修订记录
基础概念 觉得啰嗦的可以:直接跳转至正文
Linux 分层
内核提供了一组 API,通常称为“系统调用”。这些 API 不同于常规的库 API,因为它们是执行从用户模式切换到内核模式的边界。
为了提供应用程序兼容性,很少更改系统调用。Linux特别强调这一点(与内核 API 不同,内核 API 可以根据需要进行更改)。
内核代码本身可以在逻辑上分为核心内核代码和设备驱动程序代码。设备驱动程序代码负责访问特定的设备,而核心内核代码是通用的。核心内核可以进一步划分为多个逻辑子系统(如文件访问、联网、进程管理等)
SMP
在现在的硬件体系下,因为我们的系统都是 多核
架构,因此大部分的时候,我们的Linux都运行在 Symmetric MultiProcessing
状态下,也就是内核可能在多个 CPU 中进行运行。如果两个进程运行访问相同内存位置的内核函数,则会在内核中产生竞争条件。因此在内核代码的大多数地方都需要使用到 synchronization
机制,来保证代码的线程安全线。
线程 在单纯的应用侧我们无感知 CPU
的线程概念,在操作系统这个层面就不得不去面对了。
操作系统
中的 Thread
: 任务的单位,保存当前执行的栈,和同一个进程共享地址空间等,这个就是我们熟悉的线程。
CPU
中的 Thread
: 执行单位,当前 CPU 上可以同时执行的基本单位。
mmio MMIO(Memory mapping I/O)即内存映射I/O,它是PCI规范的一部分,I/O设备被放置在内存空间而不是I/O空间。从处理器的角度看,内存映射I/O后系统设备访问起来和内存一样。
Network Packet Travel 开局一张图,谨防迷路,代码原理大部分针对 Linux 2.4
,具体的实现部分可能会截取 Linux 4.19
。
Network Driver 本文试图从软件的角度解释一个网络请求的今生前世,我们这里选取 RTL8139 作为我们了解网络驱动的硬件设备。
设备数据结构定义:net_device Linux
内核中设备的抽象对应的结构体是 struct net_device
net_device source code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 struct net_device { char name[IFNAMSIZ]; unsigned long mem_end; unsigned long mem_start; unsigned long base_addr; int irq; int ifindex; struct net_device_stats stats ; struct netdev_rx_queue *_rx ; unsigned int num_rx_queues; unsigned int real_num_rx_queues; struct netdev_queue *_tx ; unsigned int num_tx_queues; unsigned int real_num_tx_queues; };
对于现在硬件设备都会基于 mmio
模式进行工作,因此我们读取数据也是直接和读取内存类似的方式进行读取的。
网卡数据的读取 外部数据的读取会涉及到中断体系,Linux
将 中断
分为了两个部分: top half
与 bottom half
。top half
是在呼叫 request_irq()
时所指定的 interrupt handler
函数,bottom half
则是真正负责响应中断的 task
。
硬中断部分 对于数据初次抵达硬件,会触发硬件中断,如下图所示(网卡类似)。
因此从中断的入口观察是最好的,下面的代码逻辑从 linux-1.x
版本中截取,比最近的要简单一些,容易理解,另外就是新版的已经改为 NAPI
模式,下文提到再说。
rtl8139_interrupt source code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 static void rtl8139_interrupt (int irq, void *dev_instance, struct pt_regs *regs) { struct net_device *dev = (struct net_device *) dev_instance; struct rtl8139_private *tp = dev->priv; void *ioaddr = tp->mmio_addr; unsigned short isr = readw(ioaddr + ISR); if (isr & RxOK) { LOG_MSG("receive interrupt received\n" ); while ((readb(ioaddr + CR) & RxBufEmpty) == 0 ) { unsigned int rx_status; unsigned short rx_size; unsigned short pkt_size; struct sk_buff *skb ; if (tp->cur_rx > RX_BUF_LEN) tp->cur_rx = tp->cur_rx % RX_BUF_LEN; rx_status = *(unsigned int *) (tp->rx_ring + tp->cur_rx); rx_size = rx_status >> 16 ; pkt_size = rx_size - 4 ; skb = dev_alloc_skb(pkt_size + 2 ); if (skb) { skb->dev = dev; skb_reserve(skb, 2 ); eth_copy_and_sum( skb, tp->rx_ring + tp->cur_rx + 4 , pkt_size, 0 ); skb_put(skb, pkt_size); skb->protocol = eth_type_trans(skb, dev); netif_rx(skb); dev->last_rx = jiffies; tp->stats.rx_bytes += pkt_size; tp->stats.rx_packets++; } else { LOG_MSG("Memory squeeze, dropping packet.\n" ); tp->stats.rx_dropped++; }
中断模式下,我们收到一个数据包是非常的好理解的,将数据从网卡中 COPY
到我们的内核中即可,netif_rx
就是我们从 Hardware
向 Kernel
转换的核心点。
netif_rx source code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 int netif_rx (struct sk_buff *skb) { int this_cpu = smp_processor_id(); struct softnet_data *queue ; unsigned long flags; queue = &softnet_data[this_cpu]; if (queue ->input_pkt_queue.qlen <= netdev_max_backlog) { if (queue ->input_pkt_queue.qlen) { if (queue ->throttle) goto drop; enqueue: dev_hold(skb->dev); __skb_queue_tail(&queue ->input_pkt_queue, skb); return softnet_data[this_cpu].cng_level; } goto enqueue; } drop: netdev_rx_stat[this_cpu].dropped++; kfree_skb(skb); return NET_RX_DROP; }
硬中断部分到此结束,我们将数据压入内核的虚拟接受列队中结束。
[选读] 硬中断成本 硬中断
是一个有硬件主导,软件配合的出来体系,但是这样的模式不是没有成本的,打断正在运行的软件进程,我们不得不涉及到进程的切换,还有中断标记的处理等工作。It’s not free. ,有兴趣的可以参考 Profiling I/O Interrupts in Modern Architectures ,
截止至今,我们知道了对于从网卡来的数据,我们至少会涉及到 ➊ Hard Interrupt
导致的 进程切换
,并且 ➋ 涉及到一次数据从网卡的 Buffer
向内核拷贝的过程。
软中断部分 软中断的逻辑在 net_rx_action
中
net_rx_action source code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 static void net_rx_action (struct softirq_action *h) { int this_cpu = smp_processor_id(); struct softnet_data *queue = &softnet_data[this_cpu]; int bugdet = netdev_max_backlog; for (;;) { struct sk_buff *skb ; struct net_device *rx_dev ; skb = __skb_dequeue(&queue ->input_pkt_queue); if (skb == NULL ) break ; skb->h.raw = skb->nh.raw = skb->data; { struct packet_type *ptype , *pt_prev ; unsigned short type = skb->protocol; pt_prev = NULL ; for (ptype = ptype_all; ptype; ptype = ptype->next) { if (!ptype->dev || ptype->dev == skb->dev) { if (pt_prev) { if (!pt_prev->data) { deliver_to_old_ones(pt_prev, skb, 0 ); } else { atomic_inc (&skb->users); pt_prev->func(skb,skb->dev,pt_prev); } } pt_prev = ptype; } } } return ; } }
回调的函数签名如下:
int (*func) (struct sk_buff *, struct net_device *, struct packet_type *);
很标准的指针函数。不过也很讨厌看这部分的代码,如果不是 Runtime
阶段很难找到这个定义的函数是什么。这里实际上在接收到 TCP/IP
协议包的时候,回调的是 int ip_rcv(struct sk_buff *skb, struct net_device *dev, struct packet_type *pt)
函数。在看 IP
处理的之前,我们先看看这个 packet_type
的相关函数。
[选读] Packet Type 钩子 packet_type
的定义很简单
packet_type 1 2 3 4 5 6 7 struct packet_type { unsigned short type; struct net_device *dev ; int (*func)(struct sk_buff *, struct net_device *, struct packet_type *); void *data; struct packet_type *next ; };
在 dev.c
的源码中,我们可以动态的向 ptype_all
注册我们需要的 ptype
dev_add_pack source code 1 2 3 4 5 6 7 8 9 10 11 void dev_add_pack (struct packet_type *pt) { if (pt->type == htons(ETH_P_ALL)) { netdev_nit++; pt->next = ptype_all; ptype_all = pt; } else { hash = ntohs(pt->type) & 15 ; pt->next = ptype_base[hash]; ptype_base[hash] = pt; } }
因此我们如果需要处理比如 arp
的请求,可以单独增加一个,所以在 arp.c
初始化的时候,就可以出发。
arp_init 1 2 3 4 5 6 7 8 9 static struct packet_type arp_packet_type = { type: __constant_htons(ETH_P_ARP), func: arp_rcv, data: (void *) 1 , }; void __init arp_init (void ) { dev_add_pack(&arp_packet_type); }
TCPDUMP tcpdump
二层抓包,用的是 libpcap
库,它的基本原理是
先创建socket,内核dev_add_packet()挂上自己的钩子函数
然后在钩子函数中,把skb放到自己的接收队列中,
接着系统调用recv取出skb来,把数据包skb->data拷贝到用户空间
最后关闭socket,内核dev_remove_packet()删除自己的钩子函数
因此代码如下表述
af_packet.c:packet_create source code 1 2 3 4 5 6 7 8 9 10 static int packet_create (struct socket *sock, int protocol) { if (protocol) { sk->protinfo.af_packet->prot_hook.type = protocol; dev_add_pack(&sk->protinfo.af_packet->prot_hook); sock_hold(sk); sk->protinfo.af_packet->running = 1 ; } return (0 ); }
直到此处,我们走到这里算是和硬件越来越远了。再下来的历程,我们要去面对的是更加高纬度的 Socket
抽象。
[选读] 软断成本 对于常见的硬件中断来说,比如键入了一个字母,过程简单,很快可以处理完成,对于网络IO就复杂的多,涉及到从驱动拷贝数据,然后到虚拟的网络协议栈,再到Socket的Buffer中,因此才才产生的了软中断的,将中断一分为二,软中断优先级较低但是因为软件中断复杂,成本上是要更多的。
软中断依然需要 ➊ 进程上下文的切换(这里切换到 内核线程 ksoftirqd
),这里只不过相对于硬中断是操作系统自己进行调度的 ➋ 就是和系统开销调用一下,因为对于正常运行的用户态进程,我们需要把当前的上下文保存。
内核线程 ksoftirqd
也会充分利用多核的能力,对于每一个 core
都会启动一个 ksoftirqd
,暂且可以把它当做一个普通的进程看待。
NAPI 单纯的看,上面的开销,我们一眼就可以发现硬中断部分有一个非常不合理的情况,考虑如果出现以下情况
网卡每隔 10 ms 获得一个数据包,我们硬中断处理需要 9ms
那会出现一个很神奇的现象就是我们虽然每次都要将中断结束,但是每一次刚刚结束又要唤醒,结果我们会浪费大量的时间在处理中断请求上。针对这一的情况,Linux 2.6
之后提供了一个全新的API来处理这种情况(实际上对于大部分的数据,网络请求从不间断)。
既然每次进中断都很浪费,那我们就不要每一次都进中断了,NAPI
的解决之道就是,当我们接收到请求之后,我们将从此网卡读取作为一个 Task
注册到 napi_scheluder
中,然后屏蔽中断,内核定时的去 poll
数据即可。主要就是减少了硬中断切换的成本。
NAPI 定义 napi source code 1 2 3 4 5 6 7 8 9 10 struct napi_struct { struct list_head poll_list ; unsigned long state; int weight; int (*poll)(struct napi_struct *, int ); };
核心的逻辑都是在 poll
的回调函数中。
NAPI 之下的硬中断处理 rtl8139_interrupt source code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 static irqreturn_t rtl8139_interrupt (int irq, void *dev_instance) { struct net_device *dev = (struct net_device *) dev_instance; struct rtl8139_private *tp = netdev_priv(dev); if (status & RxAckBits) { if (napi_schedule_prep(&tp->napi)) { RTL_W16_F(IntrMask, rtl8139_norx_intr_mask); __napi_schedule(&tp->napi); } } netdev_dbg(dev, "exiting interrupt, intr_status=%#4.4x\n" , RTL_R16(IntrStatus)); return IRQ_RETVAL(handled); }
对于 poll
函数的实现这里就不做展开了,和传统的体系一样,直接将数据拷贝到内核态没有区别。
软中断唤醒 因此到此版本,网络请求的软中断环境代码也相对应的有所改变
net_rx_action source code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 static void net_rx_action (struct softirq_action *h) { struct softnet_data *sd = &__get_cpu_var(softnet_data); unsigned long time_limit = jiffies + 2 ; int budget = netdev_budget; void *have; while (!list_empty(&sd->poll_list)) { struct napi_struct *n ; int work, weight; n = list_first_entry(&sd->poll_list, struct napi_struct, poll_list); work = 0 ; if (test_bit(NAPI_STATE_SCHED, &n->state)) { work = n->poll(n, weight); } } }
只将核心的逻辑展示出来,比如代码中 budget
防止饥饿的操作都留于读者自行理解了, NAPI
解决了我们依靠硬中断导致的大量的进程切换的问题,加上现在的操作系统都是基于 DMA
,因此整个过程中的成本就只有 ➊ 软中断导致的线程切换 ➋ 数据的拷贝
IP Level 我们从 软中断
归来,数据开始进入了我们熟悉的协议处理部分,从 ip_rcv
开始作为我们正式的处理逻辑。
Linux
这也使用了 ip_rt_hash_table
的一种设计方式。
ip_rcv source code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 int ip_rcv (struct sk_buff *skb, struct net_device *dev, struct packet_type *pt, struct net_device *orig_dev) { struct iphdr *iph ; u32 len; iph = ip_hdr(skb); if (iph->ihl < 5 || iph->version != 4 ) goto inhdr_error; if (unlikely(ip_fast_csum((u8 *) iph, iph->ihl))) goto inhdr_error; len = ntohs(iph->tot_len); if (skb->len < len) { IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_INTRUNCATEDPKTS); goto drop; } else if (len < (iph->ihl * 4 )) goto inhdr_error; if (pskb_trim_rcsum(skb, len)) { IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_INDISCARDS); goto drop; } memset (IPCB(skb), 0 , sizeof (struct inet_skb_parm)); return NF_HOOK(NFPROTO_IPV4, NF_INET_PRE_ROUTING, skb, dev, NULL , ip_rcv_finish); inhdr_error: IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_INHDRERRORS); drop: kfree_skb(skb); out: return NET_RX_DROP; }
对于 NF_HOOK
的逻辑不在本文的探讨范围内,我们直接看最后在 NF_INET_PRE_ROUTING
阶段完成,进入了下一步的逻辑处理 ip_rcv_finish
ip_rcv_finish source code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 static inline int ip_rcv_finish (struct sk_buff *skb) { struct net_device *dev = skb->dev; struct iphdr *iph = skb->nh.iph; if (skb->dst == NULL ) { if (ip_route_input(skb, iph->daddr, iph->saddr, iph->tos, dev)) goto drop; } if (iph->ihl > 5 ) { } return skb->dst->input(skb); drop: kfree_skb(skb); return NET_RX_DROP; }
我们只能从中得知我们下一步最重要的就是去处理 route
关系。
[选读] IP Route Linux
在这里进行了大量的 Cache
操作,我们滤过那些 Cache 的部分,读者如果愿意可以自行阅读代码。
route.c 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 static int ip_route_input_slow (struct sk_buff *skb, __be32 daddr, __be32 saddr, u8 tos, struct net_device *dev) { struct fib_result res ; struct in_device *in_dev = __in_dev_get_rcu(dev); struct flowi4 fl4 ; unsigned flags = 0 ; u32 itag = 0 ; struct rtable *rth ; unsigned hash; __be32 spec_dst; int err = -EINVAL; struct net *net = dev_net(dev); if (!in_dev) goto out; if (ipv4_is_multicast(saddr) || ipv4_is_lbcast(saddr) || ipv4_is_loopback(saddr)) goto martian_source; if (ipv4_is_lbcast(daddr) || (saddr == 0 && daddr == 0 )) goto brd_input; if (ipv4_is_zeronet(saddr)) goto martian_source; if (ipv4_is_zeronet(daddr) || ipv4_is_loopback(daddr)) goto martian_destination; fl4.flowi4_oif = 0 ; fl4.flowi4_iif = dev->ifindex; fl4.flowi4_mark = skb->mark; fl4.flowi4_tos = tos; fl4.flowi4_scope = RT_SCOPE_UNIVERSE; fl4.daddr = daddr; fl4.saddr = saddr; err = fib_lookup(net, &fl4, &res); if (err != 0 ) { if (!IN_DEV_FORWARD(in_dev)) goto e_hostunreach; goto no_route; } RT_CACHE_STAT_INC(in_slow_tot); if (res.type == RTN_BROADCAST) goto brd_input; if (res.type == RTN_LOCAL) { } if (!IN_DEV_FORWARD(in_dev)) goto e_hostunreach; if (res.type != RTN_UNICAST) goto martian_destination; err = ip_mkroute_input(skb, &res, &fl4, in_dev, daddr, saddr, tos); out: return err; }
对于路由算法部分,请参考 Linux内核分析 - 网络[四]:路由表 这里不做展开了,最终我们将我们的 skb->dst->input
函数指向了 int ip_local_deliver(struct sk_buff *skb)
函数,因为这个数据包是需要我们本地进行处理,而不是进行转发等操作。
IP Deliver ip_local_deliver source code 1 2 3 4 5 6 7 8 9 10 11 int ip_local_deliver (struct sk_buff *skb) { if (ip_hdr(skb)->frag_off & htons(IP_MF | IP_OFFSET)) { if (ip_defrag(skb, IP_DEFRAG_LOCAL_DELIVER)) return 0 ; } return NF_HOOK(NFPROTO_IPV4, NF_INET_LOCAL_IN, skb, skb->dev, NULL , ip_local_deliver_finish); }
最终在 ip_local_deliver_finish
函数中
ip_local_deliver_finish source code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 static int ip_local_deliver_finish (struct sk_buff *skb) { struct net *net = dev_net(skb->dev); __skb_pull(skb, ip_hdrlen(skb)); int protocol = ip_hdr(skb)->protocol; if (ipprot != NULL ) { int ret; ret = ipprot->handler(skb); if (ret < 0 ) { protocol = -ret; goto resubmit; } } }
因为 IP 层之上,还有除了 TCP
之外其他的协议,因此这里需要针对不同的 protocol
调用相对应的 函数
,这里其实就是我们的 int tcp_v4_rcv(struct sk_buff *skb)
,再往下走就是我们熟悉的 TCP
层。
TCP Level 对于我们来说,网络编程大部分的时候都在和 Socket
这个抽象进行斗争,TCP
层开始,我们就将 IP
包组成 TCP
报文发送给 Socket
了。对于 socket
有两种抽象:
struct socket 接近用户态的抽象,为用户实现 BSD Socket
抽象
struct sock 内核中表示的网络连接抽象。
Sock 定义 对于 Sock
有着一个很巨大复杂的定义,我们尽量剥离出那些比较重要的。
struct_sock 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 struct sock { struct sock_common __sk_common ; socket_lock_t sk_lock; struct sk_buff_head sk_receive_queue ; struct sk_buff_head sk_write_queue ; struct timer_list sk_timer ; struct socket *sk_socket ; void *sk_user_data; void (*sk_state_change)(struct sock *sk); void (*sk_data_ready)(struct sock *sk); void (*sk_write_space)(struct sock *sk); void (*sk_error_report)(struct sock *sk); int (*sk_backlog_rcv)(struct sock *sk, struct sk_buff *skb); void (*sk_destruct)(struct sock *sk); };
从最简单的设计定义上,我们可以发现,对于 sock
,我们知道我们有一个 Recv
的队列,和一个 Send
的队列,现在我们的大部分的操作都会在这两个队列进行操作。
Sock 读取数据 在 sock.c
中写的非常的清晰
sock_queue_rcv_skb 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 int sock_queue_rcv_skb (struct sock *sk, struct sk_buff *skb) { int err; unsigned long flags; struct sk_buff_head *list = &sk->sk_receive_queue; if (atomic_read (&sk->sk_rmem_alloc) >= sk->sk_rcvbuf) { atomic_inc (&sk->sk_drops); trace_sock_rcvqueue_full(sk, skb); return -ENOMEM; } err = sk_filter(sk, skb); if (err) return err; skb->dev = NULL ; skb_set_owner_r(skb, sk); __skb_queue_tail(list , skb); if (!sock_flag(sk, SOCK_DEAD)) sk->sk_data_ready(sk); return 0 ; }
Sock 写入数据类似,这里不做展开。我们先暂且不考虑被我忽略的 Lock
的部分的逻辑。其实代码的逻辑非常的好理解,我们将受到的 bytes 数据置于队列的尾部,通知下数据就绪就可以了。
谁负责写入队列 TCP/IP 模式 tcp_v4_rcv source code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 int tcp_v4_rcv (struct sk_buff *skb) { struct net *net = dev_net(skb->dev); int sdif = inet_sdif(skb); const struct iphdr *iph ; const struct tcphdr *th ; bool refcounted; struct sock *sk ; int ret; th = (const struct tcphdr *) skb->data; iph = ip_hdr(skb); lookup: sk = __inet_lookup_skb(&tcp_hashinfo, skb, __tcp_hdrlen(th), th->source, th->dest, sdif, &refcounted); if (!sk) goto no_tcp_socket; process: if (sk->sk_state == TCP_TIME_WAIT) goto do_time_wait; if (sk->sk_state == TCP_NEW_SYN_RECV) { } if (tcp_filter(sk, skb)) goto discard_and_relse; th = (const struct tcphdr *) skb->data; iph = ip_hdr(skb); tcp_v4_fill_cb(skb, iph, th); skb->dev = NULL ; if (sk->sk_state == TCP_LISTEN) { ret = tcp_v4_do_rcv(sk, skb); goto put_and_return; } bh_lock_sock_nested(sk); ret = 0 ; if (!sock_owned_by_user(sk)) { ret = tcp_v4_do_rcv(sk, skb); } else if (tcp_add_backlog(sk, skb)) { goto discard_and_relse; } bh_unlock_sock(sk); }
对于 tcp_v4_rcv()
函数来说,针对现在 sock
所在的不同状态进行处理我们收到的数据,逻辑也非常的容易理解。
tcp_ipv4.c:tcp_v4_do_rcv 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 int tcp_v4_do_rcv (struct sock *sk, struct sk_buff *skb) { struct sock *rsk ; if (sk->sk_state == TCP_ESTABLISHED) { tcp_rcv_established(sk, skb); return 0 ; } if (sk->sk_state == TCP_LISTEN) { struct sock *nsk = tcp_v4_cookie_check(sk, skb); if (!nsk) goto discard; if (nsk != sk) { if (tcp_child_process(sk, nsk, skb)) { rsk = nsk; goto reset; } return 0 ; } } if (tcp_rcv_state_process(sk, skb)) { rsk = sk; goto reset; } return 0 ; }
从入口处我们就发现了,对于Linux
内核来说,根据不同的状态处理 Socket
就所谓的内核虚拟协议栈最重要的事情。我们来看看接受数据的的最终地,对于 TCP/IP4
模式,反而没有直接调用 sock_queue_rcv_skb
而是选择了另外一种方式,因为在此模式下,我们已经在事先检查了大量的阈值状态。
tcp_queue_rcv source code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 static int tcp_queue_rcv (struct sock *sk, struct sk_buff *skb, int hdrlen, bool *fragstolen) { int eaten; struct sk_buff *tail = skb_peek_tail(&sk->sk_receive_queue); __skb_pull(skb, hdrlen); eaten = (tail && tcp_try_coalesce(sk, tail, skb, fragstolen)) ? 1 : 0 ; if (!eaten) { __skb_queue_tail(&sk->sk_receive_queue, skb); skb_set_owner_r(skb, sk); } return eaten; }
TCP STATE PROCESS 对 TCP
协议来说,本质上我们就是在维护一个有限状态的自动机,状态切换的代码在 ipv4/tcp_input.c:tcp_rcv_state_process
函数,比较的长就不做展开,处理状态,就是如下图进行各种状态的变换。
[可选] RAW 模式 Raw也就是混杂模式,这里的实现在 ipv4/raw.c
中
raw_v4_input 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 static int raw_v4_input (struct sk_buff *skb, const struct iphdr *iph, int hash) { struct sock *sk ; struct hlist_head *head ; int delivered = 0 ; struct net *net ; read_lock(&raw_v4_hashinfo.lock); head = &raw_v4_hashinfo.ht[hash]; if (hlist_empty(head)) goto out; net = dev_net(skb->dev); sk = __raw_v4_lookup(net, __sk_head(head), iph->protocol, iph->saddr, iph->daddr, skb->dev->ifindex); while (sk) { delivered = 1 ; if ((iph->protocol != IPPROTO_ICMP || !icmp_filter(sk, skb)) && ip_mc_sf_allow(sk, iph->daddr, iph->saddr, skb->dev->ifindex)) { struct sk_buff *clone = skb_clone(skb, GFP_ATOMIC); if (clone) raw_rcv(sk, clone); } sk = __raw_v4_lookup(net, sk_next(sk), iph->protocol, iph->saddr, iph->daddr, skb->dev->ifindex); } out: read_unlock(&raw_v4_hashinfo.lock); return delivered; }
特点是不检测 目标端口
只要是 IP 匹配的上就可以传递
Linux Networking Stack Linux
不仅仅的单纯的提供实现了 TCP/IP
协议,在历代的版本中都尝试在 数据流
处理的过程中增加一些额外的管控能力。ip firewall,ipchains,iptanles,nftables 都是为了让用户在处理的过程中做一些额外的工作。
netfilter netfilter 应该是最出名的项目了,从 Linux 2.4.X
版本开始作为内置的功能提供给运维管理员。针对 Netfilter
的分析比较的多,这里就不展开了,可以查阅 A Deep Dive into Iptables and Netfilter Architecture
但是仍有一些不致命但是很烦人的缺点。
iptables规则不支持增量配置,时间复杂度均为 O(n)
基于配置化回调函数,性能不高
ebpf ebpf
是 Linux
社区的一个新星。为了追求性能上的提高,在内核里提供了一个虚拟机,将用户的过滤规则代码编译成字节码
然后交付给虚拟机,内核根据这些指令来过滤网络数据包。
基础知识 要理解 ebpf
还需要明白 bpf_map
, 其本质上是以「键/值」方式存储在内核中的数据结构,它们可以被任何知道它们的 BPF 程序访问。在内核空间的程序创建 BPF Map 并返回对应的文件描述符,在用户空间运行的程序就可以通过这个文件描述符来访问并操作 BPF Map,这就是为什么 BPF Map 在 BPF 世界中是桥梁的存在了。
Example sockex1_kern.c[内核态逻辑] source code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 SEC("socket1" ) int bpf_prog1 (struct __sk_buff *skb) { int index = load_byte(skb, ETH_HLEN + offsetof( struct iphdr, protocol)); long *value; if (skb->pkt_type != PACKET_OUTGOING) return 0 ; value = bpf_map_lookup_elem(&my_map, &index); if (value) __sync_fetch_and_add(value, skb->len); return 0 ; } char _license[] SEC("license" ) = "GPL" ;
sockex1_user.c[用户态逻辑] source code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 int main (int ac, char **argv) { struct bpf_object *obj ; int map_fd, prog_fd; char filename[256 ]; int i, sock; FILE *f; snprintf (filename, sizeof (filename), "%s_kern.o" , argv[0 ]); if (bpf_prog_load(filename, BPF_PROG_TYPE_SOCKET_FILTER, &obj, &prog_fd)) return 1 ; map_fd = bpf_object__find_map_fd_by_name(obj, "my_map" ); sock = open_raw_sock("lo" ); assert(setsockopt(sock, SOL_SOCKET, SO_ATTACH_BPF, &prog_fd, sizeof (prog_fd)) == 0 ); f = popen("ping -4 -c5 localhost" , "r" ); (void ) f; for (i = 0 ; i < 5 ; i++) { long long tcp_cnt, udp_cnt, icmp_cnt; int key; key = IPPROTO_TCP; assert(bpf_map_lookup_elem(map_fd, &key, &tcp_cnt) == 0 ); key = IPPROTO_UDP; assert(bpf_map_lookup_elem(map_fd, &key, &udp_cnt) == 0 ); key = IPPROTO_ICMP; assert(bpf_map_lookup_elem(map_fd, &key, &icmp_cnt) == 0 ); printf ("TCP %lld UDP %lld ICMP %lld bytes\n" , tcp_cnt, udp_cnt, icmp_cnt); sleep(1 ); } return 0 ; }
内核 Hook 从例子中我们也可以很清晰的发现,核心点在于 setsockopt(sock, SOL_SOCKET, SO_ATTACH_BPF, &prog_fd, sizeof(prog_fd)
我们在内核中一顿操作之后就可以发现在
sock_setsockopt source code 1 2 3 4 5 6 7 8 9 10 11 12 case SO_ATTACH_BPF: ret = -EINVAL; if (optlen == sizeof (u32)) { u32 ufd; ret = -EFAULT; if (copy_from_user(&ufd, optval, sizeof (ufd))) break ; ret = sk_attach_bpf(ufd, sk); } break ;
这行代码也非常的简单,直接构建了一个全新的 sk_filter
,然后给此 sock
设置即可。
sk_filter source code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 static int __sk_attach_prog(struct bpf_prog *prog, struct sock *sk) { struct sk_filter *fp , *old_fp ; fp = kmalloc(sizeof (*fp), GFP_KERNEL); if (!fp) return -ENOMEM; fp->prog = prog; old_fp = rcu_dereference_protected(sk->sk_filter, lockdep_sock_is_held(sk)); rcu_assign_pointer(sk->sk_filter, fp); return 0 ; }
而调用的地方也非常的清晰,比如说在接受数据的地方。
sock_queue_rcv_skb link 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 int sock_queue_rcv_skb (struct sock *sk, struct sk_buff *skb) { int err; err = sk_filter(sk, skb); if (err) return err; return __sock_queue_rcv_skb(sk, skb); } int sk_filter_trim_cap (struct sock *sk, struct sk_buff *skb, unsigned int cap) { rcu_read_lock(); filter = rcu_dereference(sk->sk_filter); if (filter) { struct sock *save_sk = skb->sk; unsigned int pkt_len; skb->sk = sk; pkt_len = bpf_prog_run_save_cb(filter->prog, skb); skb->sk = save_sk; err = pkt_len ? pskb_trim(skb, max(cap, pkt_len)) : -EPERM; } rcu_read_unlock(); return err; }
小结 对于 ebpf
不算是什么新的技术 The BSD Packet Filter: A New Architecture for User-level Packet Capture 发表在 1992
年,其核心还是基于 编译器
优化成特定平台的代码以提高效率,并且因为配置项都是 case by case
的模式,因为也不存在维护一个巨大的 tables
的困扰,最终再通过 mmap
映射的方式,将数据在内核和用户侧进行共享。
ebpf
代表了内核向用户开放能力的趋势。现在能使用的点也并不少。
可以畅想下,未来可能会出现如下的功能架构
我们可以直接通过可编程的模式,从 云端
直接推送可插拔在 内核
中的代码逻辑。
用户空间 到上面为止,我们已经走完了所有内核部分的网络代码,下面的逻辑就是用户态的部分了。
BIO 模型 这个模型大家都熟悉,我们通过 Read()
来读取数据
Example bio 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 int main (int argc, char *argv[]) { sockfd = socket(AF_INET, SOCK_STREAM, 0 ); if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof (serv_addr)) < 0 ) { perror("ERROR on binding" ); exit (1 ); } listen(sockfd, 5 ); clilen = sizeof (cli_addr); newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen); if (newsockfd < 0 ) { perror("ERROR on accept" ); exit (1 ); } bzero(buffer, 256 ); n = read(newsockfd, buffer, 255 ); if (n < 0 ) { perror("ERROR reading from socket" ); exit (1 ); } return 0 ; }
Read 读取消息的话,内部API经历的多个版本的变更,因为为了后面讲解 epoll
,我们看比较新的 4.19
版本
do_sys_recvmmsg source code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 static int do_sys_recvmmsg (int fd, struct mmsghdr __user *mmsg, unsigned int vlen, unsigned int flags, struct timespec __user *timeout) { int datagrams; struct timespec timeout_sys ; if (flags & MSG_CMSG_COMPAT) return -EINVAL; if (!timeout) return __sys_recvmmsg(fd, mmsg, vlen, flags, NULL ); datagrams = __sys_recvmmsg(fd, mmsg, vlen, flags, &timeout_sys); if (datagrams > 0 && copy_to_user(timeout, &timeout_sys, sizeof (timeout_sys))) datagrams = -EFAULT; return datagrams; } int __sys_recvmmsg(int fd, struct mmsghdr __user *mmsg, unsigned int vlen, unsigned int flags, struct timespec *timeout) { sock = sockfd_lookup_light(fd, &err, &fput_needed); if (!sock) return err; entry = mmsg; compat_entry = (struct compat_mmsghdr __user *)mmsg; while (datagrams < vlen) { err = ___sys_recvmsg(sock, (struct user_msghdr __user *)entry, &msg_sys, flags & ~MSG_WAITFORONE, datagrams); if (err < 0 ) break ; err = put_user(err, &entry->msg_len); ++entry; if (err) break ; ++datagrams; if (flags & MSG_WAITFORONE) flags |= MSG_DONTWAIT; if (msg_sys.msg_flags & MSG_OOB) break ; cond_resched(); } if (err == 0 ) goto out_put; out_put: fput_light(sock->file, fput_needed); return datagrams; }
因此我们在配置 FD
的时候,如果是非阻塞的,我们哪怕获取不到数据也会立即返回,在最初的 Blocking IO
模式下,我们需要等待数据的获取,才能返回,因此我们会在 ➊ 处进行 LOOP,我们在 cond_resched
主动将自己释放出来,然后等待下次的调用。
我们再往下走一层,Read
函数的下一层则是 tcp.c:tcp_recvmsg()
tcp_recvmsg source code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 do { u32 offset; last = skb_peek_tail(&sk->sk_receive_queue); skb_queue_walk(&sk->sk_receive_queue, skb) { last = skb; offset = *seq - TCP_SKB_CB(skb)->seq; if (offset < skb->len) goto found_ok_skb; if (TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN) goto found_fin_ok; } found_ok_skb: used = skb->len - offset; if (len < used) used = len; if (!(flags & MSG_TRUNC)) { err = skb_copy_datagram_msg(skb, offset, msg, used); } *seq += used; copied += used; len -= used; found_fin_ok: ++*seq; if (!(flags & MSG_PEEK)) sk_eat_skb(sk, skb); break ; } while (len > 0 );
直到这里我们已经把握到了数据读取的命门,真实调用的就是 skb_copy_datagram_msg()
函数,而他所读取的恰好就是我们的 sock
的 sk_receive_queue
,因此到此为止,我们已经将用户态至网卡的所有流程全面打通。
BIO的成本 在 Blocking
模式下,读取数据需要两方面的成本:模态切换
和线程等待
模态切换 用户态
切换至 内核态
我们使用的 Syscall
模式,因此对于这一步的切换本身也是有成本的,我们需要切换堆栈
,切换 cs/ip
寄存器,这一步也不是免费的,Protection ring
In most existing systems, switching from user mode to kernel mode has an associated high cost in performance. It has been measured, on the basic request getpid, to cost 1000–1500 cycles on most machines. Of these just around 100 are for the actual switch (70 from user to kernel space, and 40 back), the rest is “kernel overhead”. In the L3 microkernel, the minimization of this overhead reduced the overall cost to around 150 cycles.
线程等待
因为对于如果我们 read
不到数据,也就是在等待的过程中,我们就不得不去面对 cond_resched()
我们将自己执行的CPU时间让出来,然后等待下次的调用,这个数量和我们的 socket
数量成正比,尤其是我们传统的每一个 socket
配给一个 thread
进行处理的时候,我们要付出大量的线程切换时间。
NIO 模型 为了解决 线程等待 的问题,Linux
提供了新的编程模式 epoll
。
Example epoll source code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 while (1 ) { int n, i; n = epoll_wait(efd, events, MAXEVENTS, -1 ); for ( i = 0 ; i < n; i++) { if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN))) { fprintf (stderr , "epoll error\n" ); close(events[i] .data.fd); continue ; } else if (sfd == events[i].data.fd) { while (1 ) { struct sockaddr in_addr; socklen_t in_len; int infd; char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV]; in_len = sizeof in_addr; infd = accept(sfd, &in_addr, &in_len); if (infd == -1 ) { if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { break ; } else { perror("accept" ); break ; } } s = getnameinfo(&in_addr, in_len, hbuf, sizeof hbuf, sbuf, sizeof sbuf, NI_NUMERICHOST | NI_NUMERICSERV); if (s == 0 ) { printf ("Accepted connection on descriptor %d " "(host=%s, port=%s)\n" , infd, hbuf, sbuf); } s = make_socket_non_blocking(infd); if (s == -1 ) abort (); event.data. fd = infd; event. events = EPOLLIN | EPOLLET; s = epoll_ctl(efd, EPOLL_CTL_ADD, infd, &event); if (s == -1 ) { perror("epoll_ctl" ); abort (); } } continue ; } } }
Read 数据 对于 epoll
来说,没有数据就直接返回了,这块就没有什么特别,在上面的分析我们也可以看到。最重要的是如何维护 events
列表的
Event epoll_evnet source code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 struct eventpoll { struct rb_root rbr ; struct list_head rdllist ; wait_queue_head_t wq; wait_queue_head_t poll_wait; }; typedef union epoll_data { void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t ;
执行epoll_ctl()时,如果增加socket句柄,则检查在红黑树中是否存在,存在立即返回,不存在则添加到树干上,然后向内核注册回调函数,用于当中断事件来临时向准备就绪链表中插入数据。因此我们在一个 evnetpoll
对象上可以维护大量的 socket
对象,但是这样还不能足够我们完成事件通知,真正的奥秘还在 rdllist
就绪列表中。
注册文件 epoll
的就绪列表是基于事件驱动的,这样的就避免了我们一直不断的 轮训
数据导致的 CPU
浪费。因此在我们对 eventpoll
里面增加一个 fd
的时候代码如下:
ep_insert source code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 static int ep_insert (struct eventpoll *ep, const struct epoll_event *event, struct file *tfile, int fd, int full_check) { user_watches = atomic_long_read(&ep->user->epoll_watches); if (unlikely(user_watches >= max_user_watches)) return -ENOSPC; epq.epi = epi; init_poll_funcptr(&epq.pt, ep_ptable_queue_proc); revents = ep_item_poll(epi, &epq.pt, 1 ); ep_rbtree_insert(ep, epi); } typedef struct poll_table_struct { poll_queue_proc _qproc; unsigned long _key; } poll_table;
ep_item_poll
对应的不同的类型的 poll
函数不同,我们看看最常用的 tcp
tcp_poll 1 2 3 4 5 6 7 8 9 10 11 12 13 __poll_t tcp_poll (struct file *file, struct socket *sock, poll_table *wait) { __poll_t mask; struct sock *sk = sock->sk; const struct tcp_sock *tp = tcp_sk(sk); int state; sock_poll_wait(file, sock, wait); state = inet_sk_state_load(sk); }
又去调用了 sock_poll_wait
,而这个函数如下
poll_wait 1 2 3 4 5 static inline void poll_wait (struct file * filp, wait_queue_head_t * wait_address, poll_table *p) { if (p && p->_qproc && wait_address) p->_qproc(filp, wait_address, p); }
_qproc
的定义如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 static void ep_ptable_queue_proc (struct file *file, wait_queue_head_t *whead, poll_table *pt) { struct epitem *epi = ep_item_from_epqueue(pt); struct eppoll_entry *pwq ; if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) { init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); pwq->whead = whead; pwq->base = epi; if (epi->event.events & EPOLLEXCLUSIVE) add_wait_queue_exclusive(whead, &pwq->wait); else add_wait_queue(whead, &pwq->wait); list_add_tail(&pwq->llink, &epi->pwqlist); epi->nwait++; } else { epi->nwait = -1 ; } }
帮大家锊一下,到现在到底发生了什么?
callback ep_poll_callback source code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 static int ep_poll_callback (wait_queue_entry_t *wait, unsigned mode, int sync, void *key) { int pwake = 0 ; unsigned long flags; struct epitem *epi = ep_item_from_wait(wait); struct eventpoll *ep = epi->ep; __poll_t pollflags = key_to_poll(key); int ewake = 0 ; if (pollflags && !(pollflags & epi->event.events)) goto out_unlock; if (!ep_is_linked(epi)) { list_add_tail(&epi->rdllink, &ep->rdllist); ep_pm_stay_awake_rcu(epi); } if (waitqueue_active(&ep->wq)) { wake_up_locked(&ep->wq); } }
什么时候会 Invoke
这个 callback
,还记得们这个 callback
实际上是在 sock->wq->wait
中,而在实际的逻辑中 sock->wq->wait
的唤醒是被我们在更早之前所看到的 sock
定义的那些回调函数中
sock_callback source code 1 2 3 4 5 void (*sk_state_change)(struct sock *sk);void (*sk_data_ready)(struct sock *sk);void (*sk_write_space)(struct sock *sk);void (*sk_error_report)(struct sock *sk);int (*sk_backlog_rcv)(struct sock *sk, struct sk_buff *skb);
随便点开一个实现,我们都会看到熟悉的 wq
tipc_data_ready source code 1 2 3 4 5 6 7 static void tipc_data_ready (struct sock *sk) { struct socket_wq *wq ; wq = rcu_dereference(sk->sk_wq); if (skwq_has_sleeper(wq)) wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN | EPOLLRDNORM | EPOLLRDBAND); }
ep_poll() ep_poll
函数比较的简单,
ep_poll source code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 static int ep_poll (struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout) { fetch_events: if (!ep_events_available(ep)) { init_waitqueue_entry(&wait, current); __add_wait_queue_exclusive(&ep->wq, &wait); for (;;) { set_current_state(TASK_INTERRUPTIBLE); if (fatal_signal_pending(current)) { res = -EINTR; break ; } if (ep_events_available(ep) || timed_out) break ; if (signal_pending(current)) { res = -EINTR; break ; } spin_unlock_irq(&ep->wq.lock); if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS)) timed_out = 1 ; spin_lock_irq(&ep->wq.lock); } __remove_wait_queue(&ep->wq, &wait); __set_current_state(TASK_RUNNING); } check_events: eavail = ep_events_available(ep); spin_unlock_irq(&ep->wq.lock); if (!res && eavail && !(res = ep_send_events(ep, events, maxevents)) && !timed_out) goto fetch_events; return res; }
对于 Epoll
的分析到此结束,epoll
基于 rdlist
来维护我们就绪的事件 fd
,然后使用 redblack tree
快速检索这些 fd
,而我认为最核心的部分还是莫过于 callback
机制,我们可以在收到数据之后才触发,这样减少了我们 轮训
的时间。
epoll的成本 epoll
显然已经将 thread blocking
的成本降低至极限,因为显然我们只需要一个 thread
就可以完成就绪事件的扫描和处理,但是依然有几个成本无法回避 ➊ 模态切换,用户态和内核态转换 ➋ 数据拷贝,因为 Read
依然是需要拷贝数据从内核态到用户态。
Zero Copy 传统读写 对于 Linux
的读写操作,想象一下从文件中读取一些数据然后写到 socket
中
read/write 1 2 read(file, tmp_buf, len); write(socket, tmp_buf, len);
读取数据远比想象中的复杂:
read
系统调用导致从用户模式到内核模式[上下文切换
]。第一次复制是由DMA
执行的,它从磁盘 中读取内容并将其存储到内核地址空间缓冲区中。
将数据从内核缓冲区复制到用户缓冲区,然后 read
系统调用返回。调用的返回导致上下文从内核模式切换回用户模式。现在数据被存储在用户地址空间缓冲区中,它可以再次开始向下运行。
write系统调用导致从用户模式到内核模式的上下文切换。执行第三次复制,再次将数据放入内核地址空间缓冲区。但是,这一次数据被放到了一个不同的缓冲区中,这个缓冲区专门与套接字相关联。
write系统调用返回,创建第四个上下文切换。
很明显的,我们从用户态
向内核态
需要拷贝一次数据,显然这次如果有一些机制可以节约下来这次开销,会更加合理一点。
mmap 机制 mmap
提供一种对于用户态和内核态的贡献数据的方式。
mmap 1 2 tmp_buf = mmap(file, len); write(socket, tmp_buf, len);
mmap
系统调用导致DMA
将文件内容复制到内核缓冲区中。然后与用户进程共享缓冲区,而不需要在内核和用户内存空间之间执行任何复制。
write
系统调用导致内核将数据从内核缓冲区复制到与套接字相关联的内核缓冲区中。
DMA
将数据从内核套接字缓冲区传递到协议引擎时发生第三次复制。
通过使用 mmap
而非 read
,我们减少了内核必须复制的数据量的一半。当传输大量数据时,这会产生相当好的结果。
基于类似于 mmap
的机制, Linux
自从 4.14
版本对 Socket
增加了 MSG_ZEROCOPY
选项,只需要执行如下代码:
设置标记位 1 2 if (setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &one, sizeof (one))) error(1 , errno, "setsockopt zerocopy" );
在读取和发送的时候使用,详细可以参考 内核文档: msg_zerocopy
零拷贝读写操作 1 2 3 ret = recvmsg(fd, &msg, MSG_ERRQUEUE); ret = send(fd, buf, sizeof (buf), MSG_ZEROCOPY);
How it works linux
内核使用 vm_area_struct
结构来表示一个独立的虚拟内存区域,由于每个不同质的虚拟内存区域功能和内部机制都不同,因此一个进程使用多个vm_area_struct结构来分别表示不同类型的虚拟内存区域。各个 vm_area_struct
结构使用链表或者树形结构链接,方便进程快速访问,如下图所示:
因此从使用者角度看
进程在用户空间调用库函数mmap
在当前进程的虚拟地址空间中,寻找一段空闲的满足要求的连续的虚拟地址
为此虚拟区分配一个vm_area_struct结构,接着对这个结构的各个域进行了初始化
将新建的虚拟区结构(vm_area_struct)插入进程的虚拟地址区域链表或树中
进程的读或写操作访问虚拟地址空间,触发 缺页异常
,内核去开辟新的物理页
sendfile 机制 sendfile 1 sendfile(socket,file,len);
sendfile
系统调用使 DMA
将文件内容复制到内核缓冲区中。
没有数据复制到套接字缓冲区中。将具有有关数据的地址和长度信息附加到套接字缓冲区。
这个可以说是真正的 Zero COPY
技术了。不过缺点也很明显,只能用在 FD
之上,对于大部分时候我们需要处理的是 Byte[]
对于内存的数据,这样的函数接口显然不是那么好用。
除此之外:Direct I/O Asynchronous direct I/O 都是优化 IO
的操作,对于网络请求一般不使用这些模式,大家可以自行阅读。
不过 Zero Copy
不一定有大家想象中的有效,在 Zero-copy networking 有详细的描述。
As was noted in the introduction, the benefits from zero-copy operation are often less than one might hope. Copying is expensive, but the setup required to avoid a copy operation also has its costs. In this case, the author claims that a simple benchmark (netperf blasting out data) runs 39% faster, while a more realistic production workload sees a 5-8% improvement. So the benefit for real-world systems is not huge, but it may well be enough to be worth going for on highly-loaded systems that transmit a lot of data.
不过从提交 PR
的作者的测试结果看还是不错的。
传输 10G 的流量,前三列是 netperf
进程的耗费的时钟周期,后三列是 系统级
的时钟周期。 std
是标准的读写, zc
代表零拷贝。
1 2 3 4 5 6 7 --process cycles-- ----cpu cycles---- std zc % std zc % 4K 27,609 11,217 41 49,217 39,175 79 16K 21,370 3,823 18 43,540 29,213 67 64K 20,557 2,312 11 42,189 26,910 64 256K 21,110 2,134 10 43,006 27,104 63 1M 20,987 1,610 8 42,759 25,931 61
io_uring 我们使用 zero copy
技术将我们数据拷贝减少至极致,但是我们依然涉及到 模块切换
,在 内核态
和 用户态
之间切换也不是免费的,因此制约我们这个特性又应该怎么解决呢?在 Kernel
解决这个之前,社区提出了一个很出名的解决之道 dpdk
提出了 UIO
的概念。
因此 Linux 5.1
参考了 DPDK
的实现,提出了 io_uring
的方案,白皮书
Example io_uring source code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 #define MAX_CONNECTIONS 4096 #define BACKLOG 512 #define MAX_MESSAGE_LEN 2048 #define BUFFERS_COUNT MAX_CONNECTIONS void add_accept (struct io_uring *ring, int fd, struct sockaddr *client_addr, socklen_t *client_len, unsigned flags) ;void add_socket_read (struct io_uring *ring, int fd, unsigned gid, size_t size, unsigned flags) ;void add_socket_write (struct io_uring *ring, int fd, __u16 bid, size_t size, unsigned flags) ;void add_provide_buf (struct io_uring *ring, __u16 bid, unsigned gid) ;enum { ACCEPT, READ, WRITE, PROV_BUF, }; typedef struct conn_info { __u32 fd; __u16 type; __u16 bid; } conn_info; char bufs[BUFFERS_COUNT][MAX_MESSAGE_LEN] = {0 };int group_id = 1337 ;int main (int argc, char *argv[]) { int portno = strtol(argv[1 ], NULL , 10 ); struct sockaddr_in serv_addr , client_addr ; socklen_t client_len = sizeof (client_addr); int sock_listen_fd = socket(AF_INET, SOCK_STREAM, 0 ); const int val = 1 ; setsockopt(sock_listen_fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof (val)); if (bind(sock_listen_fd, (struct sockaddr *)&serv_addr, sizeof (serv_addr)) < 0 ) { perror("Error binding socket...\n" ); exit (1 ); } if (listen(sock_listen_fd, BACKLOG) < 0 ) { perror("Error listening on socket...\n" ); exit (1 ); } printf ("io_uring echo server listening for connections on port: %d\n" , portno); struct io_uring_params params ; struct io_uring ring ; memset (¶ms, 0 , sizeof (params)); if (io_uring_queue_init_params(2048 , &ring, ¶ms) < 0 ) { perror("io_uring_init_failed...\n" ); exit (1 ); } struct io_uring_sqe *sqe ; struct io_uring_cqe *cqe ; add_accept(&ring, sock_listen_fd, (struct sockaddr *)&client_addr, &client_len, 0 ); while (1 ) { io_uring_submit_and_wait(&ring, 1 ); struct io_uring_cqe *cqe ; unsigned head; unsigned count = 0 ; io_uring_for_each_cqe(&ring, head, cqe) { ++count; struct conn_info conn_i ; memcpy (&conn_i, &cqe->user_data, sizeof (conn_i)); int type = conn_i.type; if (type == ACCEPT) { int sock_conn_fd = cqe->res; if (sock_conn_fd >= 0 ) { add_socket_read(&ring, sock_conn_fd, group_id, MAX_MESSAGE_LEN, IOSQE_BUFFER_SELECT); } add_accept(&ring, sock_listen_fd, (struct sockaddr *)&client_addr, &client_len, 0 ); } else if (type == READ) { int bytes_read = cqe->res; } } io_uring_cq_advance(&ring, count); } } void add_accept (struct io_uring *ring, int fd, struct sockaddr *client_addr, socklen_t *client_len, unsigned flags) { struct io_uring_sqe *sqe = io_uring_get_sqe(ring); io_uring_prep_accept(sqe, fd, client_addr, client_len, 0 ); io_uring_sqe_set_flags(sqe, flags); conn_info conn_i = { .fd = fd, .type = ACCEPT, }; memcpy (&sqe->user_data, &conn_i, sizeof (conn_i)); } void add_socket_read (struct io_uring *ring, int fd, unsigned gid, size_t message_size, unsigned flags) { struct io_uring_sqe *sqe = io_uring_get_sqe(ring); io_uring_prep_recv(sqe, fd, NULL , message_size, 0 ); io_uring_sqe_set_flags(sqe, flags); sqe->buf_group = gid; conn_info conn_i = { .fd = fd, .type = READ, }; memcpy (&sqe->user_data, &conn_i, sizeof (conn_i)); }
How it works Linux
只增加了 3 个 syscall
1 2 3 4 no syscall 425 io_uring_setup 426 io_uring_enter 427 io_uring_register
数据结构 并没有常见的 read
和 write
,因为现在的这两步可以在用户态进行了(这不废话,我们就是要解决模态切换的问题)。为了实现用户态和内核态通讯,io_uring
提供了2个队列:提交队列(submission queue)和完成队列(completion queue)
定义如下
io_uring.h source code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 struct io_uring_params { __u32 sq_entries; __u32 cq_entries; __u32 flags; __u32 sq_thread_cpu; __u32 sq_thread_idle; __u32 features; __u32 wq_fd; __u32 resv[3 ]; struct io_sqring_offsets sq_off ; struct io_cqring_offsets cq_off ; }; struct io_sqring_offsets { __u32 head; __u32 tail; __u32 ring_mask; __u32 ring_entries; __u32 flags; __u32 dropped; __u32 array ; __u32 resv1; __u64 resv2; }; struct io_cqring_offsets { __u32 head; __u32 tail; __u32 ring_mask; __u32 ring_entries; __u32 overflow; __u32 cqes; __u32 flags; __u32 resv1; __u64 resv2; }; struct io_uring_cqe { __u64 user_data; __s32 res; __u32 flags; };
SQ
,CQ
,SQEs
是在内核中分配的,所以用户态程序并不能直接访问。io_setup
的返回值是一个 fd
,应用程序使用这个 fd
进行 mmap,和 kernel
共享一块内存。
IO submit IO 提交的做法是找到一个空闲的 SQE,根据请求设置 SQE,并将这个 SQE 的索引放到 SQ 中。SQ 是一个典型的 RingBuffer,有 head,tail 两个成员,如果 head == tail,意味着队列为空。SQE 设置完成后,需要修改 SQ 的 tail,以表示向 RingBuffer 中插入一个请求。当所有请求都加入 SQ 后,就可以使用 :
1 int io_uring_enter (unsigned int fd, u32 to_submit, u32 min_complete, u32 flags) ;
不过这样还是会涉及到 模态切换
,因此还提供了另外一个模式。
offload 模式 调用 io_uring_setup
时设置了 IORING_SETUP_SQPOLL
的 flag,内核会额外启动一个内核线程,我们称作 SQ
线程。
IO Completion 当 IO
完成时,内核负责将完成 IO
在 SQEs
中的 index
放到 CQ
中。由于 IO
在提交的时候可以顺便返回完成的 IO
,所以收割 IO
不需要额外系统调用。这是跟 libaio
比较大的不同,省去了一次系统调用。
框架层 在 OS
上提供的 epoll
io_uring
的情况下,在应用层如何高效的使用也是一个相对复杂的问题。不过在这层上来说,我们更需要的是配合 编程语言
的并发模式提供一种高效的编程框架。
Thread 编程模型:Netty & Reactor epoll
也不算是一个新东西,业界也整理出一套比较高效的 框架模型
,常用的高性能 IO 模型 Reactor
。
Netty
抽象的 Acceptor
作为接受者,对应的线程是 bossGroup
,从图上看好像是单个的,但是并不是 Acceptor
也可以有多个线程,Dispatch
将接收到的请求分发到 workGroup
的线程组进行繁重的读写工作。
对于读写操作,提供了 pipeline
的机制
Goroutines:netpoller go
语言提供一套将 异步io
转化为 同步io
的机制,这样对于开发者来说会更加的轻松,得益于 goroutine
并不会带来不可承担的成本。
go
会初始化一个全局的 netpoll
,在 linux
的实现是创建一个 epfd
netpollinit source code 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 var ( epfd int32 = -1 netpollBreakRd, netpollBreakWr uintptr ) func netpollinit () { epfd = epollcreate1(_EPOLL_CLOEXEC) r, w, _ := nonblockingPipe() ev := epollevent{ events: _EPOLLIN, } *(**uintptr )(unsafe.Pointer(&ev.data)) = &netpollBreakRd epollctl(epfd, _EPOLL_CTL_ADD, r, &ev) netpollBreakRd = uintptr (r) netpollBreakWr = uintptr (w) }
当我们打开一个 file
的时候,就会向 epoll
注册事件
netpollopen source code 1 2 3 4 5 6 func netpollopen (fd uintptr , pd *pollDesc) int32 { var ev epollevent ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd return -epollctl(epfd, _EPOLL_CTL_ADD, int32 (fd), &ev) }
当我们在文件描述符上执行读写操作时,如果文件描述符不可读或者不可写,当前 Goroutine 就会执行 runtime.poll_runtime_pollWait
检查 runtime.pollDesc
的状态并调用 runtime.netpollblock
等待文件描述符的可读或者可写
netpollblock source code 1 2 3 4 5 6 7 8 9 10 11 12 13 func netpollblock (pd *pollDesc, mode int32 , waitio bool ) bool { gpp := &pd.rg if mode == 'w' { gpp = &pd.wg } ... if waitio || netpollcheckerr(pd, mode) == 0 { gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5 ) } ... }
唤醒操作也很简单
netpollready source code 1 2 3 4 5 6 7 8 9 10 11 12 func netpollready (toRun *gList, pd *pollDesc, mode int32 ) { var rg, wg *g ... if mode == 'w' || mode == 'r' +'w' { wg = netpollunblock(pd, 'w' , true ) } ... if wg != nil { toRun.push(wg) } }
小结 大概前前后后写了半个月才将本文写完,也没有预计到可以写这么多,试图帮助大家从 硬件
到 框架
整理出一套体系,但是为了达成高性能的 Web Server
,除了我们本文提到了 OS
的一些优化,还有很多在框架层面上可以优化的地方。连接池
Fast Path算法
缓存
等等。
希望大家在网络的世界里面玩的开心 ;)
参考