加入收藏 | 设为首页 | 会员中心 | 我要投稿 武陵站长网 (https://www.50888.com.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 运营中心 > 网站设计 > 教程 > 正文

Apache Flink 漫谈系列 - Watermark

发布时间:2018-10-14 21:55:26 所属栏目:教程 来源:孙金城
导读:【新产品上线啦】51CTO播客,随时随地,碎片化学习 实际问题(乱序) 在介绍Watermark相关内容之前我们先抛出一个具体的问题,在实际的流式计算中数据到来的顺序对计算结果的正确性有至关重要的影响,比如:某数据源中的某些数据由于某种原因(如:网络原因,
副标题[/!--empirenews.page--] 【新产品上线啦】51CTO播客,随时随地,碎片化学习

实际问题(乱序)

在介绍Watermark相关内容之前我们先抛出一个具体的问题,在实际的流式计算中数据到来的顺序对计算结果的正确性有至关重要的影响,比如:某数据源中的某些数据由于某种原因(如:网络原因,外部存储自身原因)会有5秒的延时,也就是在实际时间的第1秒产生的数据有可能在第5秒中产生的数据之后到来(比如到Window处理节点)。选具体某个delay的元素来说,假设在一个5秒的Tumble窗口(详见Window介绍章节),有一个EventTime是 11秒的数据,在第16秒时候到来了。图示第11秒的数据,在16秒到来了,如下图:

Apache Flink 漫谈系列 - Watermark

那么对于一个Count聚合的Tumble(5s)的window,上面的情况如何处理才能window2=4,window3=2 呢?

Apache Flink的时间类型

开篇我们描述的问题是一个很常见的TimeWindow中数据乱序的问题,乱序是相对于事件产生时间和到达Apache Flink 实际处理算子的顺序而言的,关于时间在Apache Flink中有如下三种时间类型,如下图:

Apache Flink

(1)ProcessingTime

ProcessingTime是数据流入到具体某个算子时候相应的系统时间。ProcessingTime 有最好的性能和最低的延迟。但在分布式计算环境中ProcessingTime具有不确定性,相同数据流多次运行有可能产生不同的计算结果。

(2)IngestionTime

IngestionTime是数据进入Apache Flink框架的时间,是在Source Operator中设置的。与ProcessingTime相比可以提供更可预测的结果,因为IngestionTime的时间戳比较稳定(在源处只记录一次),同一数据在流经不同窗口操作时将使用相同的时间戳,而对于ProcessingTime同一数据在流经不同窗口算子会有不同的处理时间戳。

(3)EventTime

EventTime是事件在设备上产生时候携带的。在进入Apache Flink框架之前EventTime通常要嵌入到记录中,并且EventTime也可以从记录中提取出来。在实际的网上购物订单等业务场景中,大多会使用EventTime来进行数据计算。

开篇描述的问题和本篇要介绍的Watermark所涉及的时间类型均是指EventTime类型。

什么是Watermark

Watermark是Apache Flink为了处理EventTime 窗口计算提出的一种机制,本质上也是一种时间戳,由Apache Flink Source或者自定义的Watermark生成器按照需求Punctuated或者Periodic两种方式生成的一种系统Event,与普通数据流Event一样流转到对应的下游算子,接收到Watermark Event的算子以此不断调整自己管理的EventTime clock。 Apache Flink 框架保证Watermark单调递增,算子接收到一个Watermark时候,框架知道不会再有任何小于该Watermark的时间戳的数据元素到来了,所以Watermark可以看做是告诉Apache Flink框架数据流已经处理到什么位置(时间维度)的方式。 Watermark的产生和Apache Flink内部处理逻辑如下图所示:

ProcessingTime

Watermark的产生方式

目前Apache Flink 有两种生产Watermark的方式,如下:

  • Punctuated - 数据流中每一个递增的EventTime都会产生一个Watermark。在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。
  • Periodic - 周期性的(一定时间间隔或者达到一定的记录条数)产生一个Watermark。在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。

所以Watermark的生成方式需要根据业务场景的不同进行不同的选择。

Watermark的接口定义

对应Apache Flink Watermark两种不同的生成方式,我们了解一下对应的接口定义,如下:

  • Periodic Watermarks - AssignerWithPeriodicWatermarks
    1. /** 
    2. * Returns the current watermark. This method is periodically called by the 
    3. * system to retrieve the current watermark. The method may return {@code null} to 
    4. * indicate that no new Watermark is available. 
    5. * <p>The returned watermark will be emitted only if it is non-null and itsTimestamp 
    6. * is larger than that of the previously emitted watermark (to preserve the contract of 
    7. * ascending watermarks). If the current watermark is still 
    8. * identical to the previous one, no progress in EventTime has happened since 
    9. * the previous call to this method. If a null value is returned, or theTimestamp 
    10. * of the returned watermark is smaller than that of the last emitted one, then no 
    11. * new watermark will be generated. 
    12. * <p>The interval in which this method is called and Watermarks are generated 
    13. * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}. 
    14. * @see org.Apache.flink.streaming.api.watermark.Watermark 
    15. * @see ExecutionConfig#getAutoWatermarkInterval() 
    16. * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit. 
    17. */ 
    18. @Nullable 
    19. Watermark getCurrentWatermark(); 
  • Punctuated Watermarks - AssignerWithPunctuatedWatermarks
    1. public interface AssignerWithPunctuatedWatermarks<T> extendsTimestampAssigner<T> { 
    2.  
    3. /** 
    4. * Asks this implementation if it wants to emit a watermark. This method is called right after 
    5. * the {@link #extractTimestamp(Object, long)} method. 
    6. * <p>The returned watermark will be emitted only if it is non-null and itsTimestamp 
    7. * is larger than that of the previously emitted watermark (to preserve the contract of 
    8. * ascending watermarks). If a null value is returned, or theTimestamp of the returned 
    9. * watermark is smaller than that of the last emitted one, then no new watermark will 
    10. * be generated. 
    11. * <p>For an example how to use this method, see the documentation of 
    12. * {@link AssignerWithPunctuatedWatermarks this class}. 
    13. * @return {@code Null}, if no watermark should be emitted, or the next watermark to emit. 
    14. */ 
    15. @Nullable 
    16. Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);} 
  • AssignerWithPunctuatedWatermarks 继承了TimestampAssigner接口 -TimestampAssigner
    1. public interfaceTimestampAssigner<T> extends Function { 
    2.  
    3. /** 
    4. * Assigns aTimestamp to an element, in milliseconds since the Epoch. 
    5. * <p>The method is passed the previously assignedTimestamp of the element. 
    6. * That previousTimestamp may have been assigned from a previous assigner, 
    7. * by ingestionTime. If the element did not carry aTimestamp before, this value is 
    8. * {@code Long.MIN_VALUE}. 
    9. * @param element The element that theTimestamp is wil be assigned to. 
    10. * @param previousElementTimestamp The previous internalTimestamp of the element, 
    11. * or a negative value, if noTimestamp has been assigned, yet. 
    12. * @return The newTimestamp. 
    13. */ 
    14. long extractTimestamp(T element, long previousElementTimestamp); 

从接口定义可以看出,Watermark可以在Event(Element)中提取EventTime,进而定义一定的计算逻辑产生Watermark的时间戳。

Watermark解决如上问题

(编辑:武陵站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读