Spark內核解析

Spark內核概述

Spark內核泛指Spark的核心運行機制,包括Spark核心組件的運行機制、Spark任務調度機制、Spark內存管理機制、Spark核心功能的運行原理等,熟練掌握Spark內核原理。

一、Spark核心組件回顧

Driver

Spark驅動器節點,用于執行Spark任務中的main方法,負責實際代碼的執行工作。Driver在Spark作業執行時主要負責:

1、將用戶程序轉化為任務(Job);

2、在Executor之間調度任務(task);

3、跟蹤Executor的執行情況;

4、通過UI展示查詢運行情況。

Executor

Spark Executor節點是一個JVM進程,負責在Spark作業中運行具體任務,任務彼此之間相互獨立。Spark應用啟動時,Executor節點被同時啟動,并且始終伴隨著整個Spark應用的生命周期而存在。如果有Executor節點發生了故障或崩潰,Spark應用也可以繼續執行,會將出錯節點上的任務調度到其他Executor節點上繼續運行。

Executor有兩個核心功能:

1、負責運行組成Spark應用的任務,并將結果返回給Driver進程;

2、它們通過自身的塊管理器(Block Manager)為用戶程序中要求緩存的RDD提供內存式存儲。RDD是直接緩存在Executor進程內的,因此任務可以在運行時充分利用緩存數據加速運算。

Spark通用運行流程概述

上圖為Spark通用運行流程,不論Spark以何種模式進行部署,任務提交后,都會先啟動Driver進程,隨后Driver進程向集群管理器注冊應用程序,之后集群管理器根據此任務的配置文件分配Executor并啟動,當Driver所需的資源全部滿足后,Driver開始執行main函數,Spark查詢為懶執行,當執行到action算子時開始反向推算,根據寬依賴進行stage的劃分,隨后每一個stage對應一個taskset,taskset中有多個task,根據本地化原則,task會被分發到指定的Executor去執行,在任務執行的過程中,Executor也會不斷與Driver進行通信,報告任務運行情況。

二、Spark部署模式

Spark支持三種集群管理器(Cluster Manager),分別為:

1、Standalone:獨立模式,Spark原生的簡單集群管理器,自帶完整的服務,可單獨部署到一個集群中,無需依賴任何其他資源管理系統,使用Standalone可以很方便地搭建一個集群;

2、Apache Mesos:一個強大的分布式資源管理框架,它允許多種不同的框架部署在其上,包括yarn;

3、Hadoop YARN:統一的資源管理機制,在上面可以運行多套計算框架,如map reduce、storm等,根據driver在集群中的位置不同,分為yarn client和yarn cluster。

實際上,除了上述這些通用的集群管理器外,Spark內部也提供了一些方便用戶測試和學習的簡單集群部署模式。由于在實際工廠環境下使用的絕大多數的集群管理器是Hadoop YARN,因此我們關注的重點是Hadoop YARN模式下的Spark集群部署。

Spark的運行模式取決于傳遞給SparkContext的MASTER環境變量的值,個別模式還需要輔助的程序接口來配合使用,目前支持的Master字符串及URL包括:

img

用戶在提交任務給Spark處理時,以下兩個參數共同決定了Spark的運行方式。

- master MASTER_URL:決定了Spark任務提交給哪種集群處理。

- deploy-mode DEPLOY_MODE:決定了Driver的運行方式,可選值為Client或者Cluster。

Standalone模式運行機制

Standalone集群有四個重要組成部分,分別是:

(1)Driver:是一個進程,我們編寫的Spark應用程序就運行在Driver上,由Driver進程執行;

(2)Master:是一個進程,主要負責資源調度和分配,并進行集群的監控等職責;

(3)Worker:是一個進程,一個Worker運行在集群中的一臺服務器上,主要負責兩個職責,一個是用自己的內存存儲RDD的某個或某些partition;另一個是啟動其他進程和線程(Executor),對RDD上的partition進行并行的處理和計算。

(4)Executor:是一個進程,一個Worker上可以運行多個Executor,Executor通過啟動多個線程(task)來執行對RDD的partition進行并行計算,也就是執行我們對RDD定義的例如map、flatMap、reduce等算子操作。

Standalone Client模式

img

1、在Standalone Client模式下,Driver在任務提交的本地機器上運行;

2、Driver啟動后向Master注冊應用程序,Master根據submit腳本的資源需求找到內部資源至少可以啟動一個Executor的所有Worker,然后在這些Worker之間分配Executor;

3、Worker上的Executor啟動后會向Driver反向注冊;

4、當所有的Executor注冊完成后,Driver開始執行main函數;

5、之后執行到Action算子時,開始劃分stage;

6、每個stage生成對應的taskSet,之后將task 分發到各個Executor上執行。

Standalone Cluster模式

img

1、在Standalone Cluster模式下,任務提交后,Master會找到一個Worker啟動Driver進程;

2、Driver啟動后向 Master注冊應用程序;

3、Master根據submit腳本的資源需求找到內部資源至少可以啟動一個Executor的所有 Worker,然后在這些Worker之間分配Executor;

4、Worker上的Executor啟動后會向Driver反向注冊;

5、所有的 Executor注冊完成后,Driver開始執行main函數;

6、之后執行到Action算子時,開始劃分stage,每個stage生成對應的taskSet;

7、之后將task分發到各個Executor上執行。

注意,Standalone的兩種模式下(client/Cluster),Master在接到Driver注冊Spark應用程序的請求后,會獲取其所管理的剩余資源能夠啟動一個 Executor的所有Worker,然后在這些Worker之間分發Executor,此時的分發只考慮Worker上的資源是否足夠使用,直到當前應用程序所需的所有Executor都分配完畢,Executor反向注冊完畢后,Driver開始執行main程序。

YARN模式運行機制

YARN Client模式

img

1、在YARN Client模式下,Driver在任務提交的本地機器上運行;

2、Driver啟動后會和ResourceManager通訊申請啟動ApplicationMaster;

3、隨后ResourceManager分配container,在合適的NodeManager上啟動ApplicationMaster,此時的ApplicationMaster的功能相當于一個ExecutorLaucher(執行者發射器),只負責向ResourceManager申請Executor內存;

4、ResourceManager接到ApplicationMaster的資源申請后會分配container,然后ApplicationMaster在資源分配指定的NodeManager上啟動Executor進程;

5、Executor進程啟動后會向Driver反向注冊;

6、Executor全部注冊完成后Driver開始執行main函數;

7、之后執行到Action算子時,觸發一個job,并根據寬依賴開始劃分stage;

8、每個stage生成對應的taskSet,之后將task分發到各個Executor上執行。

YARN Cluster模式

img

1、在YARN Cluster模式下,任務提交后會和ResourceManager通訊申請啟動ApplicationMaster;

2、隨后ResourceManager分配container,在合適的NodeManager上啟動ApplicationMaster;(此時的ApplicationMaster就是Driver)

3、Driver啟動后向ResourceManager申請Executor內存,ResourceManager接到ApplicationMaster的資源申請后會分配container,然后在合適的NodeManager上啟動Executor進程;

4、Executor進程啟動后會向Driver反向注冊;

5、Executor全部注冊完成后Driver開始執行main函數;

6、之后執行到Action算子時,觸發一個job,并根據寬依賴開始劃分stage;

7、每個stage生成對應的taskSet,之后將task分發到各個Executor上執行。

三、Spark通訊架構

Spark通信架構概述

Spark2.x版本使用Netty通訊架構作為內部通訊組件。Spark基于Netty新的rpc框架借鑒了Akka中的設計,它是基于Actor模型,如下圖所示:

