[关闭]
@22221cjp 2020-03-02T08:19:32.000000Z 字数 3506 阅读 734

Kinesis Stream中的窗口

flink


Kinesis是AWS提供的流处理技术平台,主要包括:

本文章主要总结下在Kinesis Analytic中,流处理的窗口的分类和特点。在Kinesis Analytic中,窗口主要有三种:

滚动窗口

滚动窗口:

是在一个固定的时间线上根据固定的时间间隔生成的窗口。

什么是固定的时间线上?
时间是不断递增的,线性的。

时间有三种:

第一次插入到内部流中的时间

在真正处理的时候,事件时间可能是乱序的。
到达时间 基本上都是顺序的,但是在第一次插入内部流后,这个时间还可能变化。

所以真正的线性时间,最准确的就是 处理时间,也就是kinesis analytic中的 rowtime

所以滑动窗口通常是以 rowtime为时间线进行划分窗口。滚动窗口不能处理明确以时间发生时间作为统计维度的计算,比如计算每个小时商家的订单成交金额。

语法:就是使用group by

  1. SELECT STREAM ROWTIME,
  2. Ticker_Symbol,
  3. MIN(Price) AS Price,
  4. MAX(Price) AS Price
  5. FROM "SOURCE_SQL_STREAM_001"
  6. GROUP BY Ticker_Symbol,
  7. STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);

滑动窗口

滑动窗口有两种:

按照时间跨度滑动

按照时间跨度滑动,可以理解为每隔1s窗口就滑动一次,假设时间间隔是5s,在t0时刻启动

那么 t0+1,t0+2,t0+3,t0+4,t0+5 ,这几个时间窗口在逐渐扩大,但是并未“滑动”

接着,t0+2 - t0+6 向前滑动一次,如此下去,不断的移动窗口。

记录的处理方式

什么叫记录落在窗口中?

这个窗口是根据启动后,起始时间根据窗口的步长时间不断滑动确定的。如果窗口是以时间作为度量的话,那么根据记录的什么时间作为判定条件,就说这个记录落在了窗口中呢?

aws的文档中没说按什么时间,将记录落在窗口中,但是根据分析,应该不是事件时间,而是记录的处理时间 row time。因为sql的规范如下:

  1. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
  2. ticker_symbol VARCHAR(10),
  3. Min_Price double,
  4. Max_Price double,
  5. Avg_Price double);
  6. CREATE OR REPLACE PUMP "STREAM_PUMP" AS
  7. INSERT INTO "DESTINATION_SQL_STREAM"
  8. SELECT STREAM ticker_symbol,
  9. MIN(Price) OVER W1 AS Min_Price,
  10. MAX(Price) OVER W1 AS Max_Price,
  11. AVG(Price) OVER W1 AS Avg_Price
  12. FROM "SOURCE_SQL_STREAM_001"
  13. WINDOW W1 AS (
  14. PARTITION BY ticker_symbol
  15. RANGE INTERVAL '1' MINUTE PRECEDING);

这其中根本没有使用event time。

所以按时间作为划分维度的滑动窗口,不能作为事件时间分析维度的数据来计算。

注意:

这个注意是kinesis sql给出的建议。

有趣的是:

可以在同一个sql中,一次定义多个类型的滑动窗口。

上面定义的w1是第一个窗口,也可以定义多个窗口。

以记录数作为划分窗口条件

在以下示例中,查询将发送输出股票行情机、价格、a2 和 a10。查询将发送股票代码的输出,这些代码的两行移动平均值超过了 10 行移动平均值。a2 和 a10 列值派生自 2 行和 10 行滑动窗口。

  1. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
  2. ticker_symbol VARCHAR(12),
  3. price double,
  4. average_last2rows double,
  5. average_last10rows double);
  6. CREATE OR REPLACE PUMP "myPump" AS INSERT INTO "DESTINATION_SQL_STREAM"
  7. SELECT STREAM ticker_symbol,
  8. price,
  9. avg(price) over last2rows,
  10. avg(price) over last10rows
  11. FROM SOURCE_SQL_STREAM_001
  12. WINDOW
  13. last2rows AS (PARTITION BY ticker_symbol ROWS 2 PRECEDING),
  14. last10rows AS (PARTITION BY ticker_symbol ROWS 10 PRECEDING);

总结:

总之,滑动窗口的使用还是有一定限制的。比如按时间划分的窗口,窗口时间不宜太长,一般不超过1小时。

滑动窗口不是按记录的event time作为落在窗口的依据的。所以不能用作按记录发生时间作为计算维度的场景下。

交错窗口

因为滚动窗口和滑动窗口都不是根据事件发生时间作为划分窗口条件的窗口,所以在应用上会有很多限制。而交错窗口解决了这样的问题。

只要不是以事件发生时间作为窗口条件,那么任务执行就不会是幂等的,也就是说相同的源数据,多次执行可能会等到不同的结果,这个是不能接受的。

交错窗口事实上是以event time作为窗口条件的窗口。也能在一定层度上解决数据延迟到达的问题。

交错窗口打开的时间是不确定的,当符合分区键的第一条记录到达时打开,而不是一个固定时间打开,然后可以定义交错窗口打开的时间长度,这个要根据业务的需要设定,比如要统计每个小时成交额,那么这个时间长度可以定义在1小时多10秒。那么可以等待延迟到达的数据时间是多少呢?应该是至少10s,因为这取决于第一条记录何时到达。

交错窗口的分区条件一定要以event time作为条件。交错窗口是同时会开多个窗口,因为不同的分区key就会产生不同的窗口,也正是因为如此,所以kinesis文档翻译中叫“交错窗口”。

下面是交错窗口使用语法:

  1. CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
  2. ticker_symbol VARCHAR(4),
  3. event_time TIMESTAMP,
  4. ticker_count DOUBLE);
  5. CREATE OR REPLACE PUMP "STREAM_PUMP" AS
  6. INSERT INTO "DESTINATION_SQL_STREAM"
  7. SELECT STREAM
  8. TICKER_SYMBOL,
  9. FLOOR(EVENT_TIME TO MINUTE),
  10. COUNT(TICKER_SYMBOL) AS ticker_count
  11. FROM "SOURCE_SQL_STREAM_001"
  12. WINDOWED BY STAGGER (
  13. PARTITION BY FLOOR(EVENT_TIME TO MINUTE), TICKER_SYMBOL RANGE INTERVAL '1' MINUTE);

可以看到分区的条件是以事件时间作为条件的,而且这里设定的窗口打开时间是1分钟,这样的话,最坏的情况下就是不等延迟到达的数据,这取决窗口打开的时间离FLOOR(EVENT_TIME TO MINUTE)有多远了。

在实际的开发中,感觉大部分情况下都应该使用交错窗口,因为任务执行时幂等的,可重复的,且任务是基于事件时间的。

参考:
https://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/stagger-window-concepts.html

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注