使用管道(PipeLine)和批量(Batch)操作

使用管道(PipeLine)和批量(Batch)操作

前段时间在做用户画像的时候,遇到了这样的一个问题,记录某一个商品的用户购买群,刚好这种需求就可以用到Redis中的Set,key作为productID,value

就是具体的customerid集合,后续的话,我就可以通过productid来查看该customerid是否买了此商品,如果购买了,就可以有相关的关联推荐,当然这只是系统中

的一个小业务条件,这时候我就可以用到SADD操作方法,代码如下:

        static void Main(string[] args)
        {
            ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.23.151:6379");

            var db = redis.GetDatabase();

            var productID = string.Format("productID_{0}", 1);

            for (int i = 0; i < 10; i++)
            {
                var customerID = i;

                db.SetAdd(productID, customerID);
            }
        }

一:问题

但是上面的这段代码很明显存在一个大问题,Redis本身就是基于tcp的一个Request/Response protocol模式,不信的话,可以用wireshark监视一下:

从图中可以看到,有很多次的192.168.23.1 => 192.168.23.151 之间的数据往返,从传输内容中大概也可以看到有一个叫做productid_xxx的前缀,

那如果有百万次局域网这样的round trip,那这个延迟性可想而知,肯定达不到我们预想的高性能。

二:解决方案【Batch】

刚好基于我们现有的业务,我可以定时的将批量的productid和customerid进行分组整合,然后用batch的形式插入到某一个具体的product的set中去,

接下来我可以把上面的代码改成类似下面这样:

 1         static void Main(string[] args)
 2         {
 3             ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.23.151:6379");
 4
 5             var db = redis.GetDatabase();
 6
 7             var productID = string.Format("productID_{0}", 1);
 8
 9             var list = new List<int>();
10
11
12             for (int i = 0; i < 10; i++)
13             {
14                 list.Add(i);
15             }
16
17             db.SetAdd(productID, list.Select(i => (RedisValue)i).ToArray());
18         }

从截图中传输的request,response可以看到,这次我们一次性提交过去,极大的较少了在网络传输方面带来的尴尬性。。

三:再次提出问题

  product维度的画像我们可以解决了,但是我们还有一个customerid的维度,也就是说我需要维护一个customerid为key的set集合,其中value的值为

该customerid的各种平均值,比如说“总交易次数”,“总交易金额”。。。等等这样的聚合信息,然后推送过来的是批量的customerid,也就是说你需要定时

维护一小嘬set集合,在这种情况下某一个set的批量操作就搞不定了。。。原始代码如下:

 1         static void Main(string[] args)
 2         {
 3             ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.23.151:6379");
 4
 5             var db = redis.GetDatabase();
 6
 7
 8             //批量过来的数据: customeridlist, ordertotalprice,具体业务逻辑省略
 9             var orderTotalPrice = 100;
10
11             var customerIDList = new List<int>();
12
13             for (int i = 0; i < 10; i++)
14             {
15                 customerIDList.Add(i);
16             }
17
18             //foreach更新每个redis 的set集合
19             foreach (var item in customerIDList)
20             {
21                 var customerID = string.Format("customerid_{0}", item);
22
23                 db.SetAdd(customerID, orderTotalPrice);
24             }
25         }

四:解决方案【PipeLine】

上面这种代码在生产上当然是行不通的,不过针对这种问题,redis早已经提出了相关的解决方案,那就是pipeline机制,原理还是一样,将命令集整合起来通过

一条request请求一起送过去,由redis内部fake出一个client做批量执行操作,代码如下:

 1         static void Main(string[] args)
 2         {
 3             ConnectionMultiplexer redis = ConnectionMultiplexer.Connect("192.168.23.151:6379");
 4
 5             var db = redis.GetDatabase();
 6
 7
 8             //批量过来的数据: customeridlist, ordertotalprice,具体业务逻辑省略
 9             var orderTotalPrice = 100;
10
11             var customerIDList = new List<int>();
12
13             for (int i = 0; i < 10; i++)
14             {
15                 customerIDList.Add(i);
16             }
17
18             var batch = db.CreateBatch();
19
20             foreach (var item in customerIDList)
21             {
22                 var customerID = string.Format("customerid_{0}", item);
23
24                 batch.SetAddAsync(customerID, orderTotalPrice);
25             }
26
27             batch.Execute();
28         }

然后,我们再看下面的wireshark截图,可以看到有很多的SADD这样的小命令,这就说明有很多命令是一起过去的,大大的提升了性能。

最后可以再看一下redis,数据也是有的,是不是很爽~~~

192.168.23.151:6379> keys *
 1) "customerid_0"
 2) "customerid_9"
 3) "customerid_1"
 4) "customerid_3"
 5) "customerid_8"
 6) "customerid_2"
 7) "customerid_7"
 8) "customerid_5"
 9) "customerid_6"
10) "customerid_4"

好了,先就说到这里了,希望本篇对你有帮助。

分类: redis

时间: 2024-11-04 18:43:57

使用管道(PipeLine)和批量(Batch)操作的相关文章

