0%

Linux 网络大全

JJUHE.png

不是很喜欢起这么大的一个名字,不过好像也没啥办法,这次带大家一起聊聊,Linux Network 的全貌(从软件工程师的角度)

修订记录

  • 2020.9.30: 增加 ebpf 相关

基础概念

觉得啰嗦的可以:直接跳转至正文

Linux 分层

JRPSU.png

内核提供了一组 API,通常称为“系统调用”。这些 API 不同于常规的库 API,因为它们是执行从用户模式切换到内核模式的边界。

为了提供应用程序兼容性,很少更改系统调用。Linux特别强调这一点(与内核 API 不同,内核 API 可以根据需要进行更改)。

内核代码本身可以在逻辑上分为核心内核代码和设备驱动程序代码。设备驱动程序代码负责访问特定的设备,而核心内核代码是通用的。核心内核可以进一步划分为多个逻辑子系统(如文件访问、联网、进程管理等)

SMP

JRR2O.png

在现在的硬件体系下,因为我们的系统都是 多核 架构,因此大部分的时候,我们的Linux都运行在 Symmetric MultiProcessing 状态下,也就是内核可能在多个 CPU 中进行运行。如果两个进程运行访问相同内存位置的内核函数,则会在内核中产生竞争条件。因此在内核代码的大多数地方都需要使用到 synchronization 机制,来保证代码的线程安全线。

线程

在单纯的应用侧我们无感知 CPU 的线程概念,在操作系统这个层面就不得不去面对了。

  • 操作系统 中的 Thread: 任务的单位,保存当前执行的栈,和同一个进程共享地址空间等,这个就是我们熟悉的线程。
  • CPU 中的 Thread: 执行单位,当前 CPU 上可以同时执行的基本单位。

mmio

MMIO(Memory mapping I/O)即内存映射I/O,它是PCI规范的一部分,I/O设备被放置在内存空间而不是I/O空间。从处理器的角度看,内存映射I/O后系统设备访问起来和内存一样。

Network Packet Travel

开局一张图,谨防迷路,代码原理大部分针对 Linux 2.4,具体的实现部分可能会截取 Linux 4.19

Jc8rR.png

Network Driver

本文试图从软件的角度解释一个网络请求的今生前世,我们这里选取 RTL8139 作为我们了解网络驱动的硬件设备。

设备数据结构定义:net_device

Linux 内核中设备的抽象对应的结构体是 struct net_device

net_devicesource code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
struct net_device {
char name[IFNAMSIZ];
/*
* I/O 相关
*/
unsigned long mem_end;
unsigned long mem_start;
unsigned long base_addr;
int irq;

// 设备 Index
int ifindex;

// stats 数据
struct net_device_stats stats;

// 接受队列
struct netdev_rx_queue *_rx;
unsigned int num_rx_queues;
unsigned int real_num_rx_queues;

// 发送队列
struct netdev_queue *_tx;
unsigned int num_tx_queues;
unsigned int real_num_tx_queues;
};

对于现在硬件设备都会基于 mmio 模式进行工作,因此我们读取数据也是直接和读取内存类似的方式进行读取的。

网卡数据的读取

外部数据的读取会涉及到中断体系,Linux中断 分为了两个部分: top halfbottom halftop half是在呼叫 request_irq() 时所指定的 interrupt handler 函数,bottom half 则是真正负责响应中断的 task

硬中断部分

对于数据初次抵达硬件,会触发硬件中断,如下图所示(网卡类似)。

J6IXC.png
因此从中断的入口观察是最好的,下面的代码逻辑从 linux-1.x 版本中截取,比最近的要简单一些,容易理解,另外就是新版的已经改为 NAPI 模式,下文提到再说。

