2015年1月5日

Concurrency in Java

關於 Java Concurrency 議題,Java 並發編程的藝術 將所有相關的文章集結成了一本迷你書。在實作 java server side 程式時,一定會面臨到的問題就是 thread 之間的記憶體資料共享,另外還有在面對耗時的工作時,必須要同時執行的工作,也就是非同步呼叫。

synchronized vs volatile

volatile 是一種輕量級的同步機制,使用這種變數,就可以保證不會發生 context switching 或是 thread 切換的動作,但 volatile 的限制是,當變數的新值是依據舊值產生的時候,就不能使用 volatile。volatile 保證了共享變數的 visibility,當一個 thread 修改此共享變數時,另一個 thread 就能夠讀到這個修改後的值,簡單地說,就是 JVM 保證所有 thread 看到這個變數的值,都是一致的。

java 的 atomic operations 如下

  1. all assignments of primitive data types except for long and double
  2. all assignments of references
  3. all operations of java.concurrent.Atomic* classes
  4. all assignments of volatile longs and doubles

long foo = 87654321L;
並不是 thread-safe 的 operation,因為 java 會分成兩個步驟,先寫入 32bits 再寫入後 32bits,所以要改成

volatile long foo = 87654321L;

volatile 並不能完全取代 synchronied,甚至我們要注意,不能誤用 volatile,要不然就很容易出錯。一般基本判斷要不要使用 volatile 的條件,就是對一個變數讀取的次數遠高於寫入的次數。

可以使用 volatile 的情境如下:

狀態標籤

在另一個 thread 中呼叫 shutdown 用以中止 doWork 的無窮迴圈,這樣的寫法可保障 doWork 不回被惡意強制中斷。

volatile boolean shutdownRequested;
...
public void shutdown() { shutdownRequested = true; }

public void doWork() { 
    while (!shutdownRequested) { 
        // do stuff
    }
}

one-time safe publication

建立 singleton 物件,可使用 volatile 搭配 Double-checked locking 處理。

public class SingletonVolatile {

    private static volatile SingletonVolatile _instance;

    public static SingletonVolatile getInstance() {
        if (_instance == null) {
            synchronized (SingletonVolatile.class) {
                if (_instance == null)
                    _instance = new SingletonVolatile();
            }
        }

        return _instance;
    }

    public static void main(String[] args) {
        System.out.println(SingletonVolatile.getInstance());
        System.out.println(SingletonVolatile.getInstance());
        System.out.println(SingletonVolatile.getInstance());
    }
}

independent observation

定期 「發佈」 觀察結果供程序內部使用,一個後台thread可能會每隔幾秒讀取一次感應器,並更新包含當前文檔的 volatile 變量。然後,其他thread可以讀取這個變量,從而隨時能夠看到最新的溫度值。

以下是身份驗證機制記憶最近一次登錄的用戶的名字的範例。

public class UserManager {
    public volatile String lastUser;

    public boolean authenticate(String user, String password) {
        boolean valid = passwordIsValid(user, password);
        if (valid) {
            User u = new User();
            activeUsers.add(u);
            lastUser = user;
        }
        return valid;
    }
}

volatile bean

JavaBean 的所有數據成員都是 volatile 類型的,並且 getter 和 setter 方法必須非常普通 —— 除了獲取或設置相應的屬性外,不能包含任何邏輯,對於物件引用的成員,引用的對象必須是有效不可變的。

@ThreadSafe
public class Person {
    private volatile String firstName;
    private volatile String lastName;
    private volatile int age;

    public String getFirstName() { return firstName; }
    public String getLastName() { return lastName; }
    public int getAge() { return age; }

    public void setFirstName(String firstName) { 
        this.firstName = firstName;
    }

    public void setLastName(String lastName) { 
        this.lastName = lastName;
    }

    public void setAge(int age) { 
        this.age = age;
    }
}

開銷較低的讀-寫鎖策略

如果讀操作遠遠超過寫操作,可以結合使用內部鎖和 volatile 變量來減少公共代碼路徑的開銷。

@ThreadSafe
public class CheesyCounter {
    // Employs the cheap read-write lock trick
    // All mutative operations MUST be done with the 'this' lock held
    @GuardedBy("this") private volatile int value;

    public int getValue() { return value; }