Spark通訊框架中各個組件(Client/Master/Worker)可以認為是一個個獨立的實體,各個實體之間通過消息來進行通信。具體各個組件之間的關系如下:

Endpoint(Client/Master/Worker)有一個InBox和N個OutBox(N>=1,N取決于當前Endpoint與多少其他的Endpoint進行通信,一個與其通訊的其他Endpoint對應一個OutBox),Endpoint接收到的消息被寫入InBox,發送出去的消息寫入OutBox并被發送到其他Endpoint的InBox中。

Spark通訊架構解析

Spark通信架構如下圖所示:

img

1) RpcEndpoint:RPC端點,Spark針對每個節點(Client/Master/Worker)都稱之為一個Rpc 端點,且都實現RpcEndpoint接口,內部根據不同端點的需求,設計不同的消息和不同的業務處理,如果需要發送(詢問)則調用 Dispatcher;

2) RpcEnv:RPC上下文環境,每個RPC端點運行時依賴的上下文環境稱為 RpcEnv;

3) Dispatcher:消息分發器,針對于RPC端點需要發送消息或者從遠程 RPC 接收到的消息,分發至對應的指令收件箱/發件箱。如果指令接收方是自己則存入收件箱,如果指令接收方不是自己,則放入發件箱;

4) Inbox:指令消息收件箱,一個本地RpcEndpoint對應一個收件箱,Dispatcher在每次向Inbox存入消息時,都將對應EndpointData加入內部ReceiverQueue中,另外Dispatcher創建時會啟動一個單獨線程進行輪詢ReceiverQueue,進行收件箱消息消費;

5) RpcEndpointRef:RpcEndpointRef是對遠程RpcEndpoint的一個引用。當我 們需要向一個具體的RpcEndpoint發送消息時,一般我們需要獲取到該RpcEndpoint的引用,然后通過該應用發送消息。

6) OutBox:指令消息發件箱,對于當前RpcEndpoint來說,一個目標RpcEndpoint對應一個發件箱,如果向多個目標RpcEndpoint發送信息,則有多個OutBox。當消息放入Outbox后,緊接著通過TransportClient將消息發送出去。消息放入發件箱以及發送過程是在同一個線程中進行;

7) RpcAddress:表示遠程的RpcEndpointRef的地址,Host + Port。

8) TransportClient:Netty通信客戶端,一個OutBox對應一個TransportClient,TransportClient不斷輪詢OutBox,根據OutBox消息的receiver信息,請求對應的遠程TransportServer;

9) TransportServer:Netty通信服務端,一個RpcEndpoint對應一個TransportServer,接受遠程消息后調用 Dispatcher分發消息至對應收發件箱;

根據上面的分析,Spark通信架構的高層視圖如下圖所示:

img

四、SparkContext解析

在Spark中由SparkContext負責與集群進行通訊、資源的申請以及任務的分配和監控等。當 Worker節點中的Executor運行完畢Task后,Driver同時負責將SparkContext關閉。

通常也可以使用SparkContext來代表驅動程序(Driver)。

img

SparkContext是用戶通往Spark集群的唯一入口,可以用來在Spark集群中創建RDD、累加器和廣播變量。

SparkContext也是整個Spark應用程序中至關重要的一個對象,可以說是整個Application運行調度的核心(不包括資源調度)。

SparkContext的核心作用是初始化Spark應用程序運行所需的核心組件,包括高層調度器(DAGScheduler)、底層調度器(TaskScheduler)和調度器的通信終端(SchedulerBackend),同時還會負責Spark程序向Cluster Manager的注冊等。

img

在實際的編碼過程中,我們會先創建SparkConf實例,并對SparkConf的屬性進行自定義設置,隨后,將SparkConf作為SparkContext類的唯一構造參數傳入來完成SparkContext實例對象的創建。SparkContext在實例化的過程中會初始化DAGScheduler、TaskScheduler和SchedulerBackend,當RDD的action算子觸發了作業(Job)后,SparkContext會調用DAGScheduler根據寬窄依賴將Job劃分成幾個小的階段(Stage),TaskScheduler會調度每個Stage的任務(Task),另外,SchedulerBackend負責申請和管理集群為當前Application分配的計算資源(即Executor)。

如果我們將Spark Application比作汽車,那么SparkContext就是汽車的引擎,而SparkConf就是引擎的配置參數。

下圖描述了Spark-On-Yarn模式下在任務調度期間,ApplicationMaster、Driver以及Executor內部模塊的交互過程:

img

Driver初始化SparkContext過程中,會分別初始化DAGScheduler、TaskScheduler、SchedulerBackend以及HeartbeatReceiver,并啟動SchedulerBackend以及HeartbeatReceiver。SchedulerBackend通過ApplicationMaster申請資源,并不斷從TaskScheduler中拿到合適的Task分發到Executor執行。HeartbeatReceiver負責接收Executor的心跳信息,監控Executor的存活狀況,并通知到TaskScheduler。

五、Spark任務調度機制

在工廠環境下,Spark集群的部署方式一般為YARN-Cluster模式,之后的內核分析內容中我們默認集群的部署方式為YARN-Cluster模式。

Spark任務提交流程

img

? Spark YARN-Cluster模式下的任務提交流程

下面的時序圖清晰地說明了一個Spark應用程序從提交到運行的完整流程:

img

1、提交一個Spark應用程序,首先通過Client向ResourceManager請求啟動一個Application,同時檢查是否有足夠的資源滿足Application的需求,如果資源條件滿足,則準備ApplicationMaster的啟動上下文,交給ResourceManager,并循環監控Application狀態。

2、當提交的資源隊列中有資源時,ResourceManager會在某個 NodeManager上啟動ApplicationMaster進程,ApplicationMaster會單獨啟動Driver后臺線程,當Driver啟動后,ApplicationMaster會通過本地的RPC連接Driver,并開始向ResourceManager申請Container資源運行Executor進程(一個Executor對應與一個Container),當ResourceManager返回Container資源,ApplicationMaster則在對應的Container上啟動Executor。

3、Driver線程主要是初始化SparkContext對象,準備運行所需的上下文,然后一方面保持與ApplicationMaster的RPC連接,通過ApplicationMaster申請資源,另一方面根據用戶業務邏輯開始調度任務,將任務下發到已有的空閑Executor上。

4、當ResourceManager向ApplicationMaster返回Container資源時,ApplicationMaster就嘗試在對應的Container上啟動Executor進程,Executor進程起來后,會向Driver反向注冊,注冊成功后保持與Driver的心跳,同時等待Driver分發任務,當分發的任務執行完畢后,將任務狀態上報給 Driver。

從上述時序圖可知,Client只負責提交Application并監控Application 的狀態。對于Spark的任務調度主要是集中在兩個方面: 資源申請和任務分發,其主要是通過ApplicationMaster、Driver以及Executor之間來完成。

Spark任務調度概述

當Driver起來后,Driver則會根據用戶程序邏輯準備任務,并根據Executor資源情況逐步分發任務。在詳細闡述任務調度前,首先說明下Spark里的幾個概念。一個Spark應用程序包括Job、Stage以及Task三個概念:

Job是以Action方法為界,遇到一個Action方法則觸發一個Job;

Stage是Job的子集,以RDD寬依賴(即 Shuffle)為界,遇到Shuffle做一次劃分;

Task是Stage的子集,以并行度(分區數)來衡量,分區數是多少,則有多少個task。






Spark的任務調度總體來說分兩路進行,一路是Stage級的調度,一路是Task級的調度,總體調度流程如下圖所示:

