Hive知識點

    Hive知識點

    2020-04-14 3523點熱度 0人點贊 0條評論

    hive知識點

    工作中hive常用知識點。

    Hive簡介

    hive是基于Hadoop的一個數據倉庫工具,可以將結構化的數據文件映射為一張數據庫表,并提供簡單的sql查詢功能,可以將sql語句轉換為MapReduce任務進行運行。其優點是學習成本低,可以通過類SQL語句快速實現簡單的MapReduce統計,不必開發專門的MapReduce應用,十分適合數據倉庫的統計分析。

    創建hive表

    # 新建個數據庫test
    create database test;
    create external table if not exists test.test(id string, name string);

    這里創建了一個名為testhive外部表,外部表與普通表的區別:

    1. 在導入數據到外部表,數據并沒有移動到自己的數據倉庫目錄下,也就是說外部表中的數據并不是由它自己來管理的!而表則不一樣;
    2. 在刪除表的時候,Hive將會把屬于表的元數據和數據全部刪掉;而刪除外部表的時候,Hive僅僅刪除外部表的元數據,數據是不會刪除的!那么,應該如何選擇使用哪種表呢?在大多數情況沒有太多的區別,因此選擇只是個人喜好的問題。但是作為一個經驗,如果所有處理都需要由Hive完成,那么你應該創建表,否則使用外部表!

    創建分區表

    通過partition關鍵字指定分區字段,分區表方便hive快速查詢索引數據。

    create table if not exists test.test
    (
    id string,
    name string
    )
    partitioned by (dt string,hour string)
    row format delimited fields terminated by '\t';
    SQL

    這里指定了兩個分區:dthour,對應hdfs的2級目錄dt,hour

    [hadoop@qcloud-test-hadoop01 ~]$ hdfs dfs -ls -R /hive/warehouse/test.db/test
    drwxr-xr-x   - hadoop supergroup          0 2019-09-10 19:07 /hive/warehouse/test.db/test/dt=2019-09-10
    drwxr-xr-x   - hadoop supergroup          0 2019-09-10 17:59 /hive/warehouse/test.db/test/dt=2019-09-10/hour=02
    -rwxr-xr-x   2 hadoop supergroup         39 2019-09-10 17:59 /hive/warehouse/test.db/test/dt=2019-09-10/hour=02/test.txt
    drwxr-xr-x   - hadoop supergroup          0 2019-09-10 19:07 /hive/warehouse/test.db/test/dt=2019-09-10/hour=03
    -rwxr-xr-x   2 hadoop supergroup         39 2019-09-10 19:07 /hive/warehouse/test.db/test/dt=2019-09-10/hour=03/test.txt

    刪除分區

    ALTER TABLE table_name DROP IF EXISTS PARTITION(year = 2015, month = 10, day = 1);

    創建orc存儲格式表

    hive創建orc格式表不能像textfile格式一樣直接load數據到表中,一般需要創建臨時textfile表,然后通過insert into 或者insert overwriteorc存儲格式表中。

    1) 臨時表testfile存儲格式

    create table if not exists test.test
    (
    id string,
    name string
    )
    partitioned by (dt string,hour string)
    row format delimited fields terminated by '\t';
    SQL

    test.txt

    001 keguang
    002 kg
    003 kk
    004 ikeguang

    load data local inpath '/home/hadoop/data/test.txt' into table test.test partition(dt = '2019-09-10', hour = '02');

    2) 導入數據到orc表

    create table if not exists test.test2
    (
    id string,
    name string
    )
    partitioned by (dt string,hour string)
    row format delimited fields terminated by '\t'
    stored as orc;

    insert select 導入數據

    insert overwrite table test.test2 partition(dt, hour) select `(dt|hour)?+.+`,dt,hour from test.test;

    這里(dt|hour)?+.+表示排除dt,hour兩個字段,由于動態分區partition(dt, hour)是按照select出來的最后2個字段作為分區字段的。其實這里select * 也是可以的,因為分區表查詢結果,最后兩個字段就是分區字段。

    select * from test.test2;
    
    結果
    001 keguang 2019-09-10  02
    002 kg  2019-09-10  02
    003 kk  2019-09-10  02
    004 ikeguang    2019-09-10  02

    所以說,非textfile存儲格式表導入數據步驟:

    1. 導入數據到textfile
    2. 查詢數據插入orc格式表

    select 排除字段

    選擇tableName表中除了name、id、pwd之外的所有字段

    set hive.support.quoted.identifiers=None;
    select `(name|id|pwd)?+.+` from tableName;

    UDF用法

    添加臨時函數

    add jar /home/hadoop/codejar/flash_format.jar;
    create temporary function gamelabel as 'com.js.dataclean.hive.udf.hm2.GameLabel';
    SQL

    刪除臨時函數

    drop temporary function 數據庫名.函數名;
    SQL

    添加永久函數

    create function hm2.gamelabel as 'com.js.dataclean.hive.udf.hm2.GameLabel' using jar 'hdfsJarPath'
    SQL

    ==注意==:1). 需要指定數據庫.函數名,即hm2.gamelabel,否則默認在default數據庫下面:default.gamelabel;2). hdfsJarPath即該jar包需要上傳到hdfs目錄;

    刪除永久函數:

    drop function 數據庫名.函數名字;
    SQL

    如果客戶端通過hiveserver2連接hive,為了正常使用自定義的永久udf,需要執行reload function;

    查看函數用法

    查month 相關的函數

    show functions like '*month*'

    查看 add_months 函數的用法

    desc function add_months;

    查看 add_months 函數的詳細說明并舉例

    desc function extended add_months;

    UDAF用法

    關于UDAF開發注意點:

    1.需要import org.apache.hadoop.hive.ql.exec.UDAF以及org.apache.hadoop.hive.ql.exec.UDAFEvaluator,這兩個包都是必須的

    2.函數類需要繼承UDAF類,內部類Evaluator實現UDAFEvaluator接口

    3.Evaluator需要實現 init、iterate、terminatePartial、merge、terminate這幾個函數

    1)init函數類似于構造函數,用于UDAF的初始化
    
    2)iterate接收傳入的參數,并進行內部的輪轉。其返回類型為boolean
    
    3)terminatePartial無參數,其為iterate函數輪轉結束后,返回亂轉數據,iterate和terminatePartial類似于hadoop的Combiner
    
    4)merge接收terminatePartial的返回結果,進行數據merge操作,其返回類型為boolean
    
    5)terminate返回最終的聚集函數結果  

    hive hbase 關聯

    create 'flash_people','info','label'
    
    create external table if not exists hm2.flash_people(
    guid string comment "people guid",
    firsttime string comment "首次入庫時間",
    ip string comment "用戶ip",
    jstimestamp bigint comment "json時間戳"
    )
    stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
    with serdeproperties("hbase.columns.mapping" = ":key,info:firsttime,info:ip,:timestamp")
    tblproperties("hbase.table.name" = "hm2:flash_people");

    hive -e 用法

    hive -e主要用來在命令行執行sql

    hive -e 'set mapred.reduce.tasks = 30;insert into hm2.flash_people select guid,dt,remote_addr,(32523145869-ngx_timestamp) from hm2.data where dt = "2018-07-01" and length(guid) = 38 and ngx_timestamp is not null and ngx_timestamp != '' and ngx_timestamp is regexp '\\d{8}' and remote_addr is not null and remote_addr != '';'
    set mapred.reduce.tasks = 30;insert into hm2.flash_people select guid,dt,remote_addr,(32523145869-ngx_timestamp) from hm2.data where dt = "2018-07-01" and length(guid) = 38 and ngx_timestamp is not null and ngx_timestamp != '' and ngx_timestamp rlike'^\\d+$' and remote_addr is not null and remote_addr != '';
    Bash

    hive一些優化參數

    set hive.auto.convert.join = false;
    set hive.ignore.mapjoin.hint=false;
    set hive.exec.parallel=true;
    set mapred.reduce.tasks = 60;
    SQL

    字段變更

    添加字段

    alter table hm2.helper add columns(country string, province string, city string);

    hive添加字段后,前面數據會有空值,就算將前面數據hdfs文件刪除,重新導入,仍然查詢出來是 NULL,這個問題有待解決。

    • 解決
    • 未解決

    添加 map 復雜類型字段

    alter table hm2.helper add columns(data_map map<string, string>);
    
    hive> desc formatted hm2.helper;
    OK
    # col_name              data_type               comment             
    
    time                    string                                      
    uuid                    string                                      
    country                 string                                      
    province                string                                      
    city                    string                                      
    data_map                map<string,string>                        
    
    # Partition Information      
    # col_name              data_type               comment             
    
    dt                      string                                      
    hour                    string                                      
    msgtype                 string                                      
    
    # Detailed Table Information         
    Database:               hm2                      
    Owner:                  hadoop                   
    CreateTime:             Wed Apr 24 10:12:30 CST 2019     
    LastAccessTime:         UNKNOWN                  
    Protect Mode:           None                     
    Retention:              0                        
    Location:               hdfs://nameser/hive/warehouse/hm2.db/helper  
    Table Type:             EXTERNAL_TABLE           
    Table Parameters:        
        EXTERNAL                TRUE                
        last_modified_by        hadoop              
        last_modified_time      1556072221          
        transient_lastDdlTime   1556072221          
    
    # Storage Information        
    SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe   
    InputFormat:            org.apache.hadoop.mapred.TextInputFormat     
    OutputFormat:           org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat   
    Compressed:             No                       
    Num Buckets:            -1                       
    Bucket Columns:         []                       
    Sort Columns:           []                       
    Storage Desc Params:         
        field.delim             \t                  
        serialization.format    \t                  
    Time taken: 0.105 seconds, Fetched: 59 row(s)

    仿照一張在創建表時定義了map類型字段的表的屬性描述

    Storage Desc Params:         
        colelction.delim        ,                   
        field.delim             \t                  
        mapkey.delim            :                   
        serialization.format    \t                  

    只需要將map屬性修改為:

    hive> alter table hm2.helper set serdeproperties('colelction.delim' = ',', 'mapkey.delim' = ':');
    OK
    Time taken: 0.132 seconds

    那么

    hive> desc formatted hm2.helper;
    OK
    # col_name              data_type               comment             
    
    time                    string                                      
    uuid                    string                                      
    country                 string                                      
    province                string                                      
    city                    string                                      
    data_map                map<string,string>                        
    
    # Partition Information      
    # col_name              data_type               comment             
    
    dt                      string                                      
    hour                    string                                      
    msgtype                 string                                      
    
    # Detailed Table Information         
    Database:               hm2                      
    Owner:                  hadoop                   
    CreateTime:             Wed Apr 24 10:12:30 CST 2019     
    LastAccessTime:         UNKNOWN                  
    Protect Mode:           None                     
    Retention:              0                        
    Location:               hdfs://nameser/hive/warehouse/hm2.db/helper  
    Table Type:             EXTERNAL_TABLE           
    Table Parameters:        
        EXTERNAL                TRUE                
        last_modified_by        hadoop              
        last_modified_time      1556072669          
        transient_lastDdlTime   1556072669          
    
    # Storage Information        
    SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe   
    InputFormat:            org.apache.hadoop.mapred.TextInputFormat     
    OutputFormat:           org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat   
    Compressed:             No                       
    Num Buckets:            -1                       
    Bucket Columns:         []                       
    Sort Columns:           []                       
    Storage Desc Params:         
        colelction.delim        ,                   
        field.delim             \t                  
        mapkey.delim            :                   
        serialization.format    \t                  
    Time taken: 0.079 seconds, Fetched: 61 row(s)

    即可

    刪除字段

    CREATE TABLE test (
    creatingTs BIGINT,
    a STRING,
    b BIGINT,
    c STRING,
    d STRING,
    e BIGINT,
    f BIGINT
    );

    如果需要刪除 column f 列,可以使用以下語句:

    ALTER TABLE test REPLACE COLUMNS (
    creatingTs BIGINT,
    a STRING,
    b BIGINT,
    c STRING,
    d STRING,
    e BIGINT
    );

    增加列:

    alter table of_test columns (judegment int)

    hive-1.2.1 支持insert,update,delete的配置

    hive-site.xml中添加配置

    <property>
        <name>hive.support.concurrency</name>
        <value>true</value>
    </property>
    <property>
        <name>hive.exec.dynamic.partition.mode</name>
        <value>nonstrict</value>
    </property>
    <property>
        <name>hive.txn.manager</name>
        <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
    </property>
    <property>
        <name>hive.compactor.initiator.on</name>
        <value>true</value>
    </property>
    <property>
        <name>hive.compactor.worker.threads</name>
        <value>1</value>
    </property>
    <property>
      <name>hive.enforce.bucketing</name>
      <value>true</value>
    </property>
    XML

    建表語句

    create external table if not exists hm2.history_helper
    (
    guid string,
    starttime string,
    endtime string,
    num int
    )
    clustered by(guid) into 50 buckets
    stored as orc TBLPROPERTIES ('transactional'='true');

    ==說明:建表語句必須帶有into buckets子句和stored as orc TBLPROPERTIES ('transactional'='true')子句,并且不能帶有sorted by子句。==

    這樣,這個表就可以就行insert,update,delete操作了。

    ==注意:== 上面規則在 hive-1.2.1 是可以的,在 hive-2.1.1 中需要將external關鍵字去掉,即高版本不支持外部表update了。

    hive表中的鎖

    場景:

    在執行insert into或insert overwrite任務時,中途手動將程序停掉,會出現卡死情況(無法提交MapReduce),只能執行查詢操作,而drop insert操作均不可操作,無論執行多久,都會保持卡死狀態

    臨時解決辦法是……把表名換一個……

    根本原因是:hive表被鎖或者某個分區被鎖,需要解鎖

    show locks 表名:

    可以查看表被鎖的情況

    解鎖

    unlock table 表名;  -- 解鎖表
    unlock table 表名 partition(dt='2014-04-01');  -- 解鎖某個分區

    表鎖和分區鎖是兩個不同的鎖,對表解鎖,對分區是無效的,分區需要單獨解鎖

    高版本hive默認插入數據時,不能查詢,因為有鎖

    hive> show locks;
    OK
    test@helper EXCLUSIVE

    解決辦法:關閉鎖機制

    set hive.txn.manager = org.apache.hadoop.hive.ql.lockmgr.DummyTxnManager; // 這是默認值
    set hive.support.concurrency=false; 默認為true

    基本知識

    查看表結構信息

     desc formatted table_name;
     desc table_name;

    導入數據到hive表

    load命令

    hive -e 'load data inpath "/data/MROutput/hm2_data_gamelabel_output2/part-r-*" into table hm2.game_label partition(dt="2018-10-11");'

    ==注意==:inpath 后接的hdfs路徑需要引號

    python中的hive命令字符串示例:

    cmd = 'hive -e \'load data inpath "%s/part-r-*" into table hm2.game_label partition(dt=%s);\''%(outpath, formatDate(day))

    orc格式表

    • hive創建orc格式表不能像textfile格式一樣直接load數據到表中,一般需要load創建臨時textfile表,然后通過insert into 或者insert overwrite到orc存儲格式表中。
    • 或者將現有orc文件cp到hive對應表的目錄

      map,reduce知識






      什么情況下只有一個reduce?

    很多時候你會發現任務中不管數據量多大,不管你有沒有設置調整reduce個數的參數,任務中一直都只有一個reduce任務;其實只有一個reduce任務的情況,除了數據量小于hive.exec.reducers.bytes.per.reducer參數值的情況外,還有以下原因:

    1. 沒有group by的匯總,比如把select pt,count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04' group by pt; 寫成 select count(1) from popt_tbaccountcopy_mes where pt = '2012-07-04';
      這點非常常見,希望大家盡量改寫。
    2. 用了Order by
    3. 有笛卡爾積
      通常這些情況下,除了找辦法來變通和避免,我暫時沒有什么好的辦法,因為這些操作都是全局的,所以hadoop不得不用一個reduce去完成;
      同樣的,在設置reduce個數的時候也需要考慮這兩個原則:使大數據量利用合適的reduce數;使單個reduce任務處理合適的數據量。

    hive 優化

    1. hive mapreduce參數優化
      設置map,reduce任務分配的資源
      set mapreduce.map.memory.mb = 4096 ;
      set mapreduce.reduce.memory.mb = 4096 ; 
      set mapreduce.map.java.opts=-Xmx3686m;
      set mapreduce.reduce.java.opts=-Xmx3428m;
    2. hive.exec.parallel參數控制在同一個sql中的不同的job是否可以同時運行,默認為false.

    下面是對于該參數的測試過程:

    測試sql:

    select r1.a 
    from 
    (select t.a from sunwg_10 t join sunwg_10000000 s on t.a=s.b) r1 
    join 
    (select s.b from sunwg_100000 t join sunwg_10 s on t.a=s.b) r2 
    on (r1.a=r2.b);
    • 當參數為false的時候,三個job是順序的執行
      set hive.exec.parallel=false;
    • 但是可以看出來其實兩個子查詢中的sql并無關系,可以并行的跑
      set hive.exec.parallel=true;
      1. 設置reduce個數
        set mapred.reduce.tasks = 15;

        ==總結==:
        在資源充足的時候hive.exec.parallel會讓那些存在并發job的sql運行得更快,但同時消耗更多的資源
        可以評估下hive.exec.parallel對我們的刷新任務是否有幫助.

    1. 參數設置

      set mapred.max.split.size=256000000;        -- 決定每個map處理的最大的文件大小,單位為B
    2. orc 小文件合并

      set hive.execution.engine = mr; # 不是必須的
      alter table hm3.hm3_format_log partition (dt="2019-09-17", hour="16", msgtype="web", action="image") concatenate;
      SQL

  1. msck repair修復大量分區

    set hive.msck.path.validation=ignore;
    msck repair table tbName;