    public synchronized int increment() {
        return value++;
    }
}

CAS(compare and swap)

獨佔鎖是一種悲觀鎖,synchronized 是一種獨佔鎖,它只有在確保其它線程不會造成干擾的情況下執行。樂觀鎖假設沒有衝突而去完成某項操作,如果因為衝突失敗就重試,直到成功為止。

實現 lock-fre 的 non-blocking 演算法,最有名的方式就是使用 CAS (compare and swap),CAS是個樂觀鎖技術,當有多個threads嘗試同時更新同一個變數時,只有其中一個 thread 能成功更新變數的值,而其它threads都會被告知更新失敗,然後再重試一次,失敗的threads並不會被暫停執行的程序。

CAS內部有3個內部變數,(1) 記憶體實際值V (2) 目標變數的舊預期值A (3) 目標變數要被修改的新值B,當 A==V時,才能將 V 修改為 B,否則什麼都不做。

ConcurrentHashMap

ConcurrentHashMap 是 HashMap 的 multithread 版本,只要會有多個 thread 會同時存取的資料結構,就必須使用 ConcurrentHashMap。至於早期 Java 版本使用的 HashTable,由於內部是以 synchronized 的方式來實作,再面對大量儲存資料的狀況下,其使用效率就會遠低於比 ConcurrentHashMap。

ConcurrentHashMap 是以 Segment 與 HashEntry 組成,Segment 就類似於 HashMap,因此我們可以同時存取對多個 Segment,但到了 Segment 這一層,就會是 synchronzied 的呼叫,所以只要 hash 過後的資料,落入了不同的 Segment,這樣才能保障 ConcurrentHashMap 可以發揮它的功能。

ConcurrentHashMap 的 get method 把所有分享的變數都設定為 volatile,在處理過程中不需要加鎖,除非讀到的值是空的才會加鎖重讀,volatile 可保證只能被單一 thread 修改,且修改後也不會讀到舊的資料,重要的是可讓多個 thread 同時讀取。

put method 為了 thread-safe 必須加鎖處理。size method 裡面是以累加 Segment 裡面的 volatile 變數 count 實作的,由於 count 的變化機率很小,累加過程多半沒有發生 count 的變化,size 會先嘗試兩次不加鎖累加。但如果發生了,就會再進行加鎖累加。

count 的變化判斷是以 modCount 變數進行,在 put, remove, clean 之前,都會先將 modCount 加 1。

ThreadPool

通常在使用 J2EE Server 連結 DB 時,會利用 DB Connection Pool 當作中間層,這可以減少 DB Connection 建立與銷毀所消耗掉的資源。

至於 JVM 的 thread 也一樣,可以使用 ThreadPool 來進行 Thread 的 reuse 與 管理,不僅可減少建立與銷毀 thread 所消耗掉的資源,也可以縮短任務處理的時間。

thread 跟 db connection 都屬於 JVM 的外部資源,這些資源都有使用的上限,以 pool 的方式進行使用管理,而不是無限制地建立與使用,可以保護 server 並提高穩定度。

建立 ThreadPool

new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);

相關的參數

  1. corePoolSize(pool的基本大小)
    ThreadPool 會持續建立 thread,直到 thread 的數量達到 corePoolSize。如果呼叫 prestartAllCoreThreads method,ThreadPool 就會提前建立所有基本線程。

  2. runnableTaskQueue
    儲存等待執行的任務的blocking queue

    2.1 ArrayBlockingQueue:以 Array 實作,FIFO(先進先出)

    2.2 LinkedBlockingQueue:以 LinkedList 實作,FIFO (先進先出),throughput比ArrayBlockingQueue好。Executors.newFixedThreadPool() 就是使用這個queue。

    2.3 SynchronousQueue:enqueue必須等到另一個線程呼叫移除才能執行,否則enqueue就會一直處於阻塞狀態,throughput比LinkedBlockingQueue高,Executors.newCachedThreadPool使用了這個queue。

    2.4 PriorityBlockingQueue

  3. maximumPoolSize(pool最大大小)
    ThreadPool允許建立的最大線程數。

  4. ThreadFactory
    用於建立thread的factory

  5. RejectedExecutionHandler(飽和策略)
    當 Queue 與 ThreadPool 都滿了,必須採取一種策略處理提交的新任務。以下是JDK1.5提供的四種策略。
    5.1 AbortPolicy:預設值,直接拋出異常。
    5.2 CallerRunsPolicy:使用呼叫端所在線程來執行任務
    5.3 DiscardOldestPolicy:丟棄 queue 裡最近的一個任務,並執行當前任務
    5.4 DiscardPolicy:不處理,直接丟棄
    5.5 自訂 RejectedExecutionHandler

  6. keepAliveTime
    thread 空閒後,保持存活的時間。如果任務很多,並且每個任務執行的時間比較短,可以調大這個時間,提高線程的利用率。

  7. TimeUnit
    天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)

