Redis秒杀代码(逐步测试修改)


因为主要体现秒杀的业务,所以这里没融合jwt或者Session的登录校验,故id是直接显示传入的

秒杀代码.Ver1 (单例)

本次秒杀没有通过redis,在单例的情况下,直接通过查询数据库来实现防止超卖和一人一单,最简单,最暴力

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public Result seckillItem(SecKillDTO secKillDTO) {
Long itemId = secKillDTO.getItemId();
Long userId = secKillDTO.getUserId();
//1.获取对应的秒杀对象
SeckillItem item = seckillItemService.getById(itemId);
LocalDateTime now = LocalDateTime.now();
//2.判断是否在秒杀时间内
//2.1 不在秒杀时间内
if(now.isBefore(item.getBeginTime())||now.isAfter(item.getEndTime())){
return Result.fail(400,"当前不在秒杀时间段内!");
}

// --> 从这里开始需要进行事务操作
synchronized (userId.toString().intern()){
//3.判断是否下过单
Long count = this.query().eq("user_id", userId).eq("item_id", itemId).count();
if(count>0){
return Result.fail(400,"您已经抢购过了,请勿重复下单!");
}
//4.判断库存是否充足
//找到对应秒杀商品,校验库存并扣减
boolean success = seckillItemService.update()
.setSql("stock = stock - 1")
.eq("item_id",itemId)
.gt("stock",0)
.update();
if(!success){
return Result.fail(400,"库存不足");
}
itemService.update().
setSql("stock = stock - 1")
.eq("id",itemId)
.update();

//5.创建订单
ItemOrder itemOrder = new ItemOrder();
itemOrder.setUserId(userId);
itemOrder.setItemId(itemId);
this.save(itemOrder);

return Result.success(itemOrder);
}
}

现在开100个进程,商品库存设置为100,每个进程循环2次,相当于200个人抢100个商品,未出现异常,响应时间均值144ms

测试结果

image-20240221121052378


秒杀代码.Ver2 (单例)

第二版代码加上了Redis; 通过Redis中的Hash数据结构来存储被秒杀的商品信息, 使用set来存存储购买商品了的用户的信息;

秒杀代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
//秒杀ver2(单例 + redis优化)
@Override
public Result seckillItem(SecKillDTO secKillDTO) {
Long itemId = secKillDTO.getItemId();
Long userId = secKillDTO.getUserId();
Object o = stringRedisTemplate.opsForHash().get("itemkill:stock:" + itemId, "stock");
if(o==null){
return Result.fail(404,"当前秒杀商品不存在");
}
//1.获取对应的秒杀对象
LocalDateTime beginTime = LocalDateTime.parse(
stringRedisTemplate.opsForHash()
.get("itemkill:stock:" + itemId, "beginTime")
.toString(),
DateTimeFormatter.ISO_LOCAL_DATE_TIME
);
LocalDateTime endTime = LocalDateTime.parse(
stringRedisTemplate.opsForHash()
.get("itemkill:stock:" + itemId, "endTime")
.toString(),
DateTimeFormatter.ISO_LOCAL_DATE_TIME
);

LocalDateTime now = LocalDateTime.now();
//2.判断是否在秒杀时间内
//2.1 不在秒杀时间内
if(now.isBefore(beginTime)||now.isAfter(endTime)){
return Result.fail(400,"当前不在秒杀时间段内!");
}
return this.runOrder(userId,itemId);
}

@Transactional
Result runOrder(Long userId, Long itemId){
// --> 从这里开始需要进行事务操作
synchronized (userId.toString().intern()){
//3.判断是否下过单
Boolean isOrdered = stringRedisTemplate.opsForSet().isMember("itemkill:order:" + itemId, userId.toString());
if(Boolean.TRUE.equals(isOrdered)){
return Result.fail(400,"您已经抢购过了,请勿重复下单!");
}
//4.判断库存是否充足
//获取库存数据
Integer stock = Integer.parseInt(stringRedisTemplate.opsForHash().get("itemkill:stock:" + itemId, "stock").toString());
//找到对应秒杀商品,校验库存并扣减
if(stock<0){
return Result.fail(400,"当前商品库存不足!");
}
stock -= 1;
stringRedisTemplate.opsForHash().put("itemkill:stock:" + itemId,"stock",stock.toString());

seckillItemService.update()
.setSql("stock = stock - 1")
.eq("item_id",itemId)
.update();
itemService.update()
.setSql("stock = stock - 1")
.eq("id",itemId)
.update();

//5.创建订单
ItemOrder itemOrder = new ItemOrder();
itemOrder.setUserId(userId);
itemOrder.setItemId(itemId);
this.save(itemOrder);

stringRedisTemplate.opsForSet().add("itemkill:order:" + itemId, userId.toString());

return Result.success(itemOrder);
}
}

