加鎖了,還有并發問題?redis分布式鎖你真的了解?下面本篇文章就來給大家聊一聊分布式系統下基于redis的分布式鎖,希望對大家有所幫助!
新接手的項目,偶爾會出現賬不平的問題。之前的技術老大臨走時給的解釋是:排查了,沒找到原因,之后太忙就沒再解決,可能是框架的原因……
既然項目交付到手中,這樣的問題是必須要解決的。梳理了所有賬務處理邏輯,最終找到了原因:數據庫并發操作熱點賬戶導致。就這這個問題,來聊一聊分布式系統下基于redis的分布式鎖。順便也分解一下問題形成原因及解決方案。【相關推薦:redis】
原因分析
系統并發量并不高,存在熱點賬戶,但也不至于那么嚴重。問題的根源在于系統架構設計,人為的制造了并發。場景是這樣的:商戶批量導入一批數據,系統會進行前置處理,并對賬戶余額進行增減。
此時,另外一個定時任務,也會對賬戶進行掃描更新。而且對同一賬戶的操作分布到各個系統當中,熱點賬戶也就出現了。
針對此問題的解決方案,從架構層面可以考慮將賬務系統進行抽離,集中在一個系統中進行處理,所有的數據庫事務及執行順序由賬務系統來統籌處理。從技術方面來講,則可以通過鎖機制來對熱點賬戶進行加鎖。
本篇文章就針對熱點賬戶基于分布式鎖的實現方式進行詳細的講解。
鎖的分析
- jvm內存模型級別的鎖,常用的有:synchronized、Lock等;
- 數據庫鎖,比如樂觀鎖,悲觀鎖等;
- 分布式鎖;
JVM內存級別的鎖,可以保證單體服務下線程的安全性,比如多個線程訪問/修改一個全局變量。但當系統進行集群部署時,JVM級別的本地鎖就無能為力了。
悲觀鎖與樂觀鎖
像上述案例中,熱點賬戶就屬于分布式系統中的共享資源,我們通常會采用數據庫鎖或分布式鎖來進行解決。
數據庫鎖,又分為樂觀鎖和悲觀鎖。
悲觀鎖是基于數據庫(mysql的InnoDB)提供的排他鎖來實現的。在進行事務操作時,通過select … for update語句,MySQL會對查詢結果集中每行數據都添加排他鎖,其他線程對該記錄的更新與刪除操作都會阻塞。從而達到共享資源的順序執行(修改);
樂觀鎖是相對悲觀鎖而言的,樂觀鎖假設數據一般情況不會造成沖突,所以在數據進行提交更新的時候,才會正式對數據的沖突與否進行檢測。如果沖突則返回給用戶異常信息,讓用戶決定如何去做。樂觀鎖適用于讀多寫少的場景,這樣可以提高程序的吞吐量。在樂觀鎖實現時通常會基于記錄狀態或添加version版本來進行實現。
悲觀鎖失效場景
項目中使用了悲觀鎖,但悲觀鎖卻失效了。這也是使用悲觀鎖時,常見的誤區,下面來分析一下。
正常使用悲觀鎖的流程:
- 通過select … for update鎖定記錄;
- 計算新余額,修改金額并存儲;
- 執行完成釋放鎖;
經常犯錯的處理流程:
- 查詢賬戶余額,計算新余額;
- 通過select … for update鎖定記錄;
- 修改金額并存儲;
- 執行完成釋放鎖;
錯誤的流程中,比如A和B服務查詢到的余額都是100,A扣減50,B扣減40,然后A鎖定記錄,更新數據庫為50;A釋放鎖之后,B鎖定記錄,更新數據庫為60。顯然,后者把前者的更新給覆蓋掉了。解決的方案就是擴大鎖的范圍,將鎖提前到計算新余額之前。
通常悲觀鎖對數據庫的壓力是非常大的,在實踐中通常會根據場景使用樂觀鎖或分布式鎖等方式來實現。
下面進入正題,講講基于Redis的分布式鎖實現。
Redis分布式鎖實戰演習
這里以spring Boot、Redis、lua腳本為例來演示分布式鎖的實現。為了簡化處理,示例中Redis既承擔了分布式鎖的功能,也承擔了數據庫的功能。
場景構建
集群環境下,對同一個賬戶的金額進行操作,基本步驟:
- 從數據庫讀取用戶金額;
- 程序修改金額;
- 再將最新金額存儲到數據庫;
下面從最初不加鎖,不同步處理,逐步推演出最終的分布式鎖。
基礎集成及類構建
準備一個不加鎖處理的基礎業務環境。
首先在spring boot項目中引入相關依賴:
<dependency> ??<groupid>org.springframework.boot</groupid> ??<artifactid>spring-boot-starter-data-redis</artifactid></dependency><dependency> ??<groupid>org.springframework.boot</groupid> ??<artifactid>spring-boot-starter-web</artifactid></dependency>
賬戶對應實體類UserAccount:
public?class?UserAccount?{ ??//用戶ID ??private?String?userId; ??//賬戶內金額 ??private?int?amount; ??//添加賬戶金額 ??public?void?addAmount(int?amount)?{ ????this.amount?=?this.amount?+?amount; ??} ??//?省略構造方法和getter/setter? }
創建一個線程實現類AccountOperationThread:
public?class?AccountOperationThread?implements?Runnable?{ ??private?final?static?Logger?logger?=?LoggerFactory.getLogger(AccountOperationThread.class); ??private?static?final?Long?RELEASE_SUCCESS?=?1L; ??private?String?userId; ??private?RedisTemplate<object>?redisTemplate; ??public?AccountOperationThread(String?userId,?RedisTemplate<object>?redisTemplate)?{ ????this.userId?=?userId; ????this.redisTemplate?=?redisTemplate; ??} ??@Override ??public?void?run()?{ ????noLock(); ??} ??/** ???*?不加鎖 ???*/ ??private?void?noLock()?{ ????try?{ ??????Random?random?=?new?Random(); ??????//?模擬線程進行業務處理 ??????TimeUnit.MILLISECONDS.sleep(random.nextInt(100)?+?1); ????}?catch?(InterruptedException?e)?{ ??????e.printStackTrace(); ????} ????//模擬數據庫中獲取用戶賬號 ????UserAccount?userAccount?=?(UserAccount)?redisTemplate.opsForValue().get(userId); ????//?金額+1 ????userAccount.addAmount(1); ????logger.info(Thread.currentThread().getName()?+?"?:?user?id?:?"?+?userId?+?"?amount?:?"?+?userAccount.getAmount()); ????//模擬存回數據庫 ????redisTemplate.opsForValue().set(userId,?userAccount); ??} }</object></object>
其中RedisTemplate的實例化交給了Spring Boot:
@Configuration public?class?RedisConfig?{ ??@Bean ??public?RedisTemplate<object>?redisTemplate(RedisConnectionFactory?redisConnectionFactory)?{ ????RedisTemplate<object>?redisTemplate?=?new?RedisTemplate(); ????redisTemplate.setConnectionFactory(redisConnectionFactory); ????Jackson2JsonRedisSerializer<object>?jackson2JsonRedisSerializer?= ????????new?Jackson2JsonRedisSerializer(Object.class); ????ObjectMapper?objectMapper?=?new?ObjectMapper(); ????objectMapper.setVisibility(PropertyAccessor.ALL,?JsonAutoDetect.Visibility.ANY); ????objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL); ????jackson2JsonRedisSerializer.setObjectMapper(objectMapper); ????//?設置value的序列化規則和?key的序列化規則 ????redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); ????redisTemplate.setKeySerializer(new?StringRedisSerializer()); ????redisTemplate.afterPropertiesSet(); ????return?redisTemplate; ??} }</object></object></object>
最后,再準備一個TestController來進行觸發多線程的運行:
@RestController public?class?TestController?{ ??private?final?static?Logger?logger?=?LoggerFactory.getLogger(TestController.class); ??private?static?ExecutorService?executorService?=?Executors.newFixedThreadPool(10); ??@Autowired ??private?RedisTemplate<object>?redisTemplate; ??@GetMapping("/test") ??public?String?test()?throws?InterruptedException?{ ????//?初始化用戶user_001到Redis,賬戶金額為0 ????redisTemplate.opsForValue().set("user_001",?new?UserAccount("user_001",?0)); ????//?開啟10個線程進行同步測試,每個線程為賬戶增加1元 ????for?(int?i?=?0;?i?<p>執行上述程序,正常來說10個線程,每個線程加1,結果應該是10。但多執行幾次,會發現,結果變化很大,基本上都要比10小。</p> <pre class="brush:js;toolbar:false;">[pool-1-thread-5]?c.s.redis.thread.AccountOperationThread??:?pool-1-thread-5?:?user?id?:?user_001?amount?:?1 [pool-1-thread-4]?c.s.redis.thread.AccountOperationThread??:?pool-1-thread-4?:?user?id?:?user_001?amount?:?1 [pool-1-thread-3]?c.s.redis.thread.AccountOperationThread??:?pool-1-thread-3?:?user?id?:?user_001?amount?:?1 [pool-1-thread-1]?c.s.redis.thread.AccountOperationThread??:?pool-1-thread-1?:?user?id?:?user_001?amount?:?1 [pool-1-thread-1]?c.s.redis.thread.AccountOperationThread??:?pool-1-thread-1?:?user?id?:?user_001?amount?:?2 [pool-1-thread-2]?c.s.redis.thread.AccountOperationThread??:?pool-1-thread-2?:?user?id?:?user_001?amount?:?2 [pool-1-thread-5]?c.s.redis.thread.AccountOperationThread??:?pool-1-thread-5?:?user?id?:?user_001?amount?:?2 [pool-1-thread-4]?c.s.redis.thread.AccountOperationThread??:?pool-1-thread-4?:?user?id?:?user_001?amount?:?3 [pool-1-thread-1]?c.s.redis.thread.AccountOperationThread??:?pool-1-thread-1?:?user?id?:?user_001?amount?:?4 [pool-1-thread-3]?c.s.redis.thread.AccountOperationThread??:?pool-1-thread-3?:?user?id?:?user_001?amount?:?5 [nio-8080-exec-1]?c.s.redis.controller.TestController??????:?user?id?:?user_001?amount?:?5
以上述日志為例,前四個線程都將值改為1,也就是后面三個線程都將前面的修改進行了覆蓋,導致最終結果不是10,只有5。這顯然是有問題的。
Redis同步鎖實現
針對上面的情況,在同一個JVM當中,我們可以通過線程加鎖來完成。但在分布式環境下,JVM級別的鎖是沒辦法實現的,這里可以采用Redis同步鎖實現。
基本思路:第一個線程進入時,在Redis中進記錄,當后續線程過來請求時,判斷Redis是否存在該記錄,如果存在則說明處于鎖定狀態,進行等待或返回。如果不存在,則進行后續業務處理。
??/** ???*?1.搶占資源時判斷是否被鎖。 ???*?2.如未鎖則搶占成功且加鎖,否則等待鎖釋放。 ???*?3.業務完成后釋放鎖,讓給其它線程。 ???*?<p> ???*?該方案并未解決同步問題,原因:線程獲得鎖和加鎖的過程,并非原子性操作,可能會導致線程A獲得鎖,還未加鎖時,線程B也獲得了鎖。 ???*/ ??private?void?redisLock()?{ ????Random?random?=?new?Random(); ????try?{ ??????TimeUnit.MILLISECONDS.sleep(random.nextInt(1000)?+?1); ????}?catch?(InterruptedException?e)?{ ??????e.printStackTrace(); ????} ????while?(true)?{ ??????Object?lock?=?redisTemplate.opsForValue().get(userId?+?":syn"); ??????if?(lock?==?NULL)?{ ????????//?獲得鎖?->?加鎖?->?跳出循環 ????????logger.info(Thread.currentThread().getName()?+?":獲得鎖"); ????????redisTemplate.opsForValue().set(userId?+?":syn",?"lock"); ????????break; ??????} ??????try?{ ????????//?等待500毫秒重試獲得鎖 ????????TimeUnit.MILLISECONDS.sleep(500); ??????}?catch?(InterruptedException?e)?{ ????????e.printStackTrace(); ??????} ????} ????try?{ ??????//模擬數據庫中獲取用戶賬號 ??????UserAccount?userAccount?=?(UserAccount)?redisTemplate.opsForValue().get(userId); ??????if?(userAccount?!=?null)?{ ????????//設置金額 ????????userAccount.addAmount(1); ????????logger.info(Thread.currentThread().getName()?+?"?:?user?id?:?"?+?userId?+?"?amount?:?"?+?userAccount.getAmount()); ????????//模擬存回數據庫 ????????redisTemplate.opsForValue().set(userId,?userAccount); ??????} ????}?finally?{ ??????//釋放鎖 ??????redisTemplate.delete(userId?+?":syn"); ??????logger.info(Thread.currentThread().getName()?+?":釋放鎖"); ????} ??}</p>
在while代碼塊中,先判斷對應用戶ID是否在Redis中存在,如果不存在,則進行set加鎖,如果存在,則跳出循環繼續等待。
上述代碼,看起來實現了加鎖的功能,但當執行程序時,會發現與未加鎖一樣,依舊存在并發問題。原因是:獲取鎖和加鎖的操作并不是原子的。比如兩個線程發現lock都是null,都進行了加鎖,此時并發問題依舊存在。
Redis原子性同步鎖
針對上述問題,可將獲取鎖和加鎖的過程原子化處理。基于spring-boot-data-redis提供的原子化API可以實現:
//?該方法使用了redis的指令:SETNX?key?value //?1.key不存在,設置成功返回value,setIfAbsent返回true; //?2.key存在,則設置失敗返回null,setIfAbsent返回false; //?3.原子性操作; Boolean?setIfAbsent(K?var1,?V?var2);
上述方法的原子化操作是對Redis的setnx命令的封裝,在Redis中setnx的使用如下實例:
redis>?SETNX?mykey?"Hello" (integer)?1 redis>?SETNX?mykey?"World" (integer)?0 redis>?GET?mykey "Hello"
第一次,設置mykey時,并不存在,則返回1,表示設置成功;第二次設置mykey時,已經存在,則返回0,表示設置失敗。再次查詢mykey對應的值,會發現依舊是第一次設置的值。也就是說redis的setnx保證了唯一的key只能被一個服務設置成功。
理解了上述API及底層原理,來看看線程中的實現方法代碼如下:
??/** ???*?1.原子操作加鎖 ???*?2.競爭線程循環重試獲得鎖 ???*?3.業務完成釋放鎖 ???*/ ??private?void?atomicityRedisLock()?{ ????//Spring?data?redis?支持的原子性操作 ????while?(!redisTemplate.opsForValue().setIfAbsent(userId?+?":syn",?"lock"))?{ ??????try?{ ????????//?等待100毫秒重試獲得鎖 ????????TimeUnit.MILLISECONDS.sleep(100); ??????}?catch?(InterruptedException?e)?{ ????????e.printStackTrace(); ??????} ????} ????logger.info(Thread.currentThread().getName()?+?":獲得鎖"); ????try?{ ??????//模擬數據庫中獲取用戶賬號 ??????UserAccount?userAccount?=?(UserAccount)?redisTemplate.opsForValue().get(userId); ??????if?(userAccount?!=?null)?{ ????????//設置金額 ????????userAccount.addAmount(1); ????????logger.info(Thread.currentThread().getName()?+?"?:?user?id?:?"?+?userId?+?"?amount?:?"?+?userAccount.getAmount()); ????????//模擬存回數據庫 ????????redisTemplate.opsForValue().set(userId,?userAccount); ??????} ????}?finally?{ ??????//釋放鎖 ??????redisTemplate.delete(userId?+?":syn"); ??????logger.info(Thread.currentThread().getName()?+?":釋放鎖"); ????} ??}
再次執行代碼,會發現結果正確了,也就是說可以成功的對分布式線程進行了加鎖。
Redis分布式鎖的死鎖
雖然上述代碼執行結果沒問題,但如果應用異常宕機,沒來得及執行finally中釋放鎖的方法,那么其他線程則永遠無法獲得這個鎖。
此時可采用setIfAbsent的重載方法:
Boolean?setIfAbsent(K?var1,?V?var2,?long?var3,?TimeUnit?var5);
基于該方法,可以設置鎖的過期時間。這樣即便獲得鎖的線程宕機,在Redis中數據過期之后,其他線程可正常獲得該鎖。
示例代碼如下:
private?void?atomicityAndExRedisLock()?{ ????try?{ ??????//Spring?data?redis?支持的原子性操作,并設置5秒過期時間 ??????while?(!redisTemplate.opsForValue().setIfAbsent(userId?+?":syn", ??????????System.currentTimeMillis()?+?5000,?5000,?TimeUnit.MILLISECONDS))?{ ????????//?等待100毫秒重試獲得鎖 ????????logger.info(Thread.currentThread().getName()?+?":嘗試循環獲取鎖"); ????????TimeUnit.MILLISECONDS.sleep(1000); ??????} ??????logger.info(Thread.currentThread().getName()?+?":獲得鎖--------"); ??????//?應用在這里宕機,進程退出,無法執行?finally; ??????Thread.currentThread().interrupt(); ??????//?業務邏輯... ????}?catch?(InterruptedException?e)?{ ??????e.printStackTrace(); ????}?finally?{ ??????//釋放鎖 ??????if?(!Thread.currentThread().isInterrupted())?{ ????????redisTemplate.delete(userId?+?":syn"); ????????logger.info(Thread.currentThread().getName()?+?":釋放鎖"); ??????} ????} ??}
業務超時及守護線程
上面添加了Redis所的超時時間,看似解決了問題,但又引入了新的問題。
比如,正常情況下線程A在5秒內可正常處理完業務,但偶發會出現超過5秒的情況。如果將超時時間設置為5秒,線程A獲得了鎖,但業務邏輯處理需要6秒。此時,線程A還在正常業務邏輯,線程B已經獲得了鎖。當線程A處理完時,有可能將線程B的鎖給釋放掉。
在上述場景中有兩個問題點:
- 第一,線程A和線程B可能會同時在執行,存在并發問題。
- 第二,線程A可能會把線程B的鎖給釋放掉,導致一系列的惡性循環。
當然,可以通過在Redis中設置value值來判斷鎖是屬于線程A還是線程B。但仔細分析會發現,這個問題的本質是因為線程A執行業務邏輯耗時超出了鎖超時的時間。
那么就有兩個解決方案了:
- 第一,將超時時間設置的足夠長,確保業務代碼能夠在鎖釋放之前執行完成;
- 第二,為鎖添加守護線程,為將要過期釋放但未釋放的鎖增加時間;
第一種方式需要全行大多數情況下業務邏輯的耗時,進行超時時間的設定。
第二種方式,可通過如下守護線程的方式來動態增加鎖超時時間。
public?class?DaemonThread?implements?Runnable?{ ??private?final?static?Logger?logger?=?LoggerFactory.getLogger(DaemonThread.class); ??//?是否需要守護?主線程關閉則結束守護線程 ??private?volatile?boolean?daemon?=?true; ??//?守護鎖 ??private?String?lockKey; ??private?RedisTemplate<object>?redisTemplate; ??public?DaemonThread(String?lockKey,?RedisTemplate<object>?redisTemplate)?{ ????this.lockKey?=?lockKey; ????this.redisTemplate?=?redisTemplate; ??} ??@Override ??public?void?run()?{ ????try?{ ??????while?(daemon)?{ ????????long?time?=?redisTemplate.getExpire(lockKey,?TimeUnit.MILLISECONDS); ????????//?剩余有效期小于1秒則續命 ????????if?(time?<p>上述線程每隔300毫秒獲取一下Redis中鎖的超時時間,如果小于1秒,則延長5秒。當主線程調用關閉時,守護線程也隨之關閉。</p> <p>主線程中相關代碼實現:</p> <pre class="brush:js;toolbar:false;">private?void?deamonRedisLock()?{ ????//守護線程 ????DaemonThread?daemonThread?=?null; ????//Spring?data?redis?支持的原子性操作,并設置5秒過期時間 ????String?uuid?=?UUID.randomUUID().toString(); ????String?value?=?Thread.currentThread().getId()?+?":"?+?uuid; ????try?{ ??????while?(!redisTemplate.opsForValue().setIfAbsent(userId?+?":syn",?value,?5000,?TimeUnit.MILLISECONDS))?{ ????????//?等待100毫秒重試獲得鎖 ????????logger.info(Thread.currentThread().getName()?+?":嘗試循環獲取鎖"); ????????TimeUnit.MILLISECONDS.sleep(1000); ??????} ??????logger.info(Thread.currentThread().getName()?+?":獲得鎖----"); ??????//?開啟守護線程 ??????daemonThread?=?new?DaemonThread(userId?+?":syn",?redisTemplate); ??????Thread?thread?=?new?Thread(daemonThread); ??????thread.start(); ??????//?業務邏輯執行10秒... ??????TimeUnit.MILLISECONDS.sleep(10000); ????}?catch?(InterruptedException?e)?{ ??????e.printStackTrace(); ????}?finally?{ ??????//釋放鎖?這里也需要原子操作,今后通過?Redis?+?Lua?講 ??????String?result?=?(String)?redisTemplate.opsForValue().get(userId?+?":syn"); ??????if?(value.equals(result))?{ ????????redisTemplate.delete(userId?+?":syn"); ????????logger.info(Thread.currentThread().getName()?+?":釋放鎖-----"); ??????} ??????//關閉守護線程 ??????if?(daemonThread?!=?null)?{ ????????daemonThread.stop(); ??????} ????} ??}
其中在獲得鎖之后,開啟守護線程,在finally中將守護線程關閉。
基于Lua腳本的實現
在上述邏輯中,我們是基于spring-boot-data-redis提供的原子化操作來保證鎖判斷和執行的原子化的。在非Spring Boot項目中,則可以基于Lua腳本來實現。
首先定義加鎖和解鎖的Lua腳本及對應的DefaultRedisScript對象,在RedisConfig配置類中添加如下實例化代碼:
@Configuration public?class?RedisConfig?{ ??//lock?script ??private?static?final?String?LOCK_SCRIPT?=?"?if?redis.call('setnx',KEYS[1],ARGV[1])?==?1?"?+ ??????"?then?redis.call('expire',KEYS[1],ARGV[2])?"?+ ??????"?return?1?"?+ ??????"?else?return?0?end?"; ??private?static?final?String?UNLOCK_SCRIPT?=?"if?redis.call('get',?KEYS[1])?==?ARGV[1]?then?return?redis.call"?+ ??????"('del',?KEYS[1])?else?return?0?end"; ??//?...?省略部分代碼 ?? ??@Bean ??public?DefaultRedisScript<boolean>?lockRedisScript()?{ ????DefaultRedisScript<boolean>?defaultRedisScript?=?new?DefaultRedisScript(); ????defaultRedisScript.setResultType(Boolean.class); ????defaultRedisScript.setScriptText(LOCK_SCRIPT); ????return?defaultRedisScript; ??} ??@Bean ??public?DefaultRedisScript<long>?unlockRedisScript()?{ ????DefaultRedisScript<long>?defaultRedisScript?=?new?DefaultRedisScript(); ????defaultRedisScript.setResultType(Long.class); ????defaultRedisScript.setScriptText(UNLOCK_SCRIPT); ????return?defaultRedisScript; ??} }</long></long></boolean></boolean>
再通過在AccountOperationThread類中新建構造方法,將上述兩個對象傳入類中(省略此部分演示)。然后,就可以基于RedisTemplate來調用了,改造之后的代碼實現如下:
??private?void?deamonRedisLockWithLua()?{ ????//守護線程 ????DaemonThread?daemonThread?=?null; ????//Spring?data?redis?支持的原子性操作,并設置5秒過期時間 ????String?uuid?=?UUID.randomUUID().toString(); ????String?value?=?Thread.currentThread().getId()?+?":"?+?uuid; ????try?{ ??????while?(!redisTemplate.execute(lockRedisScript,?Collections.singletonList(userId?+?":syn"),?value,?5))?{ ????????//?等待1000毫秒重試獲得鎖 ????????logger.info(Thread.currentThread().getName()?+?":嘗試循環獲取鎖"); ????????TimeUnit.MILLISECONDS.sleep(1000); ??????} ??????logger.info(Thread.currentThread().getName()?+?":獲得鎖----"); ??????//?開啟守護線程 ??????daemonThread?=?new?DaemonThread(userId?+?":syn",?redisTemplate); ??????Thread?thread?=?new?Thread(daemonThread); ??????thread.start(); ??????//?業務邏輯執行10秒... ??????TimeUnit.MILLISECONDS.sleep(10000); ????}?catch?(InterruptedException?e)?{ ??????logger.error("異常",?e); ????}?finally?{ ??????//使用Lua腳本:先判斷是否是自己設置的鎖,再執行刪除 ??????//?key存在,當前值=期望值時,刪除key;key存在,當前值!=期望值時,返回0; ??????Long?result?=?redisTemplate.execute(unlockRedisScript,?Collections.singletonList(userId?+?":syn"),?value); ??????logger.info("redis解鎖:{}",?RELEASE_SUCCESS.equals(result)); ??????if?(RELEASE_SUCCESS.equals(result))?{ ????????if?(daemonThread?!=?null)?{ ??????????//關閉守護線程 ??????????daemonThread.stop(); ??????????logger.info(Thread.currentThread().getName()?+?":釋放鎖---"); ????????} ??????} ????} ??}
其中while循環中加鎖和finally中的釋放鎖都是基于Lua腳本來實現了。
Redis鎖的其他因素
除了上述實例,在使用Redis分布式鎖時,還可以考慮以下情況及方案。
Redis鎖的不可重入
當線程在持有鎖的情況下再次請求加鎖,如果一個鎖支持一個線程多次加鎖,那么這個鎖就是可重入的。如果一個不可重入鎖被再次加鎖,由于該鎖已經被持有,再次加鎖會失敗。Redis可通過對鎖進行重入計數,加鎖時加 1,解鎖時減 1,當計數歸 0時釋放鎖。
可重入鎖雖然高效但會增加代碼的復雜性,這里就不舉例說明了。
等待鎖釋放
有的業務場景,發現被鎖則直接返回。但有的場景下,客戶端需要等待鎖釋放然后去搶鎖。上述示例就屬于后者。針對等待鎖釋放也有兩種方案:
- 客戶端輪訓:當未獲得鎖時,等待一段時間再重新獲取,直到成功。上述示例就是基于這種方式實現的。這種方式的缺點也很明顯,比較耗費服務器資源,當并發量大時會影響服務器的效率。
- 使用Redis的訂閱發布功能:當獲取鎖失敗時,訂閱鎖釋放消息,獲取鎖成功后釋放時,發送釋放消息。
集群中的主備切換和腦裂
在Redis包含主從同步的集群部署方式中,如果主節點掛掉,從節點提升為主節點。如果客戶端A在主節點加鎖成功,指令還未同步到從節點,此時主節點掛掉,從節點升為主節點,新的主節點中沒有鎖的數據。這種情況下,客戶端B就可能加鎖成功,從而出現并發的場景。
當集群發生腦裂時,Redis master節點跟slave 節點和 sentinel 集群處于不同的網絡分區。sentinel集群無法感知到master的存在,會將 slave 節點提升為 master 節點,此時就會存在兩個不同的 master 節點。從而也會導致并發問題的出現。Redis Cluster集群部署方式同理。
小結
通過生產環境中的一個問題,排查原因,尋找解決方案,到最終對基于Redis分布式的深入研究,這便是學習的過程。
同時,每當面試或被問題如何解決分布式共享資源時,我們會脫口而出”基于Redis實現分布式鎖“,但通過本文的學習會發現,Redis分布式鎖并不是萬能的,而且在使用的過程中還需要注意超時、死鎖、誤解鎖、集群選主/腦裂等問題。
Redis以高性能著稱,但在實現分布式鎖的過程中還是存在一些問題。因此,基于Redis的分布式鎖可以極大的緩解并發問題,但要完全防止并發,還是得從數據庫層面入手。
源碼地址:https://github.com/secbr/springboot-all/tree/master/springboot-redis-lock
更多編程相關知識,請訪問:redis!!