《Linux高性能服务器编程》学习总结(十一)——定时器

第十一章 定时器

  这里的定时器主要指的是定时器容器,TCP连接中有保活定时器,为了定期查看TCP连接是否断开,可以用socket选项实现,但是较为麻烦,所以一般都由应用层负责保活,这是定时器的一个运用场景,或者在应用层需要执行一些定时操作,这样就需要一个高效的定时器容器,主要是时间轮和时间堆,当然定时器也可以用SIGALRM信号以及I/O复用实现。

  socket选项中我们使用SO_RCVTIMEO和SO_SNDTIMEO两个选项进行设置,我们来看一个例子:

 1 /*************************************************************************
 2     > File Name: 11-1.cpp
 3     > Author: Torrance_ZHANG
 4     > Mail: [email protected]
 5     > Created Time: Sat 10 Feb 2018 11:13:03 PM PST
 6  ************************************************************************/
 7
 8 #include"head.h"
 9 using namespace std;
10
11 int timeout_connect(const char* ip, int port, int time) {
12     int ret = 0;
13     struct sockaddr_in address;
14     bzero(&address, sizeof(address));
15     address.sin_family = AF_INET;
16     inet_pton(AF_INET, ip, &address.sin_addr);
17     address.sin_port = htons(port);
18
19     int sockfd = socket(AF_INET, SOCK_STREAM, 0);
20     assert(sockfd >= 0);
21
22     struct timeval timeout;
23     timeout.tv_sec = time;
24     timeout.tv_usec = 0;
25     socklen_t len = sizeof(timeout);
26     ret = setsockopt(sockfd, SOL_SOCKET, SO_SNDTIMEO, &timeout, len);
27     assert(ret != -1);
28
29     ret = connect(sockfd, (struct sockaddr*)&address, sizeof(address));
30     if(ret == -1) {
31         if(errno == EINPROGRESS) {
32             printf("connection timeout, process timeout logic \n");
33             return -1;
34         }
35         printf("error occur when connection to server\n");
36         return -1;
37     }
38     return sockfd;
39 }
40
41 int main(int argc, char** argv) {
42     if(argc <= 2) {
43         printf("usage: %s ip_address port_number\n", basename(argv[0]));
44         return 1;
45     }
46     const char* ip = argv[1];
47     int port = atoi(argv[2]);
48
49     int sockfd = timeout_connect(ip, port, 10);
50     if(sockfd < 0) return 1;
51     else return 0;
52 }

  我们用5-7的服务器,将监听队列改成1,用两个telnet客户端占用监听队列,这样再运行这个客户端后,等待10s出现超时的结果。

  关于SIGALRM信号,我们知道他是当定时器到时时系统给应用程序的通知,所以基于这个信号,我们设计一个基于升序链表的定时器结构,为了引用方便,我们写在头文件中,其按照超时时间升序排序:

  1 /*************************************************************************
  2     > File Name: lst_timer.h
  3     > Author: Torrance_ZHANG
  4     > Mail: [email protected]
  5     > Created Time: Sat 10 Feb 2018 11:39:56 PM PST
  6  ************************************************************************/
  7
  8 #ifndef _LST_TIMER_H
  9 #define _LST_TIMER_H
 10
 11 #include<time.h>
 12 #define BUFFER_SIZE 64
 13 class util_timer;
 14
 15 struct client_data {
 16     sockaddr_in address;
 17     int sockfd;
 18     char buf[BUFFER_SIZE];
 19     util_timer* timer;
 20 };
 21
 22 class util_timer {
 23 public:
 24     util_timer() : prev(NULL), next(NULL) {}
 25 public:
 26     time_t expire;
 27     void (*cb_func)(client_data*);
 28     client_data* user_data;
 29     util_timer* prev;
 30     util_timer* next;
 31 };
 32
 33 class sort_timer_lst {
 34 public:
 35     sort_timer_lst() : head(NULL), tail(NULL) {}
 36     ~sort_timer_lst() {
 37         util_timer* tmp = head;
 38         while(tmp) {
 39             head = tmp -> next;
 40             delete tmp;
 41             tmp = head;
 42         }
 43     }
 44     void add_timer(util_timer* timer) {
 45         if(!timer) return;
 46         if(!head) {
 47             head = tail = timer;
 48             return;
 49         }
 50         if(timer -> expire < head -> expire) {
 51             timer -> next = head;
 52             head -> prev = timer;
 53             head = timer;
 54             return;
 55         }
 56         add_timer(timer, head);
 57     }
 58     void adjust_timer(util_timer* timer) {
 59         if(!timer) return;
 60         util_timer* tmp = timer -> next;
 61         if(!tmp || (timer -> expire < tmp -> expire)) return;
 62         if(timer == head) {
 63             head = head -> next;
 64             head -> prev = NULL;
 65             timer -> next = NULL;
 66             add_timer(timer, head);
 67         }
 68         else {
 69             timer -> prev -> next = timer -> next;
 70             timer -> next -> prev = timer -> prev;
 71             add_timer(timer, timer -> next);
 72         }
 73     }
 74     void del_timer(util_timer* timer) {
 75         if(!timer) return;
 76         if((timer == head) && (timer == tail)) {
 77             delete timer;
 78             head = NULL;
 79             tail = NULL;
 80             return;
 81         }
 82         if(timer == head) {
 83             head = head -> next;
 84             head -> prev = NULL;
 85             delete timer;
 86             return;
 87         }
 88         if(timer == tail) {
 89             tail = tail -> prev;
 90             tail -> next = NULL;
 91             delete timer;
 92             return;
 93         }
 94         timer -> prev -> next = timer -> next;
 95         timer -> next -> prev = timer -> prev;
 96         delete timer;
 97     }
 98     void tick() {
 99         if(!head) return;
100         printf("timer tick\n");
101         time_t cur = time(NULL);
102         util_timer* tmp = head;
103         while(tmp) {
104             if(cur < tmp -> expire) break;
105             tmp -> cb_func(tmp -> user_data);
106             head = tmp -> next;
107             if(head) head -> prev = NULL;
108             delete tmp;
109             tmp = head;
110         }
111     }
112 private:
113     void add_timer(util_timer* timer, util_timer* lst_head) {
114         util_timer* prev = lst_head;
115         util_timer* tmp = prev -> next;
116         while(tmp) {
117             if(timer -> expire < tmp -> expire) {
118                 prev -> next = timer;
119                 timer -> next = tmp;
120                 tmp -> prev = timer;
121                 timer -> prev = prev;
122                 break;
123             }
124             prev = tmp;
125             tmp = tmp -> next;
126         }
127         if(!tmp) {
128             prev -> next = timer;
129             timer -> prev = prev;
130             timer -> next = NULL;
131             tail = timer;
132         }
133     }
134 private:
135     util_timer* head;
136     util_timer* tail;
137 };
138
139 #endif

  接下来我们看一下其实际应用——处理非活动的连接,在服务器运行时,对于很多已经建立的TCP连接并不是时时刻刻都有数据发送,所以我们对于每个TCP连接都要有一个定时器,一旦其没有数据的状态持续一定的时间就可以认为这是非活动的连接并且将之关闭,目的是节省服务器的资源:

  1 /*************************************************************************
  2     > File Name: 11-3.cpp
  3     > Author: Torrance_ZHANG
  4     > Mail: [email protected]
  5     > Created Time: Sun 11 Feb 2018 04:23:50 AM PST
  6  ************************************************************************/
  7
  8 #include"head.h"
  9 #include"lst_timer.h"
 10 using namespace std;
 11
 12 #define FD_LIMIT 65535
 13 #define MAX_EVENT_NUMBER 1024
 14 #define TIMESLOT 5
 15
 16 static int pipefd[2];
 17 static sort_timer_lst timer_lst;
 18 static int epollfd = 0;
 19
 20 int setnonblocking(int fd) {
 21     int old_option = fcntl(fd, F_GETFL);
 22     int new_option = old_option | O_NONBLOCK;
 23     fcntl(fd, F_SETFL, new_option);
 24     return old_option;
 25 }
 26
 27 void addfd(int epollfd, int fd) {
 28     epoll_event event;
 29     event.data.fd = fd;
 30     event.events = EPOLLIN | EPOLLET;
 31     epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
 32     setnonblocking(fd);
 33 }
 34
 35 void sig_handler(int sig) {
 36     int save_errno = errno;
 37     int msg = sig;
 38     send(pipefd[1], (char*)&msg, 1, 0);
 39     errno = save_errno;
 40 }
 41
 42 void addsig(int sig) {
 43     struct sigaction sa;
 44     memset(&sa, 0, sizeof(sa));
 45     sa.sa_handler = sig_handler;
 46     sa.sa_flags |= SA_RESTART;
 47     sigfillset(&sa.sa_mask);
 48     assert(sigaction(sig, &sa, NULL) != -1);
 49 }
 50
 51 void timer_handler() {
 52     timer_lst.tick();
 53     alarm(TIMESLOT);
 54 }
 55
 56 void cb_func(client_data* user_data) {
 57     epoll_ctl(epollfd, EPOLL_CTL_DEL, user_data -> sockfd, 0);
 58     assert(user_data);
 59     close(user_data -> sockfd);
 60     printf("close fd %d\n", user_data -> sockfd);
 61 }
 62
 63 int main(int argc, char** argv) {
 64     if(argc <= 2) {
 65         printf("usage: %s ip_address port_number\n", basename(argv[0]));
 66         return 1;
 67     }
 68     const char* ip = argv[1];
 69     int port = atoi(argv[2]);
 70
 71     int ret = 0;
 72     struct sockaddr_in address;
 73     bzero(&address, sizeof(address));
 74     address.sin_family = AF_INET;
 75     inet_pton(AF_INET, ip, &address.sin_addr);
 76     address.sin_port = htons(port);
 77
 78     int listenfd = socket(AF_INET, SOCK_STREAM, 0);
 79     assert(listenfd >= 0);
 80
 81     ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
 82     if(ret == -1) {
 83         printf("errno is %d\n", errno);
 84         return 1;
 85     }
 86     ret = listen(listenfd, 5);
 87     assert(ret != -1);
 88
 89     epoll_event events[MAX_EVENT_NUMBER];
 90     int epollfd = epoll_create(5);
 91     assert(epollfd != -1);
 92     addfd(epollfd, listenfd);
 93
 94     ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd);
 95     assert(ret != -1);
 96     setnonblocking(pipefd[1]);
 97     addfd(epollfd, pipefd[0]);
 98
 99     addsig(SIGALRM);
