Flink之Watermark詳解


在上一篇文章中我們介紹了窗口相關的內容,那么問題來了,比如公司組織春游,規定周六早晨8:00 ~ 8:30清查人數,人齊則發車出發,可是總有那么個同學會睡懶覺遲到,這時候通常也會等待20分鐘,但是不能一直等下去,如果到了20分鐘則認為,想自己在家過周末,不參與春游活動了,不會繼續等待了,直接出發。

這種機制跟這里要講的watermark機制是一個意思。指的是,由于網絡延遲等原因,一條數據會遲到計算,比如使用event time來劃分窗口,我們知道窗口中的數據是計算一段時間的數據,如果一個數據來晚了,它的時間范圍已經不屬于這個窗口了,則會被丟棄,但他的event time實際上是屬于這個窗口的。引入watermark機制則會等待晚到的數據一段時間,等待時間到則觸發計算,如果數據延遲很大,通常也會被丟棄或者另外處理。

1. 基本概念是什么

  • Window:Window是處理無界流的關鍵,Windows將流拆分為一個個有限大小的buckets,可以可以在每一個buckets中進行計算
  • start_time,end_time:當Window時時間窗口的時候,每個window都會有一個開始時間和結束時間(前開后閉),這個時間是系統時間
  • event-time: 事件發生時間,是事件發生所在設備的當地時間,比如一個點擊事件的時間發生時間,是用戶點擊操作所在的手機或電腦的時間
  • Watermarks:可以把他理解為一個水位線,等于evevtTime - delay(比如規定為20分鐘),一旦Watermarks大于了某個window的end_time,就會觸發此window的計算,Watermarks就是用來觸發window計算的。

推遲窗口觸發的時間,實現方式:通過當前窗口中最大的eventTime-延遲時間所得到的Watermark與窗口原始觸發時間進行對比,當Watermark大于窗口原始觸發時間時則觸發窗口執行?。?!我們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的,雖然大部分情況下,流到operator的數據都是按照事件產生的時間順序來的,但是也不排除由于網絡、分布式等原因,導致亂序的產生,所謂亂序,就是指Flink接收到的事件的先后順序不是嚴格按照事件的Event Time順序排列的。

那么此時出現一個問題,一旦出現亂序,如果只根據eventTime決定window的運行,我們不能明確數據是否全部到位,但又不能無限期的等下去,此時必須要有個機制來保證一個特定的時間后,必須觸發window去進行計算了,這個特別的機制,就是Watermark。

Watermark是一種衡量Event Time進展的機制。
Watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用Watermark機制結合window來實現。
數據流中的Watermark用于表示timestamp小于Watermark的數據,都已經到達了,因此,window的執行也是由Watermark觸發的。
Watermark可以理解成一個延遲觸發機制,我們可以設置Watermark的延時時長t,每次系統會校驗已經到達的數據中最大的maxEventTime,然后認定eventTime小于maxEventTime - t的所有數據都已經到達,如果有窗口的停止時間等于maxEventTime – t,那么這個窗口被觸發執行。
有序流的Watermarker如下圖所示:(Watermark設置為0)

亂序流的Watermarker如下圖所示:(Watermark設置為2)

當Flink接收到數據時,會按照一定的規則去生成Watermark,這條Watermark就等于當前所有到達數據中的maxEventTime - 延遲時長,也就是說,Watermark是由數據攜帶的,一旦數據攜帶的Watermark比當前未觸發的窗口的停止時間要晚,那么就會觸發相應窗口的執行。由于Watermark是由數據攜帶的,因此,如果運行過程中無法獲取新的數據,那么沒有被觸發的窗口將永遠都不被觸發。

上圖中,我們設置的允許最大延遲到達時間為2s,所以時間戳為5s的事件對應的Watermark是3s,時間戳為9s的事件的Watermark是7s,如果我們的窗口1是1s-3s,窗口2是4s-6s,那么時間戳為5s的事件到達時的Watermarker恰好觸發窗口1,時間戳為9s的事件到達時的Watermark觸發窗口2。

Watermark 就是觸發前一窗口的“關窗時間”,一旦觸發關門那么以當前時刻為準在窗口范圍內的所有所有數據都會收入窗中。只要沒有達到水位那么不管現實中的時間推進了多久都不會觸發關窗。

2. Watermark的引入

watermark的引入很簡單,對于亂序數據,最常見的引用方式如下:

datastream
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.milliseconds(maxOutOfOrderness)) {
    @Override
    public long extractTimestamp(String element) {
        return Long.valueOf(JSON.parseObject(element).getString("time"));
    }
})
Java