img

Spark RDD通過其Transactions操作,形成了RDD血緣關系圖,即DAG,最后通過Action的調用,觸發Job并調度執行。DAGScheduler負責Stage級的調度,主要是將job切分成若干個Stage,并將每個Stage打包成TaskSet交給TaskScheduler調度。TaskScheduler負責Task級的調度,將DAGScheduler給過來的TaskSet按照指定的調度策略分發到Executor上執行,調度過程中SchedulerBackend負責提供可用資源,其中SchedulerBackend有多種實現,分別對接不同的資源管理系統。

Spark Stage級調度

Spark的任務調度是從DAG切割開始,主要是由DAGScheduler來完成。當遇到一個Action操作后就會觸發一個Job的計算,并交給DAGScheduler來提交,下圖是涉及到Job提交的相關方法調用流程圖。

img

Job由最終的RDD和Action方法封裝而成,SparkContext 將Job交給DAGScheduler提交,它會根據RDD的血緣關系構成的DAG進行切分,將一個Job劃分為若干Stages,具體劃分策略是,由最終的RDD不斷通過依賴回溯判斷父依賴 是否是寬依賴,即以Shuffle為界,劃分Stage,窄依賴的RDD之間被劃分到同一個Stage中,可以進行pipeline式的計算,如上圖紫色流程部分。劃分的Stages分兩類,一類叫做ResultStage,為DAG最下游的Stage,由Action方法決定,另一類叫做ShuffleMapStage,為下游Stage準備數據,下面看一個簡單的例子WordCount。

img

Job由saveAsTextFile觸發,該Job由RDD-3和saveAsTextFile方法組成,根據RDD之間的依賴關系從RDD-3開始回溯搜索,直到沒有依賴的RDD-0,在回溯搜索過程中,RDD-3依賴RDD-2,并且是寬依賴,所以在RDD-2和RDD-3之間劃分Stage,RDD-3被劃到最后一個Stage,即ResultStage中,RDD-2依賴RDD-1,RDD-1依賴RDD-0,這些依賴都是窄依賴,所以將RDD-0、RDD-1和RDD-2劃分到同一個 Stage,即 ShuffleMapStage中,實際執行的時候,數據記錄會一氣呵成地執行RDD-0到RDD-2的轉化。不難看出,其本質上是一個深度優先搜索算法。一個Stage是否被提交,需要判斷它的父Stage是否執行,只有在父Stage執行完畢才能提交當前Stage,如果一個Stage沒有父Stage,那么從該Stage開始提交。Stage提交時會將Task信息(分區信息以及方法等)序列化并被打包成TaskSet 交給TaskScheduler,一個Partition對應一個Task,另一方面TaskScheduler會監控Stage的運行狀態,只有Executor丟失或者Task由于Fetch失敗才需要重新提交失敗的Stage以調度運行失敗的任務,其他類型的Task失敗會在TaskScheduler的調度過程中重試。相對來說DAGScheduler做的事情較為簡單,僅僅是在Stage層面上劃分DAG,提交Stage并監控相關狀態信息。TaskScheduler則相對較為復雜,下面詳細闡述其細節。

Spark Task級調度

Spark Task的調度是由TaskScheduler來完成,DAGScheduler將Stage打包到TaskSet交給TaskScheduler,TaskScheduler會將TaskSet封裝為TaskSetManager加入到調度隊列中,TaskSetManager結構如下圖所示。

TaskSetManager負責監控管理同一個Stage中的Tasks,TaskScheduler就是以TaskSetManager為單元來調度任務。

TaskScheduler初始化后會啟動SchedulerBackend,它負責跟外界打交道,接收Executor的注冊信息,并維護Executor的狀態,所以說SchedulerBackend是管“糧食”的,同時它在啟動后會定期地去“詢問”TaskScheduler有沒有任務要運行,也就是說,它會定期地“問”TaskScheduler“我有這么余量,你要不要啊”,TaskScheduler在SchedulerBackend“問 ”它的時候,會從調度隊列中按照指定的調度策略選擇TaskSetManager去調度運行,大致方法調用流程如下圖所示:

img

將TaskSetManager加入rootPool調度池中之后,調用SchedulerBackend的riviveOffers方法給driverEndpoint發送ReviveOffer消息;driverEndpoint收到ReviveOffer消息后調用makeOffers方法,過濾出活躍狀態的Executor(這些Executor都是任務啟動時反向注冊到Driver的Executor),然后將Executor封裝成WorkerOffer對象;準備好計算資源(WorkerOffer)后,taskScheduler基于這些資源調用resourceOffer在Executor上分配task。

六、調度策略

前面講到,TaskScheduler會先把DAGScheduler給過來的TaskSet封裝成TaskSetManager扔到任務隊列里,然后再從任務隊列里按照一定的規則把它們取出來在SchedulerBackend給過來的Executor上運行。這個調度過程實際上還是比較粗粒度的,是面向TaskSetManager的。調度隊列的層次結構如下圖所示:

img

TaskScheduler是以樹的方式來管理任務隊列,樹中的節點類型為Schdulable,葉子節點為TaskSetManager,非葉子節點為Pool,下圖是它們之間的繼承關系。

img

TaskScheduler支持兩種調度策略,一種是FIFO,也是默認的調度策略,另一種是FAIR。在TaskScheduler初始化過程中會實例化rootPool,表示樹的根節點,是Pool類型。

1、FIFO調度策略

FIFO調度策略執行步驟如下:

1)對s1和s2兩個Schedulable的優先級(Schedulable類的一個屬性,記為priority,值越小,優先級越高);

2)如果兩個Schedulable的優先級相同,則對s1,s2所屬的Stage的身份進行標識進行比較(Schedulable類的一個屬性,記為priority,值越小,優先級越高);

3)如果比較的結果小于0,則優先調度s1,否則優先調度s2。

img

2、FAIR 調度策略

FAIR 調度策略的樹結構如下圖所示:

img

FAIR模式中有一個rootPool和多個子Pool,各個子Pool中存儲著所有待分配的TaskSetMagager。

可以通過在Properties中指定spark.scheduler.pool屬性,指定調度池中的某個調度池作為TaskSetManager的父調度池,如果根調度池不存在此屬性值對應的調度池,會創建以此屬性值為名稱的調度池作為TaskSetManager的父調度池,并將此調度池作為根調度池的子調度池。

在FAIR模式中,需要先對子Pool進行排序,再對子Pool里面的TaskSetMagager進行排序,因為Pool和TaskSetMagager都繼承了Schedulable特質,因此使用相同的排序算法。

排序過程的比較是基于Fair-share來比較的,每個要排序的對象包含三個屬性:runningTasks值(正在運行的Task數)、minShare值、weight值,比較時會綜合考量runningTasks值,minShare值以及weight值。

注意,minShare、weight的值均在公平調度配置文件fairscheduler.xml中被指定,調度池在構建階段會讀取此文件的相關配置。

1)如果A對象的runningTasks大于它的minShare,B對象的runningTasks小于它的minShare,那么B排在A前面;(runningTasks比minShare小的先執行)

2)如果A、B對象的runningTasks都小于它們的minShare,那么就比較runningTasks與minShare的比值(minShare使用率),誰小誰排前面;(minShare使用率低的先執行)

3)如果A、B對象的runningTasks都大于它們的minShare,那么就比較runningTasks與weight的比值(權重使用率),誰小誰排前面。(權重使用率低的先執行)

4)如果上述比較均相等,則比較名字。

