最近有个需求,要求订单提交5000并发,具体实现思路如下,不足之处望指教。
1、利用Redis来缓存订单,用户查询订单状态从缓存中取
2、因为阿里云的消息服务可控性很好所以用阿里云的消息服务中的消息队列来进行订单处理
实验1:用户提交订单后直接存Redis并写入阿里云消息服务
问题:Redis写入5000并发没有问题,但是写入阿里云消息服务速度太慢
实验2:用户提交订单后直接存Redis并开多线程写入阿里云消息服务
问题:Redis写入还是没有问题,但是多线程一开CPU果断飙到100%并且线程到达极限有切换和等待问题,速度也达不到5000并发
实验3:用户提交订单后直接存Redis后将订单号加入Redis队列,然后用任务从队列中使用RPOPLPUSH命令取出并加入阿里云消息服务
问题:Redis队列有个队列安全的问题,因为在这个过程中,一个客户端可能在取出一个消息之后崩溃,而未处理完的消息也就因此丢失。虽然取出时使用 RPOPLPUSH 命令(或者它的阻塞版本 BRPOPLPUSH )可以解决这个问题:因为它不仅返回一个消息,同时还将这个消息添加到另一个备份列表当中,如果一切正常的话,当一个客户端完成某个消息的处理之后,可以用 LREM 命令将这个消息从备份表删除。但是要是没有处理的话那备份表里会一直存在,我们要想办法把多长时间未处理的消息自动放入到正式队列中重新排队。
实验4:Redis写入订单信息后将订单号加入Redis队列,然后用任务从队列中使用RPOPLPUSH命令取出并同时添加一个对应订单号及取出时间的监控表然后加入阿里云消息服务,最后使用任务从监控表中判断备份表中的多长时间未处理的数据重新加入正式队列排队
问题:这样应该已经很好了,但是备份表和监控表的存取删除不是事物性的,极小概率会出问题,不过最多也就是重新排队,最后订单处理时肯定是需要判断这个订单是否已经处理过的,所以最后结果就是这样,下面上一些代码吧
代码1:根据Redis 的 Incr生成订单号,Redis客户端使用的是 StackExchange.Redis。当然也可以根据Twitter 雪花算法来生成订单号
/// <summary> /// 生成规则:时间+redis自增 ///缺点:可以根据redis自增知道每日订单量 /// </summary> public static string CreateByRedis() { //Redis服务器链接失败会报异常 try { //单例获取Redis链接 var redis = RedisConnection.GetRedisConn; var db = redis.GetDatabase(); //根据当前日期获取Redis订单号自增的key var incrkey = "Incr_" + DateTime.Now.ToString("yyyyMMdd"); if (!db.KeyExists(incrkey)) { db.StringSet(incrkey, 1, new TimeSpan(24, 0, 0)); } var orderDateTime = DateTime.Now.ToString("yyMMdd"); //因为我们的Redis Incr是按照天数来的 所以原则上用天的区分就好了 var orderIncr = db.StringIncrement(incrkey).ToString("0000000"); return orderDateTime + orderIncr; } catch (Exception) { //TODO 这里可以写个日志 return ""; } }
代码2:订单信息加入缓存并将订单号加入队列
try { //① 生成订单号 var orderNumber = OrderNumberUtil.CreateByRedis(); //② 链接Redis数据库 var redisconn = RedisConnection.GetRedisConn; var redisdb = redisconn.GetDatabase(); //③ 编辑参数字典 var redisDictionary = new Dictionary<string, string> { {"ordernumber", orderNumber ?? ""}, {"customerid", model.customerid ?? ""}, {"customerorderno", model.customerorderno ?? ""}, {"productid", model.productid ?? ""}, {"buynum", model.buynum ?? ""} }; var redisJson = (new JavaScriptSerializer()).Serialize(redisDictionary); redisdb.StringSet(orderNumber, redisJson, new TimeSpan(0, 20, 0)); //Todo 这里暂时只存20分钟 //加入队列 redisdb.ListRightPush("Queue_OrderInfo", orderNumber); } catch (RedisException re) { //Todo Redis异常,如Redis服务器挂了链接不上,写日志 } catch (Exception ex) { //Todo 其它异常,写日志 }
3、从Redis队列中取出添加副本和时间监控 并加入消息队列,如果加入成功则从取出副本和监控中删除中删除
/// <summary> ///从Redis队列中取出并加入阿里云消息队列 /// </summary> private static void PopQueue() { try { //链接Redis数据库 var redisconn = RedisConnection.GetRedisConn; var redisdb = redisconn.GetDatabase(); while (true) { Thread.Sleep(1000);//1秒钟检查一次 if (redisdb.ListLength("Queue_OrderInfo") > 0) { //为了线程安全 这里获取队列的时候还写一个hash副本 var orderNumber = redisdb.ListRightPopLeftPush("Queue_OrderInfo", "Destination_OrderInfo"); //为Hash副本创建一张Hash监控表来存时间戳 redisdb.HashSet("Destination_Time", orderNumber, DateTimeUtil.GetTimeStamp()); try { //加入阿里云消息队列去了 // ① 获取Queue的实例 var nativeQueue = QueueConnection.GetQueueConn.GetNativeQueue(QueueConnection._queueName); var sendMessageRequest = new SendMessageRequest(orderNumber);//只把orderNumber存到消息队列,如果要处理就根据这个key去缓存里查找 // ③ 发送消息 // 3.1 也可以直接使用nativeQueue.SendMessage("MessageBody"); nativeQueue.SendMessage(sendMessageRequest); } catch (MNSException me)//如果消息队列处理异常就将副本数据重新加入正式队列并删除副本 { //加入队列正式队列 redisdb.ListRightPush("Queue_OrderInfo", orderNumber); //移除副本中的记录 redisdb.ListRemove("Destination_OrderInfo", orderNumber); //移除Hash中的字段 redisdb.HashDelete("Destination_Time", orderNumber); } //处理完后将副本及监控表删除 证明这条数据已经处理完了 //移除副本中的记录 redisdb.ListRemove("Destination_OrderInfo", orderNumber); //移除Hash中的字段 redisdb.HashDelete("Destination_Time", orderNumber); Console.WriteLine("订单号 【" + orderNumber + "】 已加入阿里云消息队列"); } } } catch (RedisException re) { //Todo Redis异常,如Redis服务器挂了链接不上,写日志 } catch (Exception ex) { //Todo 写日志 } }
4、检查副本中的数据 超过时间未处理自动添加到正式队列
/// <summary> /// 检查副本中的数据 超过时间未处理自动添加到正式队列 /// </summary> public static void CheckDestination_OrderInfo() { var redisconn = RedisConnection.GetRedisConn; var redisdb = redisconn.GetDatabase(); var hashAll = redisdb.HashGetAll("Destination_Time"); while (true) { Thread.Sleep(3000);//3秒钟检查一次 if (!hashAll.Any(s => DateTimeUtil.GetTimeStamp() - Convert.ToDouble(s.Value) > 60 * 3)) return; foreach (var item in hashAll.Where(item => DateTimeUtil.GetTimeStamp() - Convert.ToDouble(item.Value) > 60 * 3)) { //加入队列正式队列 redisdb.ListRightPush("Queue_OrderInfo", item.Name); //移除副本中的记录 redisdb.ListRemove("Destination_OrderInfo", item.Name); //移除Hash中的字段 redisdb.HashDelete("Destination_Time", item.Name); Console.WriteLine("取出后有未处理完成的订单 [" + item.Name + "] 重新加入队列"); } } }
5、开线程来处理
static void Main(string[] args) { //开启一个线程用来检查队列中取出数据是否已经全部处理 var checkThread = new Thread(CheckDestination_OrderInfo); checkThread.Start(); //开启多线程来加入阿里云消息队列 var threads = new Thread[2]; for (int i = 0; i < threads.Length; i++) { threads[i] = new Thread(PopQueue); threads[i].Start(); } Console.ReadLine(); }
刚弄完的感觉还行