Event Time的使用通常要指定數據源中的時間戳,否則程序無法知道事件的事件時間是什么(數據源里的數據沒有時間戳的話,就只能使用Processing Time了)。
我們看到上面的例子中創建了一個看起來有點復雜的類,這個匿名類實現的其實就是分配時間戳的接口。Flink暴露了TimestampAssigner接口供我們實現,使我們可以自定義如何從事件數據中抽取時間戳,必須通過env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)指定通過時間事件EventTime來分配數據。

val env = StreamExecutionEnvironment.getExecutionEnvironment

// 必須指定
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
Java

MyAssigner有兩種類型

AssignerWithPeriodicWatermarks
AssignerWithPunctuatedWatermarks

以上兩個接口都繼承自TimestampAssigner,區別是

定期水位線(Assigner with periodic watermarks)

上面講述了根據從事件數據中去獲取時間戳設置水位線,但存在的問題是沒有達到水位線時不管現實中的時間推進了多久都不會觸發關窗,所以接下來我們就來介紹下定期水位線(Periodic Watermark)按照固定時間間隔生成新的水位線,不管是否有新的消息抵達,水位線提升的時間間隔是由用戶設置的,在兩次水位線提升時隔內會有一部分消息流入,用戶可以根據這部分數據來計算出新的水位線。舉個例子,最簡單的水位線算法就是取目前為止最大的事件時間,然而這種方式比較暴力,對亂序事件的容忍程度比較低,容易出現大量遲到事件。

應用定期水位線需要實現AssignerWithPeriodicWatermarks API,以下是 Flink 1.9 官網提供的定期水位線的實現例子。

public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {

    private final long maxOutOfOrderness = 3500; // 3.5 seconds

    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        long timestamp = element.getCreationTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        // 當前最大時間戳減去maxOutOfOrderness,就是watermark
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}
Java

其中extractTimestamp用于從消息中提取事件時間,而getCurrentWatermark用于生成新的水位線,新的水位線只有大于當前水位線才是有效的。每個窗口都會有該類的一個實例,因此可以利用實例的成員變量保存狀態,比如上例中的當前最大時間戳






注:周期性的(一定時間間隔或者達到一定的記錄條數)產生一個Watermark。在實際的生產中Periodic的方式必須結合時間和積累條數兩個維度繼續周期性(默認200ms)產生Watermark,否則在極端情況下會有很大的延時。

@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
    this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
    if (characteristic == TimeCharacteristic.ProcessingTime) {
        // 如果是ProcessingTime,那么默認時間間隔是0,一直不會過濾時間。
        getConfig().setAutoWatermarkInterval(0);
    } else {
        // 如果是EventTime,則autoWatermarkInterval設置為200ms
        getConfig().setAutoWatermarkInterval(200);
    }
}
Java

深入到assignTimestampsAndWatermarks里面,TimestampsAndPeriodicWatermarksOperator有一個定時回調任務:

@Override
public void open() throws Exception {
    super.open();

    currentWatermark = Long.MIN_VALUE;
    watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();

    if (watermarkInterval > 0) {
        long now = getProcessingTimeService().getCurrentProcessingTime();
        getProcessingTimeService().registerTimer(now + watermarkInterval, this);
    }
}
Java

里面大家感興趣可以繼續看一下,定時回調的方法,將符合要求的watermark發送出去并且注冊下一個定時器。

標點水位線(Assigner with punctuated watermarks)

標點水位線(Punctuated Watermark)通過數據流中某些特殊標記事件來觸發新水位線的生成。這種方式下窗口的觸發與時間無關,而是決定于何時收到標記事件。
應用標點水位線需要實現AssignerWithPunctuatedWatermarks API,以下是 Flink 1.9 官網提供的標點水位線的實現例子。

public class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<MyEvent> {

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        return element.getCreationTime();
    }

    @Override
    public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
        return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
    }
}
Java

其中extractTimestamp用于從消息中提取事件時間,checkAndGetNextWatermark用于檢查事件是否標點事件,若是則生成新的水位線。不同于定期水位線定時調用getCurrentWatermark,標點水位線是每接受一個事件就需要調用checkAndGetNextWatermark,若返回值非 null 且新水位線大于當前水位線,則觸發窗口計算

注:數據流中每一個遞增的EventTime都會產生一個Watermark。在實際的生產中Punctuated方式在TPS很高的場景下會產生大量的Watermark在一定程度上對下游算子造成壓力,所以只有在實時性要求非常高的場景才會選擇Punctuated的方式進行Watermark的生成。