整體上來說就是通過minShare和weight這兩個參數控制比較過程,可以做到讓minShare使用率和權重使用率少(實際運行task比例較少)的先運行。

FAIR模式排序完成后,所有的TaskSetManager被放入一個ArrayBuffer里,之后依次被取出并發送給Executor執行。

從調度隊列中拿到TaskSetManager后,由于TaskSetManager封裝了一個Stage的所有Task,并負責管理調度這些Task,那么接下來的工作就是TaskSetManager按照一定的規則一個個取出Task給TaskScheduler,TaskScheduler再交給SchedulerBackend去發到Executor上執行。

本地化調度

DAGScheduler切割Job,劃分Stage,通過調用submitStage來提交一個Stage對應的tasks,submitStage會調用submitMissingTasks,submitMissingTasks確定每個需要計算的task的preferredLocations,通過調用getPreferrdeLocations()得到partition的優先位置,由于一個partition對應一個task,此partition的優先位置就是task的優先位置,對于要提交到TaskScheduler的TaskSet中的每一個task,該task優先位置與其對應的partition對應的優先位置一致。從調度隊列中拿到TaskSetManager后,那么接下來的工作就是TaskSetManager按照一定的規則一個個取出task給TaskScheduler,TaskScheduler再交給SchedulerBackend去發到Executor上執行。前面也提到,TaskSetManager封裝了一個Stage的所有task,并負責管理調度這些task。根據每個task的優先位置,確定task的Locality級別,Locality一共有五種,優先級由高到低順序:

img

在調度執行時,Spark調度總是會盡量讓每個task以最高的本地性級別來啟動,當一個task以X本地性級別啟動,但是該本地性級別對應的所有節點都沒有空閑資源而啟動失敗,此時并不會馬上降低本地性級別啟動而是在某個時間長度內再次以X本地性級別來啟動該task,若超過限時時間則降級啟動,去嘗試下一個本地性級別,依次類推??梢酝ㄟ^調大每個類別的最大容忍延遲時間,在等待階段對應的Executor可能就會有相應的資源去執行此task,這就在在一定程度上提到了運行性能。

失敗重試與黑名單機制

除了選擇合適的Task調度運行外,還需要監控Task的執行狀態,前面也提到,與外部打交道的是SchedulerBackend,Task被提交到Executor啟動執行后,Executor會將執行狀態上報給SchedulerBackend,SchedulerBackend則告訴TaskScheduler,TaskScheduler找到該Task對應的TaskSetManager,并通知到該TaskSetManager,這樣TaskSetManager就知道Task的失敗與成功狀態,對于失敗的Task,會記錄它失敗的次數,如果失敗次數還沒有超過最大重試次數,那么就把它放回待調度的Task池子中,否則整個Application失敗。在記錄Task失敗次數過程中,會記錄它上一次失敗所在的ExecutorId和Host,這樣下次再調度這個Task時,會使用黑名單機制,避免它被調度到上一次失敗的節點上,起到一定的容錯作用。黑名單記錄Task上一次失敗所在的ExecutorId和Host,以及其對應的“拉黑”時間,“拉黑”時間是指這段時間內不要再往這個節點上調度這個Task了。

七、Spark Shuffle解析

ShuffleMapStage與FinalStage

img

在劃分stage時,最后一個stage成為FinalStage,它本質上是一個ResultStage對象,前面的所有stage被稱為ShuffleMapStage。

ShuffleMapStage的結束伴隨著shuffle文件的寫磁盤。

ResultStage基本上對應代碼中的action算子,即將一個函數應用在RDD的各個partition的數據集上,意味著一個job的運行結束。

Shuffle中的任務個數

map端task個數的確定

Shuffle過程中的task個數由RDD分區數決定,而RDD的分區個數與參數spark.default.parallelism有密切關系。

在Yarn Cluster模式下,如果沒有手動設置spark.default.parallelism,則有:

Others: total number of cores on all executor nodes or 2, whichever is larger. spark.default.parallelism = max(所有executor使用的core總數,2)

如果進行了手動配置,則:

spark.default.parallelism = 配置值

還有一個重要的配置:

The maximum number of bytes to pack into a single partition when reading files. spark.files.maxPartitionBytes = 128 M (默認)

代表著rdd的一個分區能存放數據的最大字節數,如果一個400MB的文件,只分了兩個區,則在action時會發生錯誤。

當一個spark應用程序執行時,生成sparkContext,同時會生成兩個參數,由上面得到的spark.default.parallelism推導出這兩個參數的值:

sc.defaultParallelism = spark.default.parallelism

sc.defaultMinPartitions = min(spark.default.parallelism,2)

當以上參數確定后,就可以推算RDD分區數目了。

(1)通過scala集合方式parallelize生成的RDD

val rdd = sc.parallelize(1 to 10)

這種方式下,如果在parallelize操作時沒有指定分區數,則有:

rdd的分區數 = sc.defaultParallelism

(2)在本地文件系統通過textFile方式生成的RDD

val rdd = sc.textFile("path/file")

rdd的分區數 = max(本地file的分片數,sc.defaultMinPartitions)

(3)在HDFS文件系統生成的RDD

rdd的分區數 = max(HDFS文件的Block數目,sc.defaultMinPartitions)

(4)從HBase數據表獲取數據并轉換為RDD

rdd的分區數 = Table的region個數

(5)通過獲取json(或者parquet等等)文件轉換成的DataFrame

rdd的分區數 = 該文件在文件系統中存放的Block數目

(6)Spark Streaming獲取Kafka消息對應的分區數

基于Receiver:

在Receiver的方式中,Spark中的partition和kafka中的partition并不是相關的,所以如果我們加大每個topic的partition數量,僅僅是增加線程來處理由單一Receiver消費的主題。但是這并沒有增加Spark在處理數據上的并行度。

基于DirectDStream:

Spark會創建跟Kafka partition一樣多的RDD partition,并且會并行從Kafka中讀取數據,所以在Kafka partition和RDD partition之間,有一個一對一的映射關系。

reduce端task個數的確定

Reduce端進行數據的聚合,一部分聚合算子可以手動指定reduce task的并行度,如果沒有指定,則以map端的最后一個RDD的分區數作為其分區數,那么分區數就決定了reduce端的task的個數。

reduce端數據的讀取

根據stage的劃分我們知道,map端task和reduce端task不在相同的stage中,map task位于ShuffleMapStage,reduce task位于ResultStage,map task會先執行,那么后執行的reduce task如何知道從哪里去拉去map task落盤后的數據呢?

reduce端的數據拉取過程如下:

1、map task執行完畢后會將計算狀態以及磁盤小文件位置等信息封裝到mapStatue對象中,然后由本進程中的MapOutPutTrackerWorker對象將mapstatus對象發送給Driver進程的MapOutPutTrackerMaster對象;

2、在reduce task開始執行之前會先讓本進程中的MapOutPutTrackerWorker向Driver進程中的MapOutPutTrackerMaster發動請求,請求磁盤小文件位置信息;

3、當所有的Map task執行完畢后,Driver進程中的MapOutPutTrackerMaster就掌握了所有的磁盤小文件的位置信息。此時MapOutPutTrackerMaster會告訴MapOutPutTrackerWorker磁盤小文件的位置信息;

4、完成之前的操作之后,由BlockerTransforService去Executor所在的節點拉數據,默認會啟動五個子線程。每次拉取的數據量不能超過48M(reduce task每次最多拉取48M數據,將拉來的數據存儲到Executor內存的20%內存中)。

HashShuffle解析

以下的討論都假設每個Executor有一個CPU core。

