@22221cjp
2020-03-09T13:52:28.000000Z
字数 4320
阅读 2882
flink
Flink中的Join有很多种,比如说可以基于各种窗口做Join,这里主要讨论下Flink SQL当中的双流Join。Flink SQL中双流Join支持 inner join(join),left join,right join,full join。
可能会比较难理解,因为两个流动的数据怎么能Join起来,假如第一个流的第一条数据和1小时后第二个流的其中一个数据才能Join上,这时时间属性不就更乱了么,刚开始这条数据还需要输出么?
因为双流Join不能确定流中的数据什么时候能够Join上,所以这里有一个非常重要的结论:
双流Join会导致watermark属性丢失,两个流Join后的流不能有Join参与方定义的任何的watermark列
例子:
-- 创建源表CREATE TABLE order_datasource (order_id BIGINT,user_id BIGINT,store_id BIGINT,industry_id BIGINT,amount BIGINT,status int,ts TIMESTAMP(3),proctime as PROCTIME(), -- 通过计算列产生一个处理时间列WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定义watermark,ts成为事件时间列) WITH ('connector.type' = 'kafka', -- 使用 kafka connector'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本'connector.topic' = 'user_order', -- kafka topic'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址'format.type' = 'json' -- 数据源格式为 json);--user nick流表CREATE TABLE user_info (user_id BIGINT,user_nick varchar,ts TIMESTAMP(3),WATERMARK FOR ts as ts - INTERVAL '5' SECOND -- 在ts上定义watermark,ts成为事件时间列) WITH ('connector.type' = 'kafka', -- 使用 kafka connector'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本'connector.topic' = 'user_info', -- kafka topic'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址'format.type' = 'json' -- 数据源格式为 json);create view order_with_user_nick asselecto.user_id,o.ts,u.user_nickfrom order_datasource o join user_info uon o.user_id=u.user_id;
当执行select * from order_with_user_nick; 会报如下错误:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.TableException: Rowtime attributes must not be in the input rows of a regular join. As a workaround you can cast the time attributes of input tables to TIMESTAMP before.

当然如果源表中不指定watermark列,包含ts就完全没有问题了。如下:
--删掉了watermark列CREATE TABLE order_datasource_1 (order_id BIGINT,user_id BIGINT,store_id BIGINT,industry_id BIGINT,amount BIGINT,status int,ts TIMESTAMP(3),proctime as PROCTIME() -- 通过计算列产生一个处理时间列) WITH ('connector.type' = 'kafka', -- 使用 kafka connector'connector.version' = 'universal', -- kafka 版本,universal 支持 0.11 以上的版本'connector.topic' = 'user_order', -- kafka topic'connector.startup-mode' = 'earliest-offset', -- 从起始 offset 开始读取'connector.properties.zookeeper.connect' = 'localhost:2181', -- zookeeper 地址'connector.properties.bootstrap.servers' = 'localhost:9092', -- kafka broker 地址'format.type' = 'json' -- 数据源格式为 json);create view order_with_user_nick asselecto.user_id,o.ts,u.user_nickfrom order_datasource_1 o join user_info uon o.user_id=u.user_id;
这样执行select * from order_with_user_nick 不会有什么问题。
下面看下不同的双流Join,它们的行为是咋样的。双流Join在物理上会保存两个流到目前为止所有的历史数据,然后也会有一点的淘汰机制,因为这样存储的压力比较大,默认淘汰时间为36小时,对于按天聚合的数据,肯定没有问题的。这个时间可设定。
既然会保存所有的历史数据,这就相当于到目前为止所有数据的聚合。
inner join和join语义上是一样的。当双流进行join时,只有两个流Join的key相同的数据才会出现在Join后的流中。如果Key相同,其他的数据变了,那么每个变化的值,都会扩展到Join后的结果流中,相当于做的是笛卡尔积。注:一定要注意这里是扩展,而不是撤销(Retraction)之前的Join后的值
例如:
create view order_with_user_nick_1 asselecto.order_id ,o.user_id ,o.store_id ,o.industry_id ,o.amount ,o.status ,u.user_nickfrom order_datasource o join user_info uon o.user_id=u.user_id;
order_datasource数据
--{"order_id":"10000","user_id": "952483","store_id":"20000", "industry_id": "1", "amount":"100","status":"1", "ts": "2017-11-27T11:22:33Z"}
--{"order_id":"10000","user_id": "952484","store_id":"20000", "industry_id": "1", "amount":"100","status":"1", "ts": "2017-11-27T11:59:33Z"}user_info数据
{"user_id": "952483","user_nick":"zhang", "ts": "2017-11-27T11:22:33Z"}
{"user_id": "952483","user_nick":"li", "ts": "2017-12-27T11:22:33Z"}
刚开始结果流中不会有任务数据,即使向order_datasource源表中添加两条数据。当向user_info表中添加第一条数据时user_id=952483这时候Join后的流中出现第一条数据,但是当继续发送第二条user_info数据后,Join后的结果流中又会出现Join后的第二条数据,注意这里并不是撤销(Retraction)之前的结果。
Left Join和Join最大的不同是:当左边的表没有和右边的表Join上的时候,并不会等着Join上才输出,而是user_nick列直接输出null。如果随着右边的表数据的流动,以后再能Join上会咋样呢?这时候会触发撤销(Retraction),将之前输出的结果null改为对应Join上的值。如果后面还能Join上但是user_nick值变了,那么这时候还是笛卡尔积会扩展为多条数据。
总结:左表不等直接输出,右表没Join上不输出。后来第一次Join上会撤销左表第一次输出的null,后来再Join上扩展开。
create view order_with_user_nick_leftjoin asselecto.order_id ,o.user_id ,o.store_id ,o.industry_id ,o.amount ,o.status ,u.user_nickfrom order_datasource o left join user_info uon o.user_id=u.user_id;
根据Left Join可以理解Full Join的行为。
左表、右表没Join上都不等,直接输出,没Join上的列为null。后面不管是左表还是右表第一次Join上触发撤销,将前面的null改为对应的值,如果后面再次Join上,就展开。