遲到事件

雖說水位線表明著早于它的事件不應該再出現,但是上如上文所講,接收到水位線以前的的消息是不可避免的,這就是所謂的遲到事件。實際上遲到事件是亂序事件的特例,和一般亂序事件不同的是它們的亂序程度超出了水位線的預計,導致窗口在它們到達之前已經關閉。
遲到事件出現時窗口已經關閉并產出了計算結果,因此處理的方法有3種:

  • 重新激活已經關閉的窗口并重新計算以修正結果。
  • 將遲到事件收集起來另外處理。
  • 將遲到事件視為錯誤消息并丟棄。

Flink 默認的處理方式是第3種直接丟棄,其他兩種方式分別使用Side Output和Allowed Lateness。

Side Output機制可以將遲到事件單獨放入一個數據流分支,這會作為 window 計算結果的副產品,以便用戶獲取并對其進行特殊處理。

Allowed Lateness機制允許用戶設置一個允許的最大遲到時長。Flink 會再窗口關閉后一直保存窗口的狀態直至超過允許遲到時長,這期間的遲到事件不會被丟棄,而是默認會觸發窗口重新計算。因為保存窗口狀態需要額外內存,并且如果窗口計算使用了 ProcessWindowFunction API 還可能使得每個遲到事件觸發一次窗口的全量計算,代價比較大,所以允許遲到時長不宜設得太長,遲到事件也不宜過多,否則應該考慮降低水位線提高的速度或者調整算法。

// 側輸出
SingleOutputStreamOperator<Tuple2<String, Long>> lateOutputTag = new OutputTag<>("lateOutputTag");

DataStream<Tuple2<String, Long>> dataStream = senv.addSource(
        new FlinkKafkaConsumer010<>(
                config.get("kafka-topic"),
                new SimpleStringSchema(),
                kafkaProps
        ))
        //設置watermark
        .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.milliseconds(maxOutOfOrderness)) {
            @Override
            public long extractTimestamp(String element) {
                return Long.valueOf(JSON.parseObject(element).getString("time"));
            }
        }).map(x -> {
            JSONObject message = JSON.parseObject(x);
            return Tuple2.of(message.getString("name"), 1L);
        })
        .returns(Types.TUPLE(Types.STRING, Types.LONG))
        .keyBy(value -> value.f0)
        .window(TumblingEventTimeWindows.of(Time.seconds(3)))
        // 窗口會等待5s
        .allowedLateness(Time.milliseconds(5000))
        // 另外收集起來
        .sideOutputLateData(lateOutputTag)
        .process(new ProcessWindowFunction<Tuple2<String, Long>, Tuple2<String, Long>, String, TimeWindow>() {

            @Override
            public void process(String key, Context context, Iterable<Tuple2<String, Long>> elements, Collector<Tuple2<String, Long>> out) throws Exception {

                long count = 0;
                for(Tuple2<String, Long> element : elements){
                    count = count + 1;
                }

                out.collect(Tuple2.of("pv-" + key, count));
            }
        });

// 獲取遲到數據并寫入對應Sink
dataStream.getSideOutput(lateOutputTag).addSink(new RichSinkFunction<Tuple2<String, Long>>() {
...
});
Java

每個Kafka分區的Timestamp

當使用Apache Kafka座位數據源時,每個Kafka分區可能有一個簡單的事件時間模式(遞增的timestamp或者有界的無序)。然而,當消費Kafka中的數據時,多個分區通常是并發進行的,將事件從分區中分離開來,并銷毀分區模式(這是Kafka consumer客戶端固有的工作模式)。
在這種情況下,你可以使用Flink的 Kafka-partition-aware(譯作:Kafka分區識別或者Kafka分區敏感)水印生成,使用這個特性,水印會在Kafka消費端的每個分區中生成,并且每個分區的水印會在stream shuffle中進行合并。
例如:如果每個Kafka分區中的事件timestamp是嚴格遞增的話,使用ascending timestamps watermark generator(遞增時間戳水印生成器)將會得到完美的整體水印。
下圖展示了如何使用per-kafka-partition水印生成,以及水印是如何在流式數據流中傳播的。

FlinkKafkaConsumer09<MyType> kafkaSource = new FlinkKafkaConsumer09<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyType>() {

    @Override
    public long extractAscendingTimestamp(MyType element) {
        return element.eventTimestamp();
    }
});

DataStream<MyType> stream = env.addSource(kafkaSource);


作者:柯廣的網絡日志

微信公眾號:Java大數據與數據倉庫