hive on spark 知識

cdh 6.0.1 下通過設置:

set hive.execution.engine=spark;

也可以將默認的application執行引擎切換為spark;
apache hadoop 下配置 hive on spark

參數調優

了解完了Spark作業運行的基本原理之后,對資源相關的參數就容易理解了。所謂的Spark資源參數調優,其實主要就是對Spark運行過程中各個使用資源的地方,通過調節各種參數,來優化資源使用的效率,從而提升Spark作業的執行性能。以下參數就是Spark中主要的資源參數,每個參數都對應著作業運行原理中的某個部分。

num-executors/spark.executor.instances

  • 參數說明:該參數用于設置Spark作業總共要用多少個Executor進程來執行。Driver在向YARN集群管理器申請資源時,YARN集群管理器會盡可能按照你的設置來在集群的各個工作節點上,啟動相應數量的Executor進程。這個參數非常之重要,如果不設置的話,默認只會給你啟動少量的Executor進程,此時你的Spark作業的運行速度是非常慢的。
  • 參數調優建議:每個Spark作業的運行一般設置50~100個左右的Executor進程比較合適,設置太少或太多的Executor進程都不好。設置的太少,無法充分利用集群資源;設置的太多的話,大部分隊列可能無法給予充分的資源。

executor-memory/spark.executor.memory

  • 參數說明:該參數用于設置每個Executor進程的內存。Executor內存的大小,很多時候直接決定了Spark作業的性能,而且跟常見的JVM OOM異常,也有直接的關聯。
  • 參數調優建議:每個Executor進程的內存設置4G8G較為合適。但是這只是一個參考值,具體的設置還是得根據不同部門的資源隊列來定??梢钥纯醋约簣F隊的資源隊列的最大內存限制是多少,num-executors乘以executor-memory,是不能超過隊列的最大內存量的。此外,如果你是跟團隊里其他人共享這個資源隊列,那么申請的內存量最好不要超過資源隊列最大總內存的1/31/2,避免你自己的Spark作業占用了隊列所有的資源,導致別的同學的作業無法運行。

