Hive基于UDF進行文本分詞

Hive系列文章

  1. Hive表的基本操作
  2. Hive中的集合數據類型
  3. Hive動態分區詳解
  4. hive中orc格式表的數據導入
  5. Java通過jdbc連接hive
  6. 通過HiveServer2訪問Hive
  7. SpringBoot連接Hive實現自助取數
  8. hive關聯hbase表
  9. Hive udf 使用方法
  10. Hive基于UDF進行文本分詞
  11. Hive窗口函數row number的用法
  12. 數據倉庫之拉鏈表

本文大綱

UDF 簡介

Hive作為一個sql查詢引擎,自帶了一些基本的函數,比如count(計數),sum(求和),有時候這些基本函數滿足不了我們的需求,這時候就要寫hive hdf(user defined funation),又叫用戶自定義函數。編寫Hive UDF的步驟:

  • 添加相關依賴,創建項目,這里我用的管理工具是maven,所以我創建的也是一個maven 項目(這個時候你需要選擇合適的依賴版本,主要是Hadoop 和 Hive,可以使用hadoop versionhive --version 來分別查看版本)
  • 繼承org.apache.hadoop.hive.ql.exec.UDF類,實現evaluate方法,然后打包;
  • 使用 add方法添加jar 包到分布式緩存,如果jar包是上傳到$HIVE_HOME/lib/目錄以下,就不需要執行add命令了;
  • 通過create temporary function創建臨時函數,不加temporary就創建了一個永久函數;
  • 在SQL 中使用你創建的UDF;

UDF分詞

這個是一個比較常見的場景,例如公司的產品有每天都會產生大量的彈幕或者評論,這個時候我們可能會想去分析一下大家最關心的熱點話題是什么,或者是我們會分析最近一段時間的網絡趨勢是什么,但是這里有一個問題就是你的詞庫建設的問題,因為你使用通用的詞庫可能不能達到很好的分詞效果,尤其有很多網絡流行用語它是不在詞庫里的,還有一個就是停用詞的問題了,因為很多時候停用詞是沒有意義的,所以這里我們需要將其過濾,而過濾的方式就是通過停用詞詞表進行過濾。

這個時候我們的解決方案主要有兩種,一種是使用第三方提供的一些詞庫,還有一種是自建詞庫,然后有專人去維護,這個也是比較常見的一種情況。

最后一個就是我們使用的分詞工具,因為目前主流的分詞器很多,選擇不同的分詞工具可能對我們的分詞結果有很多影響。

分詞工具

1:Elasticsearch的開源中文分詞器 IK Analysis(Star:2471)

IK中文分詞器在Elasticsearch上的使用。原生IK中文分詞是從文件系統中讀取詞典,es-ik本身可擴展成從不同的源讀取詞典。目前提供從sqlite3數據庫中讀取。es-ik-plugin-sqlite3使用方法: 1. 在elasticsearch.yml中設置你的sqlite3詞典的位置: ik_analysis_db_path: /opt/ik/dictionary.db

2:開源的java中文分詞庫 IKAnalyzer(Star:343)

IK Analyzer 是一個開源的,基于java語言開發的輕量級的中文分詞工具包。從2006年12月推出1.0版開始, IKAnalyzer已經推出了4個大版本。最初,它是以開源項目Luence為應用主體的,結合詞典分詞和文法分析算法的中文分詞組件。從3.0版本開始,IK發展為面向Java的公用分詞組件,獨立于Lucene項目

3:java開源中文分詞 Ansj(Star:3019)

Ansj中文分詞 這是一個ictclas的java實現.基本上重寫了所有的數據結構和算法.詞典是用的開源版的ictclas所提供的.并且進行了部分的人工優化 分詞速度達到每秒鐘大約200萬字左右,準確率能達到96%以上。

目前實現了.中文分詞. 中文姓名識別 . 詞性標注、用戶自定義詞典,關鍵字提取,自動摘要,關鍵字標記等功能。

可以應用到自然語言處理等方面,適用于對分詞效果要求高的各種項目.

4:結巴分詞 ElasticSearch 插件(Star:188)

elasticsearch官方只提供smartcn這個中文分詞插件,效果不是很好,好在國內有medcl大神(國內最早研究es的人之一)寫的兩個中文分詞插件,一個是ik的,一個是mmseg的

5:Java分布式中文分詞組件 - word分詞(Star:672)

word分詞是一個Java實現的分布式的中文分詞組件,提供了多種基于詞典的分詞算法,并利用ngram模型來消除歧義。能準確識別英文、數字,以及日期、時間等數量詞,能識別人名、地名、組織機構名等未登錄詞

