2016-11-15
本来这是在前端驱动后期分析的,但是这部分内容比较多,且分析了后端notify前端的机制,所以还是单独拿出一节分析比较好!
还是拿网络驱动部分做案例,网络驱动部分有两个队列,(忽略控制队列):接收队列和发送队列;每个队列都对应一个virtqueue,两个队列之间是互不影响的。
前后端利用virtqueue的方式如下图所示:
这里再详细的描述下,当两个queue都需要客户机填充buffer,ReceiveQueue需要客户机 driver提前填充分配好的空buffer,然后记录到availRing,并在恰当的时机通知后端,当外部网络有数据包到达时,qemu后端就从availRing 中获取一个buffer,然后填充数据,完事后记录buffer head index到usedRing.最后在恰当的时机通知客户机(向客户机注入中断),客户机接收到信号便知道有数据包到达,这里只需要从usedRing 中获取到index,然后取data数组的第i个元素即可。因为在客户机填充buffer的时候把逻辑buffer的指针保存在data数组中。
而SendQueue同样需要客户机去填充,只不过这里是当客户机需要发送数据包时,把数据包构造成逻辑buffer,然后填充到send Queue,并在恰当的时机通知后端,qemu后端收到通知就知道那个队列有请求到达,如果当前没有处理其他数据包就着手处理这个数据包。具体就同样是从AvailRing中取出buffer head index,然后从描述符表中get到buffer,这时就需要从buffer中copy数据了,因为要把数据包从host发送出去,然后更新usedRing。最后同样要在恰当的时机通知客户机。注意这里客户机同样需要从usedRing 中get index,但是这里主要是用于delay notify,因为数据包由客户机构造,其占用的buffer并不能重复使用,只是每次有数据包就把其构造成buffer而已。
以上便是基本的使用sendqueue和receive的原理,但是还有一点上面我没有提到,就是通知的那个恰当的时机,那么这个恰当的实际究竟是什么时候呢??在virtIO中有两种方式控制前后端的notify.
1、flags字段
2、事件触发
1、在vring_avail和vring_used的flags字段,控制前后端的通信。vring_used中的flags用于通知driver端,当add一个buffer的时候不用notify后端。而vring_avail中的flags用于通知qemu端,当消费一个buffer的时候不用interrupt 客户机。
2、在virtIO中又加入了另一种机制,需要由driver和qemu自己判断是否需要通知,也就是设置一个限额,当一端添加buffer或者消费buffer的数量达到指定数目,就触发事件,从而发生notify或者interrupt。在有这种机制的情况下就忽略了前面所说的flags。
这里我们以receiveQueue为例,分析下前后端的delay notify机制。
在front driver端:
客户机driver通过NAPI接收数据时,会在可用buffer不足的时候调用函数添加,具体就是try_fill_recv:
static bool try_fill_recv(struct receive_queue *rq, gfp_t gfp) { struct virtnet_info *vi = rq->vq->vdev->priv; int err; bool oom; /*循环,每循环一次添加一个buffer,一直到填充满,即描述符表满*/ do { if (vi->mergeable_rx_bufs) err = add_recvbuf_mergeable(rq, gfp); else if (vi->big_packets) err = add_recvbuf_big(rq, gfp); else err = add_recvbuf_small(rq, gfp); oom = err == -ENOMEM; if (err) break; ++rq->num; } while (rq->vq->num_free); if (unlikely(rq->num > rq->max)) rq->max = rq->num; /*通知后端*/ virtqueue_kick(rq->vq); return !oom; }
至于添加的是哪种类型的buffer,我们这里并不关心,循环结束就调用virtqueue_kick(rq->vq)函数,此时参数是接收队列的virtqueue,
接下来就调用到了virtqueue_kick_prepare函数,该函数判断当前应不应该通知后端。先看下函数的代码:
1 bool virtqueue_kick_prepare(struct virtqueue *_vq) 2 { 3 struct vring_virtqueue *vq = to_vvq(_vq); 4 u16 new, old; 5 bool needs_kick; 6 7 START_USE(vq); 8 /* We need to expose available array entries before checking avail 9 * event. */ 10 virtio_mb(vq->weak_barriers); 11 12 old = vq->vring.avail->idx - vq->num_added; 13 new = vq->vring.avail->idx; 14 vq->num_added = 0; 15 16 #ifdef DEBUG 17 if (vq->last_add_time_valid) { 18 WARN_ON(ktime_to_ms(ktime_sub(ktime_get(), 19 vq->last_add_time)) > 100); 20 } 21 vq->last_add_time_valid = false; 22 #endif 23 24 if (vq->event) { 25 needs_kick = vring_need_event(vring_avail_event(&vq->vring), 26 new, old); 27 } else { 28 needs_kick = !(vq->vring.used->flags & VRING_USED_F_NO_NOTIFY); 29 } 30 END_USE(vq); 31 return needs_kick;
这里面涉及到几个变量,old是add_sg之前的avail.idx,而new是当前的avail.idx,还有一个是vring_avail_event(&vq->vring),看具体的实现:
1 #define vring_avail_event(vr) (*(__u16 *)&(vr)->used->ring[(vr)->num])
可以看到这里是VRingUsed中的ring数组最后一项的值,该值在后端驱动从virtqueue中pop一个elem之前设置成相应队列的下一个将要使用的index,即last_avail_index。
看下vring_need_event函数:
1 static inline int vring_need_event(__u16 event_idx, __u16 new_idx, __u16 old) 2 { 3 /* Note: Xen has similar logic for notification hold-off 4 * in include/xen/interface/io/ring.h with req_event and req_prod 5 * corresponding to event_idx + 1 and new_idx respectively. 6 * Note also that req_event and req_prod in Xen start at 1, 7 * event indexes in virtio start at 0. */ 8 return (__u16)(new_idx - event_idx - 1) < (__u16)(new_idx - old); 9 }
前后端通过对比 (__u16)(new_idx - event_idx - 1) < (__u16)(new_idx - old)来判断是否需要notify后端,这在数据量比较大的时候显得很实用。在初始状态下,即在qemu一个buffer还没有使用的情况下,event_idx必然是0,那么此时这里的判断肯定为真,所以notify后端。后端收到通知就从virtqueue中pop buffer,同时在此之前需要设置event_idx,代码见qemu virtio.c的virtqueue_pop函数:
1 void *virtqueue_pop(VirtQueue *vq, size_t sz) 2 { 3 ...... 4 5 i = head = virtqueue_get_head(vq, vq->last_avail_idx++); 6 if (virtio_vdev_has_feature(vdev, VIRTIO_RING_F_EVENT_IDX)) { 7 vring_set_avail_event(vq, vq->last_avail_idx); 8 } 10 ...... 11 }
如果是初始化状态,即当前是首次执行virtqueue_pop函数,last_avail_idx=0,在++后就成了1,然后设置此值到UsedRing.ring[]数组的最后一项:
1 static inline void vring_set_avail_event(VirtQueue *vq, uint16_t val) 2 { 3 hwaddr pa; 4 if (!vq->notification) { 5 return; 6 } 7 pa = vq->vring.used + offsetof(VRingUsed, ring[vq->vring.num]); 8 virtio_stw_phys(vq->vdev, pa, val); 9 }
设置成功后就执行pop之后的处理,写入数据完成后,调用后端的 virtio_notify(vdev, q->rx_vq)函数。该函数执行前同样需要判断是否需要notify,具体函数为virtio_should_notify
bool virtio_should_notify(VirtIODevice *vdev, VirtQueue *vq) { uint16_t old, new; bool v; /* We need to expose used array entries before checking used event. */ smp_mb(); /* Always notify when queue is empty (when feature acknowledge) */ if (virtio_vdev_has_feature(vdev, VIRTIO_F_NOTIFY_ON_EMPTY) && !vq->inuse && virtio_queue_empty(vq)) { return true; } if (!virtio_vdev_has_feature(vdev, VIRTIO_RING_F_EVENT_IDX)) { return !(vring_avail_flags(vq) & VRING_AVAIL_F_NO_INTERRUPT); } v = vq->signalled_used_valid; vq->signalled_used_valid = true; old = vq->signalled_used; new = vq->signalled_used = vq->used_idx; return !v || vring_need_event(vring_get_used_event(vq), new, old); }
该函数逻辑和前端driver总的判断函数大致类似,但是还是有些不同,首先,如果队列为空即当前没有可用buffer了,那么必然会notify前端;
接着判断是否支持这样事件触发式的方式即VIRTIO_RING_F_EVENT_IDX,如果不支持,就通过flags字段来判断。而如果支持,就通过事件触发来通知。
这里有两个条件:第一个是v = vq->signalled_used_valid和vring_need_event(vring_get_used_event(vq), new, old)
v = vq->signalled_used_valid在初始化的时候被设置成false,表示还没有向前端做任何通知,而后再每次的virtio_should_notify中就会设置成true,并更新vq->signalled_used = vq->used_idx;所以如果是首次尝试通知前端,则总能成功,否则需要判断vring_need_event(vring_get_used_event(vq), new, old),该函数具体是根前面逻辑是一样的,正如前面所说,这是第一次尝试通知,所以总能成功。而vring_get_used_event(vq)是VRingAvail.ring[]数组的最后一项的值,该值在客户机driver中被设置
在次回到linux driver中,就会从usedRing中取buffer,同样每取出一个buffer就会设置used_event,代码见virtio_ring.c的virtqueue_get_buf函数,设置的值是vq->last_used_idx,记录客户机处理位置。
1 void *virtqueue_get_buf(struct virtqueue *_vq, unsigned int *len) 2 { 3 4 ...... 5 if (!(vq->vring.avail->flags & VRING_AVAIL_F_NO_INTERRUPT)) { 6 vring_used_event(&vq->vring) = vq->last_used_idx; 7 virtio_mb(vq->weak_barriers); 8 } 9 ...... 10 11 }
到目前为止,基本一次完整的交互已经完成了,但是由于是初次交互,前后端的delay机制都没起作用,判断条件中使用到的event_idx已经更新了,假如说首次add 8个buffer,然后通知了后端,并且后端使用了三个buffer并首次notify前端,此时 后端向第4个buffer中写数据,last_avail_idx=4(从0开始),那么used_event=4,此时前端发现可用buffer不足,需要添加,那么本次添加了5个,即new=8+5=13,old=8,new-old=5,而此时new-used_event-1=8,条件不满足,所以此时前端driver添加的buffer就不用notify后端。而话说这段时间后端又处理好了第二个数据包,使用了3个buffer。但不幸,前端还在处理第二个buffer,即last_used_idx=2,则used_event=2;对于后端来讲new-old=3,new-used_event-1=3,条件不满足,所以也不用通知。这样delay notify的机制便显示出效果了。笔者认为这其实本质上就是一场速度的对决,为了保证公平,即使一方处理快,也不能任意向另一端发送数据,只能待对方处理的差不多了你才能发,这样发送一方可以歇歇,而接受一方也不会因为处理不及而丢弃,从而造成浪费!哈哈,真是无规矩不成方圆!
后记:
到此,virtIO部分已经分析的差不多了,分析期间真实感觉到了自己知识的匮乏,其间多次向开发者求助,并均得到认真回复,在此在此感谢这些优秀的开发者。有时候看内核代码就感觉工程师和硬件在干仗,站在工程师的角度,需要尽其所能榨取硬件的性能。大到实现算法的优化,小到分析程序执行流的概率,从而针对编译做优化。站在硬件的角度,你处理不好,我就不给你工作。而从这方面,工程师自然是完胜,并且还在不遗余力的朝着胜利的另一个境界挺近,即征服硬件!哈哈,不过谁都知道,这是一场没有胜负的战争,工程师自然优秀,但是,因为工程师内部的竞争,这样战斗将永无休止!!唉,瞎扯淡了,各位朋友,下篇文章见!