多线程安全的滑动窗口设计实现

滑动窗口是日志模块重要的数据结构,用于日志发送接收以及日志索引查询,和组内同学讨论了的多线程安全的滑动窗口设计,有三种实现方案,写此文档记录下。

1.接口描述

滑动窗口内部使用数组,每个数组项的是一个结构体:

Structentry

{

Struct ValueNode *head;

Struct ValueNode *tail;

Int64_t cnt;

Int64_tstat;

}

由于在对同一项多次写入不同值的情况下,写入的多个值会以链表组织,head指向链表头,tail指向链表尾,cnt表明读取当前entry的引用计数,包括链表中所有节点。

StructValueNode的定义如下:

StructValueNode

{

Void *value

Struct ValueNode *next;

}

滑动窗口需要提供以下接口:

1.  Init(int64-_t size)

初始化滑动窗口,size 用于指明滑动窗口的大小。

2.  set(int64-_t id, const void*val)

set接口用于向滑动窗口中写入数据,id用于指明所写入数据的序号,val指向写入    的数据指针。对同一个id插入不同的值,会发生覆盖。

3.  get(int64-_t id,  void* &val)

get接口用于从滑动窗口中读出数据,id用于指明所读数据的序号,val指向所读到    的数据的指针。

4.  revert (int64-_t id)

读取某一项结束时候,需要调用revert接口。

5.  move_foward ()

move_foward用于将滑动窗口向前移动,对于移除滑动窗口的项,需要调用其revert   接口,将entry重置,方便后续复用此接口。

2方案一:读写锁保护start_id

方案一是并发度较低但思路比较简单的实现方案,此方案中,滑动窗口需要维护的成员变量:

1.  size:此变量用于指明滑动窗口的大小;

2.  start_id:此变量用于指明滑动窗口中最小的id;

3.  end_id:此变量用于指明滑动窗口中最大的id;

4.  rw_lock:用于保护start_id;

接口实现描述:

1.  Init(int64_t size):

将size记录到成员变量中,并申请数组内存(大小为size),将start_id end_id设置为0;

2.  set(int64_t id, const void*val)

1)  对rw_lock 加读锁;

2)  判断是否满足start_id <= id <start_id + size,如果不满足则跳转到步骤4;

3)  使用id对size取模,找到对应的entry,读取此entry的tail指针,根据实现注册的判断函数,判断是否可以覆盖写入,如果不可以则跳转到步骤4;如果可以覆盖写入或者tail指针为空,则新建一个ValueNode,将其append到链表尾部,并修改tail指针并递增cnt引用计数(此处有多线程并发问题,可以使用CAS128);

4)  对rw_lock解锁。

3.  get(int64_t id,  void* &val)

1)  判断是否满足start_id <= id <start_id + size,如果不满足则跳转到步骤4;

2)  将当前start_id记录到临时变量tmp_start_id中;

3)  根据id取模,找到对应的entry,如果cnt == 0,跳转步骤7;

4)  将cnt引用计数递增,然后将tail指针所值的vallue 赋值给val;

5)  读取当前start_id和tmp_start_id比较,如果不相等,则跳转步骤1;

6)  返回;

4.  revert (int64-_t id)

1)  根据id找到都应的entry,如果cnt ==0,报错;

2)  递减cnt,如果递减后大于0,则退出;

3)  如果递减后的cnt ==0,则遍历head指向的链表,调用每个value的revert函数,并将每个ValueNode内存释放;

5.  Move_forwad()

1)  调用get接口,读取start_id位置的状态,根据注册的函数,判断是否可以将其移动出滑动窗口,如果不可以则返回;

2)  将rw_lock加写锁,从start_id开始,扫描滑动窗口,对于每个entry采用以下操作:

a)  根据注册的函数判断是否可以移出滑动窗口,如果不可以,跳转到步骤3;

b)  如果可以,则递减其引用计数,如果递减后不为0,则需要阻塞等待;

c)  将start_id递增1;

3)  对rw_lock解锁,函数返回。

3.方案二:无锁(一)

在方案一中,读写锁在一定程度上影响了并发度,方案二将介绍一种可以不用读写锁的线程安全实现。需要补充的是,方案二需要在Entry中增加stat字段,stat有三个可选值:

1.  NULL

表明此entry无人使用,可以写入数据;

2.  USE

表明此entry正在被使用,可以读;

3.  LOCKED

表明此entry正在处于NULL和USE中间状态,不可读也不可写入;

此外,滑动窗口还需要维护成员变量Last_start_id,其代表在一次move_forward()过程中,上一次的start_id,last_start_id 和start之间的entry,是需要调用reset()清除掉的。

在这些基础上,接口描述如下:

1.  Get():

与方案一一致

2.  Revert():

与方案一一致。

3.  set():

