上一篇<<并发扣减库存方案一>>中使用了基于CAS和幂等思想,
使用当前值和期望值比较以及版本号变更比较来完成并发场景下
控制库存不被少扣或者扣减成负值,
此篇幅使用另外一个思路解决并发修改库存的问题,
利用redis单线程阻塞操作特性,以及redis执行lua脚本原子性,
来完成控制并发场景下库存扣减问题;首先简单
介绍一下此方案用到的一些redis和lua的特性:
1.redis阻塞操作
代码语言:javascript复制鉴于redis自身独有特性,我们知道redis执行命令是阻塞操作
(单进程单线程),举例说明:A服务向redis发送了两条指令,
redis.call('get',age)和redis.call('incr',age),
那么redis-server在接收到命令的时候会按照顺序先后执行;
A和B服务分别向redis-server发送了一条指令
redsi.call('get',age),那么redis-server也会按照
接收到指令的顺序阻塞执行
2.lua原子特性
代码语言:javascript复制redis2.6版本以后,内置了lua脚本的运行环境,默认用
eval命令执行;我之所以用到redis执行lua脚本来
批量执行redis指令操作,最大原因是lua脚本被redis
执行的时候,能够
保证原子特性(也就是中途不会存在类似java多线程下
的cpu时间被剥夺导致挂起),这样的话就能保证数据
变更的可靠性和一致性.
背景
代码语言:javascript复制常用的扣减操作是service层操作数据库执行update
Stock set stock = stock - ? where id = ?
存在问题
代码语言:javascript复制单线程一切都运行正常,但是多线程情况下出现数据
不一致问题,两个线程在同一个stock基础上进行
不同的扣减,导致后者覆盖前者
案例分析
代码语言:javascript复制两个线程A和B同时查到库存为5,A执行操作update
Stock set stock = stock - 2 where id = 1,
B执行操作update Stock set stock = stock-3
where id = 1,
而操作的结果可能是2或者3,而不是我们期望的0;
因为A B两个线程查询的时候stock=5,都是在此
数字上扣减,导致其中一个结果被覆盖,拿到
错误的扣减结果,还有一种情况是,设计往往有容错
机制,例如“重试”,如果通过扣减接口来修改库存,
在重试时,可能会得到错误的数据,导致重复扣减;
重试导致错误的根本原因,是因为“扣减”操作是一个
非幂等的操作,不能够重复执行,改成设置操作则
不会有这个问题
解决方案
代码语言:javascript复制把库存放到redis缓存中,在多线程场景下,让各个服务
去检查和修改缓存中
的库存量,如果redis中修改成功,
再把数据变更落地,把库存扣减持久化到数据库层;
根据上述redis和lua特性,
每次请求调用过来扣减库存,
都是去使用redis执行lua脚本去做原子操作检查和
修改缓存中的库存;具体看代码分析.
以下是代码和并发测试
1.数据库表依赖上一篇
2.编写接口及实现
12345678910111213141516171819202122232425262728293031323334353637 | @Overridepublic StockResp deductStockV5(StockReq req) { StockResp resp = new StockResp(); long stockId = req.getId(); Jedis jedis = null; long result = 0; Calendar cal = Calendar.getInstance(); try { //查询实际库存 Stock stock = this.stockDao.queryByPrimaryKey(req.getId()); if(null == stock) { return resp.error("不存在"); } //获取redis连接 jedis = jedisPool.getResource(); List<String> keys = new ArrayList<>(1); keys.add(CACHE_STOCK_WAITING_FOR_DEDUCT_PREFIX stockId); List<String> args = new ArrayList<>(1); args.add(req.getNeedDeduct() ""); args.add(stock.getStock() ""); String deductLuaScript = **buildLuaDeductScript()**; //执行lua脚本 result = Long.parseLong(jedis.eval(deductLuaScript, keys, args) ""); if(result < 0) {//库存不足 return resp.error("库存不足"); } stock.setStock((int)result); stock.setUpdateTime(cal.getTime()); this.stockDao.update(stock); } catch (Exception e) { //获取连接失败,或者其他异常,抛出去,用来终止程序 throw new ServiceRuntimeException("异常",e); } finally { IOUtils.closeQuietly(jedis); } return resp;} |
---|
以下是lua脚本
1234567891011121314151617181920 | private String buildLuaDeductScript() { StringBuilder lua = new StringBuilder(); lua.append("nlocal c"); //KEYS[1]是keys的第一个参数, //ARGV[1]是args的第一个额参数 //setnx是如果是不存在该key时才会set成功 lua.append("nredis.call('setnx',KEYS[1],ARGV[1])"); //获取缓存中库存 lua.append("nc = redis.call('get',KEYS[1])"); //判断缓存库存是否小于需要扣减的数量 lua.append("nif tonumber(c) < tonumber(ARGV[2]) then"); //c=-1表示扣减失败 lua.append("nc = -1"); lua.append("nelse"); //库存足额情况下,把库存扣减 lua.append("nc = redis.call('decrby',KEYS[1],ARGV[1])"); lua.append("nend"); lua.append("nreturn c;"); return lua.toString();} |
---|
3.单元测试
代码语言:javascript复制单个线程操作
12345678910111213141516171819202122232425262728293031323334 | @Testpublic void testDeductStockConcurrentV5() { int threadNum = 20; StockReq req = new StockReq(); req.setId(2L); req.setNeedDeduct(3); BlockingQueue<Future<StockResp>> queue = new ArrayBlockingQueue<>(threadNum); Future<StockResp> stockTask = null; for (int i = 0; i < threadNum; i ) { stockTask = executors.submit(new DeductStockV5Task(req, stockService)); try { queue.put(stockTask); } catch (InterruptedException e) { LOGGER.error("", e); } } List<StockResp> list = new ArrayList<>(queue.size()); while (!queue.isEmpty()) { try { list.add(queue.take().get(10, TimeUnit.SECONDS)); } catch (Exception e) { LOGGER.error("", e); } } if (!list.isEmpty()) { for (int i = 0, size = list.size(); i < size; i ) { LOGGER.info("响应:{}", JSON.toJSONString(list.get(i))); } } // this.stockService.deductStock(req);} |
---|
并发操作
1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 | /*** 测试并发扣减库存操作线程V5** @author Typhoon* @date 2017-07-12 16:44 Tuesday* @since V1.3.0*/class DeductStockV5Task implements Callable<StockResp> { private StockReq req; private StockService stockService; public DeductStockV5Task(StockReq req, StockService stockService) { super(); this.req = req; this.stockService = stockService; } public StockReq getReq() { return req; } public void setReq(StockReq req) { this.req = req; } public StockService getStockService() { return stockService; } public void setStockService(StockService stockService) { this.stockService = stockService; } @Override public StockResp call() throws Exception { LOGGER.info("当前线程:{}", Thread.currentThread().getName()); try { Thread.sleep(1 * 1000);// 休眠3秒 StockResp resp = this.stockService.deductStockV5(req); LOGGER.info("当前线程:{},扣减库存响应:{}", Thread.currentThread().getName(), JSON.toJSONString(resp)); return resp; } catch (Exception e) { LOGGER.error("扣减操作异常", e); return null; } }} |
---|
用20个线程跑了多次,状态良好,
欢迎各位大佬指点和拍砖
......