6:Java開源中文分詞器jcseg(Star:400)

Jcseg是什么? Jcseg是基于mmseg算法的一個輕量級開源中文分詞器,同時集成了關鍵字提取,關鍵短語提取,關鍵句子提取和文章自動摘要等功能,并且提供了最新版本的lucene, solr, elasticsearch的分詞接口, Jcseg自帶了一個 jcseg.properties文件...

7:中文分詞庫Paoding

庖丁中文分詞庫是一個使用Java開發的,可結合到Lucene應用中的,為互聯網、企業內部網使用的中文搜索引擎分詞組件。Paoding填補了國內中文分詞方面開源組件的空白,致力于此并希翼成為互聯網網站首選的中文分詞開源組件。 Paoding中文分詞追求分詞的高效率和用戶良好體驗。

8:中文分詞器mmseg4j

mmseg4j 用 Chih-Hao Tsai 的 MMSeg 算法(http://technology.chtsai.org/mmseg/ )實現的中文分詞器,并實現 lucene 的 analyzer 和 solr 的TokenizerFactory 以方便在Lucene和Solr中使...

9:中文分詞Ansj(Star:3015)

Ansj中文分詞 這是一個ictclas的java實現.基本上重寫了所有的數據結構和算法.詞典是用的開源版的ictclas所提供的.并且進行了部分的人工優化 內存中中文分詞每秒鐘大約100萬字(速度上已經超越ictclas) 文件讀取分詞每秒鐘大約30萬字 準確率能達到96%以上 目前實現了....

10:Lucene中文分詞庫ICTCLAS4J

ictclas4j中文分詞系統是sinboy在中科院張華平和劉群老師的研制的FreeICTCLAS的基礎上完成的一個java開源分詞項目,簡化了原分詞程序的復雜度,旨在為廣大的中文分詞愛好者一個更好的學習機會。

代碼實現

第一步:引入依賴

這里我們引入了兩個依賴,其實是兩個不同分詞工具

<dependency>
  <groupId>org.ansj</groupId>
  <artifactId>ansj_seg</artifactId>
  <version>5.1.6</version>
  <scope>compile</scope>
</dependency>
<dependency>
  <groupId>com.janeluo</groupId>
  <artifactId>ikanalyzer</artifactId>
  <version>2012_u6</version>
</dependency>
Java

在開始之前我們先寫一個demo 玩玩,讓大家有個基本的認識

@Test
public  void testAnsjSeg() {
    String str = "我叫李太白,我是一個詩人,我生活在唐朝" ;
    // 選擇使用哪種分詞器 BaseAnalysis ToAnalysis NlpAnalysis  IndexAnalysis
    Result result = ToAnalysis.parse(str);
    System.out.println(result);
    KeyWordComputer kwc = new KeyWordComputer(5);
    Collection<Keyword> keywords = kwc.computeArticleTfidf(str);
    System.out.println(keywords);
}
Java

輸出結果

我/r,叫/v,李太白/nr,,/w,我/r,是/v,一個/m,詩人/n,,/w,我/r,生活/vn,在/p,唐朝/t
[李太白/24.72276098504223, 詩人/3.0502185968368885, 唐朝/0.8965677022546215, 生活/0.6892230219652541]

第二步:引入停用詞詞庫

因為是停用詞詞庫,本身也不是很大,所以我直接放在項目里了,當然你也可以放在其他地方,例如HDFS 上

第三步:編寫UDF

代碼很簡單我就不不做詳細解釋了,需要注意的是GenericUDF 里面的一些方法的使用規則,至于代碼設計的好壞以及還有什么改進的方案我們后面再說,下面兩套實現的思路幾乎是一致的,不一樣的是在使用的分詞工具上的不一樣

ansj的實現

/**
 * Chinese words segmentation with user-dict in com.kingcall.dic
 * use Ansj(a java open source analyzer)
 */

// 這個信息就是你每次使用desc 進行獲取函數信息的時候返回的
@Description(name = "ansj_seg", value = "_FUNC_(str) - chinese words segment using ansj. Return list of words.",
        extended = "Example: select _FUNC_('我是測試字符串') from src limit 1;\n"
                + "[\"我\", \"是\", \"測試\", \"字符串\"]")

public class AnsjSeg extends GenericUDF {
    private transient ObjectInspectorConverters.Converter[] converters;
    private static final String userDic = "/app/stopwords/com.kingcall.dic";

    //load userDic in hdfs
    static {
        try {
            FileSystem fs = FileSystem.get(new Configuration());
            FSDataInputStream in = fs.open(new Path(userDic));
            BufferedReader br = new BufferedReader(new InputStreamReader(in));

            String line = null;
            String[] strs = null;
            while ((line = br.readLine()) != null) {
                line = line.trim();
                if (line.length() > 0) {
                    strs = line.split("\t");
                    strs[0] = strs[0].toLowerCase();
                    DicLibrary.insert(DicLibrary.DEFAULT, strs[0]); //ignore nature and freq
                }
            }
            MyStaticValue.isNameRecognition = Boolean.FALSE;
            MyStaticValue.isQuantifierRecognition = Boolean.TRUE;
        } catch (Exception e) {
            System.out.println("Error when load userDic" + e.getMessage());
        }
    }

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        if (arguments.length < 1 || arguments.length > 2) {
            throw new UDFArgumentLengthException(
                    "The function AnsjSeg(str) takes 1 or 2 arguments.");
        }

        converters = new ObjectInspectorConverters.Converter[arguments.length];
        converters[0] = ObjectInspectorConverters.getConverter(arguments[0], PrimitiveObjectInspectorFactory.writableStringObjectInspector);
        if (2 == arguments.length) {
            converters[1] = ObjectInspectorConverters.getConverter(arguments[1], PrimitiveObjectInspectorFactory.writableIntObjectInspector);
        }
        return ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableStringObjectInspector);
    }

    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        boolean filterStop = false;
        if (arguments[0].get() == null) {
            return null;
        }
        if (2 == arguments.length) {
            IntWritable filterParam = (IntWritable) converters[1].convert(arguments[1].get());
            if (1 == filterParam.get()) filterStop = true;
        }

        Text s = (Text) converters[0].convert(arguments[0].get());
        ArrayList<Text> result = new ArrayList<>();

        if (filterStop) {
            for (Term words : DicAnalysis.parse(s.toString()).recognition(StopLibrary.get())) {
                if (words.getName().trim().length() > 0) {
                    result.add(new Text(words.getName().trim()));
                }
            }
        } else {
            for (Term words : DicAnalysis.parse(s.toString())) {
                if (words.getName().trim().length() > 0) {
                    result.add(new Text(words.getName().trim()));
                }
            }
        }
        return result;
    }

    @Override
    public String getDisplayString(String[] children) {
        return getStandardDisplayString("ansj_seg", children);
    }
}
Java






