通过请求队列的方式来缓解高并发抢购(初探)
一、背景
在移动互联网高速发展的时代,各种电商平台的抢购业务变得越来越火爆,抢购业务所带来的高并发问题值得我们去探索,主要涉及的方面包括处理和响应速度、数据的一致性等。抢购开放的一瞬间,可能有成千上万的下订单请求发送到服务器去处理,如果只是简单的请求处理响应方式,不做任何处理,导致的结果很可能是很多客户很长时间得不到响应,根本不知道自己是否下订单成功,或者下订单的数量已经超过了商品的数量,这就导致了超发的问题。
二、设计思路
1、用户在下订单之前当然是先查询到这个商品,在这个查询的时候,将数据库中商品的剩余数量存到redis中;
2、服务器在一瞬间接到成千上万的下订单请求,在控制层没有直接处理请求数据,而是先根据redis中商品的剩余数量来判断,如果>0,就将请求放到请求队列中,否则直接响
应客户端“卖完了”;
3、考虑到数据的一致性,队列采用的是线程安全的队列LinkedBlockingQueue,
三、实现步骤
说明:用java语言,springmvc框架+redis实现
1、准备工作,查询商品信息,将剩余数量同步到redis中
Jedis jedis = jedisPool.getResource();
BuyGood good=buyGoodService.getById(good_id);
jedis.set("residue"+good_id, good.getResidue()+"");
jedisPool.returnResource(jedis);
2、下订单的方法,下面直接展示代码,包括请求对象,控制层方法,请求处理线程类的具体实现
2.1 请求封装对象
public class BuyRequest {
private int good_id;//商品id
private int user_id;//用户ID
private int order_id;//订单id
private BuyOrders buyOrders;//订单信息
private int response_status;//0:未处理;1:正常;2:异常
public BuyOrders getBuyOrders() {
return buyOrders;
}
public void setBuyOrders(BuyOrders buyOrders) {
this.buyOrders = buyOrders;
}
public int getGood_id() {
return good_id;
}
public void setGood_id(int good_id) {
this.good_id = good_id;
}
public int getOrder_id() {
return order_id;
}
public void setOrder_id(int order_id) {
this.order_id = order_id;
}
public int getResponse_status() {
return response_status;
}
public void setResponse_status(int response_status) {
this.response_status = response_status;
}
public int getUser_id() {
return user_id;
}
public void setUser_id(int user_id) {
this.user_id = user_id;
}
}
2.2 处理请求的controller
@Controller
@RequestMapping("/buy")
public class BuyController {
private static BuyQueue<BuyRequest> buyqueue =null;//线程安全的请求队列
@RequestMapping("/addOrders.do")
@ResponseBody
public Object addOrders(BuyRequest buyrequest){
Map<String, Object> results = new HashMap<>();
Jedis jedis = jedisPool.getResource();
try {
//下订单之前,先获取商品的剩余数量
int residue = Integer.valueOf(jedis.get("residue"+buyrequest.getGood_id()));
if(residue<1){//如果剩余数量不足,直接响应客户端“卖完了”
results.put("msg", "卖完了");
results.put("done", false);
BaseLog.info("addOrders results="+JSON.toJSONString(results));
return results;
}
//如果还有剩余商品,就准备将请求放到请求队列中
if(buyqueue==null){//第一次初始化请求队列,队列的容量为当前的商品剩余数量
buyqueue=new BuyQueue<BuyRequest>(residue);
}
if(buyqueue.remainingCapacity()>0){//当队列的可用容量大于0时,将请求放到请求队列中
buyqueue.put(buyrequest);
}else{//当请求队列已满,本次请求不能处理,直接响应客户端提示请求队列已满
results.put("msg", "抢购队列已满,请稍候重试!");
results.put("done", false);
return results;
}
if(!DealQueueThread.excute){//如果线程类的当前执行标志为未执行,即空闲状态,通过线程池启动线程
DealQueueThread dealQueue = new DealQueueThread(buyqueue);
ThreadPoolUtil.pool.execute(dealQueue);
BaseLog.info("Thread.activeCount()="+Thread.activeCount());
}
//将请求放到请求队列中之后,等待该请求处理完成
while(buyrequest.getResponse_status()==0){
//请求没有处理
}
BaseLog.info("wait_out..."+buyrequest.getResponse_status());
//response_status;//0:未处理;1:正常;2:异常
if(buyrequest.getResponse_status()==1){//请求正常处理完成,下单成功
results.put("orders", buyrequest.getBuyOrders());
results.put("done", true);
results.put("msg", "下订单成功");
}else if(buyrequest.getResponse_status()==3){//请求正常处理完成,没抢到
results.put("msg", "没有了");
results.put("done", false);
}
} catch (Exception e) {
results.put("done", false);
results.put("msg", "请求无效");
BaseLog.info("addOrders results="+JSON.toJSONString(results));
BaseLog.error("addOrders",e);
}finally{
jedisPool.returnResource(jedis);
}
return results;
}
}
2.3 处理请求的线程类,线程类中涉及到的service代码就不必写出来了,你懂的
@Component
public class DealQueueThread implements Runnable {
private static DealQueueThread dealQueueThread;
@Autowired
BuyGoodService buyGoodService;
@Autowired
BuyOrdersService buyOrdersService;
@Autowired
JedisPool jedisPool;
private Jedis jedis;
private BuyQueue<BuyRequest> buyqueue;
public static boolean excute = false;//线程的默认执行标志为未执行,即空闲状态
public DealQueueThread() {
}
public DealQueueThread(BuyQueue<BuyRequest> buyqueue) {
this.buyqueue = buyqueue;
jedis = dealQueueThread.jedisPool.getResource();
}
@PostConstruct
public void init() {
dealQueueThread = this;
dealQueueThread.buyGoodService = this.buyGoodService;
dealQueueThread.buyOrdersService = this.buyOrdersService;
dealQueueThread.jedisPool = this.jedisPool;
}
@Override
public void run() {
try {
excute = true;//修改线程的默认执行标志为执行状态
//开始处理请求队列中的请求,按照队列的FIFO的规则,先处理先放入到队列中的请求
while (buyqueue != null && buyqueue.size() > 0) {
BuyRequest buyreq = buyqueue.take();//取出队列中的请求
dealWithQueue(buyreq);//处理请求
}
} catch (InterruptedException e) {
BaseLog.error("DealQueueThread:", e);
} finally {
excute = false;
}
}
public synchronized void dealWithQueue(BuyRequest buyreq) {
try {
//为了尽量确保数据的一致性,处理之前先从redis中获取当前抢购商品的剩余数量
int residue = Integer.valueOf(jedis.get("residue" + buyreq.getGood_id()));
if (residue < 1) {//如果没有剩余商品,就直接返回
buyreq.setResponse_status(3);
return;
}
//如果有剩余商品,先在redis中将剩余数量减一,再开始下订单
jedis.decr("residue" + buyreq.getGood_id());
//将数据库中将剩余数量减一,这一步处理可以在队列处理完成之后一次性更新剩余数量
dealQueueThread.buyGoodService.minusResidue(buyreq.getGood_id());
//处理请求,下订单
BuyOrders bo = new BuyOrders();
bo.setGood_id(buyreq.getGood_id());
bo.setUser_id(buyreq.getUser_id());
int order_id = dealQueueThread.buyOrdersService.insert(bo);
BuyOrders orders = dealQueueThread.buyOrdersService.getById(order_id);
buyreq.setOrder_id(order_id);//订单id
buyreq.setBuyOrders(orders);//订单信息
buyreq.setResponse_status(1);//处理完成状态
} catch (Exception e) {
buyreq.setResponse_status(2);//异常状态
BaseLog.error("DealQueueThread dealWithQueue:", e);
}
}
}
经过测试在并发量超过五百的时候会出现超发现象,程序还有待完善,欢迎大家给出自己的见解,谢谢!