Redis秒杀

全局唯一ID

按照我们平常做的自增的id来做的数据库,存在许多问题

  • id规律性过于明显,可以对其进行破解;比如可以通过id的变化来判断商城的订单,通过id来爬取数据等
  • 会受到单表数据量的限制

全局ID生成器

现在我们就需要生成一个全局唯一的ID,这就需要全局ID生成器,其需要具有唯一性、高性能、高可用、递增性和安全性

为了增加ID的安全性,我们不可以直接使用Redis自增的数值,而是要拼接一些其他信息。(其实就是雪花算法)

image-20240206181418686

初版秒杀代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
@Transactional
public Result seckillVoucher(Long voucherId) {
// 1.查询优惠券
SeckillVoucher voucher = seckillVoucherService.getById(voucherId);
// 2.判断秒杀是否开始
if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
return Result.fail("秒杀尚未开始!");
}
// 3.判断秒杀是否结束
if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
return Result.fail("秒杀已结束!");
}
// 4.判断库存是否充足
if(voucher.getStock()<1){
return Result.fail("当前优惠券已被抢光!");
}
// 5.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId)
.update();
if(!success){
return Result.fail("库存不足!");
}
// 6.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
Long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setVoucherId(voucherId);
this.save(voucherOrder);
return Result.ok(orderId);
}

这样库存出现了超卖现象

image-20240207023744388

显然这种情况是不可接受的

在上面的代码中,我们先对他进行了查询判断,最后对数据库进行减操作;若在判断完成后,操作数据库前这一区间内存在多个线程,就可能造成超卖

image-20240207024049064

利用锁来解决超卖

这里就可以使用加锁的办法来防止超卖,所以这里用悲观锁还是乐观锁

  • 悲观锁假设线程安全问题一定会发生,所以在操作数据前会获取锁,确保线程串行(这样跟单线程还有区别吗。。。)

  • 乐观锁认为线程安全问题不一定发生,所以不会无脑加锁;但是它会在更新数据时去判断有没有其他线程对数据进行修改(若未修改,其认为安全,就更新数据;若已修改,则认为可能出现了问题,就会进行重试或异常操作)

乐观锁的判断方法

  • 基于版本号:每次修改都会改变版本号,每个线程拿自己手上的版本号来比较,如果版本号不一致就不会进行操作

    image-20240207024818818

    在本例子中,版本号可以和库存数进行简化嘛,大家下单就是为了争抢库存,所以直接拿库存数当版本号用即可,这样就是CAS法(Compare and Swap)

    这里扩展ABA问题,因为在其他场景下的不正确的CAS使用可能造成ABA问题;

    简单来说,由于对版本的判断依赖于需要改变的值,这时就可能出现 进程1修改为A,进程2修改为B,进程3修改回A,对于又来进程4,你能说“版本”没有改变吗?

    举个恶心的例子,沙漠中有一潭水,张三喝了一口,李四往里面撒了泡尿,喝的和尿的一样多,这样虽然水量没变,但你能说潭水没有变化吗,那新来的王五还该不该喝呢?(非常的新鲜,非常的美味,请全部端上来吧!)

    提一嘴,上面的秒杀场景,由于这里的库存是递减的,所以不会产生ABA问题

我们按照上面的逻辑对秒杀代码进行修改,其实仅仅需要在查询数据库时加上对stock的判断即可

1
2
3
4
5
// 5.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId).eq("stock",voucher.getStock())
.update();

但是这样又产生了问题

image-20240208015633891

在经过了200个线程的秒杀后,异常率竟然达到接近90,查看数据库发现优惠券的库存甚至还没被卖完。这是什么情况?

这样生硬的使用乐观锁会出现一个问题,若你是前一百个到达服务器的请求,你不一定会得到你应得到的优惠券,因为你的“版本号”可能会因为收到其他线程操作的影响而失效,这样就会直接让你本应得到的优惠券落入别人的手里,而你只能重新提交请求或者灰溜溜的放弃。

这样的失败率也太高了!我们要进行修改,对于这里库存的情况,对于每个线程,我们只需要保证它在扣减库存时的库存>1即可

