flink使用16-正确打包Flink程序并使用Cli提交任务

本文的计划是使用正确的maven插件打包当前教程代码库batch模块下的WordCount代码,并通过命令行的方式提交到Flink来启动任务。WordCount类即为Flink主方法类,该部分代码是Flink官方example的简单修改,只是对map方法填加了一点sleep来方便观察运行情况。

项目的运行环境使用Docker来部署Flink, Flink镜像可以从Docker hub上拉去,其Docker-Compose文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
version: "2.1"
services:
jobmanager:
image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager

taskmanager:
image: ${FLINK_DOCKER_IMAGE_NAME:-flink}
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- "jobmanager:jobmanager"
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager

正确启动Flink之后,就可以在WebUI上看到我们的环境了。

下面就开始打包我们的应用程序了。

官方推荐我们使用maven-shade-plugin插件,复制一下代码到POM中指定我们的主方法类即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>my.programs</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

需要注意的是一般来说我们是不会将flink的一些相关的包直接打到项目里,通常有两种方案:

  • 将相关的jar包统一都放到flink/lib目录下

  • 构建一个单独的common模块,所以使用到的包都放在这个模块中打包并上传到集群,之后其他模块只需要引用该common模块即可

    具体的操作可以见这篇文章

打包好后就可以直接是用 FLink Cli 提交到集群来开始job了 。

Flink Cli 一般来讲主要作用有:提交并执行任务、取消任务、获取任务状态信息、列出正在运行和等待的任务、触发savepoint等。

我们将已经打包好的jar包放到docker中

1
docker cp /opt/flink/wordcount.jar flink_jobmanager_1:/opt/

然后就可以通过命令行启动任务了,启动完成后我们可以在webUI上看到任务的执行情况。

1
docker exec -ti flink_jobmanager_1 bash -c 'flink run /opt/wordcount.jar'

Flink Cli 的命令有很多,具体的内容可以参考官网示例:

Flink Cli Examples

0%