添加秒杀商品代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Override
@Transactional // 整个过程需要作为一个事务
public Result<Void> add(AddSeckillItemDTO dto) {
// 先同步到数据库
Long itemId = dto.getItemId();
LocalDateTime beginTime = dto.getBeginTime();
LocalDateTime endTime = dto.getEndTime();
SeckillItem seckillItem = new SeckillItem();
Item item = itemService.getById(itemId);
if(item == null){
return Result.fail(404,"不存在该商品");
}
seckillItem.setItemId(item.getId());
seckillItem.setStock(item.getStock());
seckillItem.setBeginTime(beginTime);
seckillItem.setEndTime(endTime);
this.saveOrUpdate(seckillItem);

// 再将优惠券信息同步到Redis中作为热点数据存储
Map<String, String> properties = new HashMap<>();
properties.put("stock", seckillItem.getStock().toString());
properties.put("beginTime", seckillItem.getBeginTime().toString());
properties.put("endTime", seckillItem.getEndTime().toString());
stringRedisTemplate.opsForHash().putAll("itemkill:stock:"+seckillItem.getItemId().toString(), properties);

return Result.success();
}

虽然但是,本来打算这一版做了再写有分布式锁的集群的版本,但是在测试时出现了超卖的问题,进过排查,我这个多线程的测试方法就是需要加分布式锁,不然会超卖,所以这一版到此为止,在第三版中会加入分布式锁Redisson来解决多线程操作Redis导致的超卖问题

具体来说,在下面这段代码中,多线程状态下,会导致不同时间下各个线程分别获取的stock数据被自减后反复写回Redis,最终造成超卖,然后我采用的数据结构又没法采用Redis的原子操作decr()来解决这个问题,所以需要添加分布式锁

1
2
3
4
5
6
7
8
//获取库存数据
Integer stock = Integer.parseInt(stringRedisTemplate.opsForHash().get("itemkill:stock:" + itemId, "stock").toString());
//找到对应秒杀商品,校验库存并扣减
if(stock<0){
return Result.fail(400,"当前商品库存不足!");
}
stock -= 1;
stringRedisTemplate.opsForHash().put("itemkill:stock:" + itemId,"stock",stock.toString());

image-20240221224423364

(延迟够低吧? 超卖换的(笑))

秒杀代码.Ver3 (Redisson)

