订单并发5000的排队机制

最近有个需求,要求订单提交5000并发,具体实现思路如下,不足之处望指教。

1、利用Redis来缓存订单,用户查询订单状态从缓存中取

2、因为阿里云的消息服务可控性很好所以用阿里云的消息服务中的消息队列来进行订单处理

实验1:用户提交订单后直接存Redis并写入阿里云消息服务

问题:Redis写入5000并发没有问题,但是写入阿里云消息服务速度太慢

实验2:用户提交订单后直接存Redis并开多线程写入阿里云消息服务

问题:Redis写入还是没有问题,但是多线程一开CPU果断飙到100%并且线程到达极限有切换和等待问题,速度也达不到5000并发

实验3:用户提交订单后直接存Redis后将订单号加入Redis队列,然后用任务从队列中使用RPOPLPUSH命令取出并加入阿里云消息服务

问题:Redis队列有个队列安全的问题,因为在这个过程中,一个客户端可能在取出一个消息之后崩溃,而未处理完的消息也就因此丢失。虽然取出时使用 RPOPLPUSH 命令(或者它的阻塞版本 BRPOPLPUSH )可以解决这个问题:因为它不仅返回一个消息,同时还将这个消息添加到另一个备份列表当中,如果一切正常的话,当一个客户端完成某个消息的处理之后,可以用 LREM 命令将这个消息从备份表删除。但是要是没有处理的话那备份表里会一直存在,我们要想办法把多长时间未处理的消息自动放入到正式队列中重新排队。

实验4:Redis写入订单信息后将订单号加入Redis队列,然后用任务从队列中使用RPOPLPUSH命令取出并同时添加一个对应订单号及取出时间的监控表然后加入阿里云消息服务,最后使用任务从监控表中判断备份表中的多长时间未处理的数据重新加入正式队列排队

问题:这样应该已经很好了,但是备份表和监控表的存取删除不是事物性的,极小概率会出问题,不过最多也就是重新排队,最后订单处理时肯定是需要判断这个订单是否已经处理过的,所以最后结果就是这样,下面上一些代码吧

代码1:根据Redis 的 Incr生成订单号,Redis客户端使用的是 StackExchange.Redis。当然也可以根据Twitter 雪花算法来生成订单号

        /// <summary>
        /// 生成规则:时间+redis自增
        ///缺点:可以根据redis自增知道每日订单量
        /// </summary>
        public static string CreateByRedis()
        {
            //Redis服务器链接失败会报异常
            try
            {
                //单例获取Redis链接
                var redis = RedisConnection.GetRedisConn;
                var db = redis.GetDatabase();
                //根据当前日期获取Redis订单号自增的key
                var incrkey = "Incr_" + DateTime.Now.ToString("yyyyMMdd");
                if (!db.KeyExists(incrkey))
                {
                    db.StringSet(incrkey, 1, new TimeSpan(24, 0, 0));
                }

                var orderDateTime = DateTime.Now.ToString("yyMMdd"); //因为我们的Redis Incr是按照天数来的 所以原则上用天的区分就好了
                var orderIncr = db.StringIncrement(incrkey).ToString("0000000");
                return orderDateTime + orderIncr;
            }
            catch (Exception)
            {
                //TODO 这里可以写个日志
                return "";
            }
        } 

代码2:订单信息加入缓存并将订单号加入队列

            try
            {
                //① 生成订单号
                var orderNumber = OrderNumberUtil.CreateByRedis();
                //② 链接Redis数据库
                var redisconn = RedisConnection.GetRedisConn;
                var redisdb = redisconn.GetDatabase();
                //③ 编辑参数字典
                var redisDictionary = new Dictionary<string, string>
                {
                    {"ordernumber", orderNumber ?? ""},
                    {"customerid", model.customerid ?? ""},
                    {"customerorderno", model.customerorderno ?? ""},
                    {"productid", model.productid ?? ""},
                    {"buynum", model.buynum ?? ""}
                };
                var redisJson = (new JavaScriptSerializer()).Serialize(redisDictionary);
                redisdb.StringSet(orderNumber, redisJson, new TimeSpan(0, 20, 0)); //Todo 这里暂时只存20分钟

                //加入队列
                redisdb.ListRightPush("Queue_OrderInfo", orderNumber);
            }
            catch (RedisException re)
            {
                //Todo Redis异常,如Redis服务器挂了链接不上,写日志
            }
            catch (Exception ex)
            {
                //Todo 其它异常,写日志
            }

3、从Redis队列中取出添加副本和时间监控 并加入消息队列,如果加入成功则从取出副本和监控中删除中删除