1)  判断是否满足start_id <= id < last_start_id+ size,如果不满足则跳转到步骤5;

2)  使用id对size取模,找到对应的entry,读取此entry的状态,如果是LOCKED,则跳转步骤1;

3)  将entry状态修改为LOCKED,再次判断是否满足start_id <= id <last_start_id+ size;如果不满足则将entry状态修改回原来状态,并跳转步骤5;

4)  读取start的tail指针,根据实现注册的判断函数,判断是否可以覆盖写入,如果不可以则跳转到步骤4;如果可以覆盖写入或者tail指针为空,则新建一个ValueNode,将其append到链表尾部,并修改tail指针并递增cnt引用计数(此处有多线程并发问题,可以使用CAS128);将ENTRY状态修改为USE;

5)  返回

4.  move_foward()

1)  先获取到要将滑动窗口起点向后移动的目标id,记录为target_start;

2)  将start_id记录到临时变量tmp_start,然后将start修改为target_start,这需要在一个原子操作中完成

3)  对于tmp_start到target_start中间的每一个entry,执行以下操作:

a)  判断其stat是否为LOCKED,如果是则阻塞等待;

b)  将stat修改为LOCKED,;

c)  递减其引用计数,如果递减后不为0,则需要阻塞等待,如果为0,则释放其内存,并设置其状态为NULL;

4)  比较last_start_id 和tmp_start的大小关系,如果相等,则将last_start 修改为target_start ,否则阻塞等待。

在这个方案中,其实是通过给每个entry添加状态值,对每个entry的修改做并发控制,相对于方案一,减小了锁粒度。

在move_foward()接口实现的第四步中,比较last_start和tmp_start的大小关系,事实上是为了保证,当多个线程同时调用move_foward()接口,同时修改last_start_id时,能够做到串行化,即保证last_start_id顺序递增修改。

4.方案二:无锁(二)

下面介绍第二种无锁实现,在此方案中,无需维护last_log_id,但还需要维护entry状态。

在这些基础上,接口描述如下:

1.  Get():

与方案一基本一致,但需要判断所读的ENTRY状态,如果是LOCKED,则需要返回步骤一重新判断。

2.  Revert():

与方案一一致。

3.  set():

1)  判断是否满足start_id <= id <start_id + size,如果不满足则跳转到步骤5;

2)  使用id对size取模,找到对应的entry,读取此entry的状态,如果是LOCKED,则跳转步骤1;

3)  将entry状态修改为LOCKED,再次判断是否满足start_id <= id<start_id + size:如果不满足则将entry状态修改回原来状态,并跳转步骤5;

4)  读取start的tail指针,根据实现注册的判断函数,判断是否可以覆盖写入,如果不可以则跳转到步骤5;如果可以覆盖写入或者tail指针为空,则新建一个ValueNode,将其append到链表尾部,并修改tail指针并递增cnt引用计数(此处有多线程并发问题,可以使用CAS128);将ENTRY状态修改为USE;

5)  返回。

4.  move_foward()

1)  先获取到要将滑动窗口起点向后移动的目标id,记录为target_start;

2)  将start_id记录到临时变量,tmp_start中,对于start到target_start中间的每一个entry,执行以下操作:

a)  判断其stat是否为LOCKED,如果是则阻塞等待;

b)  将stat修改为LOCKED,重新读取start,判断start_id是否等于tmp_start,如果不等,则跳转到步骤2开始;

c)  递减其引用计数,如果递减后不为0,则需要阻塞等待;如果为0,则释放其内存,并设置其状态为NULL;

d)  将start_id递增1;

4.特殊需求:

在我们的设计中,新当选的leader需要写一条sync barrier日志,之后才能处理滑动窗口中的未决日志。如果此时滑动窗口中普通的未决日志已经写满,则无法再写入sync barrier日志,导致恢复流程失败。

因此,滑动窗口需要提供一种特殊接口:set_common_entry()和set_special_entry(),同时初始化时候需要传入common_size和special_size,通常,special_size>common_size。两个不同的接口使用不同的size,保证sync_barrier日志可以写入滑动窗口。

时间: 2024-10-10 06:48:35

多线程安全的滑动窗口设计实现的相关文章

基于滑动窗口的免锁队列设计实现

消息队列是一些平台的通信的基石,各个任务的通信基于消息队列,消息队列的处理速度往往影响整个系统的性能,为了避免多任务同时处理消息队列,通常有任务处理队列时需要加锁来互斥访问. 1:假设每个模块有自己的消息队列,任何模块都可以给这个模块发消息,但只有本模块会从消息队列中取消息处理,如下图所示一个消息队列可能多个任务同时写,一个任务读. 2:为了避免多个任务处理时使用锁导致的效率底线我们可以使用免锁设计来实现队列操作,见http://www.cnblogs.com/chencheng/p/35276