redis使用管道pipeline提升批量操作性能(php演示)

Redis是一个TCP服务器,支持请求/响应协议. 在Redis中,请求通过以下步骤完成: 客户端向服务器发送查询,并从套接字读取,通常以阻塞的方式,用于服务器响应. 服务器处理命令并将响应发送回客户端. 如果需要一次执行多个redis命令,以往的方式需要发送多次命令请求,有redis服务器依次执行,并返回结果, 为了解决此类问题,设计者设计出了redis管道命令: 客户端可以向服务器发送多个请求,而不必等待回复,并最终在一个步骤中读取回复,从而大大增加了协议性能 做了测试,使用pipeline

PreparedStatement批量(batch)插入数据

JDBC操作数据库的时候,需要一次性插入大量的数据的时候,如果每次只执行一条SQL语句,效率可能会比较低.这时可以使用batch操作,每次批量执行SQL语句,调高效率. public Boolean doCreateBatch(List<Emp> values) throws Exception { try { String sql = " INSERT INTO emp(empno,ename,job,hiredate,sal,comm) VALUES " + "

redis之管道——pipeline

redis 是 CS 模式,Redis客户端与Redis之间使用TCP协议进行连接,一个客户端可以通过一个socket连接发起多个请求命令,每个请求命令发出后client通常会阻塞并等待redis服务处理,redis处理完后请求命令后会将结果通过响应报文返回给client,因此当执行多条命令的时候都需要等待上一条命令执行完毕才能执行.如果一次性批量数据单次操作,会有网络延迟.而redis也是单线程的. 而Pipelining可以满足批量的操作,把多个命令连续的发送给Redis Server,然后

[Linux] 流 ( Stream )、管道 ( Pipeline ) 、Filter- 笔记

流 ( Stream ) 1. 流,是指可使用的数据元素一个序列. 2. 流,可以想象为是传送带上等待加工处理的物品,也可以想象为工厂流水线上的物品. 3. 流,可以是无限的数据. 4. 有一种功能,处理这一个流同时产生着另一个流.这种功能被成为 过滤 ( Filter ).使用管道 ( pipelie ) 将这些功能进行连接. Unix 管道 ( Pipeline ) 1. 管道连接着处理元素,一个处理元素的输出是下一个处理处理元素的输入. 2. 管道能加快数据处理速度. 2. Unix 下的

【转】批量复制操作(SqlBulkCopy)的出错处理:事务提交、回滚

原文地址:http://blog.csdn.net/westsource/article/details/6658109 默认情况下,批量复制操作作为独立的操作执行. 批量复制操作以非事务性方式发生,不可能使其回滚. 如果需要在出错时回滚全部批量复制或它的一部分,可以使用 SqlBulkCopy 托管的事务,在现有事务中执行批量复制操作,或者在 System.Transactions Transaction 中登记它. 由于不同批次在不同事务中执行,因此,如果在批量复制操作期间发生错误,则当前批

IO包中的其他类 打印流,序列流,操作对象,管道流,RandomAccessFile,操作基本数据类型,操作字节数组

打印流,序列流,操作对象,管道流,RandomAccessFile,操作基本数据类型,操作字节数组 一.打印流: 该流提供了打印方法,可以将各种数据类型的数据都原样打印. 字节打印流PrintStream构造函数可以接收的参数类型1.File对象 File2.字符串路径 String3.字节输出流 OutputStream 字符打印流PrintWriter(更常用)1.File对象 File2.字符串路径 String3.字节输出流 OutputStream4.字符输出流 Writer publ

各种数据库的批量插入操作_Oracle

最近工作中需要优化以前各种的Excel批量导入功能,目前将能优化的方面做个记录. 选用技术: 目前.Net可以访问Oracle常用的Dll,有三种: 微软自带的 System.Data.OracleClient Oracle 公司提供的 Oracle.DataAccess System.Data.OleDb  通过比对以后,使用第二种,其原因如下: 访问数据库方式 优点 缺点 System.Data.OracleClient 操作简单,无论32,64直接引用即可使用 微软在4.0之后不会对其更新

Xcode批量替换操作

原文链接: Xcode批量替换操作 简书主页:http://www.jianshu.com/users/37f2920f6848 Github主页:https://github.com/MajorLMJ iOS开发者公会-技术1群 QQ群号:87440292 iOS开发者公会-技术2群 QQ群号:232702419 iOS开发者公会-议事区   QQ群号:413102158

在WebGrid中做 批量删除操作

一般的MVC WebGrid都是在每一行中加入 Edit Detail Delete 这些Link 去对每条记录去单独操作. 稍微研究了一下总结一个 做批量删除的办法. 1. 首先是在WebGrid中加入一列CheckBox代码如下 grid.Column(header: " ", format: @<text><input class="check-box" id="chkbox" name="chkbox"

hibernate 批量处理操作

一: hibernate的批量插入操作: hibernate进行批量插入时候,调用jdbc的批量插入效率要远高于hibernate的批量插入: 1 通过hibernate调用jdbc API完成100000条数据插入: package com.tem.test; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.Iterator