rtl8139_interruptsource code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
static void rtl8139_interrupt(int irq, void *dev_instance, struct pt_regs *regs) {
struct net_device *dev = (struct net_device *) dev_instance;
struct rtl8139_private *tp = dev->priv;
void *ioaddr = tp->mmio_addr;
unsigned short isr = readw(ioaddr + ISR);

// 处理接受信号,上面略过了发送部分
if (isr & RxOK) {
LOG_MSG("receive interrupt received\n");
while ((readb(ioaddr + CR) & RxBufEmpty) == 0) {
unsigned int rx_status;
unsigned short rx_size;
unsigned short pkt_size;
struct sk_buff *skb;

// 处理环形队列
if (tp->cur_rx > RX_BUF_LEN)
tp->cur_rx = tp->cur_rx % RX_BUF_LEN;

rx_status = *(unsigned int *) (tp->rx_ring + tp->cur_rx);
rx_size = rx_status >> 16;

pkt_size = rx_size - 4;

/* 创建内核态的 skb 储存接受到的数据 */
skb = dev_alloc_skb(pkt_size + 2);
if (skb) {
skb->dev = dev;
skb_reserve(skb, 2); /* 16 byte align the IP fields */

// 拷贝数据并且检查 checksum
eth_copy_and_sum(
skb, tp->rx_ring + tp->cur_rx + 4, pkt_size, 0);

skb_put(skb, pkt_size);
skb->protocol = eth_type_trans(skb, dev);
netif_rx(skb);

dev->last_rx = jiffies;
tp->stats.rx_bytes += pkt_size;
tp->stats.rx_packets++;
} else {
LOG_MSG("Memory squeeze, dropping packet.\n");
tp->stats.rx_dropped++;
}

/* 移动读取指针 * /
tp->cur_rx = (tp->cur_rx + rx_size + 4 + 3) & ~3;
}
}

return;
}

中断模式下,我们收到一个数据包是非常的好理解的,将数据从网卡中 COPY 到我们的内核中即可,netif_rx 就是我们从 HardwareKernel 转换的核心点。

netif_rxsource code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int netif_rx(struct sk_buff *skb) {
int this_cpu = smp_processor_id();
struct softnet_data *queue;
unsigned long flags;

// 我们为每个 Core 分配了一个 待处理的数据队列,这里的处理如注释所言,
// 代码可能被重排,这里只用当前的 CPU 保证了处理路径的最短化
queue = &softnet_data[this_cpu];

//没有超过backlog的限制就置于队列
if (queue->input_pkt_queue.qlen <= netdev_max_backlog) {
if (queue->input_pkt_queue.qlen) {
if (queue->throttle) // 不然就丢弃 -> 因为还没响应,等于丢包
goto drop;
enqueue:
dev_hold(skb->dev);
__skb_queue_tail(&queue->input_pkt_queue, skb);
return softnet_data[this_cpu].cng_level;
}

goto enqueue;
}
drop:
netdev_rx_stat[this_cpu].dropped++;
kfree_skb(skb);
return NET_RX_DROP;
}

硬中断部分到此结束,我们将数据压入内核的虚拟接受列队中结束。

[选读] 硬中断成本

硬中断 是一个有硬件主导,软件配合的出来体系,但是这样的模式不是没有成本的,打断正在运行的软件进程,我们不得不涉及到进程的切换,还有中断标记的处理等工作。It’s not free. ,有兴趣的可以参考 Profiling I/O Interrupts in Modern Architectures

截止至今,我们知道了对于从网卡来的数据,我们至少会涉及到 ➊ Hard Interrupt 导致的 进程切换,并且 ➋ 涉及到一次数据从网卡的 Buffer 向内核拷贝的过程。

软中断部分

软中断的逻辑在 net_rx_action

net_rx_actionsource code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
static void net_rx_action(struct softirq_action *h) {
int this_cpu = smp_processor_id();
struct softnet_data *queue = &softnet_data[this_cpu];
int bugdet = netdev_max_backlog;

for (;;) {
struct sk_buff *skb;
struct net_device *rx_dev;
// 将数据弹出
skb = __skb_dequeue(&queue->input_pkt_queue);

// 没数据直接结束
if (skb == NULL)
break;

skb->h.raw = skb->nh.raw = skb->data;
{
struct packet_type *ptype, *pt_prev;
unsigned short type = skb->protocol;

pt_prev = NULL;
// 根据 package 类型找到对应的处理函数进行处理
for (ptype = ptype_all; ptype; ptype = ptype->next) {
if (!ptype->dev || ptype->dev == skb->dev) {
if (pt_prev) {
if (!pt_prev->data) {
deliver_to_old_ones(pt_prev, skb, 0);
} else {
atomic_inc(&skb->users);
// 调用预置的回调函数
pt_prev->func(skb,skb->dev,pt_prev);
}
}
pt_prev = ptype;
}
}

}
return;
}
}

回调的函数签名如下:

int (*func) (struct sk_buff *, struct net_device *, struct packet_type *); 很标准的指针函数。不过也很讨厌看这部分的代码,如果不是 Runtime 阶段很难找到这个定义的函数是什么。这里实际上在接收到 TCP/IP 协议包的时候,回调的是 int ip_rcv(struct sk_buff *skb, struct net_device *dev, struct packet_type *pt) 函数。在看 IP 处理的之前,我们先看看这个 packet_type 的相关函数。

[选读] Packet Type 钩子

packet_type 的定义很简单

packet_type
1
2
3
4
5
6
7
struct packet_type {
unsigned short type; /* 协议类型 */
struct net_device *dev; /* 不设置就是通配符 */
int (*func)(struct sk_buff *, struct net_device *, struct packet_type *); /* 回调函数 */
void *data; /* 私有数据 */
struct packet_type *next;
};

dev.c 的源码中,我们可以动态的向 ptype_all 注册我们需要的 ptype

dev_add_packsource code
1
2
3
4
5
6
7
8
9
10
11
void dev_add_pack(struct packet_type *pt) {
if (pt->type == htons(ETH_P_ALL)) {
netdev_nit++;
pt->next = ptype_all;
ptype_all = pt;
} else {
hash = ntohs(pt->type) & 15;
pt->next = ptype_base[hash];
ptype_base[hash] = pt;
}
}

因此我们如果需要处理比如 arp 的请求,可以单独增加一个,所以在 arp.c 初始化的时候,就可以出发。

arp_init
1
2
3
4
5
6
7
8
9
static struct packet_type arp_packet_type = {
type: __constant_htons(ETH_P_ARP),
func: arp_rcv,
data: (void *) 1,
};

void __init arp_init(void) {
dev_add_pack(&arp_packet_type);
}
TCPDUMP

tcpdump 二层抓包,用的是 libpcap 库,它的基本原理是

  1. 先创建socket,内核dev_add_packet()挂上自己的钩子函数
  2. 然后在钩子函数中,把skb放到自己的接收队列中,
  3. 接着系统调用recv取出skb来,把数据包skb->data拷贝到用户空间
  4. 最后关闭socket,内核dev_remove_packet()删除自己的钩子函数

因此代码如下表述

af_packet.c:packet_createsource code
1
2
3
4
5
6
7
8
9
10
static int packet_create(struct socket *sock, int protocol) {
// 略部分代码
if (protocol) {
sk->protinfo.af_packet->prot_hook.type = protocol;
dev_add_pack(&sk->protinfo.af_packet->prot_hook); /* 增加 Hook 在此协议上 */
sock_hold(sk);
sk->protinfo.af_packet->running = 1;
}
return (0);
}

直到此处,我们走到这里算是和硬件越来越远了。再下来的历程,我们要去面对的是更加高纬度的 Socket 抽象。

[选读] 软断成本

对于常见的硬件中断来说,比如键入了一个字母,过程简单,很快可以处理完成,对于网络IO就复杂的多,涉及到从驱动拷贝数据,然后到虚拟的网络协议栈,再到Socket的Buffer中,因此才才产生的了软中断的,将中断一分为二,软中断优先级较低但是因为软件中断复杂,成本上是要更多的。

软中断依然需要 ➊ 进程上下文的切换(这里切换到 内核线程 ksoftirqd ),这里只不过相对于硬中断是操作系统自己进行调度的 ➋ 就是和系统开销调用一下,因为对于正常运行的用户态进程,我们需要把当前的上下文保存。

内核线程 ksoftirqd 也会充分利用多核的能力,对于每一个 core 都会启动一个 ksoftirqd,暂且可以把它当做一个普通的进程看待。

NAPI

单纯的看,上面的开销,我们一眼就可以发现硬中断部分有一个非常不合理的情况,考虑如果出现以下情况

网卡每隔 10 ms 获得一个数据包,我们硬中断处理需要 9ms

那会出现一个很神奇的现象就是我们虽然每次都要将中断结束,但是每一次刚刚结束又要唤醒,结果我们会浪费大量的时间在处理中断请求上。针对这一的情况,Linux 2.6 之后提供了一个全新的API来处理这种情况(实际上对于大部分的数据,网络请求从不间断)。

JULvD.png

既然每次进中断都很浪费,那我们就不要每一次都进中断了,NAPI 的解决之道就是,当我们接收到请求之后,我们将从此网卡读取作为一个 Task 注册到 napi_scheluder 中,然后屏蔽中断,内核定时的去 poll 数据即可。主要就是减少了硬中断切换的成本。

NAPI 定义
napisource code
1
2
3
4
5
6
7
8
9
10
struct napi_struct {
// poll 列表
struct list_head poll_list;

unsigned long state;
int weight;

// 获得数据的 poll 函数
int (*poll)(struct napi_struct *, int);
};

核心的逻辑都是在 poll 的回调函数中。

NAPI 之下的硬中断处理
rtl8139_interruptsource code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static irqreturn_t rtl8139_interrupt(int irq, void *dev_instance) {
struct net_device *dev = (struct net_device *) dev_instance;
struct rtl8139_private *tp = netdev_priv(dev);

/* Receive packets are processed by poll routine.
If not running start it now.
如注释所言,现在的处理都早 POLL 的子函数中,我们在这里只是向 __napi_schedule 注册我们的 poll 函数
*/
if (status & RxAckBits) {
if (napi_schedule_prep(&tp->napi)) {
RTL_W16_F(IntrMask, rtl8139_norx_intr_mask);
__napi_schedule(&tp->napi);
}
}
netdev_dbg(dev, "exiting interrupt, intr_status=%#4.4x\n",
RTL_R16(IntrStatus));
return IRQ_RETVAL(handled);
}

对于 poll 函数的实现这里就不做展开了,和传统的体系一样,直接将数据拷贝到内核态没有区别。

软中断唤醒

因此到此版本,网络请求的软中断环境代码也相对应的有所改变

net_rx_actionsource code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static void net_rx_action(struct softirq_action *h) {
struct softnet_data *sd = &__get_cpu_var(softnet_data);
unsigned long time_limit = jiffies + 2;
int budget = netdev_budget;
void *have;

while (!list_empty(&sd->poll_list)) {
struct napi_struct *n;
int work, weight;

// 获得可运行的 poll
n = list_first_entry(&sd->poll_list,
struct napi_struct, poll_list);

work = 0;
if (test_bit(NAPI_STATE_SCHED, &n->state)) {
work = n->poll(n, weight); // 执行 poll 操作
}
}
}

只将核心的逻辑展示出来,比如代码中 budget 防止饥饿的操作都留于读者自行理解了, NAPI 解决了我们依靠硬中断导致的大量的进程切换的问题,加上现在的操作系统都是基于 DMA ,因此整个过程中的成本就只有 ➊ 软中断导致的线程切换 ➋ 数据的拷贝

JUWsT.png


IP Level

我们从 软中断 归来,数据开始进入了我们熟悉的协议处理部分,从 ip_rcv 开始作为我们正式的处理逻辑。

Linux 这也使用了 ip_rt_hash_table 的一种设计方式。

ip_rcvsource code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
int ip_rcv(struct sk_buff *skb, struct net_device *dev, struct packet_type *pt,
struct net_device *orig_dev) {

struct iphdr *iph;
u32 len;

// 获得IP协议的头部数据
iph = ip_hdr(skb);

/*
* 按照 RFC1122 进行检测
*/
if (iph->ihl < 5 || iph->version != 4)
goto inhdr_error;

if (unlikely(ip_fast_csum((u8 *) iph, iph->ihl)))
goto inhdr_error;

len = ntohs(iph->tot_len);
if (skb->len < len) {
IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_INTRUNCATEDPKTS);
goto drop;
} else if (len < (iph->ihl * 4))
goto inhdr_error;

if (pskb_trim_rcsum(skb, len)) {
IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_INDISCARDS);
goto drop;
}

memset(IPCB(skb), 0, sizeof(struct inet_skb_parm));

// 上面一圈检测没啥问题进入 Nefilter 预留的 HOOK 进行处理
return NF_HOOK(NFPROTO_IPV4, NF_INET_PRE_ROUTING, skb, dev, NULL, ip_rcv_finish);

inhdr_error:
IP_INC_STATS_BH(dev_net(dev), IPSTATS_MIB_INHDRERRORS);
drop:
kfree_skb(skb);
out:
return NET_RX_DROP;
}

对于 NF_HOOK 的逻辑不在本文的探讨范围内,我们直接看最后在 NF_INET_PRE_ROUTING 阶段完成,进入了下一步的逻辑处理 ip_rcv_finish