executor-cores/spark.executor.cores

  • 參數說明:該參數用于設置每個Executor進程的CPU core數量。這個參數決定了每個Executor進程并行執行task線程的能力。因為每個CPU core同一時間只能執行一個task線程,因此每個Executor進程的CPU core數量越多,越能夠快速地執行完分配給自己的所有task線程。
  • 參數調優建議:Executor的CPU core數量設置為2~4個較為合適。同樣得根據不同部門的資源隊列來定,可以看看自己的資源隊列的最大CPU core限制是多少,再依據設置的Executor數量,來決定每個Executor進程可以分配到幾個CPU core。同樣建議,如果是跟他人共享這個隊列,那么num-executors * executor-cores不要超過隊列總CPU core的1/3~1/2左右比較合適,也是避免影響其他同學的作業運行。

driver-memory

  • 參數說明:該參數用于設置Driver進程的內存。
  • 參數調優建議:Driver的內存通常來說不設置,或者設置1G左右應該就夠了。唯一需要注意的一點是,如果需要使用collect算子將RDD的數據全部拉取到Driver上進行處理,那么必須確保Driver的內存足夠大,否則會出現OOM內存溢出的問題。

spark.default.parallelism

  • 參數說明:該參數用于設置每個stage的默認task數量。這個參數極為重要,如果不設置可能會直接影響你的Spark作業性能。
  • 參數調優建議:Spark作業的默認task數量為500~1000個較為合適。很多同學常犯的一個錯誤就是不去設置這個參數,那么此時就會導致Spark自己根據底層HDFS的block數量來設置task的數量,默認是一個HDFS block對應一個task。通常來說,Spark默認設置的數量是偏少的(比如就幾十個task),如果task數量偏少的話,就會導致你前面設置好的Executor的參數都前功盡棄。試想一下,無論你的Executor進程有多少個,內存和CPU有多大,但是task只有1個或者10個,那么90%的Executor進程可能根本就沒有task執行,也就是白白浪費了資源!因此Spark官網建議的設置原則是,設置該參數為num-executors * executor-cores的2~3倍較為合適,比如Executor的總CPU core數量為300個,那么設置1000個task是可以的,此時可以充分地利用Spark集群的資源。

