内核接收分组理解

背景:

内核接收分组的方式有两种:第一种:传统方式,使用中断的方式;第二种:NAPI,使用中断和轮询结合的方式。

中断方式:

下图为一个分组到达NIC之后,该分组穿过内核到达网络层函数的路径。

此图的下半部分为中断处理,上半部分为软中断。在中断处理中,函数net_interupt是

设备驱动程序的中断处理程序,它将确认此中断是否由接收到分组引发的,如果确实如此,

则控制权移交到函数net_rx。函数net_rx也是特定于NIC,首先创建一个新的套接字缓冲区,

分组的内容接下来从NIC传输到缓冲区(进入到物理内存中),然后使用内核源码中针对

各个传输类型的库函数来分析首部的数据。函数netif_rx函数不是特定于网络驱动函数,该

函数位于net/core/dev.c,调用该函数,标志着控制由特定于网卡的代码转移到了网络层的

通用接口部分。此函数的作用在于,将接收分组放置到一个特定于CPU的等待队列上,并

退出中断上下文。内核使用softnet_data来管理进出流量,其定义入下:

2352 /*
2353  * Incoming packets are placed on per-cpu queues
2354  */
2355 struct softnet_data {
2356     struct list_head    poll_list;
2357     struct sk_buff_head process_queue;
2358
2359     /* stats */
2360     unsigned int        processed;
2361     unsigned int        time_squeeze;
2362     unsigned int        cpu_collision;
2363     unsigned int        received_rps;
2364 #ifdef CONFIG_RPS
2365     struct softnet_data *rps_ipi_list;
2366 #endif
2367 #ifdef CONFIG_NET_FLOW_LIMIT
2368     struct sd_flow_limit __rcu *flow_limit;
2369 #endif
2370     struct Qdisc        *output_queue;
2371     struct Qdisc        **output_queue_tailp;
2372     struct sk_buff      *completion_queue;
2373
2374 #ifdef CONFIG_RPS
2375     /* Elements below can be accessed between CPUs for RPS */
2376     struct call_single_data csd ____cacheline_aligned_in_smp;
2377     struct softnet_data *rps_ipi_next;
2378     unsigned int        cpu;
2379     unsigned int        input_queue_head;
2380     unsigned int        input_queue_tail;
2381 #endif
2382     unsigned int        dropped;
2383     struct sk_buff_head input_pkt_queue;   //对所有进入分组建立一个链表。
2384     struct napi_struct  backlog;
2385
2386 };

第2383行的input_pkt_queue即是CPU的等待队列。netif_rx的实现如下:

3329 static int netif_rx_internal(struct sk_buff *skb)
3330 {
3331     int ret;
3332
3333     net_timestamp_check(netdev_tstamp_prequeue, skb);
3334
3335     trace_netif_rx(skb);
3336 #ifdef CONFIG_RPS  //RPS 和 RFS 相关代码
3337     if (static_key_false(&rps_needed)) {
3338         struct rps_dev_flow voidflow, *rflow = &voidflow;
3339         int cpu;
3340
3341         preempt_disable();
3342         rcu_read_lock();
3343
3344         cpu = get_rps_cpu(skb->dev, skb, &rflow);   //选择合适的CPU id
3345         if (cpu < 0)
3346             cpu = smp_processor_id();
3347
3348         ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
3349
3350         rcu_read_unlock();
3351         preempt_enable();
3352     } else
3353 #endif
3354     {
3355         unsigned int qtail;
3356         ret = enqueue_to_backlog(skb, get_cpu(), &qtail);     //将skb入队
3357         put_cpu();
3358     }
3359     return ret;
3360 }
3361
3362 /**
3363  *  netif_rx    -   post buffer to the network code
3364  *  @skb: buffer to post
3365  *
3366  *  This function receives a packet from a device driver and queues it for
3367  *  the upper (protocol) levels to process.  It always succeeds. The buffer
3368  *  may be dropped during processing for congestion control or by the
3369  *  protocol layers.
3370  *
3371  *  return values:
3372  *  NET_RX_SUCCESS  (no congestion)
3373  *  NET_RX_DROP     (packet was dropped)
3374  *
3375  */
3376
3377 int netif_rx(struct sk_buff *skb)
3378 {
3379     trace_netif_rx_entry(skb);
3380
3381     return netif_rx_internal(skb);
3382 }

