redis用来做什么?除了缓存,第一个想到的就是分布式锁了,现在基本上也是面试必问八股文了。
我们知道单机系统可以用JUC提供的锁像synchronized或者Lock,但这些在分布式系统中就不起作用了。
一个靠谱的分布式锁需要满足以下条件
- 独占性,任何时刻只能有且仅有一个线程持有
- 高可用,若redis集群环境下,不能因为某一个节点挂了而出现获取锁和释放锁失败的情况
- 防死锁,杜绝死锁,必须有超时控制机制或者撤销操作,有个兜底终止跳出方案
- 不乱抢,不能私下unlock别人的锁,只能自己加锁自己释放。
- 重入性,同一个节点的同一个线程如果获得锁之后,它也可以再次获取这个锁。
经典的超卖问题
我们来看一个秒杀的案例,有一个商品有10件库存, 每抢一次减一次库存
private void buyGoods() {
//获取商品剩余库存
Integer result = (Integer) redisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : result;
if (goodsNumber > 0) {
//还有库存则数量减一并且存入真实库存
int realNumber = goodsNumber - 1;
redisTemplate.opsForValue().set("goods:001", realNumber);
System.out.println(Thread.currentThread().getName() + " 已经成功秒杀商品,此时还剩余:" + realNumber + "件");
} else {
System.out.println(Thread.currentThread().getName() + " 商品已经售罄,欢迎下次光临");
}
}
模拟十个人来抢
public void testOverBuy() throws Exception {
for (int i = 0; i < 10; i++) {
new Thread(() -> buyGoods()).start();
}
}
看输出,十个人抢了库存却还是9,原因是减库存和保存操作不是原子操作,需要加synchronized锁或者lock锁,但是在分布式环境下这样做就不行了,需要用redis来实现锁。
Thread-13 已经成功秒杀商品,此时还剩余:9件
Thread-12 已经成功秒杀商品,此时还剩余:9件
Thread-6 已经成功秒杀商品,此时还剩余:9件
Thread-7 已经成功秒杀商品,此时还剩余:9件
Thread-8 已经成功秒杀商品,此时还剩余:9件
Thread-5 已经成功秒杀商品,此时还剩余:9件
Thread-4 已经成功秒杀商品,此时还剩余:9件
Thread-11 已经成功秒杀商品,此时还剩余:9件
Thread-10 已经成功秒杀商品,此时还剩余:9件
Thread-9 已经成功秒杀商品,此时还剩余:9件
redis实现分布式锁
我们来看看自己如何实现分布式锁,首先要满足独占性,用redis的setnx命令,给key设置value如果返回1表示加锁成功,返回0表示锁已被占,为了防止死锁要加个过期时间,这样业务线程挂了的话锁也能释放,在finally语句块中释放锁
private void testRedisLock() {
String key = "goodsRedisKey";
//设置标识位,如果设置失败说明已被锁,加过期时间防止死锁
boolean flagLock = redisTemplate.opsForValue().setIfAbsent(key, 1, Duration.ofSeconds(30));
if (!flagLock) {
//没抢到锁直接返回
return;
}
try {
//业务逻辑
} finally {
//释放锁
redisTemplate.delete(key);
}
}
以上代码有个缺陷没有实现不乱抢,也就是别的线程可以删除你占用的锁,所以需要在标志位中设置线程id,释放锁的时候判断下是否是同一个线程,同一个线程才允许释放
private void testRedisLock() {
//生成线程标识
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
String key = "goodsRedisKey";
//设置标识位,如果设置失败说明已被锁,加过期时间防止死锁
boolean flagLock = redisTemplate.opsForValue().setIfAbsent(key, value, Duration.ofSeconds(30));
if (!flagLock) {
//没抢到锁直接失败
return;
}
try {
//业务逻辑
} finally {
// 判断加锁与解锁是不是同一个客户端,他们不是同一个原子操作
if (redisTemplate.opsForValue().get(key).equals(value)) {
redisTemplate.delete(key);
}
}
}
redisson实现分布式锁
以上代码还有很多不严谨的地方,比如没有实现锁的自动续期,删除锁的操作不是原子操作。其实工作中实现分布式锁都是直接用redisson的,讲解上面的代码主要是了解下分布式锁的实现思路。
使用redisson实现分布式锁,可以看到代码很简洁
private void testRedisLock() {
String key = "goodsRedisKey";
RLock lock = redissonClient.getLock(key);
lock.lock();
try {
//业务逻辑
} finally {
lock.unlock();
}
}
接下去我们看redisson的实现原理,首先尝试获取锁,线程一加锁成功继续执行,线程二加锁失败自旋等待。
加锁源码:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
redisson里面很多操作为了实现原子性使用了lua脚本,上面代码分为三段
第一段exists KEYS[1],KEYS[1]就是用户自定义的key,上文中的goodsRedisKey,判断这个key存不存在,不存在则加锁,并设置过期时间,默认30S,下图就是加的锁
第二段实现可重入,如果是自己加的锁再lock时value加1
第三段实现互斥,如果上面都不满足,则返回锁的过期时间
释放锁源码:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
第一段判断是否自己加的锁,不是自己加的锁或者锁已过期,则返回
第二段把锁的计数减1,如果剩余锁计数为0,则删除key,释放锁
自动续期-看门狗
上面说到,锁的过期时间默认是30S,如果被锁定的业务执行时间很长,超过30S那就会自动解锁,所以需要有个机制来自动续期,这就是看门狗。
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
可以看到,看门狗就是一个TimerTask,每过internalLockLeaseTime/3也就是10S续期一次,调用renewExpirationAsync来异步续期,续期成功调用renewExpiration来继续等待续期。
正确释放锁的方式
一些操作像tryLock不一定能保证拿到锁,所以释放锁的时候要判断下,否则会抛异常
try {
lock.tryLock(5, TimeUnit.SECONDS);
//业务逻辑
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
最强王者-RedLock
redisson的分布式锁已经实现了5大条件的4个,独占性,防死锁,不乱抢,防重入,那高可用能实现么?redis的高可用就是redis cluster和哨兵模式。
第一种是都是master的情况,由于redisson的锁只会存在于一台master,如果那台master挂了,锁就消失了
第二种是主从的情况,master挂了slave顶上去,锁还在,似乎没啥问题,但要知道redis集群是AP模式,啥意思呢?高可用,分区容错,但不满足一致性,因为主从同步是异步的,有可能master挂了锁的数据还没同步过去,导致slave顶上来的时候锁消失了。
如果想要绝对的高可用,那就需要最强王者-RedLock登场了。
想要保证高可用,必须要有2*N+1台master服务器,N表示允许挂掉的服务器数量,比如允许1台服务器挂掉,就需要3台master.
- 依次尝试从3个实例,使用相同的 key 和随机值(例如 UUID)获取锁。当向Redis 请求获取锁时,客户端应该设置一个超时时间,这个超时时间应该小于锁的失效时间。例如你的锁自动失效时间为 10 秒,则超时时间应该在 5-50 毫秒之间。这样可以防止客户端在试图与一个宕机的 Redis 节点对话时长时间处于阻塞状态。如果一个实例不可用,客户端应该尽快尝试去另外一个 Redis 实例请求获取锁;
- 客户端通过当前时间减去步骤 1 记录的时间来计算获取锁使用的时间。当且仅当从大多数(N/2+1,这里是 3 个节点)的 Redis 节点都取到锁,并且获取锁使用的时间小于锁失效时间时,锁才算获取成功;
- 如果取到了锁,其真正有效时间等于初始有效时间减去获取锁所使用的时间(步骤 2 计算的结果)。
- 如果由于某些原因未能获得锁(无法在至少 N/2 + 1 个 Redis 实例获取锁、或获取锁的时间超过了有效时间),客户端应该在所有的 Redis 实例上进行解锁(即便某些Redis实例根本就没有加锁成功,防止某些节点获取到锁但是客户端没有得到响应而导致接下来的一段时间不能被重新获取锁)。
redisson实现RedLock
public void getlock() {
//CACHE_KEY_REDLOCK为redis 分布式锁的key
RLock lock1 = redissonClient1.getLock(CACHE_KEY_REDLOCK);
RLock lock2 = redissonClient2.getLock(CACHE_KEY_REDLOCK);
RLock lock3 = redissonClient3.getLock(CACHE_KEY_REDLOCK);
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
boolean isLock;
try {
//waitTime 锁的等待时间处理,正常情况下 等5s
//leaseTime就是redis key的过期时间,正常情况下等5分钟。
isLock = redLock.tryLock(5, 300, TimeUnit.SECONDS);
log.info("线程{},是否拿到锁:{} ",Thread.currentThread().getName(),isLock);
if (isLock) {
//业务处理
}
} catch (Exception e) {
log.error("redlock exception ",e);
} finally {
// 无论如何, 最后都要解锁
redLock.unlock();
}
}