spark.storage.memoryFraction

  • 參數說明:該參數用于設置RDD持久化數據在Executor內存中能占的比例,默認是0.6。也就是說,默認Executor 60%的內存,可以用來保存持久化的RDD數據。根據你選擇的不同的持久化策略,如果內存不夠時,可能數據就不會持久化,或者數據會寫入磁盤。
  • 參數調優建議:如果Spark作業中,有較多的RDD持久化操作,該參數的值可以適當提高一些,保證持久化的數據能夠容納在內存中。避免內存不夠緩存所有的數據,導致數據只能寫入磁盤中,降低了性能。但是如果Spark作業中的shuffle類操作比較多,而持久化操作比較少,那么這個參數的值適當降低一些比較合適。此外,如果發現作業由于頻繁的gc導致運行緩慢(通過spark web ui可以觀察到作業的gc耗時),意味著task執行用戶代碼的內存不夠用,那么同樣建議調低這個參數的值。

spark.shuffle.memoryFraction

  • 參數說明:該參數用于設置shuffle過程中一個task拉取到上個stage的task的輸出后,進行聚合操作時能夠使用的Executor內存的比例,默認是0.2。也就是說,Executor默認只有20%的內存用來進行該操作。shuffle操作在進行聚合時,如果發現使用的內存超出了這個20%的限制,那么多余的數據就會溢寫到磁盤文件中去,此時就會極大地降低性能。
  • 參數調優建議:如果Spark作業中的RDD持久化操作較少,shuffle操作較多時,建議降低持久化操作的內存占比,提高shuffle操作的內存占比比例,避免shuffle過程中數據過多時內存不夠用,必須溢寫到磁盤上,降低了性能。此外,如果發現作業由于頻繁的gc導致運行緩慢,意味著task執行用戶代碼的內存不夠用,那么同樣建議調低這個參數的值。