入队函数 enqueue_to_backlog的实现如下:

3286 static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
3287                   unsigned int *qtail)
3288 {
3289     struct softnet_data *sd;
3290     unsigned long flags;
3291     unsigned int qlen;
3292
3293     sd = &per_cpu(softnet_data, cpu);  //获得此cpu上的softnet_data
3294
3295     local_irq_save(flags);
3296
3297     rps_lock(sd);
3298     qlen = skb_queue_len(&sd->input_pkt_queue);
3299     if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) {
3300         if (qlen) {
3301 enqueue:
3302             __skb_queue_tail(&sd->input_pkt_queue, skb);
3303             input_queue_tail_incr_save(sd, qtail);
3304             rps_unlock(sd);
3305             local_irq_restore(flags);
3306             return NET_RX_SUCCESS;
3307         }
3308
3309         /* Schedule NAPI for backlog device
3310          * We can use non atomic operation since we own the queue lock
3311          */
3312         if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
3313             if (!rps_ipi_queued(sd))
3314                 ____napi_schedule(sd, &sd->backlog); //只有qlen是0的时候才执行到这里
3315         }
3316         goto enqueue;
3317     }
3318
3319     sd->dropped++;
3320     rps_unlock(sd);
3321
3322     local_irq_restore(flags);
3323
3324     atomic_long_inc(&skb->dev->rx_dropped);
3325     kfree_skb(skb);
3326     return NET_RX_DROP;
3327 }

NAPI

NAPI是混合了中断和轮询机制,当一个新的分组到达,而前一个分组依然在处理,这时内核

并不需要产生中断,内核会继续处理并在处理完毕后将中断开启。这样内核就利用了中断和轮询的好处,

只有有分组到达的时候,才会进行轮询。

NAPI存在两个优势

1.减少了CPU使用率,因为更少的中断。

2.处理各种设备更加公平

只有设备满足如下两个条件时,才能实现NAPI:

1.设备必须能够保留多个接收的分组

2.设备必须能够禁用用于分组接收的IRQ。而且,发送分组或其他可能通过IRQ进行的操作,都仍然

必须是启用的。

其运行概览如下:

如上所示,各个设备在进入poll list前需要禁用IRQ,而设备中的分组都处理完毕后重新开启IRQ。poll list使用

napi_struct来管理设备,其定义如下:

296 /*
297  * Structure for NAPI scheduling similar to tasklet but with weighting
298  */
299 struct napi_struct {
300     /* The poll_list must only be managed by the entity which
301      * changes the state of the NAPI_STATE_SCHED bit.  This means
302      * whoever atomically sets that bit can add this napi_struct
303      * to the per-cpu poll_list, and whoever clears that bit
304      * can remove from the list right before clearing the bit.
305      */
306     struct list_head    poll_list;    //用作链表元素
307
308     unsigned long       state;
309     int         weight;        //设备权重
310     unsigned int        gro_count;
311     int         (*poll)(struct napi_struct *, int);   //设备提供的轮询函数
312 #ifdef CONFIG_NETPOLL
313     spinlock_t      poll_lock;
314     int         poll_owner;
315 #endif
316     struct net_device   *dev;
317     struct sk_buff      *gro_list;
318     struct sk_buff      *skb;
319     struct hrtimer      timer;
320     struct list_head    dev_list;
321     struct hlist_node   napi_hash_node;
322     unsigned int        napi_id;
323 };    

变量state可以为NAPI_STATE_SCHED或NAPI_STATE_DISABLE, 前者表示设备将在

内核的下一次循环时被轮询,后者表示轮询已经结束且没有更多的分组等待处理,但

设备并没有从poll list移除。

支持NAPI的NIC需要修改中断处理程序,将此设备放置在poll list上。示例代码如下:

