Java并發編程(3)- FutureTask詳解與池化思想的設計和實戰二

作者: 修羅debug
版權聲明:本文為博主原創文章,遵循 CC 4.0 by-sa 版權協議,轉載請附上原文出處鏈接和本聲明。


Java并發編程領域,FutureTask可以說是一個非常強大的利器,它通過實現RunnableFuture接口間接擁有了RunnableFuture接口的相關特性,既可以用于充當線程執行的任務(Runnable),也可以用于獲取線程異步執行任務后返回的結果(Future);本文將基于FutureTask實戰一個高級案例:設計一款簡化版的池容器,以此學習鞏固池化思想. 

寫在前面的話:debug最近又出了一本新書:Spring Boot企業級項目-入門到精通》感興趣的小伙伴可以前往各大商城平臺(淘寶、天貓、當當、京東等)一睹為快!書籍的封面如下所示,后續debug會專門出篇文章專門介紹這本書(同時提供優惠購書渠道):   


言歸正傳,在上篇文章中:Java并發編程(2)-FutureTask詳解與池化思想的設計和實戰一,我們已經從源碼的角度結合多線程ThreadPoolExecuto,深入剖析并解讀了FutureTask 的相關API,從任務的創建、到任務的執行 最后再到 線程執行完任務后異步獲取執行結果;整個過程下來,想必各位看官老爺們應該收獲頗豐。

而本文我們將趁熱打鐵,進一步介紹FutureTask在實際項目開發中的作用;總的來說,FutureTask在實際項目開發中起到的作用有兩個:

(一)FutureTask執行多任務計算的場景

比如網站“程序員實戰基地fightjava.com”的首頁的數據是由多個功能模塊組成的:輪播圖、最新課程、最新博客、最新學習路線、最新資料、友情鏈接等模塊數據;

這些模塊由于具有獨立性、互不相關性,因此可以開啟多個FutureTask,然后交由多線程去執行,最終再統一通過get()方法獲取多線程異步執行任務的結果返回給前端瀏覽器(這一場景在debug最新擼的課程:Java工程師核心技術-典型案例與面試實戰系列二 就有重點介紹過,感興趣的小伙伴可以前往觀看學習?。?span lang="EN-US">


(二)在高并發場景下確保任務只被執行一次

在很多高并發的場景下,往往我們只需要某些任務被執行一次,這種情景FutureTask就能勝任(當然啦,可能還要借助像ConcurrentHashMap這樣的組件輔助;而本文要介紹的便是在這一場景下FutureTask所發揮的作用!


按照慣例,我們還是先來介紹下這一場景/需求吧:假設有一個帶Key的連接池,當Key存在時,則直接返回Key對應的連接對象;當Key不存在時,則創建一個 “連接”對象;


對于這樣的應用場景,通常采用的方法是使用一個Map來存儲Key和連接池對應的對應關系,而由于這是出于高并發的應用場景,因此穩妥的方式是采用ConcurrentHashMap,下面debug將采用N種方式對此進行實現,當然啦,最后的實現方式當然是FutureTask啦,畢竟大佬總是最后才出場的?。?!


 

話不多說,進入代碼實戰環節

1)首先出場的是傳統的實現方式,即按照常規的業務邏輯、算法實現的方式:   

@Component
@Slf4j
public class MyConnectionPool {
private ConcurrentHashMap<String,MyConnection> connMap=new ConcurrentHashMap<>();

//獲取鏈接
public MyConnection getConn(final String key){
MyConnection conn=connMap.get(key);
if (conn!=null){
return conn;
}
conn=createConn(key);
connMap.putIfAbsent(key,conn);
return conn;
}
}

代碼的含義應該不難理解哈,這但凡有點英語基礎的閉著眼睛都能猜出來是啥意思,除非你英語是體育老師教的:

 

那么到底這種方式行不行呢?其實行不行并不是由你我說了算,而是得需要先經過壓測哈,因為我們上面已經說了,必須要滿足“高并發環境”的前提;OK,啪的一下打開JMeter.sh,然后設置QPS=1000,甚至10000 ,很快啊,如下圖所示:


 

從上圖中就可以看出,此種方式雖然最終是可以得到想要的結果,但是卻產生了大量的、很有可能會被閑置的連接資源,因此這種方式不值得推薦!


2)第二種要出場的是“synchronized”,其實現代碼如下所示:   

public synchronized MyConnection getConnA(final String key){
MyConnection conn=connMap.get(key);
if (conn!=null){
return conn;
}
conn=createConn(key);
connMap.putIfAbsent(key,conn);
return conn;
}