ip_rcv_finishsource code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
static inline int ip_rcv_finish(struct sk_buff *skb) {
struct net_device *dev = skb->dev;
struct iphdr *iph = skb->nh.iph;

/*
* Initialise the virtual path cache for the packet. It describes
* how the packet travels inside Linux networking.
*/
if (skb->dst == NULL) {
// 查找 route 表,不可达就直接丢弃
if (ip_route_input(skb, iph->daddr, iph->saddr, iph->tos, dev))
goto drop;
}


// header 长度超过 5,说明还有其他需要处理的
if (iph->ihl > 5) {
// 略,暂时不关心
}

return skb->dst->input(skb);

drop:
kfree_skb(skb);
return NET_RX_DROP;
}

我们只能从中得知我们下一步最重要的就是去处理 route 关系。

[选读] IP Route

Linux 在这里进行了大量的 Cache 操作,我们滤过那些 Cache 的部分,读者如果愿意可以自行阅读代码。

route.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
static int ip_route_input_slow(struct sk_buff *skb, __be32 daddr, __be32 saddr,
u8 tos, struct net_device *dev) {
struct fib_result res;
struct in_device *in_dev = __in_dev_get_rcu(dev);
struct flowi4 fl4;
unsigned flags = 0;
u32 itag = 0;
struct rtable *rth;
unsigned hash;
__be32 spec_dst;
int err = -EINVAL;
struct net *net = dev_net(dev);

/* IP on this device is disabled. */

if (!in_dev)
goto out;

// 多播部分的逻辑跳转
if (ipv4_is_multicast(saddr) || ipv4_is_lbcast(saddr) ||
ipv4_is_loopback(saddr))
goto martian_source;

// 广播逻辑的跳转,下略
if (ipv4_is_lbcast(daddr) || (saddr == 0 && daddr == 0))
goto brd_input;

if (ipv4_is_zeronet(saddr))
goto martian_source;

if (ipv4_is_zeronet(daddr) || ipv4_is_loopback(daddr))
goto martian_destination;

/*
* 我们开始处理IP协议逻辑的部分
*/
fl4.flowi4_oif = 0;
fl4.flowi4_iif = dev->ifindex;
fl4.flowi4_mark = skb->mark;
fl4.flowi4_tos = tos;
fl4.flowi4_scope = RT_SCOPE_UNIVERSE;
fl4.daddr = daddr;
fl4.saddr = saddr;
err = fib_lookup(net, &fl4, &res);
if (err != 0) {
// 如果设备不允许转发,host不可达错误
if (!IN_DEV_FORWARD(in_dev))
goto e_hostunreach;
goto no_route;
}

RT_CACHE_STAT_INC(in_slow_tot);

if (res.type == RTN_BROADCAST)
goto brd_input;

// Local 的处理
if (res.type == RTN_LOCAL) {
// ...
}

if (!IN_DEV_FORWARD(in_dev))
goto e_hostunreach;
if (res.type != RTN_UNICAST)
goto martian_destination;
// ip_mkroute_input 使我们的核心逻辑处
err = ip_mkroute_input(skb, &res, &fl4, in_dev, daddr, saddr, tos);
out:
return err;
}

对于路由算法部分,请参考 Linux内核分析 - 网络[四]:路由表 这里不做展开了,最终我们将我们的 skb->dst->input 函数指向了 int ip_local_deliver(struct sk_buff *skb) 函数,因为这个数据包是需要我们本地进行处理,而不是进行转发等操作。

IP Deliver

ip_local_deliversource code
1
2
3
4
5
6
7
8
9
10
11
int ip_local_deliver(struct sk_buff *skb)
{
// 这里要重新排下 IP 的包顺序
if (ip_hdr(skb)->frag_off & htons(IP_MF | IP_OFFSET)) {
if (ip_defrag(skb, IP_DEFRAG_LOCAL_DELIVER))
return 0;
}

// 这里又是熟悉的 Netfilter 的埋点,NF_INET_LOCAL_IN 阶段的回调事件
return NF_HOOK(NFPROTO_IPV4, NF_INET_LOCAL_IN, skb, skb->dev, NULL, ip_local_deliver_finish);
}

最终在 ip_local_deliver_finish 函数中

ip_local_deliver_finishsource code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
static int ip_local_deliver_finish(struct sk_buff *skb) {
struct net *net = dev_net(skb->dev);
// 跳过 ip 协议的 Header 部分
__skb_pull(skb, ip_hdrlen(skb));
int protocol = ip_hdr(skb)->protocol;
if (ipprot != NULL) {
int ret;
// 针对IP Protocol 的协议进行处理
ret = ipprot->handler(skb);
if (ret < 0) {
protocol = -ret;
goto resubmit;
}
}
}

因为 IP 层之上,还有除了 TCP 之外其他的协议,因此这里需要针对不同的 protocol 调用相对应的 函数,这里其实就是我们的 int tcp_v4_rcv(struct sk_buff *skb),再往下走就是我们熟悉的 TCP 层。


TCP Level

对于我们来说,网络编程大部分的时候都在和 Socket 这个抽象进行斗争,TCP 层开始,我们就将 IP包组成 TCP 报文发送给 Socket了。对于 socket 有两种抽象:

  • struct socket 接近用户态的抽象,为用户实现 BSD Socket 抽象
  • struct sock 内核中表示的网络连接抽象。

Sock 定义

对于 Sock 有着一个很巨大复杂的定义,我们尽量剥离出那些比较重要的。

struct_sock
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
struct sock {
struct sock_common __sk_common;
socket_lock_t sk_lock;
// 接受数据的队列
struct sk_buff_head sk_receive_queue;
// 发送数据队列
struct sk_buff_head sk_write_queue;
// 计时器
struct timer_list sk_timer;
// 对应的用户侧 socket
struct socket *sk_socket;
// 此 socket 的上下文数据
void *sk_user_data;

// 回调函数
void (*sk_state_change)(struct sock *sk);
void (*sk_data_ready)(struct sock *sk);
void (*sk_write_space)(struct sock *sk);
void (*sk_error_report)(struct sock *sk);
int (*sk_backlog_rcv)(struct sock *sk, struct sk_buff *skb);
void (*sk_destruct)(struct sock *sk);
};

从最简单的设计定义上,我们可以发现,对于 sock,我们知道我们有一个 Recv 的队列,和一个 Send 的队列,现在我们的大部分的操作都会在这两个队列进行操作。

Sock 读取数据

sock.c 中写的非常的清晰

sock_queue_rcv_skb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// sk 是待接收数据的 sock, skb 是内核接收到的数据
int sock_queue_rcv_skb(struct sock *sk, struct sk_buff *skb) {
int err;
unsigned long flags;
struct sk_buff_head *list = &sk->sk_receive_queue;

// 数据积压超过 rcvbuf 限制,返回异常
if (atomic_read(&sk->sk_rmem_alloc) >= sk->sk_rcvbuf) {
atomic_inc(&sk->sk_drops);
trace_sock_rcvqueue_full(sk, skb);
return -ENOMEM;
}

// 运行 Netfilter 逻辑
err = sk_filter(sk, skb);
if (err)
return err;


// 将 skb 的归属者设置下
skb->dev = NULL;
skb_set_owner_r(skb, sk);

// 将数据置于 receive_queue 的尾部
__skb_queue_tail(list, skb);

// 如何 Socket 不处于 READ 状态,就触发下 sk_data_ready 回调函数
if (!sock_flag(sk, SOCK_DEAD))
sk->sk_data_ready(sk);
return 0;
}

Sock 写入数据类似,这里不做展开。我们先暂且不考虑被我忽略的 Lock 的部分的逻辑。其实代码的逻辑非常的好理解,我们将受到的 bytes 数据置于队列的尾部,通知下数据就绪就可以了。

谁负责写入队列

TCP/IP 模式