突然想到,会不会有多个语句同时进行gt的判断,导致超卖?

其实mybatis-plus会把下面的语句解析成mysql语句,显然会解析成一条update语句,而update语句又有排它锁,所以不存在两个事务同时修改数据的情况

1
2
3
4
5
// 5.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId).gt("stock",0)
.update();

这下异常率达到50%了。当当!这样就轻松的解决秒杀问题了。。。吗?

仔细想想,这样本质上是借助了mysql的锁;但如果是在真实的业务中,让这么多的请求直接打到数据库,这样好吗。。。

单例下的一人一单

大额的秒杀券要修改成一人只能下一单

好像很简单,直接通过优惠券id和优用户id来查询优惠券订单数据库中有没有对应订单即可

1
2
3
4
5
6
// 6.一人一单的判断
Long userId = UserHolder.getUser().getId();
Integer count = this.query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if(count>0){
return Result.fail("已达到购买上限!");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Transactional
public Result createVoucherOrder(Long voucherId) {
// 5.一人一单的判断
Long userId = UserHolder.getUser().getId();

synchronized (userId.toString().intern()){
Integer count = this.query().eq("user_id", userId).eq("voucher_id", voucherId).count();
if(count>0){
return Result.fail("已达到购买上限!");
}

// 6.扣减库存
boolean success = seckillVoucherService.update()
.setSql("stock = stock - 1")
.eq("voucher_id", voucherId).gt("stock",0)
.update();
if(!success){
return Result.fail("库存不足!");
}

// 7.创建订单
VoucherOrder voucherOrder = new VoucherOrder();
Long orderId = redisIdWorker.nextId("order");
voucherOrder.setId(orderId);
voucherOrder.setUserId(UserHolder.getUser().getId());
voucherOrder.setVoucherId(voucherId);
this.save(voucherOrder);
return Result.ok(orderId);
}
}

由于需要先释放锁,再提交事务;如果按照上面的加锁的方法,当这个锁内的代码执行完毕后,其他线程也可以重新获得锁,然而这时候提交上去的事务可能未被执行,这样就达不到一人一单的效果了,所以需要将锁加到调用该函数的位置

1
2
3
4
5
6
7
Long userId = UserHolder.getUser().getId();
synchronized (userId.toString().intern()) {
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
//如果直接按照下面的写法,会获取不到代理,造成事务失效
//return createVoucherOrder(voucherId);
}

这下抢成功了

image-20240216143624342

至此完成了单例情况下的一人一单并发安全问题

集群下的一人一单

我们使用nginx进行负载均衡,然后再次进行下单接口的测试,发现两个后端的服务都收到了请求,没有被锁住。在集群部署的这种情况下,仍然会产生并发安全问题。

image-20240216221955515

分布式锁

满足分布式或集群模式下多进程可见并且互斥的锁

分布式锁的种类

image-20240216224310873

基于Redis的基础 的分布式锁

  • 获取锁

    1
    2
    3
    4
    SET lock thread1 EX 10 NX 将获取锁和设置超时时间打包成事务

    // SETNX 获取锁
    // EXPIRE 设置锁的过期时间,避免服务宕机造成死锁
  • 释放锁

    1
    2
    DEL key 删除锁
    或者在过期后自动释放

利用Redis实现一个锁对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class RedisLock implements ILock{

private StringRedisTemplate stringRedisTemplate;
private String name;
public static final String KEY_PREFIX = "lock:";

public RedisLock(StringRedisTemplate stringRedisTemplate, String name) {
this.stringRedisTemplate = stringRedisTemplate;
this.name = name;
}

@Override
public boolean tryLock(long timeoutSec) {
//获取线程标示
long threadID = Thread.currentThread().getId();
Boolean success = stringRedisTemplate.opsForValue().setIfAbsent(KEY_PREFIX + name, threadID + "", timeoutSec, TimeUnit.SECONDS);
return Boolean.TRUE.equals(success);
}

@Override
public void unlock() {
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}

然后在Impl里使用这个分布式的锁对象来判断

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 创建分布式锁对象
RedisLock redisLock = new RedisLock(stringRedisTemplate, "voucherOrder" + userId);
boolean isLock = redisLock.tryLock(10);
if(!isLock){
// 获取锁失败
return Result.fail("请求过多,请稍后再试。");
}
try{
//获取代理对象(事务)
IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
return proxy.createVoucherOrder(voucherId);
//如果直接按照下面的写法,会获取不到代理,造成事务失效
//return createVoucherOrder(voucherId);
}finally {
//释放锁
redisLock.unlock();
}

这样就完成了基础的分布式锁

改进的分布式锁

如果正在执行的线程1出现了业务阻塞导致锁超时了被自动释放,然后线程2获取了刚刚被释放的锁,开始执行线程2的任务,这时候线程1终于完成了业务,它又释放了线程2获取的锁,这时候线程2还在执行;线程3趁虚而入,又上了个锁;这下乱套了

image-20240217112124343

这里要让每个线程只能释放自己加的锁

1
2
3
4
5
6
7
8
public void unlock() {
String threadID = ID_PREFIX + Thread.currentThread().getId();
String id = stringRedisTemplate.opsForValue().get(KEY_PREFIX + name);

if(threadID.equals(id)){
stringRedisTemplate.delete(KEY_PREFIX + name);
}
}

再次改进

没想到吧,还有问题,上次是执行业务的过程中发生了阻塞,这次是在判断锁一直并的即将释放锁的时候发生了阻塞。

所以我们需要保持 判断锁是否一致 和 释放锁 这两个操作的原子性

这里需要使用到Redis的Lua脚本

Redis的Lua脚本
1
2
3
4
if(redis.call('get',KEYS[1]) == ARGV[1]) then
return redis.call('del',KEYS[1])
end
return 0

然后在RedisLock中调用lua脚本

1
2
3
public void unlock() {
stringRedisTemplate.execute(UNLOCK_SCRIPT, Collections.singletonList(KEY_PREFIX+name),ID_PREFIX+Thread.currentThread().getId());
}

但是这样还不够完美

基于setnx的分布式锁存在以下问题

image-20240217215208608

使用redisson完成的分布式锁

  • 导入依赖

    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.6</version>
    </dependency>
  • 创建对象

    1
    2
    3
    4
    5
    6
    7
    8
    9
    @Configuration
    public class RedisConfig {
    @Bean
    public RedissonClient redissonClient(){
    Config config = new Config();
    config.useSingleServer().setAddress("redis://localhost:6379").setPassword("123456");
    return Redisson.create(config);
    }
    }
  • 修改之前的锁代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
           // 创建分布式锁对象
    // RedisLock redisLock = new RedisLock(stringRedisTemplate, "voucherOrder:" + userId);
    RLock lock = redissonClient.getLock("voucherOrder:" + userId);

    // boolean isLock = redisLock.tryLock(1200);
    boolean isLock = lock.tryLock();

    if(!isLock){
    // 获取锁失败
    return Result.fail("请求过多,请稍后再试。");
    }

    try{
    //获取代理对象(事务)
    IVoucherOrderService proxy = (IVoucherOrderService)AopContext.currentProxy();
    return proxy.createVoucherOrder(voucherId);
    //如果直接按照下面的写法,会获取不到代理,造成事务失效
    //return createVoucherOrder(voucherId);
    }finally {
    //释放锁
    lock.unlock();
    }

Redisson可重入锁原理

使用哈希结构来加Redis锁,在删除锁的时候判断value的值

image-20240218001110009

image-20240218002747361

Redis优化秒杀

之前的秒杀还是要频繁地操作Mysql数据库,效率仍然较低

现在我们先把秒杀的热点数据存入Redis中备用,然后请求打进来直接先在Redis里处理;最后找一个合适的时机同步回数据库,这样就降低了Mysql数据库的访问量

image-20240218210745226

基于消息队列优化

这里跳过阻塞队列实现优化,直接使用消息队列来优化Redis异步秒杀

image-20240219120341072