100     addsig(SIGTERM);
101     bool stop_server = false;
102     client_data* users = new client_data[FD_LIMIT];
103     bool timeout = false;
104     alarm(TIMESLOT);
105
106     while(!stop_server) {
107         int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
108         if((number < 0) && (errno != EINTR)) {
109             printf("epoll failure\n");
110             break;
111         }
112         for(int i = 0; i < number; i ++) {
113             int sockfd = events[i].data.fd;
114             if(sockfd == listenfd) {
115                 struct sockaddr_in client_address;
116                 socklen_t client_addrlength = sizeof(address);
117                 int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
118                 addfd(epollfd, connfd);
119                 users[connfd].address = client_address;
120                 users[connfd].sockfd = connfd;
121
122                 util_timer* timer = new util_timer;
123                 timer -> user_data = &users[connfd];
124                 timer -> cb_func = cb_func;
125                 time_t cur = time(NULL);
126                 timer -> expire = cur + 3 * TIMESLOT;
127                 users[connfd].timer = timer;
128                 timer_lst.add_timer(timer);
129             }
130             else if((sockfd == pipefd[0]) && (events[i].events & EPOLLIN)) {
131                 int sig;
132                 char signals[1024];
133                 ret = recv(pipefd[0], signals, sizeof(signals), 0);
134                 if(ret == -1) continue;
135                 else if(ret == 0) continue;
136                 else {
137                     for(int i = 0; i < ret; i ++) {
138                         switch(signals[i]) {
139                             case SIGALRM: {
140                                 timeout = true;
141                                 break;
142                             }
143                             case SIGTERM: {
144                                 stop_server = true;
145                             }
146                         }
147                     }
148                 }
149             }
150             else if(events[i].events & EPOLLIN) {
151                 memset(users[sockfd].buf, 0, BUFFER_SIZE);
152                 ret = recv(sockfd, users[sockfd].buf, BUFFER_SIZE, 0);
153                 printf("get %d bytes of client data %s from %d\n", ret, users[sockfd].buf, sockfd);
154
155                 util_timer* timer = users[sockfd].timer;
156                 if(ret < 0) {
157                     if(errno != EAGAIN) {
158                         cb_func(&users[sockfd]);
159                         if(timer) {
160                             timer_lst.del_timer(timer);
161                         }
162                     }
163                 }
164                 else if(ret == 0) {
165                     cb_func(&users[sockfd]);
166                     if(timer) {
167                         timer_lst.del_timer(timer);
168                     }
169                 }
170                 else {
171                     if(timer) {
172                         time_t cur = time(NULL);
173                         timer -> expire = cur + 3 * TIMESLOT;
174                         printf("adjust timer once\n");
175                         timer_lst.adjust_timer(timer);
176                     }
177                 }
178             }
179             else {}
180         }
181         if(timeout) {
182             timer_handler();
183             timeout = false;
184         }
185     }
186     close(listenfd);
187     close(pipefd[1]);
188     close(pipefd[0]);
189     delete [] users;
190     return 0;
191 }

  我们可以看到,客户端连接一段时间后没有数据发送,服务器端就自动关闭连接,以节省服务器资源。

  前面提到I/O复用系统调用的最后一个timeval参数时说过这个是用来控制超时时间的,所以我们也可以用这个参数作为定时,但是我们需要不断更新这个定时参数,用一个代码来说明一下:

 1 /*************************************************************************
 2     > File Name: 11-4.cpp
 3     > Author: Torrance_ZHANG
 4     > Mail: [email protected]
 5     > Created Time: Mon 12 Feb 2018 01:49:52 AM PST
 6  ************************************************************************/
 7
 8 #include<iostream>
 9 using namespace std;