1、未經優化的HashShuffleManager

shuffle write階段,主要就是在一個stage結束計算之后,為了下一個stage可以執行shuffle類的算子(比如reduceByKey),而將每個task處理的數據按key進行“劃分”。所謂“劃分”,就是對相同的key執行hash算法,從而將相同key都寫入同一個磁盤文件中,而每一個磁盤文件都只屬于下游stage的一個task。在將數據寫入磁盤之前,會先將數據寫入內存緩沖中,當內存緩沖填滿之后,才會溢寫到磁盤文件中去。

下一個stage的task有多少個,當前stage的每個task就要創建多少份磁盤文件。比如下一個stage總共有100個task,那么當前stage的每個task都要創建100份磁盤文件。如果當前stage有50個task,總共有10個Executor,每個Executor執行5個task,那么每個Executor上總共要創建500個磁盤文件,所有Executor上會創建5000個磁盤文件。由此可見,未經優化的shuffle write操作所產生的磁盤文件的數量是極其驚人的。

shuffle read階段,通常就是一個stage剛開始時要做的事情。此時該stage的每一個task就需要將上一個stage的計算結果中的所有相同key,從各個節點上通過網絡都拉取到自己所在的節點上,然后進行key的集合或鏈接等操作。由于shuffle write的過程中,map task個下游stage的每個reduce task都創建了一個磁盤文件,因此shuffle read的過程中,每個reduce task只要從上游stage的所有map task所在的節點上,拉取屬于自己的那一個磁盤文件即可。

shuffle read的拉取過程是一邊拉取一邊進行聚合的。每個shuffle read task都會有一個自己的buffer緩沖,每次都只能拉取與buffer緩沖相同大小的數據,然后通過你村中的一個Map進行聚合等操作。聚合完一批數據后,再拉取下一批數據,并放到buffer緩沖中進行聚合操作。以此類推,知道最后將所有數據到拉取完,并得到最終的結果。

未經優化的HashShuffleManager工作原理如下圖所示:

img

2、優化后的HashShuffleManager

為了優化HashShuffleManager我們可以設置一個參數,spark.shuffle.consolidateFiles,該參數默認值為false,將其設置為true即可開啟優化機制,通常來說,如果我們使用HashShuffleManager,那么都建議開啟這個選項。

開啟consolidate機制之后,在shuffle write過程中,task就不是為了下游stage的每個task創建一個磁盤文件了,此時會出現shuffleFileGroup的概念,每個shuffleFileGroup會對應一批磁盤文件,磁盤文件的數量與下游stage的task數量是相同的。一個Executor上有多少個CPU core,就可以并行執行多少個task。而第一批并行執行的每個task都會闖將一個shuffleFileGroup,并將數據寫入對應的磁盤文件內。

當Executor的CPU core執行完一批task,接著執行下一批task時,下一批task就會復用之前已有的shuffleFileGroup,包括其中的磁盤文件,也就是說,此時task會將數據寫入已有的磁盤文件中,而不會寫入新的磁盤文件中。因此,consolidate機制允許不同的task復用同一批磁盤文件,這樣就可以有效將多個task的磁盤文件進行一定程度上的合并,從而大幅度減少磁盤文件的數量,進而提升shuffle write的性能。

假設第二個stage有100個task,第一個stage有50個task,總共還是有10個Executor(Executor CPU個數為1),每個Executor執行5個task。那么原本使用未經優化的HashSHuffleManager時,每個Executor會產生500個磁盤文件,所有Executor會產生5000個磁盤文件的。但是此時經過優化之后,每個Executor創建的磁盤文件的數量的計算公式為:CPU core的數量 * 下一個stage的task數量,也就是說,每個Executor此時只會創建100個磁盤文件,所有Executor只會創建1000個磁盤文件。

優化后的HashShuffleManager工作原理如下圖所示:

img

SortShuffle解析

SortShuffleManager的運行機制主要分為兩種,一種是普通運行機制,另一種是bypass運行機制。當shuffle read task的數量小于等于spark.shuffle.sort.bypassMergeThreshold參數的值時(默認為200),就會啟用bypass機制。

1、普通運行機制

在該模式下,數據會先寫入一個內存數據結構中此時根據不同的shuffle算子,可能選用不同的數據結構,如果是reduceByKey這種聚合類的shuffle算子,那么會選用Map數據結構,一邊通過Map進行聚合,一邊寫入內存;如果是join這種普通的shuffle算子,那么會選用Array數據結構,直接寫入內存。接著,每寫一條數據進如內存數據結構之后,就會判斷一下,是否達到了某個臨界閾值。如果達到臨界閾值的話,那么就會嘗試將內存數據結構中的數據溢寫到磁盤,然后清空內存數據結構。

在溢寫到磁盤文件之前,會先根據key對內存數據結構中已有的數據進行排序。排序過后,會分批將數據寫入磁盤文件。默認的batch數量是10000條,也就是說,排序好的數據,會以每批1萬條數據的形式分批寫入磁盤文件。寫入磁盤文件是通過Java的BufferedOutputStream實現的。BufferedOutputStream是Java的緩沖輸出流,首先會將數據緩沖在內存中,當內存緩沖滿溢之后再一次寫入磁盤文件中,這樣可以減少磁盤IO次數,提升性能。

一個task將所有數據寫入內存數據結構的過程中,會發生多次磁盤溢寫操作,也就會產生多個臨時文件。最后會將之前所有的臨時磁盤文件都進行合并,這就是merge過程,此時會將之前所有臨時磁盤文件中的數據讀取出來,然后依次寫入最終的磁盤文件之中。此外,由于一個task就只對應一個磁盤文件,也就意味著該task為下游stage的task準備的數據都在這一個文件中,一次你還會單獨寫一份索引文件,其中標識了下游各個task的數據在文件中的start offset與end offset。

SortShuffleManager由于有一個磁盤文件merge的過程,因此大大減少了文件數量。比如第一個stage有50個task,總共有10個Executor,每個Executor執行5個task,而第二個stage有100個task。由于每個task最終只有一個磁盤文件,因此此時每個Executor上只有5個磁盤文件,所有Executor只有50個磁盤文件。

普通運行機制的SortShuffleManager工作原理如下圖所示:

img

2、bypass運行機制

bypass運行機制的觸發條件如下:

(1)shuffle map task數量小于spark.shuffle.sort.bypassMergeThreshold參數的值。

(2)不是聚合類的shuffle算子。

此時,每個task會為每個下游task都創建一個臨時磁盤文件,并將數據按key進行hash然后根據key的hash值,將key寫入對應的磁盤文件之中。當然,寫入磁盤文件時也是先寫入內存緩沖,緩沖寫滿之后再溢寫到磁盤文件的。最后,同樣會將所有臨時磁盤文件都合并成一個磁盤文件,并創建一個單獨的索引文件。

該過程的磁盤寫機制其實跟未經優化的HashShuffleManager是一模一樣的,因為都要創建數量驚人的磁盤文件,只是在最后會做一個磁盤文件的合并而已。因此少量的最終磁盤文件,也讓該機制相對未經優化的HashShuffleManager來說,shuffleread的性能會更好。

而該機制與普通SortShuffleManager運行機制的不同在于:第一,磁盤寫機制不同;第二,不會進行排序。也就是說,啟用該機制的最大好處在于,shuffle write過程中,不需要進行數據的排序操作,也就節省掉了這部分的性能開銷。

普通運行機制的SortShuffleManager工作原理如下圖所示:

img

八、Spark內存管理