原文鏈接


Spark On Yarn執行中executor內存限制問題

解決Spark On Yarn執行中executor內存限制問題

集群版本 Spark 2.2.0 + Hadoop 3.0-CDH6.0.1

hive on saprk , 設置:

hive> set hive.execution.engine=spark;
hive> set spark.executor.memory=31.5g;
hive> set spark.executor.cores=11;
hive> set spark.serializer=org.apache.spark.serializer.KryoSerializer;

提示內存不足

Failed to execute spark task, with exception 'org.apache.hadoop.hive.ql.metadata.HiveException(Failed to create Spark client for Spark session 50288c8b-96aa-44ad-9eea-3cb4abb1ae5b)'
FAILED: Execution Error, return code 30041 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. Failed to create Spark client for Spark session 50288c8b-96aa-44ad-9eea-3cb4abb1ae5b

解決方案,修改Yarn的配置文件:

1、yarn.nodemanager.resource.memory-mb 容器內存

設置為 至少 : executor-memory(15g) + driver(512m)的內存,如上例可配置為 16g

2、yarn.scheduler.maximum-allocation-mb 最大容器內存

設置為 至少 : executor-memory(15g) + driver(512m)的內存,如上例可配置為 16g

第一個參數為NodeManager`的配置 ,第二個參數為 `ResourceManager的配置。