tcp_v4_rcvsource code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
int tcp_v4_rcv(struct sk_buff *skb) {
// 获得设备
struct net *net = dev_net(skb->dev);
int sdif = inet_sdif(skb);
const struct iphdr *iph;
const struct tcphdr *th;
bool refcounted;
struct sock *sk;
int ret;

// 获得 tcp header 部分
th = (const struct tcphdr *) skb->data;

// 获得 ip header 部分
iph = ip_hdr(skb);

lookup: // 查表获得 对应的 sock, 这里会根据 source ip / dist ip / source port / dis port 获得有个唯一的 sock
sk = __inet_lookup_skb(&tcp_hashinfo, skb, __tcp_hdrlen(th), th->source,
th->dest, sdif, &refcounted);

// 没有直接跳转到异常处理
if (!sk)
goto no_tcp_socket;

process:
if (sk->sk_state == TCP_TIME_WAIT)
goto do_time_wait;

if (sk->sk_state == TCP_NEW_SYN_RECV) {
// TCP_NEW_SYN_RECV 处理逻辑
}

// Netfilter 失败直接 discard
if (tcp_filter(sk, skb))
goto discard_and_relse;

// 获得 tcp header 部分
th = (const struct tcphdr *) skb->data;
iph = ip_hdr(skb);

// 调用填充数据的回调
tcp_v4_fill_cb(skb, iph, th);

skb->dev = NULL;

// 在 TCP LISTEN 状态下直接进入 tcp_v4_do_rcv
if (sk->sk_state == TCP_LISTEN) {
ret = tcp_v4_do_rcv(sk, skb);
goto put_and_return;
}


bh_lock_sock_nested(sk); // 先上锁
ret = 0;
if (!sock_owned_by_user(sk)) { // 如果在此 Sock 在内核态中 接收请求数据
ret = tcp_v4_do_rcv(sk, skb);
} else if (tcp_add_backlog(sk, skb)) { // 不在内核态说明在用户侧处理,将数据放置到 backlog 中
goto discard_and_relse;
}
bh_unlock_sock(sk); // 解锁
}

对于 tcp_v4_rcv() 函数来说,针对现在 sock 所在的不同状态进行处理我们收到的数据,逻辑也非常的容易理解。

tcp_ipv4.c:tcp_v4_do_rcv
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
int tcp_v4_do_rcv(struct sock *sk, struct sk_buff *skb) {
struct sock *rsk;

// 如果请求已经建立
if (sk->sk_state == TCP_ESTABLISHED) {
tcp_rcv_established(sk, skb);
return 0;
}

// 如果是监听状态
if (sk->sk_state == TCP_LISTEN) {
struct sock *nsk = tcp_v4_cookie_check(sk, skb);

if (!nsk)
goto discard;
if (nsk != sk) {
if (tcp_child_process(sk, nsk, skb)) {
rsk = nsk;
goto reset;
}
return 0;
}
}

// 状态处理
if (tcp_rcv_state_process(sk, skb)) {
rsk = sk;
goto reset;
}
return 0;
}

从入口处我们就发现了,对于Linux 内核来说,根据不同的状态处理 Socket 就所谓的内核虚拟协议栈最重要的事情。我们来看看接受数据的的最终地,对于 TCP/IP4 模式,反而没有直接调用 sock_queue_rcv_skb 而是选择了另外一种方式,因为在此模式下,我们已经在事先检查了大量的阈值状态。

tcp_queue_rcvsource code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static int tcp_queue_rcv(struct sock *sk, struct sk_buff *skb, int hdrlen, bool *fragstolen) {
int eaten;
// 获得队列尾
struct sk_buff *tail = skb_peek_tail(&sk->sk_receive_queue);

// 滤过 Header 部分的数据
__skb_pull(skb, hdrlen);

// 尝试将数据和 tail 进行合并
eaten = (tail &&
tcp_try_coalesce(sk, tail,
skb, fragstolen)) ? 1 : 0;

// 不成功就放入 sk_receive_queue 中
if (!eaten) {
__skb_queue_tail(&sk->sk_receive_queue, skb);
skb_set_owner_r(skb, sk);
}
return eaten;
}
TCP STATE PROCESS

TCP 协议来说,本质上我们就是在维护一个有限状态的自动机,状态切换的代码在 ipv4/tcp_input.c:tcp_rcv_state_process 函数,比较的长就不做展开,处理状态,就是如下图进行各种状态的变换。
JWO8G.png

[可选] RAW 模式

Raw也就是混杂模式,这里的实现在 ipv4/raw.c

raw_v4_input
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
static int raw_v4_input(struct sk_buff *skb, const struct iphdr *iph, int hash) {

struct sock *sk;
struct hlist_head *head;
int delivered = 0;
struct net *net;

read_lock(&raw_v4_hashinfo.lock);

// 根据 Hash 获得对应的链表
head = &raw_v4_hashinfo.ht[hash];
if (hlist_empty(head))
goto out;

// 拿到这个数据从哪个网卡进来的
net = dev_net(skb->dev);

// 查表获得对应的 sock 对象, 这里直接循环了
sk = __raw_v4_lookup(net, __sk_head(head), iph->protocol,
iph->saddr, iph->daddr,
skb->dev->ifindex);

while (sk) {
delivered = 1;
// 这里处理下多播的可能,直到遍历完成就可以跳转至 out 了
if ((iph->protocol != IPPROTO_ICMP || !icmp_filter(sk, skb)) &&
ip_mc_sf_allow(sk, iph->daddr, iph->saddr,
skb->dev->ifindex)) {
struct sk_buff *clone = skb_clone(skb, GFP_ATOMIC);

/* Not releasing hash table! */
if (clone)
raw_rcv(sk, clone);
}
sk = __raw_v4_lookup(net, sk_next(sk), iph->protocol,
iph->saddr, iph->daddr,
skb->dev->ifindex);
}
out:
read_unlock(&raw_v4_hashinfo.lock);
return delivered;
}

特点是不检测 目标端口 只要是 IP 匹配的上就可以传递

Linux Networking Stack

Linux 不仅仅的单纯的提供实现了 TCP/IP 协议,在历代的版本中都尝试在 数据流 处理的过程中增加一些额外的管控能力。ip firewall,ipchains,iptanles,nftables 都是为了让用户在处理的过程中做一些额外的工作。

netfilter

netfilter 应该是最出名的项目了,从 Linux 2.4.X 版本开始作为内置的功能提供给运维管理员。针对 Netfilter 的分析比较的多,这里就不展开了,可以查阅 A Deep Dive into Iptables and Netfilter Architecture

但是仍有一些不致命但是很烦人的缺点。

  • iptables规则不支持增量配置,时间复杂度均为 O(n)
  • 基于配置化回调函数,性能不高

ebpf

ebpfLinux 社区的一个新星。为了追求性能上的提高,在内核里提供了一个虚拟机,将用户的过滤规则代码编译成字节码然后交付给虚拟机,内核根据这些指令来过滤网络数据包。

JkLe8.png

基础知识

要理解 ebpf 还需要明白 bpf_map, 其本质上是以「键/值」方式存储在内核中的数据结构,它们可以被任何知道它们的 BPF 程序访问。在内核空间的程序创建 BPF Map 并返回对应的文件描述符,在用户空间运行的程序就可以通过这个文件描述符来访问并操作 BPF Map,这就是为什么 BPF Map 在 BPF 世界中是桥梁的存在了。

JkGaw.png

Example

sockex1_kern.c[内核态逻辑]source code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
SEC("socket1")
int bpf_prog1(struct __sk_buff *skb) {
// 从 skb 获得固定改长度的数据 -> 转换成 index
int index = load_byte(skb, ETH_HLEN + offsetof(
struct iphdr, protocol));
long *value;

if (skb->pkt_type != PACKET_OUTGOING)
return 0;
// 从 map 中获得储存的数据
value = bpf_map_lookup_elem(&my_map, &index);

// 向 skb 中增加我们储存在 map 中的数据
if (value)
__sync_fetch_and_add(value, skb->len);

return 0;
}

char _license[] SEC("license") = "GPL";
sockex1_user.c[用户态逻辑]source code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
int main(int ac, char **argv) {
struct bpf_object *obj;
int map_fd, prog_fd;
char filename[256];
int i, sock;
FILE *f;

snprintf(filename, sizeof(filename), "%s_kern.o", argv[0]);

// 将编译好内核态代码载入 bpf 模块
if (bpf_prog_load(filename, BPF_PROG_TYPE_SOCKET_FILTER, &obj, &prog_fd))
return 1;

map_fd = bpf_object__find_map_fd_by_name(obj, "my_map");

// 打开一个 lo 网卡的 Raw Socket
sock = open_raw_sock("lo");

// 为 Socket 设置 SO_ATTACH_BPF 标记,功能是 prog_fd,也就是我们上面刚刚载入的
assert(setsockopt(sock, SOL_SOCKET, SO_ATTACH_BPF, &prog_fd, sizeof(prog_fd)) == 0);

// 执行 PING 操作
f = popen("ping -4 -c5 localhost", "r");
(void) f;

for (i = 0; i < 5; i++) {
long long tcp_cnt, udp_cnt, icmp_cnt;
int key;

key = IPPROTO_TCP;
assert(bpf_map_lookup_elem(map_fd, &key, &tcp_cnt) == 0);

key = IPPROTO_UDP;
assert(bpf_map_lookup_elem(map_fd, &key, &udp_cnt) == 0);

key = IPPROTO_ICMP;
assert(bpf_map_lookup_elem(map_fd, &key, &icmp_cnt) == 0);

printf("TCP %lld UDP %lld ICMP %lld bytes\n",
tcp_cnt, udp_cnt, icmp_cnt);
sleep(1);
}

return 0;
}