/// <summary>
        ///从Redis队列中取出并加入阿里云消息队列
        /// </summary>
        private static void PopQueue()
        {
            try
            {
                //链接Redis数据库
                var redisconn = RedisConnection.GetRedisConn;
                var redisdb = redisconn.GetDatabase();
                while (true)
                {
                    Thread.Sleep(1000);//1秒钟检查一次
                    if (redisdb.ListLength("Queue_OrderInfo") > 0)
                    {
                        //为了线程安全 这里获取队列的时候还写一个hash副本
                        var orderNumber = redisdb.ListRightPopLeftPush("Queue_OrderInfo", "Destination_OrderInfo");
                        //为Hash副本创建一张Hash监控表来存时间戳
                        redisdb.HashSet("Destination_Time", orderNumber, DateTimeUtil.GetTimeStamp());
                        try
                        {
                            //加入阿里云消息队列去了
                            // ① 获取Queue的实例
                            var nativeQueue = QueueConnection.GetQueueConn.GetNativeQueue(QueueConnection._queueName);
                            var sendMessageRequest = new SendMessageRequest(orderNumber);//只把orderNumber存到消息队列,如果要处理就根据这个key去缓存里查找
                            // ③ 发送消息
                            // 3.1 也可以直接使用nativeQueue.SendMessage("MessageBody");
                            nativeQueue.SendMessage(sendMessageRequest);
                        }
                        catch (MNSException me)//如果消息队列处理异常就将副本数据重新加入正式队列并删除副本
                        {
                            //加入队列正式队列
                            redisdb.ListRightPush("Queue_OrderInfo", orderNumber);
                            //移除副本中的记录
                            redisdb.ListRemove("Destination_OrderInfo", orderNumber);
                            //移除Hash中的字段
                            redisdb.HashDelete("Destination_Time", orderNumber);
                        }
                        //处理完后将副本及监控表删除 证明这条数据已经处理完了
                        //移除副本中的记录
                        redisdb.ListRemove("Destination_OrderInfo", orderNumber);
                        //移除Hash中的字段
                        redisdb.HashDelete("Destination_Time", orderNumber);

                        Console.WriteLine("订单号 【" + orderNumber + "】 已加入阿里云消息队列");
                    }
                }
            }
            catch (RedisException re)
            {
                //Todo Redis异常,如Redis服务器挂了链接不上,写日志
            }
            catch (Exception ex)
            {
                //Todo 写日志
            }
        }

4、检查副本中的数据 超过时间未处理自动添加到正式队列

        /// <summary>
        /// 检查副本中的数据 超过时间未处理自动添加到正式队列
        /// </summary>
        public static void CheckDestination_OrderInfo()
        {
            var redisconn = RedisConnection.GetRedisConn;
            var redisdb = redisconn.GetDatabase();
            var hashAll = redisdb.HashGetAll("Destination_Time");
            while (true)
            {
                Thread.Sleep(3000);//3秒钟检查一次
                if (!hashAll.Any(s => DateTimeUtil.GetTimeStamp() - Convert.ToDouble(s.Value) > 60 * 3))
                    return;

                foreach (var item in hashAll.Where(item => DateTimeUtil.GetTimeStamp() - Convert.ToDouble(item.Value) > 60 * 3))
                {
                    //加入队列正式队列
                    redisdb.ListRightPush("Queue_OrderInfo", item.Name);
                    //移除副本中的记录
                    redisdb.ListRemove("Destination_OrderInfo", item.Name);
                    //移除Hash中的字段
                    redisdb.HashDelete("Destination_Time", item.Name);

                    Console.WriteLine("取出后有未处理完成的订单 [" + item.Name + "] 重新加入队列");
                }
            }
        }

5、开线程来处理  

        static void Main(string[] args)
        {
            //开启一个线程用来检查队列中取出数据是否已经全部处理
            var checkThread = new Thread(CheckDestination_OrderInfo);
            checkThread.Start();

            //开启多线程来加入阿里云消息队列
            var threads = new Thread[2];
            for (int i = 0; i < threads.Length; i++)
            {
                threads[i] = new Thread(PopQueue);
                threads[i].Start();
            }
            Console.ReadLine();
        }

刚弄完的感觉还行

时间: 2024-10-29 19:06:48

订单并发5000的排队机制的相关文章

Java开源生鲜电商平台-OMS订单系统中并发问题和锁机制的探讨与解决方案(源码可下载)

Java开源生鲜电商平台-OMS订单系统中并发问题和锁机制的探讨与解决方案(源码可下载) 说明:Java开源生鲜电商中OMS订单系统中并发问题和锁机制的探讨与解决方案: 问题由来     假设在一个订单系统中(以火车票订单系统为例),用户A,用户B都要预定从成都到北京的火车票,A.B在不同的售票窗口均同时查询到了某车厢卧铺中.下铺位有空位.用户A正在犹豫订中铺还是下铺,这时用户B果断订购了下铺.当用户A决定订下铺时,系统提示下铺已经被预订,请重新选择铺位.在这个系统场景中,我们来探讨一下,火车票

Mysql事务并发问题,锁机制

