Java并發編程(2)- FutureTask詳解與池化思想的設計和實戰一

作者: 修羅debug
版權聲明:本文為博主原創文章,遵循 CC 4.0 by-sa 版權協議,轉載請附上原文出處鏈接和本聲明。




Java并發編程領域,FutureTask可以說是一個非常強大的利器,它通過實現RunnableFuture接口間接擁有了RunnableFuture接口的相關特性,既可以用于充當線程執行的任務(Runnable),也可以用于獲取線程異步執行任務后返回的結果(Future);本文將通過剖析解讀FutureTask底層相關的核心源碼,并基于FutureTask自設計并實戰一款“池容器”,即池化思想的設計和實戰;


寫在前面的話:debug最近又出了一本新書:Spring Boot企業級項目-入門到精通》 感興趣的小伙伴可以前往各大商城平臺(淘寶、天貓、當當、京東等)一睹為快!書籍的封面如下所示,后續debug會專門出篇文章專門介紹這本書(同時提供優惠購書渠道):



言歸正傳,在上篇文章中:Java并發編程(1): Callable、Future和FutureTask ,我們已經介紹并實戰過了Java并發編程中Callable、Future以及FutureTask的相關基本概念以及API,本文將不再贅述;


值得一提的是,Future或者FutureTask需要通過線程池才能發揮出實際的功效,因此在實際應用中它跟線程池又有著千絲萬縷的聯系,本文將從源碼的角度進行剖析,通過解讀FutureTask底層相關的核心源碼,并基于FutureTask自設計并實戰一款“池容器”,即池化思想的設計和實戰;


1)在上篇文章中想必各位觀看老爺們已經基本知道了Future、FutureTask需要結合線程池來使用,看下方代碼:   

ArrayBlockingQueue queue=new ArrayBlockingQueue(2);
ExecutorService executor=new ThreadPoolExecutor(2,4,1, TimeUnit.MINUTES,queue);
FutureTask<Map<String,Object>> futureTask=new FutureTask<Map<String, Object>>(new ProductThread());
executor.execute(futureTask);
Map<String,Object> resMap=futureTask.get();
System.out.println("--子線程執行任務后得到的結果:"+resMap);


簡短解說:在上述該代碼中,ProductThread是一個實現了Callable接口的類,其中的call()方法便是真正要執行的任務代碼邏輯,在此就不貼出來了(在上篇文章有源碼);然后通過它構造一FutureTask,最后便是提交給線程池的execute()方法進行執行,執行完成之后,通過futureTaskget()方法獲取線程異步執行后返回的結果;

而本文我們將基于這一段“解說”進行核心源碼的剖析。


2)首先是new FutureTask<Map<String, Object>>(new ProductThread()),即創建FutureTask,其底層源碼如下所示:   

//創建任務.等待被線程池中的線程執行
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}


在上述源碼中,值得一提的是,任務的運行狀態變量state的設計個人覺得相當巧妙,采用volatile關鍵字進行定義,而volatile的作用想必有些小伙伴是比較熟悉的:

1)保證線程之間可見性:即對 用volatile關鍵字修飾的變量 的可見性,即 “當一個線程修改了這個變量的值時,volatile 可以保證新值能立即同步到主內存,以及每次使用前立即從主內存刷新”


2)禁止指令重排序:A.那么什么是“指令重排序”呢,我們寫的代碼最終都將轉化為相應的指令交付給底層控制單元、計算單元執行,即CPU執行,重排序 則指的是CPU采用了允許將多條指令不按程序規定的順序分開發送給各相應電路單元處理,這樣做的弊端在于多核處理器下各處理器會發生亂序執行,從而導致我們所謂的 “并發安全”問題,而volatile關鍵詞就禁止了這種現象,即通過在本地代碼中插入許多內存屏障指令來保證處理器不發生亂序執行;

 

OK,回到線程待執行任務的運行狀態變量state,其定義和取值如下所示(總共有6個狀態的取值,其含義直接翻譯過來就行了):   

private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;


從官方的源碼注釋中可以看出 一個任務從創建到完畢,可能經歷的狀態變化為:   

NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED


3FutureTask任務定義好了之后,接下來就應該是交給線程準備執行了,即:executor.execute(futureTask) ,其底層源碼如下所示:   

public void execute(Runnable command){
//如果任務對象為null,拋異常
if (command == null)
throw new NullPointerException();

//先獲取當前池中工作中的線程(活躍的、可用的線程數)
//如果當前池中的線程數小于核心線程數,就會調用addWorker檢查運行狀態和正在運行的
//線程數量
//通過return操作可以用于防止錯誤地添加線程、然后執行當前任務
//(因為隨意的添加線程只會造成資源浪費)
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//否則當 池中的線程數大于核心線程數的時候 且 任務可以被加入任務隊列時
//我們需要來個雙重判斷,判斷是否真的需要添加一個新的線程來執行這個任務,
//因為可能已經存在這樣的情況:線程執行完畢任務后的那一刻可能處于空閑狀態,
//這個時候該線程就可以直接復用;
//否則直接創建一個新的線程來執行此任務
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//判斷池中是否有可用的線程,如果沒有 而且 也無法將當前任務加入到任務隊列時
//則拒絕執行當前的任務(拒絕的策略取決于創建線程池時指定的策略)
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果不能創建新的線程去執行新任務的話,就拒絕當前任務
else if (!addWorker(command, false))
reject(command);
}


在上述源碼中,addWorker()方法很搶眼,因此不得不稍微介紹一番,因為該方法代碼有點長,debug特意將其截成長圖供各位看官老爺們閱讀,如下所示:



