之前我写了如何实现分布式锁和分布式限流, 这次我们继续在这块功能上推进, 实现一个秒杀系统, 采用 spring boot 2.x + mybatis+ Redis + swagger2 + lombok 实现.
先说说基本流程, 就是提供一个秒杀接口, 然后针对秒杀接口进行限流, 限流的方式目前我实现了两种, 上次实现的是累计计数方式, 这次还有这个功能, 并且我增加了令牌桶方式的 lua 脚本进行限流.
然后不被限流的数据进来之后, 加一把分布式锁, 获取分布式锁之后就可以对数据库进行操作了. 直接操作数据库的方式可以, 但是速度会比较慢, 咱们直接通过一个初始化接口, 将库存数据放到缓存中, 然后对缓存中的数据进行操作. 写库的操作采用异步方式, 实现的方式就是将操作好的数据放入到队列中, 然后由另一个线程对队列进行消费. 当然, 也可以将数据直接写入 mq 中, 由另一个线程进行消费, 这样也更稳妥.
好了, 看一下项目的基本结构:
看一下入口 controller 类, 入口类有两个方法, 一个是初始化订单的方法, 即秒杀开始的时候, 秒杀接口才会有效, 这个方法可以采用定时任务自动实现也可以. 初始化后就可以调用 placeOrder 的方法了. 在 placeOrder 上面有个自定义的注解 DistriLimitAnno, 这个是我在上篇文章写的, 用作限流使用. 采用的方式目前有两种, 一种是使用计数方式限流, 一种方式是令牌桶, 上次使用了计数, 咱们这次采用令牌桶方式实现.
- package com.hqs.flashsales.controller;
- import com.hqs.flashsales.annotation.DistriLimitAnno;
- import com.hqs.flashsales.aspect.LimitAspect;
- import com.hqs.flashsales.lock.DistributedLock;
- import com.hqs.flashsales.limit.DistributedLimit;
- import com.hqs.flashsales.service.OrderService;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.Redis.core.RedisTemplate;
- import org.springframework.data.Redis.core.script.RedisScript;
- import org.springframework.stereotype.Controller;
- import org.springframework.web.bind.annotation.GetMapping;
- import org.springframework.Web.bind.annotation.PostMapping;
- import org.springframework.Web.bind.annotation.ResponseBody;
- import javax.annotation.Resource;
- import java.util.Collections;
- /**
- * @author huangqingshi
- * @Date 2019-01-23
- */
- @Slf4j
- @Controller
- public class FlashSaleController {
- @Autowired
- OrderService orderService;
- @Autowired
- DistributedLock distributedLock;
- @Autowired
- LimitAspect limitAspect;
- // 注意 RedisTemplate 用的 String,String, 后续所有用到的 key 和 value 都是 String 的
- @Autowired
- RedisTemplate<String, String> redisTemplate;
- private static final String LOCK_PRE = "LOCK_ORDER";
- @PostMapping("/initCatalog")
- @ResponseBody
- public String initCatalog() {
- try {
- orderService.initCatalog();
- } catch (Exception e) {
- log.error("error", e);
- }
- return "init is ok";
- }
- @PostMapping("/placeOrder")
- @ResponseBody
- @DistriLimitAnno(limitKey = "limit", limit = 100, seconds = "1")
- public Long placeOrder(Long orderId) {
- Long saleOrderId = 0L;
- boolean locked = false;
- String key = LOCK_PRE + orderId;
- String uuid = String.valueOf(orderId);
- try {
- locked = distributedLock.distributedLock(key, uuid,
- "10" );
- if(locked) {
- // 直接操作数据库
- // saleOrderId = orderService.placeOrder(orderId);
- // 操作缓存 异步操作数据库
- saleOrderId = orderService.placeOrderWithQueue(orderId);
- }
- log.info("saleOrderId:{}", saleOrderId);
- } catch (Exception e) {
- log.error(e.getMessage());
- } finally {
- if(locked) {
- distributedLock.distributedUnlock(key, uuid);
- }
- }
- return saleOrderId;
- }
- }
令牌桶的方式比直接计数更加平滑, 直接计数可能会瞬间达到最高值, 令牌桶则把最高峰给削掉了, 令牌桶的基本原理就是有一个桶装着令牌, 然后又一队人排队领取令牌, 领到令牌的人就可以去做做自己想做的事情了, 没有领到令牌的人直接就走了 (也可以重新排队). 发令牌是按照一定的速度发放的, 所以这样在多人等令牌的时候, 很多人是拿不到的. 当桶里边的令牌在一定时间内领完后, 则没有令牌可领, 都直接走了. 如果过了一定的时间之后可以再次把令牌桶装满供排队的人领. 基本原理是这样的, 看一下脚本简单了解一下, 里边有一个 key 和四个参数, 第一个参数是获取一个令牌桶的时间间隔, 第二个参数是重新填装令牌的时间 (精确到毫秒), 第三个是令牌桶的数量限制, 第四个是隔多长时间重新填装令牌桶.
- -- bucket name
- local key = KEYS[1]
- -- token generate interval
- local intervalPerPermit = tonumber(ARGV[1])
- -- grant timestamp
- local refillTime = tonumber(ARGV[2])
- -- limit token count
- local limit = tonumber(ARGV[3])
- -- ratelimit time period
- local interval = tonumber(ARGV[4])
- local counter = Redis.call('hgetall', key)
- if table.getn(counter) == 0 then
- -- first check if bucket not exists, if yes, create a new one with full capacity, then grant access
- Redis.call('hmset', key, 'lastRefillTime', refillTime, 'tokensRemaining', limit - 1)
- -- expire will save memory
- Redis.call('expire', key, interval)
- return 1
- elseif table.getn(counter) == 4 then
- -- if bucket exists, first we try to refill the token bucket
- local lastRefillTime, tokensRemaining = tonumber(counter[2]), tonumber(counter[4])
- local currentTokens
- if refillTime> lastRefillTime then
- -- check if refillTime larger than lastRefillTime.
- -- if not, it means some other operation later than this call made the call first.
- -- there is no need to refill the tokens.
- local intervalSinceLast = refillTime - lastRefillTime
- if intervalSinceLast> interval then
- currentTokens = limit
- Redis.call('hset', key, 'lastRefillTime', refillTime)
- else
- local grantedTokens = math.floor(intervalSinceLast / intervalPerPermit)
- if grantedTokens> 0 then
- -- ajust lastRefillTime, we want shift left the refill time.
- local padMillis = math.fmod(intervalSinceLast, intervalPerPermit)
- Redis.call('hset', key, 'lastRefillTime', refillTime - padMillis)
- end
- currentTokens = math.min(grantedTokens + tokensRemaining, limit)
- end
- else
- -- if not, it means some other operation later than this call made the call first.
- -- there is no need to refill the tokens.
- currentTokens = tokensRemaining
- end
- assert(currentTokens>= 0)
- if currentTokens == 0 then
- -- we didn't consume any keys
- Redis.call('hset', key, 'tokensRemaining', currentTokens)
- return 0
- else
- -- we take 1 token from the bucket
- Redis.call('hset', key, 'tokensRemaining', currentTokens - 1)
- return 1
- end
- else
- error("Size of counter is" .. table.getn(counter) .. ", Should Be 0 or 4.")
- end
看一下调用令牌桶 lua 的 JAVA 代码, 也比较简单:
- public Boolean distributedRateLimit(String key, String limit, String seconds) {
- Long id = 0L;
- long intervalInMills = Long.valueOf(seconds) * 1000;
- long limitInLong = Long.valueOf(limit);
- long intervalPerPermit = intervalInMills / limitInLong;
- // Long refillTime = System.currentTimeMillis();
- // log.info("调用 redis 执行 lua 脚本, {} {} {} {} {}", "ratelimit", intervalPerPermit, refillTime,
- // limit, intervalInMills);
- try {
- id = redisTemplate.execute(rateLimitScript, Collections.singletonList(key),
- String.valueOf(intervalPerPermit), String.valueOf(System.currentTimeMillis()),
- String.valueOf(limitInLong), String.valueOf(intervalInMills));
- } catch (Exception e) {
- log.error("error", e);
- }
- if(id == 0L) {
- return false;
- } else {
- return true;
- }
- }
创建两张简单表, 一个库存表, 一个是销售订单表:
- CREATE TABLE `catalog` (
- `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
- `name` varchar(50) NOT NULL DEFAULT ''COMMENT'名称',
- `total` int(11) NOT NULL COMMENT '库存',
- `sold` int(11) NOT NULL COMMENT '已售',
- `version` int(11) NULL COMMENT '乐观锁, 版本号',
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- CREATE TABLE `sales_order` (
- `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
- `cid` int(11) NOT NULL COMMENT '库存 ID',
- `name` varchar(30) NOT NULL DEFAULT ''COMMENT'商品名称',
- `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '创建时间',
- PRIMARY KEY (`id`)
- ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
基本已经准备完毕, 然后启动程序, 打开 swagger(http://localhost:8080/swagger-ui.html#), 执行初始化方法 initCatalog:
日志里边会输出初始化的记录内容, 初始化库存为 1000:
初始化执行的方法, 十分简单, 写到缓存中.
- @Override
- public void initCatalog() {
- Catalog catalog = new Catalog();
- catalog.setName("mac");
- catalog.setTotal(1000L);
- catalog.setSold(0L);
- catalogMapper.insertCatalog(catalog);
- log.info("catalog:{}", catalog);
- redisTemplate.opsForValue().set(CATALOG_TOTAL + catalog.getId(), catalog.getTotal().toString());
- redisTemplate.opsForValue().set(CATALOG_SOLD + catalog.getId(), catalog.getSold().toString());
- log.info("redis value:{}", redisTemplate.opsForValue().get(CATALOG_TOTAL + catalog.getId()));
- handleCatalog();
- }
我写了一个测试类, 启动 3000 个线程, 然后去进行下单请求:
- package com.hqs.flashsales;
- import lombok.extern.slf4j.Slf4j;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.boot.test.Web.client.TestRestTemplate;
- import org.springframework.test.context.junit4.SpringRunner;
- import org.springframework.util.LinkedMultiValueMap;
- import org.springframework.util.MultiValueMap;
- import java.util.concurrent.TimeUnit;
- @Slf4j
- @RunWith(SpringRunner.class)
- @SpringBootTest(classes = FlashsalesApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
- public class FlashSalesApplicationTests {
- @Autowired
- private TestRestTemplate testRestTemplate;
- @Test
- public void flashsaleTest() {
- String url = "http://localhost:8080/placeOrder";
- for(int i = 0; i <3000; i++) {
- try {
- TimeUnit.MILLISECONDS.sleep(20);
- new Thread(() -> {
- MultiValueMap<String, String> params = new LinkedMultiValueMap<>();
- params.add("orderId", "1");
- Long result = testRestTemplate.postForObject(url, params, Long.class);
- if(result != 0) {
- System.out.println("-------------" + result);
- }
- }
- ).start();
- } catch (Exception e) {
- log.info("error:{}", e.getMessage());
- }
- }
- }
- @Test
- public void contextLoads() {
- }
- }
然后开始运行测试代码, 查看一下测试日志和程序日志, 均显示卖了 1000 后直接显示 SOLD OUT 了. 分别看一下日志和数据库:
商品库存 catalog 表和订单明细表 sales_order 表, 都是 1000 条, 没有问题.
总结:
通过采用分布式锁和分布式限流, 即可实现秒杀流程, 当然分布式限流也可以用到很多地方, 比如限制某些 IP 在多久时间访问接口多少次, 都可以的. 令牌桶的限流方式使得请求可以得到更加平滑的处理, 不至于瞬间把系统达到最高负载. 在这其中其实还有一个小细节, 就是 Redis 的锁, 单机情况下没有任何问题, 如果是集群的话需要注意, 一个 key 被 hash 到同一个 slot 的时候没有问题, 如果说扩容或者缩容的话, 如果 key 被 hash 到不同的 slot, 程序可能会出问题. 在写代码的过程中还出现了一个小问题, 就是写 controller 的方法的时候, 方法一定要声明成 public 的, 否则自定义的注解用不了, 其他 service 的注解直接变为空, 这个问题也是找了很久才找到.
好了代码地址: https://github.com/stonehqs/flashsales.git
欢迎拍砖~
来源: https://www.cnblogs.com/huangqingshi/p/10325574.html