10
11 #define TIMEOUT 5000
12
13 int main() {
14     time_t start = time(NULL);
15     time_t end = time(NULL);
16     int timeout = TIMEOUT;
17
18     while(1) {
19         printf("the timeout is now %d mil-seconds\n", timeout);
20         start = time(NULL);
21         int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, timeout);
22         if((number < ) && (errno != EINTR)) {
23             printf("epoll failure\n");
24             break;
25         }
26         if(number == 0) {
27             timeout = TIMEOUT;
28             continue;
29         }
30         end = time(NULL);
31         timeout -= (end - start) * 1000;
32         if(timeout <= 0) timeout = TIMEOUT;
33     }
34 }

  前面我们提到的基于升序链表的定时器容器存在着一个问题,就是添加和删除定时器的效率偏低,当定时器很多的时候,每次添加都要从头开始遍历寻找应该插入的位置,所以我们用了一种新的定时器容器——时间轮就很好地解决了这个问题。

  什么是时间轮,这个名字很形象,时间轮就像是一个轮子,我们虚拟一个指针指向轮子上的每一个槽,这个指针每隔一个时间间隔就走到下一个槽,这个时间间隔就是心搏时间,也是时间轮的槽时间si,而每一个槽其实就是一个指向一条定时器链表的头指针,每条链表上的定时器的定时时间相差N*si的整数倍,这个思想和哈希的思想很像。

  1 /*************************************************************************
  2     > File Name: time_wheel_timer.h
  3     > Author: Torrance_ZHANG
  4     > Mail: [email protected]
  5     > Created Time: Mon 12 Feb 2018 05:13:13 AM PST
  6  ************************************************************************/
  7
  8 #ifndef _TIME_WHEEL_TIMER_H
  9 #define _TIME_WHEEL_TIMER_H
 10
 11 #include<time.h>
 12 #include<netinet/in.h>
 13 #include<stdio.h>
 14
 15 #define BUFFER_SIZE 64
 16 class tw_timer;
 17
 18 struct client_data {
 19     sockaddr_in address;
 20     int sockfd;
 21     char buf[BUFFER_SIZE];
 22     tw_timer* timer;
 23 };
 24
 25 class tw_timer {
 26 public:
 27     tw_timer(int rot, int ts) : next(NULL), prev(NULL), rotation(rot), time_slot(ts) {}
 28 public:
 29     int rotation;
 30     int time_slot;
 31     void (*cb_func)(client_data*);
 32     client_data* user_data;
 33     tw_timer* next;
 34     tw_timer* prev;
 35 };
 36
 37 class time_wheel {
 38 public:
 39     time_wheel() : cur_slot(0) {
 40         for(int i = 0; i < N; ++ i) {
 41             slots[i] = NULL;
 42         }
 43     }
 44     ~time_wheel() {
 45         for(int i = 0; i < N; i ++) {
 46             tw_timer* tmp = slots[i];
 47             while(tmp) {
 48                 slots[i] = tmp -> next;
 49                 delete tmp;
 50                 tmp = slots[i];
 51             }
 52         }
 53     }
 54     tw_timer* add_timer(int timeout) {
 55         if(timeout < 0) return NULL;
 56         int ticks = 0;
 57         if(timeout < SI) {
 58             ticks = 1;
 59         }
 60         else ticks = timeout / SI;
 61         int rotation = ticks / N;
 62         int ts = (cur_slot + (ticks % N)) % N;
 63         tw_timer* timer = new tw_timer(rotation, ts);
 64         if(!slots[ts]) {
 65             printf("add timer, rotation is %d, ts is %d, cur_slot is %d\n", rotation, ts, cur_slot);
 66             slots[ts] = timer;
 67         }
 68         else {
 69             timer -> next = slots[ts];
 70             slots[ts] -> prev = timer;
 71             slots[ts] = timer;
 72         }
 73         return timer;
 74     }
 75     void del_timer(tw_timer* timer) {
 76         if(!timer) return;
 77         int ts = timer -> time_slot;
 78         if(timer == slots[ts]) {
 79             slots[ts] = slots[ts] -> next;
 80             if(slots[ts]) {
 81                 slots[ts] -> prev = NULL;
 82             }
 83             delete timer;
 84         }
 85         else {
 86             timer -> prev -> next = timer -> next;
 87             if(timer -> next) {
 88                 timer -> next -> prev = timer -> prev;
 89             }
 90             delete timer;
 91         }
 92     }
 93     void tick() {
 94         tw_timer* tmp = slots[cur_slot];
 95         printf("current slot is %d\n", cur_slot);
 96         while(tmp) {
 97             printf("tick the timer once\n");
 98             if(tmp -> rotation > 0) {
 99                 tmp -> rotation --;
100                 tmp = tmp -> next;
101             }
102             else {
103                 tmp -> cb_func(tmp -> user_data);
104                 if(tmp == slots[cur_slot]) {
105                     printf("delete header in cur_slot\n");
106                     slots[cur_slot] = tmp -> next;
107                     delete tmp;
108                     if(slots[cur_slot]) {
109                         slots[cur_slot] -> prev = NULL;
110                     }
111                     tmp = slots[cur_slot];
112                 }
113                 else {
114                     tmp -> prev -> next = tmp -> next;
115                     if(tmp -> next) {
116                         tmp -> next -> prev = tmp -> prev;
117                     }
118                     tw_timer* tmp2 = tmp -> next;
119                     delete tmp;
120                     tmp = tmp2;
121                 }
122             }
123         }
124         cur_slot = ++cur_slot % N;
125     }
126 private:
127     static const int N = 60;
128     static const int SI = 1;
129     tw_timer* slots[N];
130     int cur_slot;
131 };
132
133 #endif

  我们来将11-3的程序修改一下,使用时间轮来进行定时:

  1 /*************************************************************************
  2     > File Name: 11-3.cpp
  3     > Author: Torrance_ZHANG
  4     > Mail: [email protected]
  5     > Created Time: Sun 11 Feb 2018 04:23:50 AM PST
  6  ************************************************************************/
  7
  8 #include"head.h"
  9 #include"time_wheel_timer.h"
 10 using namespace std;
 11
 12 #define FD_LIMIT 65535
 13 #define MAX_EVENT_NUMBER 1024
 14 #define TIMESLOT 1
 15
 16 static int pipefd[2];
 17 static time_wheel t_wheel;
 18 static int epollfd = 0;
 19
 20 int setnonblocking(int fd) {
 21     int old_option = fcntl(fd, F_GETFL);
 22     int new_option = old_option | O_NONBLOCK;
 23     fcntl(fd, F_SETFL, new_option);
 24     return old_option;
 25 }
 26
 27 void addfd(int epollfd, int fd) {
 28     epoll_event event;
 29     event.data.fd = fd;
 30     event.events = EPOLLIN | EPOLLET;
 31     epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
 32     setnonblocking(fd);
 33 }
 34
 35 void sig_handler(int sig) {
 36     int save_errno = errno;
 37     int msg = sig;
 38     send(pipefd[1], (char*)&msg, 1, 0);
 39     errno = save_errno;
 40 }
 41
 42 void addsig(int sig) {
 43     struct sigaction sa;
 44     memset(&sa, 0, sizeof(sa));
 45     sa.sa_handler = sig_handler;
 46     sa.sa_flags |= SA_RESTART;
 47     sigfillset(&sa.sa_mask);
 48     assert(sigaction(sig, &sa, NULL) != -1);
 49 }
 50
 51 void timer_handler() {
 52     t_wheel.tick();
 53     alarm(TIMESLOT);
 54 }
 55
 56 void cb_func(client_data* user_data) {
 57     epoll_ctl(epollfd, EPOLL_CTL_DEL, user_data -> sockfd, 0);
 58     assert(user_data);
 59     close(user_data -> sockfd);
 60     printf("close fd %d\n", user_data -> sockfd);
 61 }
 62
 63 int main(int argc, char** argv) {
 64     if(argc <= 2) {
 65         printf("usage: %s ip_address port_number\n", basename(argv[0]));
 66         return 1;
 67     }
 68     const char* ip = argv[1];
 69     int port = atoi(argv[2]);
 70
 71     int ret = 0;
 72     struct sockaddr_in address;
 73     bzero(&address, sizeof(address));
 74     address.sin_family = AF_INET;
 75     inet_pton(AF_INET, ip, &address.sin_addr);
 76     address.sin_port = htons(port);
 77
 78     int listenfd = socket(AF_INET, SOCK_STREAM, 0);
 79     assert(listenfd >= 0);
 80
 81     ret = bind(listenfd, (struct sockaddr*)&address, sizeof(address));
 82     if(ret == -1) {
 83         printf("errno is %d\n", errno);
 84         return 1;
 85     }
 86     ret = listen(listenfd, 5);
 87     assert(ret != -1);
 88
 89     epoll_event events[MAX_EVENT_NUMBER];
 90     int epollfd = epoll_create(5);
 91     assert(epollfd != -1);
 92     addfd(epollfd, listenfd);
 93
 94     ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd);
 95     assert(ret != -1);
 96     setnonblocking(pipefd[1]);
 97     addfd(epollfd, pipefd[0]);
 98
 99     addsig(SIGALRM);
