本文主要介绍如何使用 FLink SQL 自己的 DDL语言来构建基于 kafka 的表和 基于Mysql 的表,并直接把从 kafka 接过来的 Json 格式的数据转换为 表结构后直接写入到Mysql,有了这样的经验之后,大家可以自行修改 DML操作来实现不同的业务。文章内容参考了一些阿里云邪大佬的文章Link,写的很好。
环境配置如下:
- zookeeper : 3.4.6
- kafka: 2.12-2.2.1
- mysql: 8.0
以上三个组件全部是通过Docker搭建,我的环境是使用VirtualBox搭建的Centos7虚拟机,在虚拟机上安装Docker, 之后在本地主机IDE内调试代码。其中遇到了不少坑,主要是FLink与Kafka的通信,可以参考我的Docker-Compose文件的配置,已经解决了网络问题。注意KAFKA_ADVERTISED_LISTENERS
的地址修改成自己的虚拟机IP地址。
1 | version: '2.1' |
环境搭建好之后就是正式的代码部分了:
核心是两段SQL代码,分别是用来连接Kafka和MYSQL的。
其中kafka使用json格式来解析。
样例数据({“t”:1570,”user_name”:”xiaoming”,”cnt”:100})
1 | -- source |
在 Java代码中,可以直接使用tEnv的sqlUpdate()方法来注册这两张表,之后就可以直接使用了。具体操作如下:
1 | // 1. 连接kafka构建源表 |
可以直接通过mysql的客户端看到我们的写入结果!
以上就是使用 Flink SQL 的 DDL 语言通过不同的外部数据源建立表的过程。