在上圖中有一小段代碼便是真正觸發“線程執行任務”的時機,即:

if (workerAdded) {
t.start();
workerStarted = true;
}


start()方法一調用,便最終會調用FutureTask中的run()方法,其底層源碼如下所示:

public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
//將前面執行 new FutureTask(callable) 代碼時傳入該構造方法的任務對象
//callable引用 交給新的 c
//如果當前任務處于 NEW,即創建的狀態,則執行callable原生定義的call()方法的
//代碼邏輯,其實就是 ProductThread 中的 call()方法的代碼邏輯
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
//執行完成之后,將得到的結果通過 set() 方法設置到 FutureTask 中的
//私有變量 outCome中
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}


其中,if (ran)  set(result); 表示將線程異步執行完之后得到的結果通過 set() 方法設置到 FutureTask 中的私有變量 outCome中,其代碼如下所示:

//上述執行完run()方法后,會得到一個結果V,將該結果通過 set(V) 方法可以設置//到 FutureTask 
//中的私有變量 outCome中
//其中設置回去的過程其實也是加了鎖,只不過是一個樂觀鎖,即通過cas機制來實現
//即“判斷當前任務的舊狀態old=stateOffset是否真的為New,如果是,
//則將其設置為COMPLETING,
//代表任務正在執行中”,其他線程就執行不成功
//然后設置完成后,將該任務的運行狀態設置為NORMAL,即最終態為
//“任務已執行完成”
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}


最后,便是通過futureTaskget()方法獲取線程異步執行后返回的結果,即對應的業務代碼為: Map<String,Object> resMap=futureTask.get(); 接下來一起研讀get()方法底層的源碼:

//獲取線程異步執行后的結果,過程:先獲取當前任務的運行狀態
//如果是已完成,即Normal狀態時,則直接report(s),即獲取結果
//如果是未完成(即處于運行期間 : <= COMPLETING),
//則進入等待的邏輯,即awaitDone()方法里面的代碼邏輯
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}


report()方法的底層源碼也很簡潔,如下所示:

//其中report()方法的核心邏輯,就是通過判斷當前任務的運行狀態(已完成)
//從而將線程的執行結果 result 返回:
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}


如果當前任務仍然處于運行中的狀態時,則執行 awaitDone() 方法進入堵塞隊列等待獲取執行結果的代碼邏輯,如下所示:

/**
* Awaits completion or aborts on interrupt or timeout.
* 等待任務執行完成、或者任務被終止、或者任務被中斷、或者任務執行超時
* @param timed true if use timed waits
* @param nanos time to wait, if timed
*
* 如果設置timed為true,則代表在獲取線程異步執行的結果時 可以等待一定的時間nanos
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
//這其實也是一個死循環 - CAS自旋 + AQS 即同步隊列控制器 的原理
for (;;) {
//看看執行任務的線程是不是被中斷interrupt,如果是的話做出:
//1.在等待隊列中移除這個調用get方法的線程結點WaitNode
//(怎么移除呢:很簡單,只需要調整鏈表中的線程節點即可)
//2. 拋出異常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
//表示任務執行完畢:可能正常完成也可能拋異常,總之就是結束了
//就把這個waitNode的執行線程thread指向null
if (q != null)
q.thread = null;
return s;
}
//如果state==COMPLETING,意味著基本完成但還沒保存結果,就yield,
//表示線程掛起
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
//q == null時,即等待節點q為null,就創建等待節點,
//這個節點后面會被插入阻塞隊列
//第一次循環時一般會執行到
else if (q == null)
q = new WaitNode();
//一般第二次循環時會執行到
//大概的含義為:判斷queued,即是否入隊列成功,
//這里是將創建的線程節點q加入隊列頭
//使用Unsafe的CAS方法,對waiters進行賦值,
//waiters也是一個WaitNode節點,
//相當于隊列頭,
//或者理解為隊列的頭指針,通過WaitNode可以遍歷整個阻塞隊列
//(頭插法)
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
//設置了超時的處理機制:設置超時時間之后,
//調用get()的線程最多阻塞nanos 納秒,
//就會從阻塞狀態醒過來。
//如果最終真的超時的話,就移除 調用get()方法的線程wait結點,
//并返回state
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
//否則就進入堵塞等待狀態(nanos納秒),等待被喚醒
LockSupport.parkNanos(this, nanos);
}
else
//沒有設置超時時間但任務又還沒執行出結果,就直接進入阻塞狀態,
//等待被其他線程喚醒
LockSupport.park(this);
}
}


至此,對于FutureTask的核心源碼剖析我們已經擼完了,當然啦,還有像cancel()方法,即取消任務的執行就留給各位看官老爺們的研讀了(很簡單,通過CAS機制判斷任務的運行狀態 以及 mayInterruptIfRunning 參數決定最終是否可以中斷該任務的執行線程,同時也改變了任務的運行狀態: New-> INTERRUPTING->NORMAL ; New-> CANCELLED):

public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}


finishCompletion()等方法的源碼解讀也是如此,就留給各位小伙伴細品了!本文我們就到這里吧!OK,打完收工,下期還有個高級案例實戰,即“池化思想的設計與實戰”,我們下期再見?。?!


總結:

1)代碼下載:關注“程序員實戰基地”微信公眾號(掃描下圖微信公眾號即可),回復“100”,即可獲取代碼下載鏈接;歡迎關注debug的技術公眾號一起學習干貨技術吧!