2215 static irqreturn_t e100_intr(int irq, void *dev_id)
2216 {
2217     struct net_device *netdev = dev_id;
2218     struct nic *nic = netdev_priv(netdev);
2219     u8 stat_ack = ioread8(&nic->csr->scb.stat_ack);
2220
2221     netif_printk(nic, intr, KERN_DEBUG, nic->netdev,
2222              "stat_ack = 0x%02X\n", stat_ack);
2223
2224     if (stat_ack == stat_ack_not_ours ||    /* Not our interrupt */
2225        stat_ack == stat_ack_not_present)    /* Hardware is ejected */
2226         return IRQ_NONE;
2227
2228     /* Ack interrupt(s) */
2229     iowrite8(stat_ack, &nic->csr->scb.stat_ack);
2230
2231     /* We hit Receive No Resource (RNR); restart RU after cleaning */
2232     if (stat_ack & stat_ack_rnr)
2233         nic->ru_running = RU_SUSPENDED;
2234
2235     if (likely(napi_schedule_prep(&nic->napi))) { //设置state为NAPI_STATE_SCHED
2236         e100_disable_irq(nic);
2237         __napi_schedule(&nic->napi); //将设备添加到 poll list,并开启软中断。
2238     }
2239
2240     return IRQ_HANDLED;
2241 }

函数__napi_schedule的定义入下:

3013 /* Called with irq disabled */
3014 static inline void ____napi_schedule(struct softnet_data *sd,
3015                      struct napi_struct *napi)
3016 {
3017     list_add_tail(&napi->poll_list, &sd->poll_list);
3018     __raise_softirq_irqoff(NET_RX_SOFTIRQ);
3019 } 

4375 /**
4376  * __napi_schedule - schedule for receive
4377  * @n: entry to schedule
4378  *
4379  * The entry‘s receive function will be scheduled to run.
4380  * Consider using __napi_schedule_irqoff() if hard irqs are masked.
4381  */
4382 void __napi_schedule(struct napi_struct *n)
4383 {
4384     unsigned long flags;
4385
4386     local_irq_save(flags);
4387     ____napi_schedule(this_cpu_ptr(&softnet_data), n);
4388     local_irq_restore(flags);
4389 }
4390 EXPORT_SYMBOL(__napi_schedule);

设备除了对中断处理进行修改,还需要提供一个poll函数,使用此函数从NIC中获取分组。示例代码如下:

特定于硬件的方法 hyper_do_poll 从NIC中获取分组,返回值work_done为处理的分组数目。当分组处理完后,

会调用netif_rx_complete将此设备从poll list中移除。ixgbe网卡的poll函数为ixgbe_poll, 而对于非NAPI的函数,

内核提供默认的处理函数process_backlog。

     

软中断处理相关

无论是NAPI接口还是非NAPI最后都是使用 net_rx_action 作为软中断处理函数。因此整个流程如下:

上图中有些函数名已经发生变更,但是流程依然如此。在最新的3.19内核代码中,非NAPI的调用流程如下:

neif_rx会调用enqueue_to_backlog 将skb存入softnet_data,并调用____napi_schedule函数。

netif_rx===>netif_rx_internal===>enqueue_to_backlog===>____napi_schedule===>net_rx_action===>process_backlog===>__netif_receive_skb

e100网卡的NAPI调用流程入下:

e100_intr===>__napi_schedule===>net_rx_action===>e100_poll===>e100_rx_clean===>e100_rx_indicate===>netif_receive_skb

时间: 2024-08-29 13:20:02

内核接收分组理解的相关文章

TCP接收/发送滑动窗口与内核接收/发送缓冲区之间的关系

在有关TCP连接的很多配置中,有很多选项有的配置 net.ipv4.tcp_rmem:这个参数定义了TCP接收缓冲(用于TCP接收滑动窗口)的最小值.默认值.最大值 net.ipv4.tcp_wmem:这个参数定义了TCP发送缓冲(用于TCP发送滑动窗口)的最小值.默认值.最大值 netdev_max_backlog:当网卡接收数据包的速度大于内核处理的速度时,会有一个队列保存这些数据包.这个参数表示队列的最大值 rmem_default:这个参数表示内核套接字接收缓存区默认的大小 wmem_d

Android内核开发:理解和掌握repo工具

由于Android源码是用repo工具来管理的,因此,搞Android内核开发,首先要搞清楚repo是什么东西,它该怎么使用?作为<Android内核开发>系列文章的第二篇,我们首先谈谈对repo工具的理解和使用. 1. repo是什么? repo是一种代码版本管理工具,它是由一系列的Python脚本组成,封装了一系列的Git命令,用来统一管理多个Git仓库. 2. 为什么要用repo? 因为Android源码引用了很多开源项目,每一个子项目都是一个Git仓库,每个Git仓库都有很多分支版本,

