[关闭]
@22221cjp 2020-03-09T13:52:28.000000Z 字数 4320 阅读 2882

Flink中双流JOIN

flink


Flink中的Join有很多种,比如说可以基于各种窗口做Join,这里主要讨论下Flink SQL当中的双流Join。Flink SQL中双流Join支持 inner join(join),left join,right join,full join。

可能会比较难理解,因为两个流动的数据怎么能Join起来,假如第一个流的第一条数据和1小时后第二个流的其中一个数据才能Join上,这时时间属性不就更乱了么,刚开始这条数据还需要输出么?

Join流的时间属性

因为双流Join不能确定流中的数据什么时候能够Join上,所以这里有一个非常重要的结论:

双流Join会导致watermark属性丢失,两个流Join后的流不能有Join参与方定义的任何的watermark列

例子:

  1. -- 创建源表
  2. CREATE TABLE order_datasource (
  3. order_id BIGINT,
  4. user_id BIGINT,
  5. store_id BIGINT,
  6. industry_id BIGINT,
  7. amount BIGINT,
  8. status int,
  9. ts TIMESTAMP(3),
  10. proctime as PROCTIME(), -- 通过计算列产生一个处理时间列
  11. WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- ts上定义watermarkts成为事件时间列
  12. ) WITH (
  13. 'connector.type' = 'kafka', -- 使用 kafka connector
  14. 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本
  15. 'connector.topic' = 'user_order', -- kafka topic
  16. 'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取
  17. 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址
  18. 'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址
  19. 'format.type' = 'json' -- 数据源格式为 json
  20. );
  21. --user nick流表
  22. CREATE TABLE user_info (
  23. user_id BIGINT,
  24. user_nick varchar,
  25. ts TIMESTAMP(3),
  26. WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- ts上定义watermarkts成为事件时间列
  27. ) WITH (
  28. 'connector.type' = 'kafka', -- 使用 kafka connector
  29. 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本
  30. 'connector.topic' = 'user_info', -- kafka topic
  31. 'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取
  32. 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址
  33. 'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址
  34. 'format.type' = 'json' -- 数据源格式为 json
  35. );
  36. create view order_with_user_nick as
  37. select
  38. o.user_id,
  39. o.ts,
  40. u.user_nick
  41. from order_datasource o join user_info u
  42. on o.user_id=u.user_id;

当执行select * from order_with_user_nick; 会报如下错误:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.

image_1e2vmorb1es3r8jpnn1ph31lua9.png-37.4kB

当然如果源表中不指定watermark列,包含ts就完全没有问题了。如下:

  1. --删掉了watermark
  2. CREATE TABLE order_datasource_1 (
  3. order_id BIGINT,
  4. user_id BIGINT,
  5. store_id BIGINT,
  6. industry_id BIGINT,
  7. amount BIGINT,
  8. status int,
  9. ts TIMESTAMP(3),
  10. proctime as PROCTIME() -- 通过计算列产生一个处理时间列
  11. ) WITH (
  12. 'connector.type' = 'kafka', -- 使用 kafka connector
  13. 'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本
  14. 'connector.topic' = 'user_order', -- kafka topic
  15. 'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取
  16. 'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址
  17. 'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址
  18. 'format.type' = 'json' -- 数据源格式为 json
  19. );
  20. create view order_with_user_nick as
  21. select
  22. o.user_id,
  23. o.ts,
  24. u.user_nick
  25. from order_datasource_1 o join user_info u
  26. on o.user_id=u.user_id;

这样执行select * from order_with_user_nick 不会有什么问题。

下面看下不同的双流Join,它们的行为是咋样的。双流Join在物理上会保存两个流到目前为止所有的历史数据,然后也会有一点的淘汰机制,因为这样存储的压力比较大,默认淘汰时间为36小时,对于按天聚合的数据,肯定没有问题的。这个时间可设定。

既然会保存所有的历史数据,这就相当于到目前为止所有数据的聚合。

Inner Join/Join行为

inner join和join语义上是一样的。当双流进行join时,只有两个流Join的key相同的数据才会出现在Join后的流中。如果Key相同,其他的数据变了,那么每个变化的值,都会扩展到Join后的结果流中,相当于做的是笛卡尔积。注:一定要注意这里是扩展,而不是撤销(Retraction)之前的Join后的值

例如:

  1. create view order_with_user_nick_1 as
  2. select
  3. o.order_id ,
  4. o.user_id ,
  5. o.store_id ,
  6. o.industry_id ,
  7. o.amount ,
  8. o.status ,
  9. u.user_nick
  10. from order_datasource o join user_info u
  11. on o.user_id=u.user_id;

order_datasource数据
--{"order_id":"10000","user_id": "952483","store_id":"20000", "industry_id": "1", "amount":"100","status":"1", "ts": "2017-11-27T11:22:33Z"}
--{"order_id":"10000","user_id": "952484","store_id":"20000", "industry_id": "1", "amount":"100","status":"1", "ts": "2017-11-27T11:59:33Z"}

user_info数据
{"user_id": "952483","user_nick":"zhang", "ts": "2017-11-27T11:22:33Z"}
{"user_id": "952483","user_nick":"li", "ts": "2017-12-27T11:22:33Z"}

刚开始结果流中不会有任务数据,即使向order_datasource源表中添加两条数据。当向user_info表中添加第一条数据时user_id=952483这时候Join后的流中出现第一条数据,但是当继续发送第二条user_info数据后,Join后的结果流中又会出现Join后的第二条数据,注意这里并不是撤销(Retraction)之前的结果。

Left Join行为

Left Join和Join最大的不同是:当左边的表没有和右边的表Join上的时候,并不会等着Join上才输出,而是user_nick列直接输出null。如果随着右边的表数据的流动,以后再能Join上会咋样呢?这时候会触发撤销(Retraction),将之前输出的结果null改为对应Join上的值。如果后面还能Join上但是user_nick值变了,那么这时候还是笛卡尔积会扩展为多条数据。

总结:左表不等直接输出,右表没Join上不输出。后来第一次Join上会撤销左表第一次输出的null,后来再Join上扩展开。

  1. create view order_with_user_nick_leftjoin as
  2. select
  3. o.order_id ,
  4. o.user_id ,
  5. o.store_id ,
  6. o.industry_id ,
  7. o.amount ,
  8. o.status ,
  9. u.user_nick
  10. from order_datasource o left join user_info u
  11. on o.user_id=u.user_id;

Full Join行为

根据Left Join可以理解Full Join的行为。

左表、右表没Join上都不等,直接输出,没Join上的列为null。后面不管是左表还是右表第一次Join上触发撤销,将前面的null改为对应的值,如果后面再次Join上,就展开。

添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注