字符串處理

1 字符串連接:

concat(str, str2, str3,...) 字符串連接
concat_ws(separator, str, str2, str3, ...) 將字符串用separator作為間隔連接起來

2 字符串截取

substr(s, 0, 1) 截取第一個字符
substr(s, -1) 截取最后一個字符

3 字符串urldecode

reflect("java.net.URLDecoder", "decode", trim(originalfilename), "UTF-8")
SQL

hive 中 join

mapjoin的優化在于,在mapreduce task開始之前,創建一個local task,小表以hshtable的形式加載到內存,然后序列化到磁盤,把內存的hashtable壓縮為tar文件。然后把文件分發到 Hadoop Distributed Cache,然后傳輸給每一個mapper,mapper在本地反序列化文件并加載進內存在做join

sql

select workflow,count(workflow) from (select guid, substr(workflow, -1) workflow from hm2.workflow_list) m right join hm2.helper helper on m.guid = helper.guid and helper.dt = "2018-10-21" group by workflow;

內存溢出解決辦法:

set hive.auto.convert.join = false;
set hive.ignore.mapjoin.hint=false;
set hive.exec.parallel=true;

Hive中Join的原理和機制

籠統的說,Hive中的Join可分為Common Join(Reduce階段完成join)和Map Join(Map階段完成join)。本文簡單介紹一下兩種join的原理和機制。

Hive Common Join

如果不指定MapJoin或者不符合MapJoin的條件,那么Hive解析器會將Join操作轉換成Common Join,即:在Reduce階段完成join.
整個過程包含Map、Shuffle、Reduce階段。

  • Map階段

讀取源表的數據,Map輸出時候以Join on條件中的列為key,如果Join有多個關聯鍵,則以這些關聯鍵的組合作為key;
Map輸出的value為join之后所關心的(select或者where中需要用到的)列;同時在value中還會包含表的Tag信息,用于標明此value對應哪個表;
按照key進行排序

  • Shuffle階段
    根據key的值進行hash,并將key/value按照hash值推送至不同的reduce中,這樣確保兩個表中相同的key位于同一個reduce中

  • Reduce階段
    根據key的值完成join操作,期間通過Tag來識別不同表中的數據。

以下面的HQL為例,圖解其過程:

SELECT 
 a.id,a.dept,b.age 
FROM a join b 
ON (a.id = b.id);

Hive Map Join

MapJoin通常用于一個很小的表和一個大表進行join的場景,具體小表有多小,由參數hive.mapjoin.smalltable.filesize來決定,該參數表示小表的總大小,默認值為25000000字節,即25M。
Hive0.7之前,需要使用hint提示 /+ mapjoin(table) /才會執行MapJoin,否則執行Common Join,但在0.7版本之后,默認自動會轉換Map Join,由參數hive.auto.convert.join來控制,默認為true.
仍然以9.1中的HQL來說吧,假設a表為一張大表,b為小表,并且hive.auto.convert.join=true,那么Hive在執行時候會自動轉化為MapJoin。

如圖中的流程,首先是Task A,它是一個Local Task(在客戶端本地執行的Task),負責掃描小表b的數據,將其轉換成一個HashTable的數據結構,并寫入本地的文件中,之后將該文件加載到DistributeCache中,該HashTable的數據結構可以抽象為:

key value
1 26
2 34
  • 接下來是Task B,該任務是一個沒有Reduce的MR,啟動MapTasks掃描大表a,在Map階段,根據a的每一條記錄去和DistributeCache中b表對應的HashTable關聯,并直接輸出結果。
  • 由于MapJoin沒有Reduce,所以由Map直接輸出結果文件,有多少個Map Task,就有多少個結果文件。

原文鏈接

轉義字符

hive> select split('a:1||b:2||c:3','\\|\\|') from hm2.test;
OK
["a:1","b:2","c:3"]
["a:1","b:2","c:3"]
["a:1","b:2","c:3"]
["a:1","b:2","c:3"]