Mysql事务并发问题,锁机制 1.什么是事务 事务是一条或多条数据库操作语句的组合,具备ACID,4个特点. 原子性:要不全部成功,要不全部撤销 隔离性:事务之间相互独立,互不干扰 一致性:数据库正确地改变状态后,数据库的一致性约束没有被破坏 持久性:事务的提交结果,将持久保存在数据库中 2.事务并发会产生什么问题 1)第一类丢失更新:在没有事务隔离的情况下,两个事务都同时更新一行数据,但是第二个事务却中途失败退出, 导致对数据的两个修改都失效了. 例如: 张三的工资为5000,事务A中获取工

MySql 事务,并发问题 ,锁机制 &lt;转&gt;

Mysql事务,并发问题,锁机制 1.什么是事务 事务是一条或多条数据库操作语句的组合,具备ACID,4个特点. 原子性:要不全部成功,要不全部撤销 隔离性:事务之间相互独立,互不干扰 一致性:数据库正确地改变状态后,数据库的一致性约束没有被破坏 持久性:事务的提交结果,将持久保存在数据库中 2.事务并发会产生什么问题 1)第一类丢失更新:在没有事务隔离的情况下,两个事务都同时更新一行数据,但是第二个事务却中途失败退出, 导致对数据的两个修改都失效了. 例如: 张三的工资为5000,事务A中获取

探索 ConcurrentHashMap 高并发性的实现机制

简介 ConcurrentHashMap 是 util.concurrent 包的重要成员.本文将结合 Java 内存模型,分析 JDK 源代码,探索 ConcurrentHashMap 高并发的具体实现机制. 由于 ConcurrentHashMap 的源代码实现依赖于 Java 内存模型,所以阅读本文需要读者了解 Java 内存模型.同时,ConcurrentHashMap 的源代码会涉及到散列算法和链表数据结构,所以,读者需要对散列算法和基于链表的数据结构有所了解. Java 内存模型 由

Sqlite3 排队机制的应用

sqlite 是单线程的,在node-webkit软件中执行事务时再执行别的操作会失败.因此创建排队机制,所有执行等待事务执行完成再执行.应用了"sqlite3-transactions"库,方便快速的处理.在sqlite中事务执行前设置"PRAGMA synchronous=OFF;"会加快执行速度,特别是对大量数据批量操作的事务中.缺点是数据安全性差一点.如果在这个执行过程中程序强制退出或电脑断电等会造成数据丢失或损坏.另外如果表之间没有关联性,或没有关联查询的

SQL Server 锁的排队机制

1.新建一个表,插入1010000数据: create table test(id int identity(1,1) ,name varchar(600)) go insert into test values(replicate('a',600)); go 1010000 create index idx_test_id on test(id) 2.新开一个会话(A),运行如下语句,由于没有提交,所以会阻塞其他药修改相同数据的会话: begin tran update test set na

【Java 并发】Executor框架机制与线程池配置使用

[Java 并发]Executor框架机制与线程池配置使用 一,Executor框架Executor框架便是Java 5中引入的,其内部使用了线程池机制,在java.util.cocurrent 包下,通过该框架来控制线程的启动.执行和关闭,可以简化并发编程的操作.因此,在Java 5之后,通过Executor来启动线程比使用Thread的start方法更好,更易管理,效率更好(用线程池实现,节约开销). Executor框架主要包括:Executor,Executors,ExecutorSer

【巨杉数据库SequoiaDB】巨杉 Tech | 并发性与锁机制解析与实践

01 概述 数据库是一个多用户使用的共享资源.当多个用户并发地存取数据时,在数据库中就会产生多个事务同时存取同一数据的情况.若对并发操作不加控制就可能会读取和存储不正确的数据,破坏数据库的一致性.加锁是实现数据库并发控制的一个非常重要的技术.当事务在对某个数据对象进行操作前,先向系统发出请求,对其加锁.加锁后事务就对该数据对象有了一定的控制,在该事务释放锁之前,其他的事务不能对此数据对象进行更新操作. OLTP 场景下通常要求具有很高的并发性.并发事务实际上取决于资源的使用状况,原则上应尽量减少

论火车票订单系统中并发问题和锁机制的探讨

问题由来 假设在一个订单系统中(以火车票订单系统为例),用户A,用户B都要预定从成都到北京的火车票,A.B在不同的售票窗口均同时查询到了某车厢卧铺中.下铺 位有空位.用户A正在犹豫订中铺还是下铺,这时用户B果断订购了下铺.当用户A决定订下铺时,系统提示下铺已经被预订,请重新选择铺位.在这个系统场景 中,我们来探讨一下,火车票系统是怎样处理并发事件以及怎么利用锁机制来避免重复订票的. 设想的方案 方案1: 为了避免重复订票,大部分人会想到在做订票操作前,去数据库查询该铺位是否已经被预订,假设“铺位