Redisson作为第一个较为成熟的Redisson分布式锁, 给多线程下的Redis操作加个锁简直是小菜一碟

  1. 导入依赖

    1
    2
    3
    4
    5
    6
    <!--redisson-->
    <dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.6</version>
    </dependency>
  2. 添加配置类RedissonConfig

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @Configuration
    public class RedissonConfig {

    @Bean
    public RedissonClient redisClient(){
    Config config = new Config();
    config.useSingleServer().setAddress("redis://localhost:6379");
    return Redisson.create(config);
    }


    }
  3. 给ver2的代码加锁

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    //秒杀ver3(Redisson)
    @Override
    public Result seckillItem(SecKillDTO secKillDTO) {
    Long itemId = secKillDTO.getItemId();
    Long userId = secKillDTO.getUserId();
    Object o = stringRedisTemplate.opsForHash().get("itemkill:stock:" + itemId, "stock");
    if(o==null){
    return Result.fail(404,"当前秒杀商品不存在");
    }
    //1.获取对应的秒杀对象
    LocalDateTime beginTime = LocalDateTime.parse(
    stringRedisTemplate.opsForHash()
    .get("itemkill:stock:" + itemId, "beginTime")
    .toString(),
    DateTimeFormatter.ISO_LOCAL_DATE_TIME
    );
    LocalDateTime endTime = LocalDateTime.parse(
    stringRedisTemplate.opsForHash()
    .get("itemkill:stock:" + itemId, "endTime")
    .toString(),
    DateTimeFormatter.ISO_LOCAL_DATE_TIME
    );

    LocalDateTime now = LocalDateTime.now();
    //2.判断是否在秒杀时间内
    //2.1 不在秒杀时间内
    if(now.isBefore(beginTime)||now.isAfter(endTime)){
    return Result.fail(400,"当前不在秒杀时间段内!");
    }
    return this.runOrder(userId,itemId);
    }

    @Transactional
    Result runOrder(Long userId, Long itemId){
    // --> 从这里开始需要进行事务操作
    RedissonClient redissonClient = redissonConfig.redisClient();
    RLock seckillLock = redissonClient.getLock("seckill_lock");
    try {
    seckillLock.lock();
    synchronized (userId.toString().intern()) {
    //3.判断是否下过单
    Boolean isOrdered = stringRedisTemplate.opsForSet().isMember("itemkill:order:" + itemId, userId.toString());
    if (Boolean.TRUE.equals(isOrdered)) {
    return Result.fail(400, "您已经抢购过了,请勿重复下单!");
    }
    //4.判断库存是否充足
    //获取库存数据
    Integer stock = Integer.parseInt(stringRedisTemplate.opsForHash().get("itemkill:stock:" + itemId, "stock").toString());
    //找到对应秒杀商品,校验库存并扣减
    if (stock < 1) {
    return Result.fail(400, "当前商品库存不足!");
    }
    stock -= 1;
    stringRedisTemplate.opsForHash().put("itemkill:stock:" + itemId, "stock", stock.toString());

    seckillItemService.update()
    .setSql("stock = stock - 1")
    .eq("item_id", itemId)
    .update();
    itemService.update()
    .setSql("stock = stock - 1")
    .eq("id", itemId)
    .update();

    //5.创建订单
    ItemOrder itemOrder = new ItemOrder();
    itemOrder.setUserId(userId);
    itemOrder.setItemId(itemId);
    this.save(itemOrder);

    stringRedisTemplate.opsForSet().add("itemkill:order:" + itemId, userId.toString());

    return Result.success(itemOrder);
    }
    } finally{
    seckillLock.unlock();
    }
    }

    这样好像不太对,延迟非常恐怖,甚至比直接访问数据库还要夸张; 直接修改相当于加了两层锁, Redis一层,进了Redis又有一层UserId的锁

    image-20240225205823890

  1. 再修改

    • 首先对锁进行改造

      重新对之前的代码进行分析,发现我们做的有点画蛇添足了; 还记得我们Ver2中提出了什么问题吗? 我们使用Redisson主要是为了解决stock写回时的冲突问题;

      查阅文档,发现Redisson为我们提供了一系列支持分布式的数据结构,其中使用RMap来接管Redis中的Hash数据结构; 对于RMap的put等操作请求会直接同步到Redis中,不用再使用Template来进行上传;

      进过上面的分析,我们发现,这里只需要在对RMap进行操作来判断并更新库存的时候需要加上Redisson的锁,所以只需要加一部分的锁就行,同时恢复之前的对userId加的锁

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      @Transactional
      Result runOrder(Long userId, Long itemId){
      // --> 从这里开始需要进行事务操作
      synchronized (userId.toString().intern()) {
      RedissonClient redissonClient = redissonConfig.redisClient();
      //3.判断是否下过单
      boolean isOrdered = redissonClient.getSet("itemkill:order:" + itemId).contains(userId.toString());
      //Boolean isOrdered = stringRedisTemplate.opsForSet().isMember("itemkill:order:" + itemId, userId.toString());
      if (Boolean.TRUE.equals(isOrdered)) {
      return Result.fail(400, "您已经抢购过了,请勿重复下单!");
      }
      //4.判断库存是否充足
      //获取库存数据
      RLock stockLock = redissonClient.getLock("redisson_stock_lock");
      try{
      stockLock.lock();
      RMap<String, String> rmap = redissonClient.getMap("itemkill:stock:" + itemId);
      log.warn(rmap.toString());
      int stock = Integer.parseInt(rmap.get("stock"));
      //Integer stock = Integer.parseInt(redissonClient.getMap("itemkill:stock:" + itemId).get("stock").toString());
      //Integer stock = Integer.parseInt(stringRedisTemplate.opsForHash().get("itemkill:stock:" + itemId, "stock").toString());
      //找到对应秒杀商品,校验库存并扣减
      if (stock < 1) {
      return Result.fail(400, "当前商品库存不足!");
      }
      rmap.fastPut("stock",String.valueOf(stock - 1 ));
      //stock -= 1;
      //stringRedisTemplate.opsForHash().put("itemkill:stock:" + itemId, "stock", stock.toString());

      }finally {
      stockLock.unlock();
      }
      seckillItemService.update()
      .setSql("stock = stock - 1")
      .eq("item_id", itemId)
      .update();
      itemService.update()
      .setSql("stock = stock - 1")
      .eq("id", itemId)
      .update();
      //5.创建订单
      ItemOrder itemOrder = new ItemOrder();
      itemOrder.setUserId(userId);
      itemOrder.setItemId(itemId);
      this.save(itemOrder);
      stringRedisTemplate.opsForSet().add("itemkill:order:" + itemId, userId.toString());
      return Result.success(itemOrder);
      }
    • 发现了一个问题

      在以前添加抢购商品的代码中,我直接使用的是StringRedisTemplate中的Hash进行插入;然后在抢购代码中进行拉取RMap的操作时会报错java.io.IOException: Unsupported protocol version 98

      这里我推测是RedisTemplate中的写操作的解析器和Redisson的不同; 所以解决方法也很粗暴,直接把涉及存储和读取的地方都替换成使用Redisson中的方法即可

      1
      2
      3
      4
      5
      6
      7
      Map<String, String> properties = new HashMap<>();
      properties.put("stock", seckillItem.getStock().toString());
      properties.put("beginTime", seckillItem.getBeginTime().toString());
      properties.put("endTime", seckillItem.getEndTime().toString());
      RedissonClient redissonClient = redissonConfig.redisClient();
      RMap<String, String> map = redissonClient.getMap("itemkill:stock:"+seckillItem.getItemId().toString());
      map.putAll(properties);

效果拔群

image-20240225214325737

再看看代码,感觉还是有很大的优化空间; 比如说:

  • 对于数据库的同步太频繁,每次成功秒杀还是要修改数据库的库存;

    是否可以定时或者定量地进行异步的同步呢?

  • 还可以添加消息队列MQ来进行流量的平衡

    仔细想想,来的早的请求没有不给他秒杀资格的道理,使用MQ还可以将资格判断与库存的扣减解耦,进一步地增加了数据的安全


秒杀代码.Ver4 (RabbitMQ)

这里添加RabbitMQ对秒杀业务进行解耦操作; 当秒杀请求打进来后先判断其是否有抢购资格,然后把它丢到消息队列,然后异步地交给负责下单的方法进行判断并下单; 实现起来并不复杂

  1. 引入RabbitMQ依赖

    1
    2
    3
    4
    5
    <--MQ-->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
  2. 配置RabbitMQConfig

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Configuration
    public class RabbitMQConfig {
    @Bean
    public Queue Queue(){
    return QueueBuilder
    .durable("item_seckill")
    .build();
    }
    }
  3. 修改秒杀代码

    总体思路是把对数据库的操作与对RabbitMQ的查询分离,然后在验证请求的秒杀资格成功后将对数据库的操作丢入消息队列

    需要注意锁的范围的修改

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
        //秒杀ver4(RabbitMQ 解耦)
    @Override
    @Transactional
    public Result seckillItem(SecKillDTO secKillDTO) {
    Long itemId = secKillDTO.getItemId();
    Long userId = secKillDTO.getUserId();
    RedissonClient redissonClient = redissonConfig.redisClient();
    RMap<String, String> rMap = redissonClient.getMap("itemkill:stock:" + itemId);
    //Object o = stringRedisTemplate.opsForHash().get("itemkill:stock:" + itemId, "stock");
    if(rMap==null||rMap.isEmpty()){
    return Result.fail(404,"当前秒杀商品不存在");
    }
    //1.获取对应的秒杀对象
    LocalDateTime beginTime = LocalDateTime.parse(
    // stringRedisTemplate.opsForHash()
    // .get("itemkill:stock:" + itemId, "beginTime")
    // .toString(),
    rMap.get("beginTime"),
    DateTimeFormatter.ISO_LOCAL_DATE_TIME
    );
    LocalDateTime endTime = LocalDateTime.parse(
    // stringRedisTemplate.opsForHash()
    // .get("itemkill:stock:" + itemId, "endTime")
    // .toString(),
    rMap.get("endTime"),
    DateTimeFormatter.ISO_LOCAL_DATE_TIME
    );

    LocalDateTime now = LocalDateTime.now();
    //2.判断是否在秒杀时间内
    //2.1 不在秒杀时间内
    if(now.isBefore(beginTime)||now.isAfter(endTime)){
    return Result.fail(400,"当前不在秒杀时间段内!");
    }

    synchronized (userId.toString().intern()) {
    //3.判断是否下过单
    boolean isOrdered = redissonClient.getSet("itemkill:order:" + itemId).contains(userId.toString());
    //Boolean isOrdered = stringRedisTemplate.opsForSet().isMember("itemkill:order:" + itemId, userId.toString());
    if (Boolean.TRUE.equals(isOrdered)) {
    return Result.fail(400, "您已经抢购过了,请勿重复下单!");
    }
    //4.判断库存是否充足
    //获取库存数据
    RLock stockLock = redissonClient.getLock("redisson_stock_lock");
    try {
    stockLock.lock();
    RMap<String, String> rmap = redissonClient.getMap("itemkill:stock:" + itemId);
    log.warn(rmap.toString());
    int stock = Integer.parseInt(rmap.get("stock"));
    //Integer stock = Integer.parseInt(redissonClient.getMap("itemkill:stock:" + itemId).get("stock").toString());
    //Integer stock = Integer.parseInt(stringRedisTemplate.opsForHash().get("itemkill:stock:" + itemId, "stock").toString());
    //找到对应秒杀商品,校验库存并扣减
    if (stock < 1) {
    return Result.fail(400, "当前商品库存不足!");
    }
    rmap.fastPut("stock", String.valueOf(stock - 1));
    //stock -= 1;
    //stringRedisTemplate.opsForHash().put("itemkill:stock:" + itemId, "stock", stock.toString());

    } finally {
    stockLock.unlock();
    }
    }

    ItemOrder itemOrder = new ItemOrder();
    itemOrder.setUserId(userId);
    itemOrder.setItemId(itemId);
    stringRedisTemplate.opsForSet().add("itemkill:order:" + itemId, userId.toString());
    amqpTemplate.convertAndSend("item_seckill",itemOrder);
    return Result.success(itemOrder);
    }

    @Transactional
    @Override
    public void runOrder(ItemOrder itemOrder){
    Long itemId = itemOrder.getItemId();
    seckillItemService.update()
    .setSql("stock = stock - 1")
    .eq("item_id", itemId)
    .update();
    itemService.update()
    .setSql("stock = stock - 1")
    .eq("id", itemId)
    .update();
    //5.创建订单
    this.save(itemOrder);
    }
  1. 写MQ的Listener的逻辑

    直接调用刚才上面写好的操作数据库的方法即可

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    @RabbitListener(queues = "item_seckill")
    @Component
    public class itemSeckillListener {

    @Resource
    ItemOrderService itemOrderService;

    @RabbitHandler
    private void doOrder(ItemOrder itemOrder){
    itemOrderService.runOrder(itemOrder);
    }
    }

好吧,加了一个MQ之后又慢了不少; 可能效率和安全性没法完全兼顾

image-20240226142330278


应该还有很大的优化空间,后续找到更好的方法会更新

  • 上面的代码部分还是使用的RedisTemple, 因为没啥大的印象,暂时还没有替换更新