flink使用15-在Flink SQL中连接kafka和mysql

本文主要介绍如何使用 FLink SQL 自己的 DDL语言来构建基于 kafka 的表和 基于Mysql 的表,并直接把从 kafka 接过来的 Json 格式的数据转换为 表结构后直接写入到Mysql,有了这样的经验之后,大家可以自行修改 DML操作来实现不同的业务。文章内容参考了一些阿里云邪大佬的文章Link,写的很好。

环境配置如下:

  • zookeeper : 3.4.6
  • kafka: 2.12-2.2.1
  • mysql: 8.0

以上三个组件全部是通过Docker搭建,我的环境是使用VirtualBox搭建的Centos7虚拟机,在虚拟机上安装Docker, 之后在本地主机IDE内调试代码。其中遇到了不少坑,主要是FLink与Kafka的通信,可以参考我的Docker-Compose文件的配置,已经解决了网络问题。注意KAFKA_ADVERTISED_LISTENERS的地址修改成自己的虚拟机IP地址。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
version: '2.1'
services:
zookeeper:
image: wurstmeister/zookeeper:3.4.6
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.12-2.2.1
ports:
- "9092:9092"
depends_on:
- zookeeper
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://192.168.56.103:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE
KAFKA_CREATE_TOPICS: "flink:1:1"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
mysql:
image: mysql
command: --default-authentication-plugin=mysql_native_password
restart: always
ports:
- "3306:3306"
environment:
MYSQL_ROOT_PASSWORD: 123456

环境搭建好之后就是正式的代码部分了:

核心是两段SQL代码,分别是用来连接Kafka和MYSQL的。

其中kafka使用json格式来解析。

样例数据({“t”:1570,”user_name”:”xiaoming”,”cnt”:100})

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
-- source
CREATE TABLE user_log (
user_id VARCHAR,
item_id VARCHAR,
category_id VARCHAR,
behavior VARCHAR,
ts TIMESTAMP
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_behavior',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.0.key' = 'group.id',
'connector.properties.0.value' = 'test-group',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'localhost:9092',
'connector.specific-offsets.0.partition' = '0',
'connector.specific-offsets.0.offset' = '0',
'update-mode' = 'append',
'format.type' = 'json',
'format.derive-schema' = 'true'
);
-- sink
CREATE TABLE pvuv_sink (
dt VARCHAR,
pv BIGINT,
uv BIGINT
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/flink-test',
'connector.table' = 'pvuv_sink',
'connector.username' = 'root',
'connector.password' = '123456',
'connector.write.flush.max-rows' = '1'
);

在 Java代码中,可以直接使用tEnv的sqlUpdate()方法来注册这两张表,之后就可以直接使用了。具体操作如下:

1
2
3
4
5
6
7
8
9
10
// 1. 连接kafka构建源表
tEnv.sqlUpdate(kafkaSourceSql);

// 2. 定义要输出的表
tEnv.sqlUpdate(mysqlSinkSql);

// 3. 自定义具体的 DML 操作,这里我直接将kafka写入到mysql
// 对于Insert Into 操作,同样还是要使用sqlUpdate()方法
tEnv.sqlUpdate("INSERT INTO sink " +
"SELECT * from log where cnt=100");

可以直接通过mysql的客户端看到我们的写入结果!

以上就是使用 Flink SQL 的 DDL 语言通过不同的外部数据源建立表的过程。

0%