redis用來做什麼?除了緩存,第一個想到的就是分佈式鎖了,現在基本上也是面試必問八股文了。
我們知道單機系統可以用JUC提供的鎖像synchronized或者Lock,但這些在分佈式系統中就不起作用了。
一個靠譜的分佈式鎖需要滿足以下條件
- 獨佔性,任何時刻只能有且僅有一個線程持有
- 高可用,若redis集群環境下,不能因為某一個節點掛了而出現獲取鎖和釋放鎖失敗的情況
- 防死鎖,杜絕死鎖,必須有超時控制機制或者撤銷操作,有個兜底終止跳出方案
- 不亂搶,不能私下unlock別人的鎖,只能自己加鎖自己釋放。
- 重入性,同一個節點的同一個線程如果獲得鎖之後,它也可以再次獲取這個鎖。
經典的超賣問題
我們來看一個秒殺的案例,有一個商品有10件庫存, 每搶一次減一次庫存
private void buyGoods() {
//獲取商品剩餘庫存
Integer result = (Integer) redisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : result;
if (goodsNumber > 0) {
//還有庫存則數量減一併且存入真實庫存
int realNumber = goodsNumber - 1;
redisTemplate.opsForValue().set("goods:001", realNumber);
System.out.println(Thread.currentThread().getName() + " 已經成功秒殺商品,此時還剩餘:" + realNumber + "件");
} else {
System.out.println(Thread.currentThread().getName() + " 商品已經售罄,歡迎下次光臨");
}
}
模擬十個人來搶
public void testOverBuy() throws Exception {
for (int i = 0; i < 10; i++) {
new Thread(() -> buyGoods()).start();
}
}
看輸出,十個人搶了庫存卻還是9,原因是減庫存和保存操作不是原子操作,需要加synchronized鎖或者lock鎖,但是在分佈式環境下這樣做就不行了,需要用redis來實現鎖。
Thread-13 已經成功秒殺商品,此時還剩餘:9件
Thread-12 已經成功秒殺商品,此時還剩餘:9件
Thread-6 已經成功秒殺商品,此時還剩餘:9件
Thread-7 已經成功秒殺商品,此時還剩餘:9件
Thread-8 已經成功秒殺商品,此時還剩餘:9件
Thread-5 已經成功秒殺商品,此時還剩餘:9件
Thread-4 已經成功秒殺商品,此時還剩餘:9件
Thread-11 已經成功秒殺商品,此時還剩餘:9件
Thread-10 已經成功秒殺商品,此時還剩餘:9件
Thread-9 已經成功秒殺商品,此時還剩餘:9件
redis實現分佈式鎖
我們來看看自己如何實現分佈式鎖,首先要滿足獨佔性,用redis的setnx命令,給key設置value如果返回1表示加鎖成功,返回0表示鎖已被占,為了防止死鎖要加個過期時間,這樣業務線程掛了的話鎖也能釋放,在finally語句塊中釋放鎖
private void testRedisLock() {
String key = "goodsRedisKey";
//設置標識位,如果設置失敗說明已被鎖,加過期時間防止死鎖
boolean flagLock = redisTemplate.opsForValue().setIfAbsent(key, 1, Duration.ofSeconds(30));
if (!flagLock) {
//沒搶到鎖直接返回
return;
}
try {
//業務邏輯
} finally {
//釋放鎖
redisTemplate.delete(key);
}
}
以上代碼有個缺陷沒有實現不亂搶,也就是別的線程可以刪除你佔用的鎖,所以需要在標誌位中設置線程id,釋放鎖的時候判斷下是否是同一個線程,同一個線程才允許釋放
private void testRedisLock() {
//生成線程標識
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
String key = "goodsRedisKey";
//設置標識位,如果設置失敗說明已被鎖,加過期時間防止死鎖
boolean flagLock = redisTemplate.opsForValue().setIfAbsent(key, value, Duration.ofSeconds(30));
if (!flagLock) {
//沒搶到鎖直接失敗
return;
}
try {
//業務邏輯
} finally {
// 判斷加鎖與解鎖是不是同一個客戶端,他們不是同一個原子操作
if (redisTemplate.opsForValue().get(key).equals(value)) {
redisTemplate.delete(key);
}
}
}
redisson實現分佈式鎖
以上代碼還有很多不嚴謹的地方,比如沒有實現鎖的自動續期,刪除鎖的操作不是原子操作。其實工作中實現分佈式鎖都是直接用redisson的,講解上面的代碼主要是了解下分佈式鎖的實現思路。
使用redisson實現分佈式鎖,可以看到代碼很簡潔
private void testRedisLock() {
String key = "goodsRedisKey";
RLock lock = redissonClient.getLock(key);
lock.lock();
try {
//業務邏輯
} finally {
lock.unlock();
}
}
接下去我們看redisson的實現原理,首先嘗試獲取鎖,線程一加鎖成功繼續執行,線程二加鎖失敗自旋等待。
加鎖源碼:
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
redisson裏面很多操作為了實現原子性使用了lua腳本,上面代碼分為三段
第一段exists KEYS[1],KEYS[1]就是用戶自定義的key,上文中的goodsRedisKey,判斷這個key存不存在,不存在則加鎖,並設置過期時間,默認30S,下圖就是加的鎖
第二段實現可重入,如果是自己加的鎖再lock時value加1
第三段實現互斥,如果上面都不滿足,則返回鎖的過期時間
釋放鎖源碼:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
第一段判斷是否自己加的鎖,不是自己加的鎖或者鎖已過期,則返回
第二段把鎖的計數減1,如果剩餘鎖計數為0,則刪除key,釋放鎖
自動續期-看門狗
上面說到,鎖的過期時間默認是30S,如果被鎖定的業務執行時間很長,超過30S那就會自動解鎖,所以需要有個機制來自動續期,這就是看門狗。
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
可以看到,看門狗就是一個TimerTask,每過internalLockLeaseTime/3也就是10S續期一次,調用renewExpirationAsync來異步續期,續期成功調用renewExpiration來繼續等待續期。
正確釋放鎖的方式
一些操作像tryLock不一定能保證拿到鎖,所以釋放鎖的時候要判斷下,否則會拋異常
try {
lock.tryLock(5, TimeUnit.SECONDS);
//業務邏輯
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (lock.isLocked() && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
最強王者-RedLock
redisson的分佈式鎖已經實現了5大條件的4個,獨佔性,防死鎖,不亂搶,防重入,那高可用能實現么?redis的高可用就是redis cluster和哨兵模式。
第一種是都是master的情況,由於redisson的鎖只會存在於一台master,如果那台master掛了,鎖就消失了
第二種是主從的情況,master掛了slave頂上去,鎖還在,似乎沒啥問題,但要知道redis集群是AP模式,啥意思呢?高可用,分區容錯,但不滿足一致性,因為主從同步是異步的,有可能master掛了鎖的數據還沒同步過去,導致slave頂上來的時候鎖消失了。
如果想要絕對的高可用,那就需要最強王者-RedLock登場了。
想要保證高可用,必須要有2*N+1台master服務器,N表示允許掛掉的服務器數量,比如允許1台服務器掛掉,就需要3台master.
- 依次嘗試從3個實例,使用相同的 key 和隨機值(例如 UUID)獲取鎖。當向Redis 請求獲取鎖時,客戶端應該設置一個超時時間,這個超時時間應該小於鎖的失效時間。例如你的鎖自動失效時間為 10 秒,則超時時間應該在 5-50 毫秒之間。這樣可以防止客戶端在試圖與一個宕機的 Redis 節點對話時長時間處於阻塞狀態。如果一個實例不可用,客戶端應該儘快嘗試去另外一個 Redis 實例請求獲取鎖;
- 客戶端通過當前時間減去步驟 1 記錄的時間來計算獲取鎖使用的時間。當且僅當從大多數(N/2+1,這裡是 3 個節點)的 Redis 節點都取到鎖,並且獲取鎖使用的時間小於鎖失效時間時,鎖才算獲取成功;
- 如果取到了鎖,其真正有效時間等於初始有效時間減去獲取鎖所使用的時間(步驟 2 計算的結果)。
- 如果由於某些原因未能獲得鎖(無法在至少 N/2 + 1 個 Redis 實例獲取鎖、或獲取鎖的時間超過了有效時間),客戶端應該在所有的 Redis 實例上進行解鎖(即便某些Redis實例根本就沒有加鎖成功,防止某些節點獲取到鎖但是客戶端沒有得到響應而導致接下來的一段時間不能被重新獲取鎖)。
redisson實現RedLock
public void getlock() {
//CACHE_KEY_REDLOCK為redis 分佈式鎖的key
RLock lock1 = redissonClient1.getLock(CACHE_KEY_REDLOCK);
RLock lock2 = redissonClient2.getLock(CACHE_KEY_REDLOCK);
RLock lock3 = redissonClient3.getLock(CACHE_KEY_REDLOCK);
RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
boolean isLock;
try {
//waitTime 鎖的等待時間處理,正常情況下 等5s
//leaseTime就是redis key的過期時間,正常情況下等5分鐘。
isLock = redLock.tryLock(5, 300, TimeUnit.SECONDS);
log.info("線程{},是否拿到鎖:{} ",Thread.currentThread().getName(),isLock);
if (isLock) {
//業務處理
}
} catch (Exception e) {
log.error("redlock exception ",e);
} finally {
// 無論如何, 最後都要解鎖
redLock.unlock();
}
}