100     addsig(SIGTERM);
101     bool stop_server = false;
102     client_data* users = new client_data[FD_LIMIT];
103     bool timeout = false;
104     alarm(TIMESLOT);
105
106     while(!stop_server) {
107         int number = epoll_wait(epollfd, events, MAX_EVENT_NUMBER, -1);
108         if((number < 0) && (errno != EINTR)) {
109             printf("epoll failure\n");
110             break;
111         }
112         for(int i = 0; i < number; i ++) {
113             int sockfd = events[i].data.fd;
114             if(sockfd == listenfd) {
115                 struct sockaddr_in client_address;
116                 socklen_t client_addrlength = sizeof(address);
117                 int connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_addrlength);
118                 addfd(epollfd, connfd);
119                 users[connfd].address = client_address;
120                 users[connfd].sockfd = connfd;
121
122                 tw_timer* new_timer = t_wheel.add_timer(15 * TIMESLOT);
123                 new_timer -> user_data = &users[connfd];
124                 new_timer -> cb_func = cb_func;
125                 users[connfd].timer = new_timer;
126             }
127             else if((sockfd == pipefd[0]) && (events[i].events & EPOLLIN)) {
128                 int sig;
129                 char signals[1024];
130                 ret = recv(pipefd[0], signals, sizeof(signals), 0);
131                 if(ret == -1) continue;
132                 else if(ret == 0) continue;
133                 else {
134                     for(int i = 0; i < ret; i ++) {
135                         switch(signals[i]) {
136                             case SIGALRM: {
137                                 timeout = true;
138                                 break;
139                             }
140                             case SIGTERM: {
141                                 stop_server = true;
142                             }
143                         }
144                     }
145                 }
146             }
147             else if(events[i].events & EPOLLIN) {
148                 memset(users[sockfd].buf, 0, BUFFER_SIZE);
149                 ret = recv(sockfd, users[sockfd].buf, BUFFER_SIZE, 0);
150                 printf("get %d bytes of client data %s from %d\n", ret, users[sockfd].buf, sockfd);
151
152                 tw_timer* timer = users[sockfd].timer;
153                 if(ret < 0) {
154                     if(errno != EAGAIN) {
155                         cb_func(&users[sockfd]);
156                         if(timer) {
157                             t_wheel.del_timer(timer);
158                         }
159                     }
160                 }
161                 else if(ret == 0) {
162                     cb_func(&users[sockfd]);
163                     if(timer) {
164                         t_wheel.del_timer(timer);
165                     }
166                 }
167                 else {
168                     if(timer) {
169                         t_wheel.del_timer(timer);
170                         tw_timer* new_timer = t_wheel.add_timer(15 * TIMESLOT);
171                         new_timer -> user_data = &users[sockfd];
172                         new_timer -> cb_func = cb_func;
173                         users[sockfd].timer = new_timer;
174                     }
175                 }
176             }
177             else {}
178         }
179         if(timeout) {
180             timer_handler();
181             timeout = false;
182         }
183     }
184     close(listenfd);
185     close(pipefd[1]);
186     close(pipefd[0]);
187     delete [] users;
188     return 0;
189 }

  时间轮每秒转动一格,当15s没有消息发送的时候就会自动断开连接。对于时间轮而言,添加和删除一个定时器的时间复杂度是O(1),执行一个定时器的时间复杂度表面上看是O(n),但是由于我们将所有定时器散列到不同的链表中,所以每条链表上的定时器其实很少,当时用多个时间轮的时候其复杂度可以降至接近O(1),所以其效率是很高的。

  对于时间堆,有数据结构基础的话其实很好理解,就是一个基于时间排序的小根堆,每一次添加操作可以用O(lgn)的复杂度完成,而执行和删除堆顶的操作复杂度为O(1),只是删除后需要将堆进行调整,其复杂度也约为O(lgn)。整体来看时间堆的复杂度也是很好的,我们来看一下代码:

  1 /*************************************************************************
  2     > File Name: min_heap.h
  3     > Author: Torrance_ZHANG
  4     > Mail: [email protected]
  5     > Created Time: Mon 12 Feb 2018 09:31:19 PM PST
  6  ************************************************************************/
  7
  8 #ifndef _MIN_HEAP_H
  9 #define _MIN_HEAP_H
 10
 11 #include<iostream>
 12 #include<netinet/in.h>
 13 #include<time.h>
 14
 15 using std::exception;
 16
 17 #define BUFFER_SIZE 64
 18 class heap_timer;
 19
 20 struct client_data {
 21     sockaddr_in address;
 22     int sockfd;
 23     char buf[BUFFER_SIZE];
 24     heap_timer* timer;
 25 };
 26
 27 class heap_timer {
 28 public:
 29     heap_timer(int delay) {
 30         expire = time(NULL) + delay;
 31     }
 32 public:
 33     time_t expire;
 34     void (*cb_func)(client_data*);
 35     client_data* user_data;
 36 };
 37
 38 class time_heap {
 39 public:
 40     time_heap(int cap) throw (std::exception) : capacity(cap), cur_size(0) {
 41         array = new heap_timer* [capacity];
 42         if(!array) throw std::exception();
 43         for(int i = 0; i < capacity; i ++) {
 44             array[i] = NULL;
 45         }
 46     }
 47     time_heap(heap_timer** init_array, int size, int capacity) throw (std::exception) : cur_size(size), capacity(capacity) {
 48         if(capacity < size) throw std::exception();
 49         array = new heap_timer* [capacity];
 50         if(!array) throw std::exception();
 51         for(int i = 0; i < capacity; i ++) {
 52             array[i] = NULL;
 53         }
 54         if(size != 0) {
 55             for(int i = 0; i < size; i ++) {
 56                 array[i] = init_array[i];
 57             }
 58             for(int i = (cur_size - 1) / 2; i >= 0; -- i) {
 59                 percolate_down(i);
 60             }
 61         }
 62     }
 63     ~time_heap() {
 64         for(int i = 0; i < cur_size; i ++) {
 65             delete array[i];
 66         }
 67         delete [] array;
 68     }
 69 public:
 70     void add_timer(heap_timer* timer) throw (std::exception) {
 71         if(!timer) return;
 72         if(cur_size >= capacity) resize();
 73         int hole = cur_size ++;
 74         int parent = 0;
 75         for(; hole > 0; hole = parent) {
 76             parent = (hole - 1) / 2;
 77             if(array[parent] -> expire <= timer -> expire) break;
 78             array[hole] = array[parent];
 79         }
 80         array[hole] = timer;
 81     }
 82     void del_timer(heap_timer* timer) {
 83         if(!timer) return;
 84         //延迟销毁,这样可以节省删除定时器的开销,但是会使堆数组膨胀,空间换时间
 85         timer -> cb_func = NULL;
 86     }
 87     heap_timer* top() const {
 88         if(empty()) return NULL;
 89         return array[0];
 90     }
 91     void pop_timer() {
 92         if(empty()) return;
 93         if(array[0]) {
 94             delete array[0];
 95             array[0] = array[-- cur_size];
 96             percolate_down(0);
 97         }
 98     }
 99     void tick() {
100         heap_timer* tmp = array[0];
101         time_t cur = time(NULL);
102         while(!empty()) {
103             if(!tmp) break;
104             if(tmp -> expire > cur) break;
105             if(array[0] -> cb_func) array[0] -> cb_func(array[0] -> user_data);
106             pop_timer();
107             tmp = array[0];
108         }
109     }
110     bool empty() const {
111         return cur_size == 0;
112     }
113 private:
114     void percolate_down(int hole) {
115         heap_timer* temp = array[hole];
116         int child = 0;
117         for(; ((hole * 2 + 1) <= (cur_size - 1)); hole = child) {
118             child = hole * 2 + 1;
119             if((child < (cur_size - 1)) && (array[child + 1] -> expire < array[child] -> expire)) ++child;
120             if(array[child] -> expire < temp -> expire) array[hole] = array[child];
121             else break;
122         }
123         array[hole] = temp;
124     }
125     void resize() throw (std::exception) {
126         heap_timer** temp = new heap_timer* [2 * capacity];
127         for(int i = 0; i < 2 * capacity; i ++) {
128             temp[i] = NULL;
129         }
130         if(!temp) {
131             throw std::exception();
132         }
133         capacity = 2 * capacity;
134         for(int i = 0; i < cur_size; i ++) {
135             temp[i] = array[i];
136         }
137         delete [] array;
138         array = temp;
139     }
140 private:
141     heap_timer** array;
142     int capacity;
143     int cur_size;
144 };
145
146 #endif