内核 Hook

从例子中我们也可以很清晰的发现,核心点在于 setsockopt(sock, SOL_SOCKET, SO_ATTACH_BPF, &prog_fd, sizeof(prog_fd) 我们在内核中一顿操作之后就可以发现在

sock_setsockoptsource code
1
2
3
4
5
6
7
8
9
10
11
12
case SO_ATTACH_BPF:
ret = -EINVAL;
if (optlen == sizeof(u32)) {
u32 ufd;

ret = -EFAULT;
if (copy_from_user(&ufd, optval, sizeof(ufd)))
break;

ret = sk_attach_bpf(ufd, sk);
}
break;

这行代码也非常的简单,直接构建了一个全新的 sk_filter,然后给此 sock 设置即可。

sk_filtersource code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
static int __sk_attach_prog(struct bpf_prog *prog, struct sock *sk) {
struct sk_filter *fp, *old_fp;
// 开辟内核空间
fp = kmalloc(sizeof(*fp), GFP_KERNEL);
if (!fp)
return -ENOMEM;

// 设置 sk_filter 程序就是我们用户编写的
fp->prog = prog;

// rcu 设置 fp 对象
old_fp = rcu_dereference_protected(sk->sk_filter,
lockdep_sock_is_held(sk));
rcu_assign_pointer(sk->sk_filter, fp);

return 0;
}

而调用的地方也非常的清晰,比如说在接受数据的地方。

sock_queue_rcv_skblink
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
int sock_queue_rcv_skb(struct sock *sk, struct sk_buff *skb) {
int err;

err = sk_filter(sk, skb);
if (err)
return err;

return __sock_queue_rcv_skb(sk, skb);
}

// 真实调用处
int sk_filter_trim_cap(struct sock *sk, struct sk_buff *skb, unsigned int cap) {

// 上读锁
rcu_read_lock();
// 获得 filter
filter = rcu_dereference(sk->sk_filter);
if (filter) {
struct sock *save_sk = skb->sk;
unsigned int pkt_len;

skb->sk = sk;
// 执行回调
pkt_len = bpf_prog_run_save_cb(filter->prog, skb);
skb->sk = save_sk;
err = pkt_len ? pskb_trim(skb, max(cap, pkt_len)) : -EPERM;
}
rcu_read_unlock();

return err;
}

小结

对于 ebpf 不算是什么新的技术 The BSD Packet Filter: A New Architecture for User-level Packet Capture 发表在 1992 年,其核心还是基于 编译器 优化成特定平台的代码以提高效率,并且因为配置项都是 case by case 的模式,因为也不存在维护一个巨大的 tables 的困扰,最终再通过 mmap 映射的方式,将数据在内核和用户侧进行共享。

ebpf 代表了内核向用户开放能力的趋势。现在能使用的点也并不少。
JFZpO.png

可以畅想下,未来可能会出现如下的功能架构

JkrhS.png

我们可以直接通过可编程的模式,从 云端 直接推送可插拔在 内核 中的代码逻辑。

用户空间

到上面为止,我们已经走完了所有内核部分的网络代码,下面的逻辑就是用户态的部分了。

BIO 模型

这个模型大家都熟悉,我们通过 Read() 来读取数据

Example

bio
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
int main(int argc, char *argv[]) {
sockfd = socket(AF_INET, SOCK_STREAM, 0);

if (bind(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr)) < 0) {
perror("ERROR on binding");
exit(1);
}


listen(sockfd, 5);
clilen = sizeof(cli_addr);

// accept 一个新请求
newsockfd = accept(sockfd, (struct sockaddr *) &cli_addr, &clilen);

if (newsockfd < 0) {
perror("ERROR on accept");
exit(1);
}

bzero(buffer, 256);

// 最多读取 255 个字节的数据
n = read(newsockfd, buffer, 255);

//出错返回
if (n < 0) {
perror("ERROR reading from socket");
exit(1);
}

return 0;
}

Read

读取消息的话,内部API经历的多个版本的变更,因为为了后面讲解 epoll,我们看比较新的 4.19 版本

do_sys_recvmmsgsource code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// wrap 下 __sys_recvmmsg 函数
static int do_sys_recvmmsg(int fd, struct mmsghdr __user *mmsg,
unsigned int vlen, unsigned int flags,
struct timespec __user *timeout)
{
int datagrams;
struct timespec timeout_sys;

if (flags & MSG_CMSG_COMPAT)
return -EINVAL;

if (!timeout)
return __sys_recvmmsg(fd, mmsg, vlen, flags, NULL);

datagrams = __sys_recvmmsg(fd, mmsg, vlen, flags, &timeout_sys);

// 这里需要注意的就是大家常说的 零拷贝,这里我们其实需要将内核的数据 COPY 至用户态
if (datagrams > 0 &&
copy_to_user(timeout, &timeout_sys, sizeof(timeout_sys)))
datagrams = -EFAULT;

return datagrams;
}

int __sys_recvmmsg(int fd, struct mmsghdr __user *mmsg, unsigned int vlen,
unsigned int flags, struct timespec *timeout)
{
//找到 FD 对应的 sock 对象
sock = sockfd_lookup_light(fd, &err, &fput_needed);
if (!sock)
return err;

entry = mmsg;
compat_entry = (struct compat_mmsghdr __user *)mmsg;

// ➊ loop 读取数据
while (datagrams < vlen) {
err = ___sys_recvmsg(sock, (struct user_msghdr __user *)entry, &msg_sys, flags & ~MSG_WAITFORONE, datagrams);
if (err < 0)
break;
err = put_user(err, &entry->msg_len);
++entry;

if (err)
break;
++datagrams;

/* 如果是 MSG_WAITFORONE 就设置 MSG_DONTWAIT */
if (flags & MSG_WAITFORONE)
flags |= MSG_DONTWAIT;

/* 无需等待的情况下就直接 Ret 了 */
if (msg_sys.msg_flags & MSG_OOB)
break;

// 如果数据没读够就 yield 掉
cond_resched();
}

if (err == 0)
goto out_put;

out_put:
fput_light(sock->file, fput_needed);

return datagrams;
}

因此我们在配置 FD 的时候,如果是非阻塞的,我们哪怕获取不到数据也会立即返回,在最初的 Blocking IO 模式下,我们需要等待数据的获取,才能返回,因此我们会在 ➊ 处进行 LOOP,我们在 cond_resched 主动将自己释放出来,然后等待下次的调用。

我们再往下走一层,Read 函数的下一层则是 tcp.c:tcp_recvmsg()

tcp_recvmsgsource code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
do {
u32 offset;

// 从 sk_receive_queue 获得想要的数据
last = skb_peek_tail(&sk->sk_receive_queue);
skb_queue_walk(&sk->sk_receive_queue, skb)
{
last = skb;

offset = *seq - TCP_SKB_CB(skb)->seq;
if (offset < skb->len)
goto found_ok_skb;
if (TCP_SKB_CB(skb)->tcp_flags & TCPHDR_FIN)
goto found_fin_ok;
}
// 找到了 skb
found_ok_skb:
/* 足够需要么? */
used = skb->len - offset;
if (len < used)
used = len;

// 拷贝数据到 MSG 也就是用户态中
if (!(flags & MSG_TRUNC)) {
err = skb_copy_datagram_msg(skb, offset, msg, used);
}

*seq += used;
copied += used;
len -= used;

/* 处理 FIN. */
found_fin_ok:
++*seq;
if (!(flags & MSG_PEEK))
sk_eat_skb(sk, skb);
break;
} while (len > 0);

直到这里我们已经把握到了数据读取的命门,真实调用的就是 skb_copy_datagram_msg() 函数,而他所读取的恰好就是我们的 socksk_receive_queue,因此到此为止,我们已经将用户态至网卡的所有流程全面打通。

BIO的成本

Blocking 模式下,读取数据需要两方面的成本:模态切换线程等待

模态切换

用户态 切换至 内核态 我们使用的 Syscall 模式,因此对于这一步的切换本身也是有成本的,我们需要切换堆栈,切换 cs/ip 寄存器,这一步也不是免费的,Protection ring