ikanalyzer的實現

@Description(name = "ansj_seg", value = "_FUNC_(str) - chinese words segment using Iknalyzer. Return list of words.",
        extended = "Example: select _FUNC_('我是測試字符串') from src limit 1;\n"
                + "[\"我\", \"是\", \"測試\", \"字符串\"]")
public class IknalyzerSeg extends GenericUDF {
    private transient ObjectInspectorConverters.Converter[] converters;
    //用來存放停用詞的集合
    Set<String> stopWordSet = new HashSet<String>();

    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        if (arguments.length < 1 || arguments.length > 2) {
            throw new UDFArgumentLengthException(
                    "The function AnsjSeg(str) takes 1 or 2 arguments.");
        }
        //讀入停用詞文件
        BufferedReader StopWordFileBr = null;
        try {
            StopWordFileBr = new BufferedReader(new InputStreamReader(new FileInputStream(new File("stopwords/baidu_stopwords.txt"))));
            //初如化停用詞集
            String stopWord = null;
            for(; (stopWord = StopWordFileBr.readLine()) != null;){
                stopWordSet.add(stopWord);
            }
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

        converters = new ObjectInspectorConverters.Converter[arguments.length];
        converters[0] = ObjectInspectorConverters.getConverter(arguments[0], PrimitiveObjectInspectorFactory.writableStringObjectInspector);
        if (2 == arguments.length) {
            converters[1] = ObjectInspectorConverters.getConverter(arguments[1], PrimitiveObjectInspectorFactory.writableIntObjectInspector);
        }
        return ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.writableStringObjectInspector);

    }

    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {
        boolean filterStop = false;
        if (arguments[0].get() == null) {
            return null;
        }
        if (2 == arguments.length) {
            IntWritable filterParam = (IntWritable) converters[1].convert(arguments[1].get());
            if (1 == filterParam.get()) filterStop = true;
        }
        Text s = (Text) converters[0].convert(arguments[0].get());
        StringReader reader = new StringReader(s.toString());
        IKSegmenter iks = new IKSegmenter(reader, true);
        List<Text> list = new ArrayList<>();
        if (filterStop) {
            try {
                Lexeme lexeme;
                while ((lexeme = iks.next()) != null) {
                    if (!stopWordSet.contains(lexeme.getLexemeText())) {
                        list.add(new Text(lexeme.getLexemeText()));
                    }
                }
            } catch (IOException e) {
            }
        } else {
            try {
                Lexeme lexeme;
                while ((lexeme = iks.next()) != null) {
                    list.add(new Text(lexeme.getLexemeText()));
                }
            } catch (IOException e) {
            }
        }
        return list;
    }

    @Override
    public String getDisplayString(String[] children) {
        return "Usage: evaluate(String str)";
    }
}
Java