linux用户态和内核态切换理解

1. 用户态和内核态的概念区别 究竟什么是用户态,什么是内核态,这两个基本概念以前一直理解得不是很清楚,根本原因个人觉得是在于因为大部分时候我们在写程序时关注的重点和着眼的角度放在了实现的功能和代码的逻辑性上,先看一个例子: 1)例子 void testfork(){ if(0 = = fork()){ printf("create new process success!/n"); } printf("testfork ok/n"); } 这段代码很简单,从功能的

Linux内核分析之理解进程调度时机跟踪分析进程调度与进程切换的过程

一.原理分析 1.调度时机 背景不同类型的进程有不同的调度需求第一种分类I/O-bond:频繁的进行I/O:通常会花费很多时间等待I/O操作的完成CPU-bound:计算密集型:需要大量的CPU时间进行运算 第二种分批处理进程(batch process):不必与用户交互,通常在后台运行:不必很快响应.典型的批处理程序:编译程序.科学计算实时进程(real-time process):有实时需求,不应被低优先级的进程阻塞:响应时间要短.要稳定.典型的实时进程:视频/音频.机械控制等交互式进程(i

【Android 内核研究】理解Context

写在前面的话 非常感谢柯元旦所赠的<Android内核剖析>一书.通过对本书的学习,让我对Android内核有了更深一层次的理解.本文是<Android内核剖析>的学习笔记. Context是什么 一个Context意味着一个场景,一个场景就是用户和操作系统交互的一个过程.在广义上,这个所谓的过程应该包括前台界面和后台数据. 举个例子,比如当你打电话的时候,场景包括电话程序对应的界面以及隐藏在界面后的数据. 从程序的角度来看,一个Activity就是一个Context,一个Serv

用户态和内核态的理解和区别

CPU的两种工作状态:内核态(管态)和用户态(目态). 内核态: 1.系统中既有操作系统的程序,也有普通用户程序.为了安全性和稳定性,操作系统的程序不能随便访问,这就是内核态.即需要执行操作系统的程序就必须转换到内核态才能执行!!! 2. 内核态可以使用计算机所有的硬件资源!!! 用户态:不能直接使用系统资源,也不能改变CPU的工作状态,并且只能访问这个用户程序自己的存储空间!!!! 三种从“用户态”转换到“内核态”的最主要(触发)方式: a.系统调用(用户进程主动发起的):这是用户态进程“主动

关于内核编译的理解

内核的编译一般分为 配置 或者图形配置 ,然后再进行make 就行了,也可以将厂家提供的 config_ok  拷贝为 .config 其中.config 文件是干嘛的? 它首先配置文件,对我们最重要的是,它描述 使用了什么驱动,谁选择编译进入内核(嵌入式的资源紧张,有些驱动用户不需要) make s3c2410_defconfig    这是将2410的有关配置,执行这个会生成 .config文件 当make 时,根据.config文件生成autoconfig.h ,系统编译的时候将这个头文件

Linux Kernel - Debug Guide (Linux内核调试指南 )

http://blog.csdn.net/blizmax6/article/details/6747601 linux内核调试指南 一些前言 作者前言 知识从哪里来 为什么撰写本文档 为什么需要汇编级调试 ***第一部分:基础知识*** 总纲:内核世界的陷阱 源码阅读的陷阱 代码调试的陷阱 原理理解的陷阱 建立调试环境 发行版的选择和安装 安装交叉编译工具 bin工具集的使用 qemu的使用 initrd.img的原理与制作 x86虚拟调试环境的建立 arm虚拟调试环境的建立 arm开发板调试环

《TCP/IP详解卷2:实现》笔记--BPF:BSD 分组过滤程序

BSD分组过滤程序(BPF)是一种软件设备,用于过滤网络接口的数据流,即给网络接口加上开关.应用进程打开/dev/bpf0. /dev/bpf1等等后,可以读取BPF设备,每个应用进程一次只能打开一个BPF设备. 通过若干ioctl命令,可以配置BPF设备,把它与某个网络接口相关联,并安装过滤程序,从而能够选择性地接收输入的分组. BPF设备打开后,应用进程通过读写设备来接收分组,或将分组放入网络接口队列中. BPF设备工作的前提是网络接口必须能够支持BPF.之前提到的以太网和环回接口的驱动程序