其它轉義字符還有{, [

insert table select from

insert into tbName select * from tbName2;
insert overwrite table tbName select * from tbName2;

insert overwrite例子

insert overwrite table hm2.helper partition(dt = '2018-06-22', hour = '09',msgtype = 'helper') select time,source,remote_addr,remote_user,body_bytes_sent,request_time,status,host,request_method,http_referrer,http_x_forwarded_for,http_user_agent,upstream_response_time,upstream_addr,guid,helperversion,osversion,ngx_timestamp,get_type,split(ip2area(http_x_forwarded_for,remote_addr), "\t")[0] country,split(ip2area(http_x_forwarded_for,remote_addr), "\t")[1] province,split(ip2area(http_x_forwarded_for,remote_addr), "\t")[2] city from hm2.helper where dt = '2018-06-22' and hour = '09' and msgtype = 'helper';

插入分區表,不用指定分區,可以自動識別

INSERT overwrite TABLE test.dis_helper PARTITION (dt,hour,msgtype) select `(num)?+.+` from (select *,row_number() over (partition by guid order by time asc) num from hm2.helper where dt ='2018-09-06'and hour between '00' and '23' and msgtype='helper') t where t.num=1;

這里把數據去重,插入分區表test.dis_helper中,自動根據dt,hour,msgtype字段的取值進入分區表,并且`(num)?+.+表示除了`num``這個字段。

explain 查看執行計劃

對于這么一個插敘sql

explain
select
workflow,count(workflow) cou from (select guid, split(split(workflow,"\\|\\|")[0], ":")[1] workflow
from 
hm2.workflow_list) m
inner join
hm2.flash flash
on m.guid = flash.guid and flash.dt = "2018-11-04"
group by workflow order by cou;

可以打印出執行計劃

STAGE DEPENDENCIES:
  Stage-1 is a root stage
  Stage-2 depends on stages: Stage-1
  Stage-3 depends on stages: Stage-2
  Stage-0 depends on stages: Stage-3

STAGE PLANS:
  Stage: Stage-1
    Map Reduce
      Map Operator Tree:
          TableScan
            alias: workflow_list
            Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
            Filter Operator
              predicate: guid is not null (type: boolean)
              Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
              Select Operator
                expressions: guid (type: string), split(split(workflow, '\|\|')[0], ':')[1] (type: string)
                outputColumnNames: _col0, _col1
                Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
                Reduce Output Operator
                  key expressions: _col0 (type: string)
                  sort order: +
                  Map-reduce partition columns: _col0 (type: string)
                  Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
                  value expressions: _col1 (type: string)
          TableScan
            alias: flash
            Statistics: Num rows: 489153811 Data size: 48915382339 Basic stats: COMPLETE Column stats: NONE
            Filter Operator
              predicate: guid is not null (type: boolean)
              Statistics: Num rows: 244576906 Data size: 24457691219 Basic stats: COMPLETE Column stats: NONE
              Reduce Output Operator
                key expressions: guid (type: string)
                sort order: +
                Map-reduce partition columns: guid (type: string)
                Statistics: Num rows: 244576906 Data size: 24457691219 Basic stats: COMPLETE Column stats: NONE
      Reduce Operator Tree:
        Join Operator
          condition map:
               Inner Join 0 to 1
          keys:
            0 _col0 (type: string)
            1 guid (type: string)
          outputColumnNames: _col1
          Statistics: Num rows: 269034602 Data size: 26903460924 Basic stats: COMPLETE Column stats: NONE
          Group By Operator
            aggregations: count(_col1)
            keys: _col1 (type: string)
            mode: hash
            outputColumnNames: _col0, _col1
            Statistics: Num rows: 269034602 Data size: 26903460924 Basic stats: COMPLETE Column stats: NONE
            File Output Operator
              compressed: false
              table:
                  input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                  output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                  serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

  Stage: Stage-2
    Map Reduce
      Map Operator Tree:
          TableScan
            Reduce Output Operator
              key expressions: _col0 (type: string)
              sort order: +
              Map-reduce partition columns: _col0 (type: string)
              Statistics: Num rows: 269034602 Data size: 26903460924 Basic stats: COMPLETE Column stats: NONE
              value expressions: _col1 (type: bigint)
      Reduce Operator Tree:
        Group By Operator
          aggregations: count(VALUE._col0)
          keys: KEY._col0 (type: string)
          mode: mergepartial
          outputColumnNames: _col0, _col1
          Statistics: Num rows: 134517301 Data size: 13451730462 Basic stats: COMPLETE Column stats: NONE
          File Output Operator
            compressed: false
            table:
                input format: org.apache.hadoop.mapred.SequenceFileInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe

  Stage: Stage-3
    Map Reduce
      Map Operator Tree:
          TableScan
            Reduce Output Operator
              key expressions: _col1 (type: bigint)
              sort order: +
              Statistics: Num rows: 134517301 Data size: 13451730462 Basic stats: COMPLETE Column stats: NONE
              value expressions: _col0 (type: string)
      Reduce Operator Tree:
        Select Operator
          expressions: VALUE._col0 (type: string), KEY.reducesinkkey0 (type: bigint)
          outputColumnNames: _col0, _col1
          Statistics: Num rows: 134517301 Data size: 13451730462 Basic stats: COMPLETE Column stats: NONE
          File Output Operator
            compressed: false
            Statistics: Num rows: 134517301 Data size: 13451730462 Basic stats: COMPLETE Column stats: NONE
            table:
                input format: org.apache.hadoop.mapred.TextInputFormat
                output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
                serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe

  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        ListSink

常用 sql 查詢

常用SQL查詢。

explode 一行變多行

explode主要是將一行數據按照某個字段變為多行。

select mt.impress,count(mt.guid) from (select (case when t.impressed = '' or t.impressed is null then 'null' else t.impressed end) impress,t.guid from (select guid,impressed from hm2.install lateral view explode(split(replace(replace(offer_impressed,'[',''),']',''), ',')) test_alias as impressed where dt='2018-12-17') t) mt group by mt.impress;

分組后取 top N

row_number() over(partition by)

hm4.ffbrowser_domain_url_2019_07_17數據如下:

www.baidu.com   https://www.baidu.com/link?url=fCBMjU_THQzoAdo0RTAQLQIfVWIlPlgzpEM5Pk5qChKiahWFfMzFo6ckRjd9OFc7w8cj5h8esZXKBab5WqeLPgDYipewXUdz9LFPf-oxOfK&wd=&eqid=d2c80e8e0033424b000000045d2f14e9    1
www.baidu.com   https://www.baidu.com/link?url=3jlP1HLS4aYSnN1L5CG3pPr0zXnqBpDdG8mlFQm47w1RcHFwHiBU0t8Hi0UrMD37lSJvQkGWQ3iBtNpc0AJhEei-v8MdGKgRnVqy62tuCA_&wd=&eqid=e3d5845b002790f7000000045d2f0af8    1
www.baidu.com   http://www.baidu.com/   1
www.baidu.com   https://www.baidu.com/link?url=8S_ziMFwpClJww3C15iXu__wqMrMOxnYuDnZDpQWnbs1PTTqx_wwjIY7QsrFfaKT&wd=&eqid=eb8f38f70039cf51000000045d2e8716   1
www.baidu.com   https://www.baidu.com/link?url=AZvluWbTjZjpaT5lnIpkB-gTIdyX_nZdtoLX_pkbM5i&wd=&eqid=8b09c549000038e7000000035930ca9a    1
www.baidu.com   https://www.baidu.com/link?url=IjStquL7c4YwVDk1zQJFYkwBiGY20Kv2PQsXuTQTHH0BhAPLjUaz-XhLLp5Zfe3fE4hU_KNfEs6JxyESkwGlea&wd=&eqid=e7e404c9000012b1000000045d2ee845 1
www.baidu.com   https://www.baidu.com/link?url=qRaLKHc_ZZIskkWF_f6azkmHqRlfgmuRQZcrzRovBC5MEBR5yTIG20FiR3O__8Jz&wd=&eqid=e13f05290018e7fb000000045d2eed7d   2
www.baidu.com   https://www.baidu.com/s?tn=50000201_hao_pg&usm=3&wd=%E7%AB%AF%E7%81%AB%E9%94%85%E6%B3%BC%E5%A6%BB%E5%AD%90%E5%90%8C%E5%AD%A6&ie=utf-8&rsv_cq=%E4%BA%BA%E7%B1%BB%E7%99%BB%E6%9C%8850%E5%91%A8%E5%B9%B4&rsv_dl=0_right_fyb_pchot_20811_01&rsf=531e82477396136261c6275e8afa58b1_1_10_8&rqid=e3d5845b002790f7   1
www.baidu.com   https://www.baidu.com/link?url=NHmzZVrcbQ1tf6JnR4MJlHXJZFy-4RMgKwjNeDvskMyl17vpdi_8XgVCdRvGFtU2WJpNpHQf4VbwIeQi5qDHskDTrDUK5KMUkrkfKcWYxhy&wd=&eqid=e3d5845b002790f7000000045d2f0af8    1
www.baidu.com   https://www.baidu.com/s?wd=%E5%BE%AE%E5%8D%9A&ie=utf-8  1

sql查詢語句

select a.domain,a.url,a.cou from (select domain,url,cou,row_number() over(partition by domain order by cou desc) as n from hm4.ffbrowser_domain_url_2019_07_17)a where a.n <= 2;

結果

www.baidu.com   https://www.baidu.com/  69
www.baidu.com   https://www.baidu.com/link?url=r6riiF-vxG9OX70KBVx86FuywJYXHu-TpTTSEst9ggK78xIjVvkI_QoS9tEDBAqq&wd=&eqid=ba409e160014f8c9000000045eec97 3

相似問題:每門課程成績的前N名

case when 用法

select reginlistlength, softname, sum(cou) from (select (case when reginlistlength > '1' then 'more' else reginlistlength end) as reginlistlength, softname, count(l.guid) cou from (select reginlistlength, softname, guid from hm2.lnk where dt = '2019-05-24' and softtype = '1' group by guid,reginlistlength, softname, guid)l where l.guid in (select guid from hm2.helper where dt = '2019-05-24' group by guid) group by reginlistlength, softname)m group by reginlistlength, softname;
select (case when reginlistlength > '1' then 'more' else reginlistlength end) as reginlistlength, softname, count(l.guid) cou from (select reginlistlength, softname, guid from hm2.lnk where dt = '2019-05-24' and softtype = '1' group by guid,reginlistlength, softname, guid)l where l.guid in (select guid from hm2.helper where dt = '2019-05-24' group by guid) group by reginlistlength, softname;









作者:柯廣的網絡日志

微信公眾號:Java大數據與數據倉庫