在執行Spark應用程序時,Spark集群會啟動Driver和Executor兩種JVM進程,前者為主控進程,負責創建Spark上下文,提交Spark作業(Job),并將作業轉化為計算任務(Task),在各個Executor進程間協調任務的調度,后者負責在工作節點上執行具體的計算任務,并將結果返回給Driver,同時為需要持久化的RDD提供存儲功能。

堆內和堆外內存規劃

作為一個JVM進程,Executor的內存管理建立在JVM的內存管理之上,Spark對JVM的堆內(On-heap)空間進行了更為詳細的分配,以充分利用內存。同時,Spark引入了堆外(Off-heap)內存,使之可以直接在工作節點的系統內存中開辟空間,進一步優化了內存的使用。

堆內內存受到JVM統一管理,堆外內存是直接向操作系統進行內存的申請和釋放。

1、堆內內存

堆內內存的大小,由Spark應用程序啟動時的- executor-memory或spark.executor.memory參數配置。Executor內運行的并發任務共享JVM堆內內存,這些任務在緩存RDD數據和廣播(Broadcast)數據時占用的內存被規劃為存儲(Storage)內存,而這些任務在執行Shuffle時占用的內存被規劃為執行(Execution)內存,剩余的部分不做特殊規劃,那些Spark內部的對象實例,或者用戶定義的Spark應用程序中的對象實例,均占用剩余的空間。不同的管理模式下,這三部分占用的空間大小各不相同。

Spark對堆內內存的管理是一種邏輯上的俄“規劃式”的管理,因為對象實例占用內存的申請和釋放都由JVM完成,Spark只能在申請后和釋放前記錄這些內存。其具體流程如下:

1、Spark在代碼中new一個對象實例;

2、JVM從堆內內存分配空間,創建對象并返回對象引用;

3、Spark保存該對象的引用,記錄該對象占用的內存。

釋放內存流程如下:

1、Spark記錄該對象釋放的內存,刪除該對象的引用;

2、等待JVM的垃圾回收機制釋放該對象占用的堆內內存。

我們知道,JVM的對象可以以序列化的方式存儲,序列化的過程是將對象轉換為二進制字節流,本質上可以理解為將非連續空間的鏈式存儲轉化為連續空間或塊存儲,在訪問時則需要進行序列化的逆過程--反序列化,將字節流轉化為對象,序列化的方式可以節省存儲空間,但增加了存儲和讀取時候的計算開銷。

對于Spark中序列化的對象,由于是字節流的形式,其占用的內存大小可直接計算,而對于非序列化的對象,其占用的內存是通過周期性地采樣近似估算而得,即并不是每次新增的數據項都會計算一次占用的內存大小,這種方法降低了時間開銷但是有可能誤差較大,導致某一時刻的實際內存可能遠遠超出預期。此外,在被Spark標記為釋放的對象實例,很有可能在實際上并沒有被JVM回收,導致實際可用的內存小于Spark記錄的可用內存。所以Spark并不能準確記錄實際可用的堆內內存,從而也就無法完全避免內存溢出(OOM,Out of Memory)的異常。

雖然不能精確控制堆內內存的申請和釋放,但Spark通過對存儲內存和執行內存各自獨立的規劃管理,可以決定是否要在存儲內存里緩沖新的RDD,以及是否為新的任務分配執行內存,在一定程度上可以提升內存的利用率,減少異常的出現。

2、堆外內存

為了進一步優化內存的使用以及提高Shuffle時排序的效率,Spark引入了堆外(Off-heap)內存,使之可以直接在工作節點的系統內存中開辟空間,存儲經過序列化的二進制數據。

堆外內存意味著把內存對象分配在Java虛擬機的堆以外的內存,這些內存直接受操作系統管理(而不是虛擬機)。這樣做的結果就是能保持一個較小的堆,以減少垃圾收集對應用的影響。

利用JDK Unsafe API(從spark2.0開始,在管理堆外的存儲內存時不再基于Tachyon,而是與堆外的執行內存一樣,基于JDK Unsafe API實現),Spark可以直接操作系統堆外內存,減少了不必要的內存開銷,以及頻繁的GC掃描和回收,提升了處理性能。堆外內存可以被精確地申請和釋放(堆外內存之所以能夠被精確的申請和釋放,是由于內存的申請和釋放不再通過JVM機制,而是直接向操作系統申請,JVM對于內存的清理是無法準確指定時間點的,因此無法實現精確的釋放),而且序列化的數據占用的空間可以被精確計算,所以相比堆內內存來說降低了管理的難度,也降低了誤差。

在默認情況下堆外內存并不啟用,可以通過配置spark.memory.offHeap.enabled參數啟用,并由spark.memory.offHeap.size參數設定堆外空間的大小。除了沒有other空間,堆外內存與堆內內存的劃分方式相同,所有運行中的并發任務共享存儲內存和執行內存。

(該部分內存主要用于程序的共享庫,Perm Space、線程Stack和一些Memory mapping等,或者類C方式allocate object)

內存空間分配

1、靜態內存管理

在Spark最初采用的靜態內存管理機制下,存儲內存、執行內存和其他內存的大小在Spark應用程序運行期間均為固定的,但用戶可以應用程序啟動前進行配置,堆內內存的分配如下圖所示:

img

可以看到,可用的堆內內存的大小需要按照代碼清單的方式計算:

可用的存儲內存 = systemMaxMemory spark.storage.memoryFraction spark.storage.safety Fraction

可用的執行內存 = systemMaxMemory spark.shuffle.memoryFraction spark.shuffle.safety Fraction






其中systemMaxMemory取決于當前JVM堆內內存的大小,最后可用的執行內存或者存儲內存要在此基礎上與各自的memoryFraction參數和safetyFraction參數相乘得出。上述計算公式中的兩個safetyFraction參數,其意義在于在邏輯預留出1-safetyFraction這么一塊保險區域,降低因實際內存超出當前預設范圍而導致OOM的風險(上文提到,對于非序列化對象的內存采樣估算會產生誤差)。值得注意的是,這個預留的保險區域僅僅是一種邏輯上的規劃,再具體使用時Spark并沒有區別對待,和“其他內存”一樣交給了JVM去管理。

Storage內存和Executor內存都有預留空間,目的是防止OOM,因為Spark堆內內存大小的記錄是不準確的,需要留出保險區域。

堆外的空間分配較為簡單,只有存儲內存和執行內存??捎玫膱绦袃却婧痛鎯却嬲加玫目臻g大小直接由參數spark.memory.storageFraction決定,由于堆外內存占用的空間可以被精確計算,所以無需再設定保險區域。

img

靜態內存管理機制實現起來較為簡單,但如果用戶不熟悉Spark的鵆機制,或沒有根據具體的數據規模和計算任務或做相應的配置,很容易造成“一般海水,一般火焰”的局面,即存儲內存和執行內存中的一方剩余大量的空間,而另一方卻早早被占滿,不得不淘汰或移出舊的內容以存儲新的內容。由于新的內存管理機制的出現,這種方式目前已經很少有開發者使用,出于兼容舊版本的應用程序的目的,Spark依然保留了它的實現。

2、統一內存管理

Spark1.6之后引入的統一內存管理機制,與靜態內存管理的區別在于存儲內存和執行內存共享同一塊空間,可以動態占用對方的空閑區域,統一內存管理的堆內內存結構如下圖所示:

img

統一內存管理的堆外內存結構如下圖所示:

img

其中最重要的優化在于動態占用機制,其規則如下:

1、設定基本的存儲內存和執行內存區域(spark.storage.storageFraction參數),該設定確定了雙方各自擁有的空間的范圍;