算法设计与优化策略——滑动窗口

"滑动窗口"和上篇博客中介绍的"等价转换"一样也为一种算法优化的思想.同样,下面通过一个例子,来介绍这种思想.唯一的雪花(Unique snowflake,UVa 11572)输入一个长度为n(n<=10^6)的序列A,找到一个尽量长的连续子序列AL~AR,使得该序列中没有相同的元素.在读完题目以后,我们不难有思路.最简单的思路就是,我们可以通过循环的方法,对每一个元素都找出一它为开头的最长序列(没有相同元素).这个方法也能做出来,但似乎有点太麻烦了.下面,我

tcp协议头窗口,滑动窗口,流控制,拥塞控制关系

参考文章 TCP 的那些事儿(下) http://coolshell.cn/articles/11609.html tcp/ip详解--拥塞控制 & 慢启动 快恢复 拥塞避免 http://blog.csdn.net/kinger0/article/details/48206999 TCP window Full http://blog.csdn.net/abccheng/article/details/50503457 名词解释 MTU:maximum transmission unit,最大

TCP 滑动窗口的简介(写得太好,转载过来的)

TCP 滑动窗口的简介 POSTED BY ADMIN ON AUG 1, 2012 IN FLOWS34ARTICLES | 0 COMMENTS TCP的滑动窗口主要有两个作用,一是提供TCP的可靠性,二是提供TCP的流控特性.同时滑动窗口机制还体现了TCP面向字节流的设计思路.TCP 段中窗口的相关字段. TCP的Window是一个16bit位字段,它代表的是窗口的字节容量,也就是TCP的标准窗口最大为2^16-1=65535个字节. 另外在TCP的选项字段中还包含了一个TCP窗口扩大因子

TCP/IP(十一)TCP滑动窗口和用赛控制

目前建立在TCP协议上的网络协议特别多,有telnet,ssh,有ftp,有http等等.这些协议又可以根据数据吞吐量来大致分成两大类:(1)交互数据类型,例如telnet,ssh,这种类型的协议在大多数情况下只是做小流量的数据交换,比如说按一下键盘,回显一些文字等等.(2)数据成块类型,例如ftp,这种类型的协议要求TCP能尽量的运载数据,把数据的吞吐量做到最大,并尽可能的提高效率.针对这两种情况,TCP给出了两种不同的策略来进行数据传输. 1.TCP的交互数据流 对于交互性要求比较高的应用,

TCP 滑动窗口和 拥塞窗口

转http://coolshell.cn/articles/11609.html 滑动窗口 -- 表征发送端和接收端的接收能力 拥塞窗口-- 表征中间设备的传输能力 TCP滑动窗口 需要说明一下,如果你不了解TCP的滑动窗口这个事,你等于不了解TCP协议.我们都知道,TCP必需要解决的可靠传输以及包乱序(reordering)的问题,所以,TCP必需要知道网络实际的数据处理带宽或是数据处理速度,这样才不会引起网络拥塞,导致丢包. 所以,TCP引入了一些技术和设计来做网络流控,Sliding Wi

TCP 滑动窗口(发送窗口和接收窗口)

TCP的滑动窗口主要有两个作用,一是提供TCP的可靠性,二是提供TCP的流控特性.同时滑动窗口机制还体现了TCP面向字节流的设计思路. TCP的Window是一个16bit位字段,它代表的是窗口的字节容量,也就是TCP的标准窗口最大为2^16-1=65535个字节. 另外在TCP的选项字段中还包含了一个TCP窗口扩大因子,option-kind为3,option-length为3个字节,option-data取值范围0-14.窗口扩大因子用来扩大TCP窗口,可把原来16bit的窗口,扩大为31b

滑动窗口的最大值问题

给出一个序列,要求找出滑动窗口中的最大值,比如: # 序列: 2, 6, 1, 5, 3, 9, 7, 4 # 窗口大小: 4 [2, 6, 1, 5], 3, 9, 7, 4 => 6 2, [6, 1, 5, 3], 9, 7, 4 => 6 2, 6, [1, 5, 3, 9], 7, 4 => 9 2, 6, 1, [5, 3, 9, 7], 4 => 9 2, 6, 1, 5, [3, 9, 7, 4] => 9 # 期望结果: [6, 6, 9, 9, 9] 并要

堆的相关题目—滑动窗口

1.数据流滑动窗口平均值 描述 给出一串整数流和窗口大小,计算滑动窗口中所有整数的平均值. 样例 样例1 : MovingAverage m = new MovingAverage(3); m.next(1) = 1 // 返回 1.00000 m.next(10) = (1 + 10) / 2 // 返回 5.50000 m.next(3) = (1 + 10 + 3) / 3 // 返回 4.66667 m.next(5) = (10 + 3 + 5) / 3 // 返回 6.00000来源