原文地址:https://www.cnblogs.com/Torrance/p/8446606.html

时间: 2024-10-17 01:16:05

《Linux高性能服务器编程》学习总结(十一)——定时器的相关文章

Linux高性能服务器编程——定时器

 定时器 服务器程序通常管理着众多定时事件,因此有效组织这些定时事件,使之能在预期的时间点被触发且不影响服务器的主要逻辑,对于服务器的性能有着至关重要的影响.位置我们要将每个定时事件封装成定时器,并使用某种容器类型的数据结构,比如链表.排序链表和时间轮将所有定时器串联起来,以实现对定时事件的统一管理. Linux提供三种定时方法: 1.socket选项SO_RECVTIMEO和SO_SNDTIMEO. 2.SIGALRM信号 3.I/O复用系统调用的超时参数 socket选项SO_RCVTI

linux高性能服务器编程

<Linux高性能服务器编程>:当当网.亚马逊 目录: 第一章:tcp/ip协议族 第二章:ip协议族 第三章:tcp协议详解 第四章:tcp/ip通信案例:访问Internet 第五章:linux网络编程基础API 第六章:高级IO函数 第七章:linux服务器程序规范 第八章:高性能服务器框架 第九章:IO复用 第十章:信号 第十一章:定时器 第十二章:高性能IO框架库libevent 第十三章:多进程编程 第十四章:多线程编程 第十五章:进程池和线程池 第十六章:服务器调制.调试和测试