2、雙方的空間都不足時,則存儲到磁盤;若己方空間不足而對方空余時,可借用對方的空間;(存儲空間不足是指不足以放下一個完整的Block)

3、執行內存的空間被對方占用后,可讓對方將占用的部分轉存到磁盤,然后“歸還”借用的空間;

4、存儲內存的空間被對方占用后,無法讓對方“歸還”,因為需要考慮Shuffle過程中的很多因素,實現起來較為復雜。

統一內存管理的動態占用機制如下圖所示:

img

憑借統一內存管理機制,spark在一定程度上提高了堆內和堆外內存資源的利用率,降低了開發者維護spark內存的難度。如果存儲內存的空間太大或者說緩存的數據過多,反而會導致頻繁的全量垃圾回收,降低任務執行時的性能,因為緩存的RDD數據通常都是長期主流內存的。所以要想充分發揮Spark的性能,需要開發者進一步了解存儲內存和執行內存各自管理方式和實現原理。

存儲內存管理

1、RDD持久化機制

彈性分布式數據集(RDD)作為Spark最根本的數據抽象,是只讀的分區記錄(Partition)的集合,只能基于在穩定物理存儲中的數據集上創建,或者在其他已有的RDD上執行轉換(Transformation)操作產生一個新的RDD。轉換后的RDD與原始的RDD之間產生了依賴關系,構成了血統(Lineage)。憑借血統,Spark保證了每一個RDD都可以被重新恢復。但是RDD的所有轉換都是有惰性的,即只有當一個返回結果給Driver的行動(Action)發生時,Spark才會創建任務讀取RDD,然后真正觸發轉換的執行。

Task在啟動之初讀取一個分區時,會先判斷這個分區是否已經被持久化,如果沒有則需要檢查Checkpoint或按照血統重新計算。所以如果一個RDD上要執行多次行動,可以在第一次行動中使用persist或cache方法,在內存或磁盤中持久化或緩存這個RDD,從而在后面的行動中提升計算速度。

事實上,cache方法是使用默認的MEMORY_ONLY的存儲級別將RDD持久化到內存,故緩存是一種特殊的持久化。堆內和堆外存儲內存的設計,便可以對緩存RDD時使用的內存做統一的規劃和管理。

RDD的持久化由Spark的Storage模塊負責,實現了RDD與物理存儲的解耦合。Storage模塊負責管理Spark在計算過程中產生的數據,將那些在內存或磁盤、在本地或遠程存取數據的功能封裝了起來。在具體實現時Driver端和Executor端的Storage模塊構成了主從式的架構,即Driver端的BlockManager為Master,Executor端的BlockManager為Slave。

Storage模塊在邏輯上以Block為基本存儲單位,RDD的每個Partition經過處理后位移對應一個Block(BlockId的格式為rdd_RDD-ID_PARTITION-ID)。Driver端的Master負責整個Spark應用程序的Block的元數據信息的管理和維護,而Executor端的Slave需要將Block的更新等狀態上報到Master,同時接受Master的命令,例如新增或刪除一個RDD。

img

在對RDD持久化時,Spark規定了MEMORY_ONLY、MEMORY_AND_DISK等7中不同的存儲級別,而存儲級別是以下5個變量的組合:

class StorageLevel private(

private var _useDisk: Boolean, //磁盤

private var _useMemory: Boolean, //這里其實是指堆內內存

private var _useOffHeap: Boolean, //堆外內存

private var _deserialized: Boolean, //是否為非序列化

private var _replication: Int = 1 //副本個數

)

Spark中7中存儲級別如下:

img

通過對數據結構的分析,可以看出存儲級別從三個維度定義了RDD的Partition(同時也就是Block)的存儲方式:

(1)存儲位置:磁盤/堆內內存/堆外內存。如MEMORY_AND_DISK是同時在磁盤和堆內內存上存儲,實現了冗余備份。OFF_HEAP則是只在堆外內存存儲,目前選擇堆外內存時不能同時存儲到其他位置。

(2)存儲形式:Block緩存到存儲內存后,是否為非序列化的形式。如MEMORY_ONLY是非序列化方式存儲,OFF_HEAP是序列化方式存儲。

(3)副本數量:大于1時需要遠程冗余備份到其他節點。如DISK_ONLY_2需要遠程備份1個副本。

2、RDD的緩存過程

RDD在緩存到存儲內存之前,Partition中的數據一般以迭代器(Iterator)的數據結構來訪問,這是Scala語言中一種遍歷數據集合的方法。通過Iterator可以獲取分區中每一條序列化或者非序列化的數據項(Record),這些Record的對象實例在邏輯上占用了JVM堆內內存的other部分的空間,同一Partition的不同Record的存儲空間并不連續。

RDD在緩存到存儲內存之后,Partition被轉換成Block,Record在堆內或堆外存儲內存中占用一塊連續的空間。將Partition由不連續的存儲空間轉換為連續存儲空間的過程,Spark稱之為“展開”(Unroll)。

Block有序列化和非序列化兩種存儲格式,具體以哪種方式取決于該RDD的存儲級別。非序列化的Block以一種DeserializedMemoryEntry的數據結構定義,用一個數組存儲所有的對象實例,序列化的Block則以SerializedMemoryEntry的數據結構定義,用字節緩沖區(ByteBuffer)來存儲二進制數據。每個Executor的Storage模塊用一個鏈式Map結構(LinkedHashMap)來管理堆內和堆外存儲內存中所有的Block對象的實例,對這個LinkedHashMap新增和刪除間接記錄了內存的申請和釋放。

因為不能保證存儲空間可以一次容納Iterator中的所有數據,當前的計算任務在Unroll時要向MemoryManager申請足夠的Unroll空間來臨時占位,空間不足則Unroll失敗,空間足夠時可以繼續進行。

對于序列化的Partition,其所需的Unroll空間可以直接累加計算,一次申請。

對于非序列化的Partition則要在便利Record的過程中一次申請,即每讀取一條Record,采樣估算其所需的Unroll空間并進行申請,空間不足時可以中斷,釋放已占用的Unroll空間。

如果最終Unroll成功,當前Partition所占用的Unroll空間被轉換為正常的緩存RDD的存儲空間,如下圖所示。

img

在靜態內存管理時,Spark在存儲內存中專門劃分了一塊Unroll空間,其大小是固定的,統一內存管理時則沒有對Unroll空間進行特別區分,當存儲空間不足時會根據動態占用機制進行處理。

3、淘汰與落盤

由于同一個Executor的所有的計算任務共享有限的存儲內存空間,當有新的Block需要緩存單數剩余空間不足且無法動態占用時,就要對LinkedHashMap中的舊Block進行淘汰(Eviction),而被淘汰的Block如果其存儲級別中同時包含存儲到磁盤的要求,則要對其進行落盤(Drop),否則直接刪除該Block。

存儲內存的淘汰規則為:

被淘汰的舊Block要與新的Block的MemoryMode相同,即同屬于堆外或堆內內存;

新舊Block不能屬于同一個RDD,避免循環淘汰;

舊Block所屬RDD不能處于被讀狀態,避免引發一致性問題;

遍歷LinkedHashMap中Block,按照最近最少使用(LRU)的順序淘汰,直到滿足新Block所需的空間。其中LRU是LinkedHashMap的特性。

落盤的流程則比較簡單,如果其存儲級別符合_useDisk為true的條件,再根據其_deserialized判斷是否是非序列化的形式,若是則對其進行序列化,最后將數據存儲到磁盤,在Storage模塊中更新其信息。

