Java中AQS框架的原理及自定義同步組件

aqs 的核心原理是基于模板方法模式,通過維護 volatile int state 變量和 fifo 隊列實現同步機制。1. 它定義了 tryacquire 和 tryrelease 等抽象方法供子類實現;2. 使用 clh 隊列管理等待線程,acquire() 和 release() 控制鎖的獲取與釋放;3. 支持獨占與共享兩種模式,分別適用于 reentrantlock 和 semaphore 等場景;4. condition 對象用于線程等待與通知,提升條件阻塞控制能力;5. 性能優化可通過減少 cas 競爭、降低線程阻塞喚醒開銷及優化隊列操作實現。開發者繼承 aqs 并實現其核心方法即可構建自定義同步組件。

Java中AQS框架的原理及自定義同步組件

AQS (AbstractQueuedSynchronizer) 是 Java 并發包 java.util.concurrent 的核心基石。它提供了一個構建鎖和同步器的框架,簡化了鎖的實現。簡單來說,它通過維護一個 volatile int state 變量和一個 FIFO 隊列來管理并發狀態,并提供了一套原子性操作 state 變量的方法。

Java中AQS框架的原理及自定義同步組件

AQS 的核心原理是基于模板方法模式。它定義了同步器需要實現的抽象方法,例如 tryAcquire (嘗試獲取鎖) 和 tryRelease (嘗試釋放鎖)。開發者只需要繼承 AQS 并實現這些方法,就可以構建自定義的同步組件。

Java中AQS框架的原理及自定義同步組件

解決方案

立即學習Java免費學習筆記(深入)”;

Java中AQS框架的原理及自定義同步組件

AQS 的運作可以分解為以下幾個關鍵步驟:

  1. 狀態管理: AQS 維護一個 volatile int state 變量,代表同步狀態。getState()、setState() 和 compareAndSetState() 方法提供了對狀態的原子性操作。

  2. CLH 隊列: AQS 使用一個 FIFO 隊列 (CLH 隊列的變體) 來管理等待獲取鎖的線程。當一個線程嘗試獲取鎖失敗時,它會被加入到隊列的尾部,并進入阻塞狀態。

  3. 獲取鎖: 線程調用 acquire(int arg) 方法嘗試獲取鎖。acquire() 方法會調用 tryAcquire(int arg) 方法,該方法由子類實現,用于嘗試獲取鎖。如果 tryAcquire() 成功,則 acquire() 方法返回;否則,線程會被加入到 CLH 隊列中,并阻塞等待。

  4. 釋放鎖: 線程調用 release(int arg) 方法釋放鎖。release() 方法會調用 tryRelease(int arg) 方法,該方法由子類實現,用于嘗試釋放鎖。如果 tryRelease() 成功,則 release() 方法會喚醒 CLH 隊列中的下一個線程。

  5. 獨占模式和共享模式: AQS 支持獨占模式和共享模式。獨占模式下,只有一個線程可以獲取鎖;共享模式下,多個線程可以同時獲取鎖。acquire() 和 release() 方法用于獨占模式,acquireShared() 和 releaseShared() 方法用于共享模式。

AQS 的代碼實現細節相當復雜,涉及到 CAS 操作、線程阻塞/喚醒等底層機制。理解這些細節有助于更深入地掌握 AQS 的原理。

自定義同步組件,需要繼承 AQS,并重寫以下方法:

  • tryAcquire(int arg):獨占模式下嘗試獲取鎖。
  • tryRelease(int arg):獨占模式下嘗試釋放鎖。
  • tryAcquireShared(int arg):共享模式下嘗試獲取鎖。
  • tryReleaseShared(int arg):共享模式下嘗試釋放鎖。
  • isHeldExclusively():當前同步器是否在獨占模式下被線程占用。

如何選擇合適的同步模式:獨占還是共享?

選擇獨占模式還是共享模式取決于你的同步組件的用途。如果你的組件需要保證同一時刻只有一個線程可以訪問共享資源,那么應該選擇獨占模式。例如,ReentrantLock 就是一個獨占鎖。如果你的組件允許多個線程同時訪問共享資源,那么應該選擇共享模式。例如,Semaphore 和 CountDownLatch 就是共享同步器。

例如,你想實現一個簡單的讀寫鎖,讀鎖是共享的,寫鎖是獨占的。那么你可以基于 AQS 實現一個 ReadWriteLock 類,其中讀鎖使用 tryAcquireShared() 和 tryReleaseShared(),寫鎖使用 tryAcquire() 和 tryRelease()。