Linux 高性能服务器编程——高级I/O函数

重定向dup和dup2函数 [cpp] view plaincopyprint? #include <unistd.h> int dup(int file_descriptor); int dup2(int file_descriptor_one, int file_descriptor_two); dup创建一个新的文件描述符, 此描述符和原有的file_descriptor指向相同的文件.管道或者网络连接. dup返回的文件描述符总是取系统当前可用的最小整数值. dup2函数通过使用参数f

Linux高性能服务器编程——信号及应用

 信号 信号是由用户.系统或者进程发送给目标进程的信息,以通知目标进程某个状态的改变或系统异常.Linux信号可由如下条件产生: 对于前台进程,用户可以通过输入特殊的终端字符来给它发送信号.比如输入Ctrl+C通常会给进程发送一个终端信号. 2.系统异常 系统状态变化 运行kill命令或调用kill函数 Linux信号概述 发送信号 Linux下,一个进程给其他进程发送信号的API是kill函数.其定义如下: #include <sys/types.h> #include <sign

Linux高性能服务器编程——多进程编程

多进程编程 多进程编程包括如下内容: 复制进程影映像的fork系统调用和替换进程映像的exec系列系统调用. 僵尸进程以及如何避免僵尸进程 进程间通信(Inter-Process Communication,IPC)最简单的方式:管道 3种进程间通信方式:信号量,消息队列和共享内存 fork系统调用 #include<unistd.h> pid_tfork(void); 该函数的每次都用都返回两次,在父进程中返回的是子进程的PID,在子进程中返回的是0.该返回值是后续代码判断当前进程是父进程还