第四步:編寫測試用例

GenericUDF 給我們提供了一些方法,這些方法可以用來構建測試需要的環境和參數,這樣我們就可以測試這些代碼了

@Test
public void testAnsjSegFunc() throws HiveException {
    AnsjSeg udf = new AnsjSeg();
    ObjectInspector valueOI0 = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
    ObjectInspector valueOI1 = PrimitiveObjectInspectorFactory.javaIntObjectInspector;
    ObjectInspector[] init_args = {valueOI0, valueOI1};
    udf.initialize(init_args);

    Text str = new Text("我是測試字符串");

    GenericUDF.DeferredObject valueObj0 = new GenericUDF.DeferredJavaObject(str);
    GenericUDF.DeferredObject valueObj1 = new GenericUDF.DeferredJavaObject(0);
    GenericUDF.DeferredObject[] args = {valueObj0, valueObj1};
    ArrayList<Object> res = (ArrayList<Object>) udf.evaluate(args);
    System.out.println(res);
}

@Test
public void testIkSegFunc() throws HiveException {
    IknalyzerSeg udf = new IknalyzerSeg();
    ObjectInspector valueOI0 = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
    ObjectInspector valueOI1 = PrimitiveObjectInspectorFactory.javaIntObjectInspector;
    ObjectInspector[] init_args = {valueOI0, valueOI1};
    udf.initialize(init_args);

    Text str = new Text("我是測試字符串");

    GenericUDF.DeferredObject valueObj0 = new GenericUDF.DeferredJavaObject(str);
    GenericUDF.DeferredObject valueObj1 = new GenericUDF.DeferredJavaObject(0);
    GenericUDF.DeferredObject[] args = {valueObj0, valueObj1};
    ArrayList<Object> res = (ArrayList<Object>) udf.evaluate(args);
    System.out.println(res);
}
Java

我們看到加載停用詞沒有找到,但是整體還是跑起來了,因為讀取不到HDFS 上的文件

但是我們第二個樣例是不需要從HDFS 上加載停用詞信息,所以可以完美的測試運行

后來為了能在外部更新文件,我將其放在了HDFS 上,和AnsjSeg 中的代碼一樣

第五步:創建UDF 并使用

add jar /Users/liuwenqiang/workspace/code/idea/HiveUDF/target/HiveUDF-0.0.4.jar;
create temporary function ansjSeg as 'com.kingcall.bigdata.HiveUDF.AnsjSeg';
select ansjSeg("我是字符串,你是啥");
-- 開啟停用詞過濾
select ansjSeg("我是字符串,你是啥",1);
create temporary function ikSeg as 'com.kingcall.bigdata.HiveUDF.IknalyzerSeg';
select ikSeg("我是字符串,你是啥");
select ikSeg("我是字符串,你是啥",1);

上面方法的第二個參數,就是是否開啟停用詞過濾,我們使用ikSeg函數演示一下

下面我們嘗試獲取一下函數的描述信息

如果沒有寫的話,就是下面的這樣的

其它應用場景

通過編寫Hive UDF可以輕松幫我們實現大量常見需求,其它應該場景還有:

  • ip地址轉地區:將上報的用戶日志中的ip字段轉化為國家-省-市格式,便于做地域分布統計分析;
  • 使用Hive SQL計算的標簽數據,不想編寫Spark程序,可以通過UDF在靜態代碼塊中初始化連接池,利用Hive啟動的并行MR任務,并行快速導入大量數據到codis中,應用于一些推薦業務;
  • 還有其它sql實現相對復雜的任務,都可以編寫永久Hive UDF進行轉化;

總結

  1. 這一節我們學習了一個比較常見的UDF,通過實現GenericUDF 抽象類來實現,這一節的重點在于代碼的實現以及對GenericUDF類中方法的理解
  2. 上面的代碼實現上有一個問題,那就是關于停用詞的加載,就是我們能不能動態加載停用詞呢?

  1. 作者:柯廣的網絡日志

    微信公眾號:Java大數據與數據倉庫