提交處理任務的方式有兩種

  1. execute方法沒有返回值,所以無法判斷任務是否被線程池執行成功

    threadsPool.execute(new Runnable() {
             @Override
             public void run() {
             }
         });
    
  2. 使用 submit 來提交任務,它會返回一個future,我們可以通過這個future來判斷任務是否執行成功。future.get()會 blocking 住,直到任務完成,如果使用 get(long timeout, TimeUnit unit) 則會阻塞一段時間後立即返回。

    Future<Object> future = executor.submit(harReturnValuetask);
    try {
      Object s = future.get();
    } catch (InterruptedException e) {
     // 處理中斷異常
    } catch (ExecutionException e) {
     // 處理無法執行任務異常
    } finally {
     // 關閉線程池
     executor.shutdown();
    }
    

當提交一個新任務到 ThreadPool 處理流程如下:

  1. 判斷基本線程池是否已滿?沒滿,建立一個工作線程來執行任務。滿了,則進入下個流程。
  2. 判斷工作隊列是否已滿?沒滿,則將新提交的任務儲存在工作隊列裡。滿了,則進入下個流程。
  3. 判斷整個線程池是否已滿?沒滿,則建立一個新的工作線程來執行任務,滿了,則交給 RejectedExecutionHandler 飽和策略來處理這個任務。

使用 ThreadPool 之前必須先分析任務特性,可以從以下幾個角度來進行分析:

  1. 任務的性質:CPU密集型任務,IO密集型任務和混合型任務。
    CPU密集型任務,儘可能小的線程,如配置N(cpu)+1個線程的線程池
    IO密集型任務則由於線程並不是一直在執行任務,則配置儘可能多的線程,如2*N(cpu)
    混合型的任務,如果可以拆分,則將其拆成一個CPU密集型任務和一個IO密集型任務
    Runtime.getRuntime().availableProcessors() 可取得當前的CPU個數
  2. 任務的優先級:高,中和低。
    優先級不同的任務可以使用優先級隊列PriorityBlockingQueue來處理。它可以讓優先級高的任務先得到執行,但如果一直有優先級高的任務,那麼優先級低的任務可能永遠會不能被執行。
  3. 任務的執行時間:長,中和短。
    執行時間不同的任務可以交給不同規模的線程池來處理,也可以使用優先級隊列,讓執行時間短的任務先執行。
  4. 任務的依賴性:是否依賴其他系統資源,如DB連接。
    因為發送SQL後需要等待DB返回結果,如果等待的時間越長CPU空閒時間就越長,因此thread 數量應該設置越大,這樣才能更好的利用CPU。

監控 ThreadPool

有一些屬性在監控 ThreadPool 的時候可以使用

  1. taskCount:需要執行的任務數量
  2. completedTaskCount:在執行過程中已完成的任務數量
  3. largestPoolSize:曾經建立過的最大線程數量
  4. getPoolSize:Thread數量如果線程池不銷毀的話,池裡的線程不會自動銷毀,所以這個大小只增不+ getActiveCount:獲取活動的線程數。

可以繼承 ThreadPool 並覆寫 beforeExecute,afterExecute 和 terminated方法,可以在任務執行前,執行後和線程池關閉前,監控任務的平均執行時間,最大執行時間和最小執行時間等。這幾個方法在線程池裡是空的方法。

protected void beforeExecute(Thread t, Runnable r) { }

Reference

聊聊並發系列文章

非阻塞同步算法與CAS(Compare and Swap)無鎖算法

JDK, Jetty, Tomcat 線程池的實現算法分析
Java 理論與實踐: 正確使用 Volatile 變量