import java.util.concurrent.locks.AbstractQueuedSynchronizer;  public class ReadWriteLock {      private final Sync sync = new Sync();      public void readLock() {         sync.acquireShared(1);     }      public void readUnlock() {         sync.releaseShared(1);     }      public void writeLock() {         sync.acquire(1);     }      public void writeUnlock() {         sync.release(1);     }      private static class Sync extends AbstractQueuedSynchronizer {          @Override         protected int tryAcquireShared(int acquires) {             // 實現讀鎖的獲取邏輯             return super.tryAcquireShared(acquires);         }          @Override         protected boolean tryReleaseShared(int releases) {             // 實現讀鎖的釋放邏輯             return super.tryReleaseShared(releases);         }          @Override         protected boolean tryAcquire(int acquires) {             // 實現寫鎖的獲取邏輯             return super.tryAcquire(acquires);         }          @Override         protected boolean tryRelease(int releases) {             // 實現寫鎖的釋放邏輯             return super.tryRelease(releases);         }     } }

上面的代碼只是一個框架,你需要填充 tryAcquireShared、tryReleaseShared、tryAcquire 和 tryRelease 方法的具體實現。這涉及到維護讀寫狀態,以及處理并發競爭。

AQS 中的 Condition 對象有什么作用?

Condition 對象是 AQS 的一個重要組成部分,它提供了一種線程等待/通知機制,類似于 Object.wait() 和 Object.notify() 方法。Condition 對象允許線程在獲取鎖之后,因為某些條件不滿足而進入等待狀態,并在條件滿足時被喚醒。

每個 Condition 對象都關聯著一個等待隊列。當線程調用 Condition.await() 方法時,它會被加入到等待隊列中,并釋放持有的鎖。當其他線程調用 Condition.signal() 或 Condition.signalAll() 方法時,等待隊列中的線程會被喚醒,并嘗試重新獲取鎖。

例如,在生產者-消費者模型中,可以使用 Condition 對象來實現緩沖區為空時消費者等待,緩沖區滿時生產者等待的邏輯。

import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock;  public class BoundedBuffer {      final ReentrantLock lock = new ReentrantLock();     final Condition notFull  = lock.newCondition();     final Condition notEmpty = lock.newCondition();      final Object[] items = new Object[100];     int putptr, takeptr, count;      public void put(Object x) throws InterruptedException {         lock.lock();         try {             while (count == items.length)                 notFull.await();             items[putptr] = x;             if (++putptr == items.length) putptr = 0;             ++count;             notEmpty.signal();         } finally {             lock.unlock();         }     }      public Object take() throws InterruptedException {         lock.lock();         try {             while (count == 0)                 notEmpty.await();             Object x = items[takeptr];             if (++takeptr == items.length) takeptr = 0;             --count;             notFull.signal();             return x;         } finally {             lock.unlock();         }     } }

在這個例子中,notFull 和 notEmpty 兩個 Condition 對象分別用于控制生產者和消費者的等待和喚醒。

AQS 的性能瓶頸及優化策略

AQS 雖然強大,但并非完美。在高并發場景下,AQS 的性能可能會成為瓶頸。

  • CAS 競爭: AQS 依賴于 CAS 操作來更新 state 變量。在高并發場景下,CAS 競爭可能會非常激烈,導致大量的重試,降低性能。

  • 線程阻塞/喚醒: 線程的阻塞和喚醒涉及到用戶態和內核態的切換,開銷較大。頻繁的線程阻塞/喚醒會影響性能。

  • 隊列操作: AQS 使用 CLH 隊列來管理等待線程。隊列的操作,例如入隊和出隊,也需要一定的開銷。

針對這些瓶頸,可以采取以下優化策略:

  • 減少 CAS 競爭: 可以通過使用更細粒度的鎖,或者使用無鎖數據結構來減少 CAS 競爭。

  • 減少線程阻塞/喚醒: 可以通過使用自旋鎖或者使用 CompletableFuture 等異步編程技術來減少線程阻塞/喚醒。

  • 優化隊列操作: 可以通過使用更高效的隊列數據結構,或者使用批量操作來優化隊列操作。

例如,Java 8 中引入的 StampedLock 就是一種優化的讀寫鎖,它使用了樂觀讀和 CAS 操作來減少鎖的競爭,從而提高性能。

? 版權聲明
THE END
喜歡就支持一下吧
點贊14 分享