Linux高性能服务器编程——系统检测工具

系统检测工具 tcpdump tcpdump是一款经典的转包工具,tcpdump给使用者提供了大量的选项,泳衣过滤数据报或者定制输出格式. lsof lsof是一个列出当前系统打开的文件描述符的工具.通过它我们可以了解感兴趣的进程打开了哪些文件描述符,或者我们感兴趣的文件描述符被哪些进程打卡了. nc nc命令主要被用来快速构建网络连接.我们可以让它以服务器方式运行,监听某个端口并接收客户连接,因此它可用来调试客户端程序.我们也可以使之以客户端方式运行,向服务器发起连接并收发数据,因此它可以用来

Linux高性能服务器编程——进程池和线程池

进程池和线程池 池的概念 由于服务器的硬件资源"充裕",那么提高服务器性能的一个很直接的方法就是以空间换时间,即"浪费"服务器的硬件资源,以换取其运行效率.这就是池的概念.池是一组资源的集合,这组资源在服务器启动之初就完全被创建并初始化,这称为静态资源分配.当服务器进入正是运行阶段,即开始处理客户请求的时候,如果它需要相关的资源,就可以直接从池中获取,无需动态分配.很显然,直接从池中取得所需资源比动态分配资源的速度要快得多,因为分配系统资源的系统调用都是很耗时的.当

