flink使用13-开始体验 Flink SQL

SQL API 是 Flink 中最顶级的 API , 它构建了 Table API 之上, 也可以方便的和 Table 做转换, 构建 SQL 所使用的Environment 也是 Table Environment . Flink SQL 底层使用 Apache Calcite 框架, 将标准的 Flink SQL 语句解析并转换成底层的算子处理逻辑. 下面就直接用 Flink 官方仓库中的 案例 Code Link来做一个演示.

  1. 获取执行环境
1
2
3
4
5
// 首先同样有流处理和批处理的区别, 
// 获取对应的environment之后直接转换为Table environment ,
// 就可以使用SQL API,
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
  1. 拿到要操作的表
1
2
3
4
5
6
// 将 Stream 转换为 Table, 可以采用不同的办法

// 将 DataStream 转换为 Table
Table tableA = tEnv.fromDataStream(orderA, "user, product, amount");
// 将 DataStream 注册成 Table
tEnv.registerDataStream("OrderB", orderB, "user, product, amount");
  1. 执行SQL语句
1
2
3
4
// TableEnvironment 有 SqlQuery 和 SqlUpdate 两种操作符可以使用

// union 两个 table
Table result = tEnv.sqlQuery("SELECT * FROM " + tableA + " WHERE amount > 2 UNION ALL " +"SELECT * FROM OrderB WHERE amount > 2");

SQL可以执行许多复杂的操作,本文先简单的了解下 SQL 的API

0%