In most existing systems, switching from user mode to kernel mode has an associated high cost in performance. It has been measured, on the basic request getpid, to cost 1000–1500 cycles on most machines. Of these just around 100 are for the actual switch (70 from user to kernel space, and 40 back), the rest is “kernel overhead”. In the L3 microkernel, the minimization of this overhead reduced the overall cost to around 150 cycles.

线程等待

JuepH.png

因为对于如果我们 read 不到数据,也就是在等待的过程中,我们就不得不去面对 cond_resched() 我们将自己执行的CPU时间让出来,然后等待下次的调用,这个数量和我们的 socket 数量成正比,尤其是我们传统的每一个 socket 配给一个 thread 进行处理的时候,我们要付出大量的线程切换时间。

NIO 模型

为了解决 线程等待 的问题,Linux 提供了新的编程模式 epoll

Example

epollsource code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
while (1) {
int n, i;

n = epoll_wait(efd, events, MAXEVENTS, -1);
for (
i = 0;
i < n;
i++) {
if ((events[i].events & EPOLLERR) ||
(events[i].events & EPOLLHUP) ||
(!(events[i].events & EPOLLIN))) {
fprintf(stderr,
"epoll error\n");
close(events[i]
.data.fd);
continue;
} else if (sfd == events[i].data.fd) {
while (1) {
struct sockaddr in_addr;
socklen_t in_len;
int infd;
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];

in_len = sizeof in_addr;
infd = accept(sfd, &in_addr, &in_len);
if (infd == -1) {
if ((errno == EAGAIN) ||
(errno == EWOULDBLOCK)) {
break;
} else {
perror("accept");
break;
}
}

s = getnameinfo(&in_addr, in_len,
hbuf, sizeof hbuf,
sbuf, sizeof sbuf,
NI_NUMERICHOST | NI_NUMERICSERV);
if (s == 0) {
printf("Accepted connection on descriptor %d "
"(host=%s, port=%s)\n", infd, hbuf, sbuf);
}

s = make_socket_non_blocking(infd);
if (s == -1)

abort();

event.data.
fd = infd;
event.
events = EPOLLIN | EPOLLET;
s = epoll_ctl(efd, EPOLL_CTL_ADD, infd, &event);
if (s == -1) {
perror("epoll_ctl");

abort();

}
}
continue;
}
}
}

Read 数据

对于 epoll 来说,没有数据就直接返回了,这块就没有什么特别,在上面的分析我们也可以看到。最重要的是如何维护 events 列表的

Event

epoll_evnetsource code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
struct eventpoll {
/* 红黑树的根节点,这棵树中存储着所有添加到epoll中的事件,也就是这个epoll监控的事件*/
struct rb_root rbr;
  
/* 双向链表 rdllist (rd -> Ready) 保存着将要通过epoll_wait返回给用户的、满足条件的事件*/
struct list_head rdllist;

/* 阻塞在 sys_epoll_wait 的队列 */
wait_queue_head_t wq;

/* 阻塞在 file->poll() 的队列 */
wait_queue_head_t poll_wait;

};

// 每一个在 RBR 元素
typedef union epoll_data {
void *ptr; /* 指向用户自定义数据 */
int fd; /* 注册的文件描述符 */
uint32_t u32; /* 32-bit integer */
uint64_t u64; /* 64-bit integer */
} epoll_data_t;

执行epoll_ctl()时,如果增加socket句柄,则检查在红黑树中是否存在,存在立即返回,不存在则添加到树干上,然后向内核注册回调函数,用于当中断事件来临时向准备就绪链表中插入数据。因此我们在一个 evnetpoll 对象上可以维护大量的 socket 对象,但是这样还不能足够我们完成事件通知,真正的奥秘还在 rdllist 就绪列表中。

注册文件

epoll 的就绪列表是基于事件驱动的,这样的就避免了我们一直不断的 轮训 数据导致的 CPU 浪费。因此在我们对 eventpoll 里面增加一个 fd的时候代码如下:

ep_insertsource code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
static int ep_insert(struct eventpoll *ep, const struct epoll_event *event,
struct file *tfile, int fd, int full_check) {

/* 限制观察的数量 */
user_watches = atomic_long_read(&ep->user->epoll_watches);
if (unlikely(user_watches >= max_user_watches))
return -ENOSPC;

/* 初始化 poll_table */
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

/*
* ep_ptable_queue_proc 的原型是
* typedef void (*poll_queue_proc)(struct file *, wait_queue_head_t *, struct poll_table_struct *);
*/

/* 向这个FD注册Ready事件 */
revents = ep_item_poll(epi, &epq.pt, 1);

/* 添加到红黑树中 */
ep_rbtree_insert(ep, epi);

//.....
}


typedef struct poll_table_struct {
poll_queue_proc _qproc; /* 回调函数 */
unsigned long _key; /* 监听的事件 */
} poll_table;

ep_item_poll 对应的不同的类型的 poll 函数不同,我们看看最常用的 tcp

tcp_poll
1
2
3
4
5
6
7
8
9
10
11
12
13
__poll_t tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
__poll_t mask;
struct sock *sk = sock->sk;
const struct tcp_sock *tp = tcp_sk(sk);
int state;

// 核心的 Poll wait 调用
sock_poll_wait(file, sock, wait);

state = inet_sk_state_load(sk);
//....
}

又去调用了 sock_poll_wait,而这个函数如下

poll_wait
1
2
3
4
5
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
if (p && p->_qproc && wait_address)
p->_qproc(filp, wait_address, p);
}

_qproc 的定义如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
poll_table *pt) {
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;

// 注册 ep_poll_callback 为 epi,将其加入了 wait queue 中
if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
if (epi->event.events & EPOLLEXCLUSIVE)
add_wait_queue_exclusive(whead, &pwq->wait);
else
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
} else {
epi->nwait = -1;
}
}

帮大家锊一下,到现在到底发生了什么?

callback

ep_poll_callbacksource code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key) {
int pwake = 0;
unsigned long flags;
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi->ep;

// 执行 poll 函数,获得发生的事件
__poll_t pollflags = key_to_poll(key);
int ewake = 0;

/* 比对是否是需要处理的事件类型 */
if (pollflags && !(pollflags & epi->event.events))
goto out_unlock;

/* 如果我们不在rdlink里面的话,我们就把自己加入进去 */
if (!ep_is_linked(epi)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake_rcu(epi);
}

/* 如果有人在等待返回 */
if (waitqueue_active(&ep->wq)) {
// ... 一些判断逻辑
wake_up_locked(&ep->wq); // 唤醒epoll 中的 wq 队列
}

//....
}

什么时候会 Invoke 这个 callback,还记得们这个 callback 实际上是在 sock->wq->wait 中,而在实际的逻辑中 sock->wq->wait 的唤醒是被我们在更早之前所看到的 sock 定义的那些回调函数中

sock_callbacksource code
1
2
3
4
5
void			(*sk_state_change)(struct sock *sk);
void (*sk_data_ready)(struct sock *sk);
void (*sk_write_space)(struct sock *sk);
void (*sk_error_report)(struct sock *sk);
int (*sk_backlog_rcv)(struct sock *sk, struct sk_buff *skb);

随便点开一个实现,我们都会看到熟悉的 wq

tipc_data_readysource code
1
2
3
4
5
6
7
static void tipc_data_ready(struct sock *sk){
struct socket_wq *wq;
wq = rcu_dereference(sk->sk_wq);
if (skwq_has_sleeper(wq))
// 唤醒 wq->wait 队列
wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN | EPOLLRDNORM | EPOLLRDBAND);
}

ep_poll()

ep_poll 函数比较的简单,

ep_pollsource code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
fetch_events:

// 如果没有就绪事件
if (!ep_events_available(ep)) {

/*
* 没有任何可用的事件的情况下,在这里 Sleep 掉,等别人 Wake up,
* 在 ep_poll_callback 触发的时候会从这里继续执行
*/
init_waitqueue_entry(&wait, current);
__add_wait_queue_exclusive(&ep->wq, &wait);

for (;;) {
/*
* 该函数将当前任务设置为TASK_INTERRUPTIBLE,解锁自旋锁并告诉调度程序重新调度,
* 而且还设置了内核计时器,以在指定的超时时间到期或接收到任何信号时重新调度当前进程。
*/
set_current_state(TASK_INTERRUPTIBLE);

if (fatal_signal_pending(current)) {
res = -EINTR;
break;
}
// 有可用的事件,或者超时了
if (ep_events_available(ep) || timed_out)
break;
if (signal_pending(current)) {
res = -EINTR;
break;
}

spin_unlock_irq(&ep->wq.lock);
if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
timed_out = 1;

spin_lock_irq(&ep->wq.lock);
}

// 唤醒之后继续把自己从等待队列里面移除
__remove_wait_queue(&ep->wq, &wait);
__set_current_state(TASK_RUNNING);
}
check_events:
/* ep_poll()检查是否有任何要报告的就绪事件。然后释放锁 */
eavail = ep_events_available(ep);

