多个进程绑定(bind)同一个端口,当客户断发起连接(connect)时,内核会通过一个hash算法决定分配到那个进程上。
Linux 4.5之前的reuseport查找实现(4.3内核)
以下是未优化前的Linux 4.3内核的实现,可见是多么地不直观。它采用了遍历HASH冲突链表的方式进行reuseport套接字的精确定位:
result = NULL; badness = 0; udp_portaddr_for_each_entry_rcu(sk, node, &hslot2->head) { score = compute_score2(sk, net, saddr, sport, daddr, hnum, dif); if (score > badness) { // 冒泡排序 // 找到了更加合适的socket,需要重新hash result = sk; badness = score; reuseport = sk->sk_reuseport; if (reuseport) { hash = udp_ehashfn(net, daddr, hnum, saddr, sport); matches = 1; } } else if (score == badness && reuseport) { // reuseport套接字散列定位 // 找到了同样reuseport的socket,进行定位 matches++; if (reciprocal_scale(hash, matches) == 0) result = sk; hash = next_pseudo_random32(hash); } }
Linux 4.5(针对UDP)/4.6(针对TCP)的reuseport查找实现
我们来看看在4.5和4.6内核中对于reuseport的查找增加了一些什么神奇的新东西:
result = NULL; badness = 0; udp_portaddr_for_each_entry_rcu(sk, node, &hslot2->head) { score = compute_score2(sk, net, saddr, sport, daddr, hnum, dif); if (score > badness) { // 在reuseport情形下,意味着找到了更加合适的socket组,需要重新hash result = sk; badness = score; reuseport = sk->sk_reuseport; if (reuseport) { hash = udp_ehashfn(net, daddr, hnum, saddr, sport); if (select_ok) { struct sock *sk2; // 找到了一个组,接着进行组内hash。 sk2 = reuseport_select_sock(sk, hash, skb, sizeof(struct udphdr)); if (sk2) { result = sk2; select_ok = false; goto found; } } matches = 1; } } else if (score == badness && reuseport) { // 这个else if分支的期待是,在分层查找不适用的时候,寻找更加匹配的reuseport组,注意4.5/4.6以后直接寻找的是一个reuseport组。 // 在某种意义上,这回退到了4.5之前的算法。 matches++; if (reciprocal_scale(hash, matches) == 0) result = sk; hash = next_pseudo_random32(hash); } }
struct sock *reuseport_select_sock(struct sock *sk, u32 hash, struct sk_buff *skb, int hdr_len) { ... prog = rcu_dereference(reuse->prog); socks = READ_ONCE(reuse->num_socks); if (likely(socks)) { /* paired with smp_wmb() in reuseport_add_sock() */ smp_rmb(); if (prog && skb) // 可以用BPF来从用户态注入自己的定位逻辑,更好实现基于策略的负载均衡 sk2 = run_bpf(reuse, socks, prog, skb, hdr_len); else // reciprocal_scale简单地将结果限制在了[0,socks)这个区间内 sk2 = reuse->socks[reciprocal_scale(hash, socks)]; } ... }
单机上的 连接服务器 则可以用端口复用的方式实现负载均衡;也完美解决了nginx之前的惊群现象,也不需要像nginx后来的做法去避免惊群。
下面给出测试用的demo
#include <stdio.h> #include <stdlib.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <sys/types.h> #include <unistd.h> #include <pthread.h> #define MAXLINE 100 void* thread_(void* agr) { int listenfd,connfd; struct sockaddr_in servaddr; char buff[MAXLINE+1]; time_t ticks; unsigned short port; int flag=1,len=sizeof(int); port=10013; if( (listenfd=socket(AF_INET,SOCK_STREAM,0)) == -1) { perror("socket"); exit(1); } bzero(&servaddr,sizeof(servaddr)); servaddr.sin_family=AF_INET; servaddr.sin_addr.s_addr=htonl(INADDR_ANY); servaddr.sin_port=htons(port); //SO_REUSEPORT if( setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &flag, len) == -1) { perror("SO_REUSEADDR"); exit(1); } if( setsockopt(listenfd, SOL_SOCKET, SO_REUSEPORT, &flag, len) == -1) { perror("SO_REUSEPORT"); exit(1); } if( bind(listenfd,(struct sockaddr*)&servaddr,sizeof(servaddr)) ==-1) { perror("bind"); exit(1); } else printf("bind call OK!\n"); if( listen(listenfd,5) == -1) { perror("listen"); exit(1); } char buf[] = "hello world."; for(;;) { if( (connfd=accept(listenfd,(struct sockaddr*)NULL,NULL)) == -1) { perror("accept"); exit(1); } send(connfd,buf,sizeof(buf),0); close(connfd); printf("pid %d : once\n",pthread_self()); } } int main(int argc, char** argv) { pthread_t pt; if( 0!=pthread_create(&pt,NULL,thread_,(void*)0) ) { perror("pthread_create"); } thread_((void*)1); return 0; }
时间: 2024-10-10 00:06:27