您现在的位置是:亿华云 > 数据库
Flink无法将聚合结果直接写入Kafka怎么办?
亿华云2025-10-05 10:36:11【数据库】3人已围观
简介抛出疑无路?【Flink 1.10】- 有一种情况是所有的系统或应用之间的桥梁都是Kafka,而这个时候恰恰是上游需要做Unbound的聚合统计。From @PyFlink 企业用户。示例代码:INS
【Flink 1.10】- 有一种情况是所有的系统或应用之间的桥梁都是Kafka,而这个时候恰恰是直接上游需要做Unbound的聚合统计。From @PyFlink 企业用户。写入
示例代码:
INSERT INTO kafkaSink
SELECT
id,聚合结果
SUM(cnt)
FROM csvSource
GROUP BY id执行这个SQL,在【Flink 1.10】版本会抛出如下异常:
【Flink-1.10】这个问题是写入因Flink内部Retract机制导致,在没有考虑对Chanage log全链路支持之前,聚合结果无法在Kafka这样的直接Append only的消息队列增加对Retract/Upsert的支持。这个做法是写入出于语义完整性考虑做出的决定。但现实业务场景总是聚合结果有着这样或那样的实际业务需求,业务不关心你语义是直接否okay,业务关心我不改变我原有的写入技术选型。
在这个基础之上只要你告诉我Sink到Kafka的聚合结果行为就行,我会根据你的直接产出行为,源码库在业务上面做适配,写入所以这个时候就是实用为主,不管什么语义不语义了......,所以这个时候应该怎么办呢?
我们的做法是将 Kafka的sink由原有的AppendStreamTableSink变成UpsertStreamTableSink或者RetractStreamTableSink。但出于性能考虑,我们改变成UpsertStreamTableSink,这个改动不多,但是对于初学者来讲还是不太愿意动手改代码,所以为大家提供一份:
KafkaTableSinkBase.java
https://github.com/sunjincheng121/know_how_know_why/blob/master/QA/upsertKafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkBase.java
KafkaTableSourceSinkFactoryBase.javahttps://github.com/sunjincheng121/know_how_know_why/blob/master/QA/upsertKafka/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java
在你的项目创建 org.apache.flink.streaming.connectors.kafka包 并把上面的两个类放入该包,用于覆盖官方KafkaConnector里面的实现。
特别强调:这样的变化会导致写入Kafka的结果不会是每个Group Key只有一条结果,而是每个Key可能有很多条结果。这个大家可以自行测试一下:
package cdc
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.scala._
/
*** Test for sink data to Kafka with upsert mode.
*/
object UpsertKafka {
def main(args: Array[String]): Unit = {
val sourceData = "file:///Users/jincheng.sunjc/work/know_how_know_why/QA/upsertKafka/src/main/scala/cdc/id_cnt_data.csv"
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val sourceDDL = "CREATE TABLE csvSource (" +
" id VARCHAR," +
" cnt INT" +
") WITH (" +
"connector.type = filesystem," +
"connector.path = " + sourceData + "," +
"format.type = csv" +
")"
val sinkDDL = "CREATE TABLE kafkaSink (" +
" id VARCHAR," +
" cnt INT " +
") WITH (" +
"connector.type = kafka," +
"connector.version = 0.10," +
"connector.topic = test," +
"connector.properties.zookeeper.connect = localhost:2181," +
"connector.properties.bootstrap.servers = localhost:9092," +
"connector.properties.group.id = data_Group," +
"format.type = json)"
tEnv.sqlUpdate(sourceDDL)
tEnv.sqlUpdate(sinkDDL)
val sql = "INSERT INTO kafkaSink" +
" SELECT id, SUM(cnt) FROM csvSource GROUP BY id"
tEnv.sqlUpdate(sql)
env.execute("RetractKafka")
}
}当然,也可以clone我的git代码【https://github.com/sunjincheng121/know_how_know_why/tree/master/QA/upsertKafka】直观体验一下。由于本系列文章只关注解决问题,不论述细节原理,高防服务器有关原理性知识,我会在我的视频课程《Apache 知其然,知其所以然》中进行介绍。
Flink 的锅?...看到上面的问题有些朋友可能会问,既然知道问题,知道有实际业务需求,为啥Flink不改进,不把这种情况支持掉呢?问的好,就这个问题而言,Flink是委屈的,Flink已经在努力支持这个场景了,预期Flink-1.12的版本大家会体验到完整的CDC(change data capture)支持。
众人拾柴期待你典型问题的抛出... 我将知无不言...言无不尽... 我在又一村等你...
作者介绍孙金城,社区编辑,Apache Flink PMC 成员,Apache Beam Committer,Apache IoTDB PMC 成员,ALC Beijing 成员,Apache ShenYu 导师,Apache 软件基金会成员。关注技术领域流计算和时序数据存储。
香港云服务器很赞哦!(9395)
相关文章
- .com域名是国际最广泛流行的通用域名,目前全球注册量第一的域名,公司企业注册域名的首选。国际化公司通常会注册该类域名。
- 谁还不懂分布式系统性能调优,请把这篇文章甩给他~
- 在线协同成为办公日常,2020年超过3亿场会在腾讯会议上进行
- GitHub宣布全站清理不必要的Cookie提示栏
- 3.dns修改成功后,点击“域名解析”,按提示进行操作。解析格式一般如下:
- Python如何破解加密zip文件的密码!
- 并发编程之定时任务&定时线程池原理解析
- Python带你理解用于信号同步的CAZAC序列
- 打开https://www.aizhan.com/输入自己想要查询的域名然后按回车键,如果做过网站都会有数据显示出来
- Python循环语句代码详解:while、for、break