spin_unlock_irq(&ep->wq.lock);

/*
* 如果该函数没有任何事件,并且超时尚未到期(如果该函数遇到过早的唤醒,则可能发生),
* 它只是返回fetch_events 并再次等待。
*/
if (!res && eavail &&
!(res = ep_send_events(ep, events, maxevents)) && !timed_out)
goto fetch_events;

return res;
}

对于 Epoll 的分析到此结束,epoll 基于 rdlist 来维护我们就绪的事件 fd,然后使用 redblack tree 快速检索这些 fd,而我认为最核心的部分还是莫过于 callback 机制,我们可以在收到数据之后才触发,这样减少了我们 轮训 的时间。

epoll的成本

epoll 显然已经将 thread blocking 的成本降低至极限,因为显然我们只需要一个 thread 就可以完成就绪事件的扫描和处理,但是依然有几个成本无法回避 ➊ 模态切换,用户态和内核态转换 ➋ 数据拷贝,因为 Read 依然是需要拷贝数据从内核态到用户态。

Zero Copy

传统读写

对于 Linux 的读写操作,想象一下从文件中读取一些数据然后写到 socket

read/write
1
2
read(file, tmp_buf, len);
write(socket, tmp_buf, len);

JHDOi.png

读取数据远比想象中的复杂:

  1. read 系统调用导致从用户模式到内核模式[上下文切换]。第一次复制是由DMA执行的,它从磁盘中读取内容并将其存储到内核地址空间缓冲区中。
  2. 将数据从内核缓冲区复制到用户缓冲区,然后 read 系统调用返回。调用的返回导致上下文从内核模式切换回用户模式。现在数据被存储在用户地址空间缓冲区中,它可以再次开始向下运行。
  3. write系统调用导致从用户模式到内核模式的上下文切换。执行第三次复制,再次将数据放入内核地址空间缓冲区。但是,这一次数据被放到了一个不同的缓冲区中,这个缓冲区专门与套接字相关联。
  4. write系统调用返回,创建第四个上下文切换。

很明显的,我们从用户态内核态需要拷贝一次数据,显然这次如果有一些机制可以节约下来这次开销,会更加合理一点。

mmap 机制

mmap 提供一种对于用户态和内核态的贡献数据的方式。

mmap
1
2
tmp_buf = mmap(file, len);
write(socket, tmp_buf, len);

JJLdr.png

  1. mmap系统调用导致DMA将文件内容复制到内核缓冲区中。然后与用户进程共享缓冲区,而不需要在内核和用户内存空间之间执行任何复制。
  2. write系统调用导致内核将数据从内核缓冲区复制到与套接字相关联的内核缓冲区中。
  3. DMA将数据从内核套接字缓冲区传递到协议引擎时发生第三次复制。

通过使用 mmap 而非 read,我们减少了内核必须复制的数据量的一半。当传输大量数据时,这会产生相当好的结果。

基于类似于 mmap 的机制, Linux 自从 4.14版本对 Socket 增加了 MSG_ZEROCOPY 选项,只需要执行如下代码:

设置标记位
1
2
if (setsockopt(fd, SOL_SOCKET, SO_ZEROCOPY, &one, sizeof(one)))
error(1, errno, "setsockopt zerocopy");

在读取和发送的时候使用,详细可以参考 内核文档: msg_zerocopy

零拷贝读写操作
1
2
3
ret = recvmsg(fd, &msg, MSG_ERRQUEUE);

ret = send(fd, buf, sizeof(buf), MSG_ZEROCOPY);
How it works

linux 内核使用 vm_area_struct 结构来表示一个独立的虚拟内存区域,由于每个不同质的虚拟内存区域功能和内部机制都不同,因此一个进程使用多个vm_area_struct结构来分别表示不同类型的虚拟内存区域。各个 vm_area_struct 结构使用链表或者树形结构链接,方便进程快速访问,如下图所示:

JSWqf.png

因此从使用者角度看

  1. 进程在用户空间调用库函数mmap
  2. 在当前进程的虚拟地址空间中,寻找一段空闲的满足要求的连续的虚拟地址
  3. 为此虚拟区分配一个vm_area_struct结构,接着对这个结构的各个域进行了初始化
  4. 将新建的虚拟区结构(vm_area_struct)插入进程的虚拟地址区域链表或树中
  5. 进程的读或写操作访问虚拟地址空间,触发 缺页异常,内核去开辟新的物理页

sendfile 机制

sendfile
1
sendfile(socket,file,len);

JJPr5.png

  1. sendfile 系统调用使 DMA 将文件内容复制到内核缓冲区中。
  2. 没有数据复制到套接字缓冲区中。将具有有关数据的地址和长度信息附加到套接字缓冲区。

这个可以说是真正的 Zero COPY 技术了。不过缺点也很明显,只能用在 FD 之上,对于大部分时候我们需要处理的是 Byte[] 对于内存的数据,这样的函数接口显然不是那么好用。


除此之外:Direct I/O Asynchronous direct I/O 都是优化 IO 的操作,对于网络请求一般不使用这些模式,大家可以自行阅读。

不过 Zero Copy 不一定有大家想象中的有效,在 Zero-copy networking 有详细的描述。

As was noted in the introduction, the benefits from zero-copy operation are often less than one might hope. Copying is expensive, but the setup required to avoid a copy operation also has its costs. In this case, the author claims that a simple benchmark (netperf blasting out data) runs 39% faster, while a more realistic production workload sees a 5-8% improvement. So the benefit for real-world systems is not huge, but it may well be enough to be worth going for on highly-loaded systems that transmit a lot of data.

不过从提交 PR 的作者的测试结果看还是不错的。

传输 10G 的流量,前三列是 netperf 进程的耗费的时钟周期,后三列是 系统级 的时钟周期。 std 是标准的读写, zc 代表零拷贝。

1
2
3
4
5
6
7
        --process cycles--      ----cpu cycles----
std zc % std zc %
4K 27,609 11,217 41 49,217 39,175 79
16K 21,370 3,823 18 43,540 29,213 67
64K 20,557 2,312 11 42,189 26,910 64
256K 21,110 2,134 10 43,006 27,104 63
1M 20,987 1,610 8 42,759 25,931 61

io_uring

我们使用 zero copy 技术将我们数据拷贝减少至极致,但是我们依然涉及到 模块切换,在 内核态用户态 之间切换也不是免费的,因此制约我们这个特性又应该怎么解决呢?在 Kernel 解决这个之前,社区提出了一个很出名的解决之道 dpdk 提出了 UIO 的概念。

JtdzO.png

因此 Linux 5.1 参考了 DPDK 的实现,提出了 io_uring 的方案,白皮书

Example

io_uringsource code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#define MAX_CONNECTIONS     4096
#define BACKLOG 512
#define MAX_MESSAGE_LEN 2048
#define BUFFERS_COUNT MAX_CONNECTIONS

void add_accept(struct io_uring *ring, int fd, struct sockaddr *client_addr, socklen_t *client_len, unsigned flags);
void add_socket_read(struct io_uring *ring, int fd, unsigned gid, size_t size, unsigned flags);
void add_socket_write(struct io_uring *ring, int fd, __u16 bid, size_t size, unsigned flags);
void add_provide_buf(struct io_uring *ring, __u16 bid, unsigned gid);

enum {
ACCEPT,
READ,
WRITE,
PROV_BUF,
};

typedef struct conn_info {
__u32 fd;
__u16 type;
__u16 bid;
} conn_info;

char bufs[BUFFERS_COUNT][MAX_MESSAGE_LEN] = {0};
int group_id = 1337;

int main(int argc, char *argv[]) {
int portno = strtol(argv[1], NULL, 10);
struct sockaddr_in serv_addr, client_addr;
socklen_t client_len = sizeof(client_addr);

// 初始 listen socket
int sock_listen_fd = socket(AF_INET, SOCK_STREAM, 0);
const int val = 1;
setsockopt(sock_listen_fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));

