hive udf 使用方法

Hive系列文章

  1. Hive表的基本操作
  2. Hive中的集合數據類型
  3. Hive動態分區詳解
  4. hive中orc格式表的數據導入
  5. Java通過jdbc連接hive
  6. 通過HiveServer2訪問Hive
  7. SpringBoot連接Hive實現自助取數
  8. hive關聯hbase表
  9. Hive udf 使用方法
  10. Hive基于UDF進行文本分詞
  11. Hive窗口函數row number的用法
  12. 數據倉庫之拉鏈表

hive作為一個sql查詢引擎,自帶了一些基本的函數,比如count(計數),sum(求和),有時候這些基本函數滿足不了我們的需求,這時候就要寫hive hdf(user defined funation),又叫用戶自定義函數,應用與select 語句中。

哪些情況滿足不了我們的需求呢,比如:

  • 需要將字段與數據庫中查詢一下,做個比對;
  • 需要對數據進行復雜處理;
  • 等等

hive udf 用法

下面是一個判斷hive表字段是否包含'100'的簡單udf:

package com.js.dataclean.hive.udf.hm2

import org.apache.hadoop.hive.ql.exec.UDF;

public class IsContains100 extends UDF{

    public String evaluate(String s){

        if(s == null || s.length() == 0){
            return "0";
        }

        return s.contains("100")?"1":"0";
    }
}
Java

使用maven將其打包,進入hive cli,輸入命令:

add jar /home/hadoop/codejar/flash_format.jar;
create temporary function isContains100 as 'com.js.dataclean.hive.udf.hm2.IsContains100';
SQL

創建完臨時函數,即可使用這個函數了:

select isContains100('abc100def') from table limit 1;
1
SQL

hive udf 創建與使用步驟

  • 繼承org.apache.hadoop.hive.ql.exec.UDF類,實現evaluate方法;
  • 打包上傳到集群,通過create temporary function創建臨時函數,不加temporary就創建了一個永久函數;
  • 通過select 語句使用;

下面是一個例子,通過讀取mysql數據庫中的規則,為hive中的workflow返回對應的,類型:

type workflow
a   1
a   2
b   11
b   22
b   33

我們希望,將hive的某一個字段取值為,1,2的變為a,取值為11,22,33的全部變為b,就是歸類的意思。
這個udf可以這么實現:

package com.js.dataclean.hive.udf.hm2.workflow;

import org.apache.hadoop.hive.ql.exec.UDF;

import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @ Author: keguang
 * @ Date: 2018/12/13 16:24
 * @ version: v1.0.0
 * @ description:
 */
public class GetWorkflow extends UDF{

    private static final String host = "0.0.0.0";
    private static final String port = "3306";
    private static final String database = "root";
    private static final String userName = "root";
    private static final String password = "123456";
    private static String url = "";
    private static final String driver = "com.mysql.jdbc.Driver";
    private static Connection conn = null;
    private static Map<String, List<String>> workflowType = null;

    static {
        url = "jdbc:mysql://" + host + ":" + port + "/" + database;
        try {
            Class.forName(driver);
            conn = DriverManager.getConnection(url, userName, password);
            workflowType = getWorkflowType(conn);
        } catch (Exception e) {
            e.printStackTrace();
        }

    }

    private static Map<String, List<String>> getWorkflowType(Connection conn){
        Map<String, List<String>> workflowType = new HashMap<>();
        String sql = "select * from flash_player_workflow";
        PreparedStatement ps = null;
        try {
            ps = conn.prepareStatement(sql);
            ResultSet rs = ps.executeQuery();
            while (rs.next()){
                String workflow = rs.getString("workflow");
                String type = rs.getString("flag");

                List<String> workflows = workflowType.get(type);
                if(workflows == null){
                    workflows = new ArrayList<>();
                }
                workflows.add(workflow);
                workflowType.put(type, workflows);
            }
        } catch (SQLException e) {
            e.printStackTrace();
        }finally {

            // 關閉鏈接
            if(conn != null){
                try {
                    conn.close();
                } catch (SQLException e) {
                    e.printStackTrace();
                }
            }
        }
        return workflowType;

    }

    public String evaluate(String s){
        assert workflowType != null;

        for(String type:workflowType.keySet()){
            List<String> workflows = workflowType.get(type);
            if(workflows.contains(s)){
                return type;
            }
        }

        return s;
    }

}
Java

查看hive function的用法:

查month 相關的函數

show functions like '*month*';
SQL

查看 add_months 函數的用法

desc function add_months;
SQL

查看 add_months 函數的詳細說明并舉例

desc function extended add_months;
SQL

hive 中的 UDAF

可以看出,udf就是一個輸入一個輸出,輸入一個性別,返回'男'或者'女',如果我們想實現select date,count(1) from table,統計每天的流量呢?這就是一個分組統計,顯然是多個輸入,一個輸出,這時候udf已經不能滿足我們的需要,就需要寫udaf,user defined aggregare function(用戶自定義聚合函數)。

這里寫一個字符串連接函數,相當于concat的功能,將多行輸入,合并為一個字符串:

package com.js.dataclean.hive.udaf.hm2;

import com.js.dataclean.utils.StringUtil;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

/**
 * 實現字符串連接聚合的UDAF
 * @version v1.0.0
 * @Author:keguang
 * @Date:2018/10/22 14:36
 */
public class MutiStringConcat extends UDAF{
    public static class SumState{
        private String sumStr;
    }

    public static class SumEvaluator implements UDAFEvaluator{
        SumState sumState;

        public SumEvaluator(){
            super();
            sumState = new SumState();
            init();
        }

        @Override
        public void init() {
            sumState.sumStr = "";
        }

        /**
         * 來了一行數據
         * @param s
         * @return
         */
        public boolean iterate(String s){
            if(!StringUtil.isNull(s)){
                sumState.sumStr += s;
            }
            return true;
        }

        /**
         * 狀態傳遞
         * @return
         */
        public SumState terminatePartial() {
            return sumState;
        }

        /**
         * 子任務合并
         * @param state
         * @return
         */
        public boolean merge(SumState state){
            if(state != null){
                sumState.sumStr += state.sumStr;
            }
            return true;
        }

        /**
         * 返回最終結果
         * @return
         */
        public String terminate(){
            return sumState.sumStr;
        }
    }
}
Java

用法,與udf一樣,還是需要打包并且到hive cli中注冊使用。

關于UDAF開發注意點:

  • 需要import org.apache.hadoop.hive.ql.exec.UDAF以及org.apache.hadoop.hive.ql.exec.UDAFEvaluator,這兩個包都是必須的

  • 函數類需要繼承UDAF類,內部類Evaluator實現UDAFEvaluator接口

  • Evaluator需要實現 init、iterate、terminatePartial、merge、terminate這幾個函數

    • init函數類似于構造函數,用于UDAF的初始化

    • iterate接收傳入的參數,并進行內部的輪轉。其返回類型為boolean

    • terminatePartial無參數,其為iterate函數輪轉結束后,返回亂轉數據,iterate和terminatePartial類似于hadoop的Combiner

    • merge接收terminatePartial的返回結果,進行數據merge操作,其返回類型為boolean

    • terminate返回最終的聚集函數結果










作者:柯廣的網絡日志

微信公眾號:Java大數據與數據倉庫