相對于第一種方式,雖然解決了安全性問題,但是卻大大犧牲了性能,無法提高前端的并發量;即 synchronized這種同步方式,雖然犧牲了性能,但卻沒有浪費由于大量創建的連接所占用的空間資源(這其實是一種加  獨占/悲觀 鎖的方式)

JMeter一通壓測下來,發現確實沒啥問題哈,如下圖所示:


3)第三種實現方式仍然是加鎖,只是在這里加的是ZooKeeper的分布式鎖,話不多說,直接上代碼:   

//基于zk的分布式鎖 ~ 這種就集成、依賴了第三方的中間件(不具備獨立對外提供服務的特性)
public MyConnection getConnB(final String key) throws Exception{
MyConnection conn=null;
conn=connMap.get(key);
if (conn!=null){
return conn;
}
//操作zookeeper的客戶端實例
InterProcessMutex mutex=new InterProcessMutex(client,"/pool/V4");
try {
if (mutex.acquire(4,TimeUnit.SECONDS)){
conn=connMap.get(key);
if (conn!=null){
return conn;
}
conn=createConn(key);
connMap.putIfAbsent(key,conn);
}
}catch (Exception e){
}finally {
mutex.release();
}
return conn;
}

對于ZooKeeper不熟悉的小伙伴可以前往 “程序員實戰基地fightjava.com”觀看debug以前擼過的課程,比如“分布式鎖實戰視頻教程(基于Spring Boot”等等。上述的實現方式經過壓測后也沒有啥問題:



雖然在高并發的場景下性能是沒得說的、結果也是正確的;但是呢,它的缺陷也是很明顯的,那就是集成、依賴了第三方的中間件ZooKeeper,不具備獨立對外提供服務的特性,對外不友好 (所謂的“獨立”,指的是最好能直接基于JDK、而不依賴于任何第三方組件,且可移植性良好的特點)

 

4)最后要登場的自然是FutureTask啦,老規矩,還是先上代碼哈:   

private ConcurrentHashMap<String,FutureTask<MyConnection>> connHashMap=new ConcurrentHashMap<>();

//基于futureTask
public MyConnection getConnC(final String key) throws Exception{
FutureTask<MyConnection> futureTask=connHashMap.get(key);
if (futureTask!=null){
return futureTask.get();
}

//多線程訪問這段代碼都可以執行,即都創建了 “獲取連接” 的任務,
//但是注意:此時還沒真正地執行 獲取連接對象實例 的代碼
Callable<MyConnection> callable= () -> createConn(key);
FutureTask<MyConnection> newTask= new FutureTask<>(callable);

//同一時刻,對于同個key并發的多個線程只有一個可以成功此行代碼,即 putIfAbsent
//當返回null時表示現在本地映射map還沒有該key對應的task,否則調取get()方法
//堵塞式等待獲取執行結果即可
futureTask=connHashMap.putIfAbsent(key,newTask);
if (futureTask==null){
futureTask=newTask;
futureTask.run();
}
return futureTask.get();
}


JMeter壓測過后,觀察最終的結果,發現也是沒啥問題的,如下圖所示:


 

有小伙伴可能會問,這是為啥呢?為啥這種方式也可行呢?這主要有兩個原因:

A. ConcurrentHashMap<String,FutureTask<MyConnection>> 該方法從名字上就可以看出大概的意思:適用于高并發下的場景,它的Key具有唯一性,而且putIfAbsent() 方法的作用在于同一時間只會有一個線程執行該方法成功,當返回null時,表示還不存在該key,否則,表示已經存在該key了,你再put進去也沒用了,某種程度上,這其實也是一個加鎖的過程(Redis SETEX 也正有此種功效)

 

B.建立在A的基礎上,只要保證FutureTask中任務的“創建”早于ConcurrentHashMapputIfAbsent()方法、而真正“執行其真正的代碼邏輯”時則晚于ConcurrentHashMapputIfAbsent()方法即可,之所以要如此做,正是因為 A 中提到的 putIfAbsent() 起到了加鎖的作用,同一時間將只會有一個線程趟過去,牛逼吧:


 

OK,至此,我們也就擼完了,收工!咱們下期再見?。?!

總結

1)代碼下載:文章涉及到的代碼可以通過關注“程序員實戰基地”微信公眾號(掃描下圖微信公眾號即可),回復數字:100,即可獲取代碼下載鏈接:

我是debug,一個相信技術改變生活、技術成就夢想 的攻城獅;如果本文對你有幫助,歡迎你關注debug的技術公眾號一起學習干貨技術,并動動手指點贊、收藏以及轉發,你的三連可是debug分享的動力哦