Linux高性能服务器编程——I/O复用

 IO复用 I/O复用使得程序能同时监听多个文件描述符,通常网络程序在下列情况下需要使用I/O复用技术: 客户端程序要同时处理多个socket 客户端程序要同时处理用户输入和网络连接 TCP服务器要同时处理监听socket和连接socket,这是I/O复用使用最多的场合 服务器要同时处理TCP请求和UDP请求.比如本章将要讨论的会社服务器 服务器要同时监听多个端口,或者处理多种服务. I/O复用虽然能同时监听多个文件描述符,但它本身是阻塞的.并且当多个文件描述符同时就绪时,如果不采用额外措施

Linux高性能服务器编程——多线程编程(上)

多线程编程 Linux线程概述 线程模型 线程是程序中完成一个独立任务的完整执行序列,即一个可调度的实体.根据运行环境和调度者的身份,线程可分为内核线程和用户线程.内核线程,在有的系统上也称为LWP(Light Weigth Process,轻量级进程),运行在内核空间,由内核来调度:用户线程运行在用户空间,由线程库来调度.当进程的一个内核线程获得CPU的使用权时,它就加载并运行一个用户线程.可见,内核线程相当于用于线程运行的容器.一个进程可以拥有M个内核线程和N个用户线程,其中M≤N.并且在一

Linux高性能服务器编程——多线程编程(下)

多线程编程 条件变量 如果说互斥锁是用于同步线程对共享数据的访问的话,那么条件变量则是用于线程之间同步共享数据的值.条件变量提供了一种线程间的通信机制:当某个共享数据达到某个值得时候,唤醒等待这个共享数据的线程. 条件本身是由互斥量保护的.线程在改变条件状态前必须首先锁住互斥量,其他现成在获得互斥量之前不会察觉到这种变化,因为必须锁住互斥量以后才能计算条件. 条件变量的相关函数主要有如下5个: #include <pthread.h> int pthread_cond_destroy(pthr