執行內存管理

執行內存主要用來存儲任務再在執行Shuffle時占用的內存,Shuffle是按照一定規則對RDD數據重新分區的過程,Shuffle的Write和Read兩階段對執行內存的使用:

Shuffle Write

在map端會采用ExternalSorter進行外排,在內存中存儲數據時主要占用堆內執行空間。

Shuffle Read

(1)在對reduce端的數據進行聚合時,要將數據交給Aggregator處理,在內存中存儲數據時占用堆內執行空間。

(2)如果需要進行最終結果排序,則要將再次將數據交給ExternalSorter處理,占用堆內執行空間。

在ExternalSorter和Aggregator中,Spark會使用一種叫做AppendOnlyMap的哈希表在堆內執行內存中存儲數據,但是Shuffle過程中所有數據并不能都保存到該哈希表中,當這個哈希表占用的內存會進行周期性地采樣估算,當其大到一定程度,無法再從MemoryManager申請到新的執行內存時,Spark就會將其全部內容存儲到磁盤文件中,這個過程被稱為溢存(Spill),溢存到磁盤的文件最后會被歸并(Merge)。

Spark的存儲內存和執行內存有著截然不同的管理方式:對于存儲內存來說,Spark用一個LinkedHashMap來集中管理所有的Block,Block由需要緩存的RDD的Partition轉化而成;而對于執行內存,Spark用AppendOnlyMap來存儲Shuffle過程中的數據,在Tungsten排序中甚至抽象稱為頁式內存管理,開辟了全新的JVM內存管理機制。

九、Spark核心組件解析

BlockManager數據存儲與管理機制

BlockManager是整個Spark底層負責數據存儲與管理的一個組件,Driver和Executor的所有數據都由對應的BlockManager進行管理。

Driver上有BlockManagerMaster,負責對各個節點上的BlockManager內部管理的數據的元數據進行維護,比如block的增刪改等操作,都會在這里維護好元數據的變更。

每個節點都有一個BlockManager,每個BlockManager創建之后,第一件事即使去向BlockManagerMaster進行注冊,此時BlockManagerMaster會為其創建對應的BlockManagerInfo。

img

BlockManagerMaster與BlockManager的關系非常像NameNode與DataNode的關系,BlockManagerMaster中保存BlockManager內部管理數據的元數據,進行維護,當BlockManager進行Block增刪改等操作時,都會在BlockManagerMaster中進行元數據的變更,這與NameNode維護DataNode的元數據信息,DataNode中數據發生變化時NameNode中的元數據也會相應變化是一致的。

每個節點上都有一個BlockManager,BlockManager中有三個非常重要的組件:

DisStore:負責對磁盤數據進行讀寫;

MemoryStore:負責對內存數據進行讀寫;

BlockTransferService:負責建立BlockManager到遠程其他節點的BlockManager的連接,負責對遠程其他節點的BlockManager的數據進行讀寫;

每個BlockManager創建之后,做的第一件事就是向BlockManagerMaster進行注冊,此時BlockManagerMaster會為其創建對應的BlockManagerInfo。

使用BlockManager進行寫操作時,比如說,RDD運行過程中的一些中間數據,或者我們手動指定了persist(),會優先將數據寫入內存中,如果內存大小不夠,會使用自己的算法,將內存中的部分數據寫入磁盤;此外,如果persist()指定了要replica,那么會使用BlockTransferService將數據replicate一份到其他節點的BlockManager上去。

使用BlockManager進行讀操作時,比如說,shuffleRead操作,如果能從本地讀取,就利用DisStore或者MemoryStore從本地讀取數據,但是本地沒有數據的話,那么會用BlockTransferService與有數據的BlockManager建立連接,然后用BlockTransferService從遠程BlockManager讀取數據;例如,shuffle Read操作中,很有可能要拉取的數據本地沒有,那么此時就會從遠程有數據的節點上,找那個節點的BlockManager來拉取需要的數據。

只要使用BlockManager執行了數據增刪改的操作,那么必須將Block的BlockStatus上報到BlockManagerMaster,在BlockManagerMaster上會對指定BlockManager的BlockManagerInfo內部的BlockStatus進行增刪改操作,從而達到元數據的維護功能。

Spark共享變量底層實現

Spark一個非常重要的特性就是共享變量。

默認情況下,如果在一個算子的函數中使用到了某個外部的變量,那么這個變量的值會被拷貝到每個task中,此時每個task只能操作自己的那份變量副本。如果多個task想要共享某個變量,那么這種方式是做不到的。

Spark為此提供了兩種共享變量,一種是Broadcast Variable(廣播變量),另一種是Accumulator(累加變量)。Broadcast Variable會將用到的變量,僅僅為每個節點拷貝一份,即每個Executor拷貝一份,更大的用途是優化性能,見上網絡傳輸以及內存損耗。Accumulator則可以讓多個task共同操作一份變量,主要可以進行累加操作。Broadcast Variable是共享讀變量,task不能去修改它,而Accumulator可以讓多個task操作一個變量。

廣播變量

廣播變量允許編程者在每個Executor上暴力外部數據的只讀變量,而不是給每個任務發送一個副本。

每個task都會保存一份它所使用的外部變量的副本,當一個Executor上的多個task都使用一個外部變量時,對于Executor內存的消耗是非常大的,因此,我們可以將大型外部變量封裝為廣播變量,此時一個Executor保存一個變量副本,此Executor上的所有task共用此變量,不再是一個task單獨保存一個副本,這在一定程度上降低了Spark任務的內存占用。

使用外部變量

使用廣播變量

Spark還嘗試使用高效的廣播算法分發廣播變量,以降低通信成本。

Spark提供的Broadcast Variable是只讀的,并且在每個Executor上只會有一個副本,而不會為每個task都拷貝一份副本,因此,它的最大作用,就是減少變量到各個節點的網絡傳輸消耗,以及在各個節點上的內存消耗。此外,Spark內部也是用了高效的廣播算法來減少網絡消耗。

可以通過調用SparkContext的broadcast()方法來針對每個變量創建廣播變量。然后再算子的函數內,使用到廣播變量時,每個Executor只會拷貝一份副本了,每個task可以使用廣播變量的value()方法獲取值。

在任務運行時,Executor并不獲取廣播變量,當task執行到使用廣播變量的代碼時,會向Executor的內存中請求廣播變量,如下圖所示:

img

之后Executor會通過BlockManager向Driver拉取廣播變量,然后提供給task進行使用,如下圖所示:

img

廣播大變量是Spark中常用的基礎優化方法,通過減少內存占用實現任務執行性能的提升。

累加器

累加器(accumulator):Accumulator是僅僅被相關操作累加的變量,因此可以在并行中被有效地支持。它們可用于實現計數器(如MapReduce)或總和計數。

Accumulator是存在于Driver端的,集群上運行的task進行Accumulator的累加,隨后把值發送到Driver端,在Driver端匯總(Spark UI在SparkContext創建時被創建,即在Driver端被創建,因此它可以讀取Accumulator的數值),由于Accumulator存在于Driver端,從節點讀取不到Accumulator的數值。

Spark提供的Accumulator主要用于多個節點對一個變量進行共享性的操作。Accumulator只提供了累加的功能,但是卻給我們提供了多個task對于同一個變量并行操作的功能,但是task只能對Accumulator進行累加操作,不能讀取它的值,只有Driver程序可以讀取Accumulator的值。

Accumulator的底層原理如下圖所示:

img








作者:柯廣的網絡日志

微信公眾號:Java大數據與數據倉庫