// 绑定
if (bind(sock_listen_fd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
perror("Error binding socket...\n");
exit(1);
}

// 监听
if (listen(sock_listen_fd, BACKLOG) < 0) {
perror("Error listening on socket...\n");
exit(1);
}
printf("io_uring echo server listening for connections on port: %d\n", portno);

// 初始化 io_uring
struct io_uring_params params;
struct io_uring ring;
memset(&params, 0, sizeof(params));

if (io_uring_queue_init_params(2048, &ring, &params) < 0) {
perror("io_uring_init_failed...\n");
exit(1);
}

// 初始化 SQE 和 CQE
struct io_uring_sqe *sqe;
struct io_uring_cqe *cqe;

// 增加 accept SQE 监视新连接
add_accept(&ring, sock_listen_fd, (struct sockaddr *)&client_addr, &client_len, 0);

// 事件循环
while (1) {
// 等待事件的发生
io_uring_submit_and_wait(&ring, 1);
struct io_uring_cqe *cqe;
unsigned head;
unsigned count = 0;

// 遍历 CQE
io_uring_for_each_cqe(&ring, head, cqe) {
++count;
struct conn_info conn_i;
memcpy(&conn_i, &cqe->user_data, sizeof(conn_i));

int type = conn_i.type;
if (type == ACCEPT) {
int sock_conn_fd = cqe->res;
// 为建立的连接增加 read 监听
if (sock_conn_fd >= 0) {
add_socket_read(&ring, sock_conn_fd, group_id, MAX_MESSAGE_LEN, IOSQE_BUFFER_SELECT);
}

// 继续监听
add_accept(&ring, sock_listen_fd, (struct sockaddr *)&client_addr, &client_len, 0);
} else if (type == READ) {
int bytes_read = cqe->res;
// 这里可以读取数据
}
}

io_uring_cq_advance(&ring, count);
}
}

void add_accept(struct io_uring *ring, int fd, struct sockaddr *client_addr, socklen_t *client_len, unsigned flags) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_accept(sqe, fd, client_addr, client_len, 0);
io_uring_sqe_set_flags(sqe, flags);

conn_info conn_i = {
.fd = fd,
.type = ACCEPT,
};
memcpy(&sqe->user_data, &conn_i, sizeof(conn_i));
}

void add_socket_read(struct io_uring *ring, int fd, unsigned gid, size_t message_size, unsigned flags) {
struct io_uring_sqe *sqe = io_uring_get_sqe(ring);
io_uring_prep_recv(sqe, fd, NULL, message_size, 0);
io_uring_sqe_set_flags(sqe, flags);
sqe->buf_group = gid;

conn_info conn_i = {
.fd = fd,
.type = READ,
};
memcpy(&sqe->user_data, &conn_i, sizeof(conn_i));
}

How it works

Linux 只增加了 3 个 syscall

1
2
3
4
no         syscall
425 io_uring_setup
426 io_uring_enter
427 io_uring_register
数据结构

并没有常见的 readwrite,因为现在的这两步可以在用户态进行了(这不废话,我们就是要解决模态切换的问题)。为了实现用户态和内核态通讯,io_uring 提供了2个队列:提交队列(submission queue)和完成队列(completion queue)

定义如下

io_uring.hsource code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
struct io_uring_params {
__u32 sq_entries;
__u32 cq_entries;
__u32 flags;
__u32 sq_thread_cpu;
__u32 sq_thread_idle;
__u32 features;
__u32 wq_fd;
__u32 resv[3];
struct io_sqring_offsets sq_off;
struct io_cqring_offsets cq_off;
};

struct io_sqring_offsets {
__u32 head; /* head offset */
__u32 tail; /* tail offset */
__u32 ring_mask;
__u32 ring_entries;
__u32 flags;
__u32 dropped;
__u32 array;
__u32 resv1;
__u64 resv2;
};

struct io_cqring_offsets {
__u32 head;
__u32 tail;
__u32 ring_mask;
__u32 ring_entries;
__u32 overflow;
__u32 cqes;
__u32 flags;
__u32 resv1;
__u64 resv2;
};

struct io_uring_cqe {
__u64 user_data; /* sqe->data submission passed back */
__s32 res; /* result code for this event */
__u32 flags;
};

JtFjD.png

SQCQSQEs 是在内核中分配的,所以用户态程序并不能直接访问。io_setup 的返回值是一个 fd,应用程序使用这个 fd 进行 mmap,和 kernel 共享一块内存。

IO submit

IO 提交的做法是找到一个空闲的 SQE,根据请求设置 SQE,并将这个 SQE 的索引放到 SQ 中。SQ 是一个典型的 RingBuffer,有 head,tail 两个成员,如果 head == tail,意味着队列为空。SQE 设置完成后,需要修改 SQ 的 tail,以表示向 RingBuffer 中插入一个请求。当所有请求都加入 SQ 后,就可以使用 :

1
int io_uring_enter(unsigned int fd, u32 to_submit, u32 min_complete, u32 flags);

不过这样还是会涉及到 模态切换,因此还提供了另外一个模式。

offload 模式

调用 io_uring_setup 时设置了 IORING_SETUP_SQPOLL 的 flag,内核会额外启动一个内核线程,我们称作 SQ 线程。

Jtm5p.png

IO Completion

IO 完成时,内核负责将完成 IOSQEs 中的 index 放到 CQ 中。由于 IO 在提交的时候可以顺便返回完成的 IO,所以收割 IO 不需要额外系统调用。这是跟 libaio 比较大的不同,省去了一次系统调用。

框架层

OS 上提供的 epoll io_uring 的情况下,在应用层如何高效的使用也是一个相对复杂的问题。不过在这层上来说,我们更需要的是配合 编程语言 的并发模式提供一种高效的编程框架。

Thread 编程模型:Netty & Reactor

epoll 也不算是一个新东西,业界也整理出一套比较高效的 框架模型,常用的高性能 IO 模型 Reactor

Jtv0T.png

Netty 抽象的 Acceptor 作为接受者,对应的线程是 bossGroup,从图上看好像是单个的,但是并不是 Acceptor 也可以有多个线程,Dispatch 将接收到的请求分发到 workGroup 的线程组进行繁重的读写工作。

对于读写操作,提供了 pipeline 的机制
Jt4bQ.png

Goroutines:netpoller

go 语言提供一套将 异步io 转化为 同步io 的机制,这样对于开发者来说会更加的轻松,得益于 goroutine 并不会带来不可承担的成本。

go 会初始化一个全局的 netpoll,在 linux 的实现是创建一个 epfd

netpollinitsource code
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var (
epfd int32 = -1
netpollBreakRd, netpollBreakWr uintptr
)

func netpollinit() {
epfd = epollcreate1(_EPOLL_CLOEXEC)
r, w, _ := nonblockingPipe()
ev := epollevent{
events: _EPOLLIN,
}
*(**uintptr)(unsafe.Pointer(&ev.data)) = &netpollBreakRd
epollctl(epfd, _EPOLL_CTL_ADD, r, &ev)
netpollBreakRd = uintptr(r)
netpollBreakWr = uintptr(w)
}

当我们打开一个 file 的时候,就会向 epoll 注册事件

netpollopensource code
1
2
3
4
5
6
func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
*(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev) // 注册关注事件
}

当我们在文件描述符上执行读写操作时,如果文件描述符不可读或者不可写,当前 Goroutine 就会执行 runtime.poll_runtime_pollWait 检查 runtime.pollDesc 的状态并调用 runtime.netpollblock 等待文件描述符的可读或者可写

netpollblocksource code
1
2
3
4
5
6
7
8
9
10
11
12
13
func netpollblock(pd *pollDesc, mode int32, waitio bool) bool {
gpp := &pd.rg
if mode == 'w' {
gpp = &pd.wg
}
...
if waitio || netpollcheckerr(pd, mode) == 0 {

// 让出执行的线程,将 Goroutine 转换到休眠状态并等待运行时的唤醒
gopark(netpollblockcommit, unsafe.Pointer(gpp), waitReasonIOWait, traceEvGoBlockNet, 5)
}
...
}

唤醒操作也很简单

netpollreadysource code
1
2
3
4
5
6
7
8
9
10
11
12
func netpollready(toRun *gList, pd *pollDesc, mode int32) {
var rg, wg *g
...
if mode == 'w' || mode == 'r'+'w' {
// 将可以唤醒的 Goroutine 重新置为可运行
wg = netpollunblock(pd, 'w', true)
}
...
if wg != nil {
toRun.push(wg)
}
}

Go 的大道至简,的确很香。

小结

大概前前后后写了半个月才将本文写完,也没有预计到可以写这么多,试图帮助大家从 硬件框架 整理出一套体系,但是为了达成高性能的 Web Server,除了我们本文提到了 OS 的一些优化,还有很多在框架层面上可以优化的地方。连接池 Fast Path算法 缓存 等等。


希望大家在网络的世界里面玩的开心 ;)

参考

来杯奶茶, 嗝~~~