flink使用17-如何自定义各种UDF并在SQL中使用

今天主要讲一下Flink SQL 中怎样使用 UDF ,目前1.9版本可用的UDF包括Scalar Function 、 Table Function 、 Aggregateion Function 、 Table Aggregation Function。完整代码见仓库 -> Github

如果注册一个 UDF?

注册的方法很简单,使用 TableEnvironment的registerFunction()方法就可以了。

Scalar Function

scalar Function 是一个一对一的转换。

这里我们以一个求字符串长度的方式来演示,首先需要继承ScalarFunction,需要重新实现eval()方法。

1
2
3
4
5
6
7
8
9
10
public class StringLength extends ScalarFunction {

public int eval(String s){

if (s == null) {
return 0;
}
return s.length();
}
}

之后只需要注册该方法就可以使用了

1
2
3
tEnv.registerFunction("stringLength",new StringLength());

tEnv.sqlQuery("select word,stringLength(word) from t");

Table Function

Table Function是将一行数据转为多行数据的一个方法。

在这里用一个将一个字符串切割成多行的例子来演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class StringSplit extends TableFunction<Row> {

public void eval(String str) {
for (String s : str.split("#")) {
Row row = new Row(2);
row.setField(0, s);
row.setField(1, s.length());
// 使用collect()来收集需要返回的数据
collect(row);
}
}

@Override
public TypeInformation<Row> getResultType() {
return new RowTypeInfo(Types.STRING,Types.INT);
}

}

需要注意在实现eval()方法的同时,也需要实现getResultType()方法,用来告诉flink返回值的类型,方便其转换。

1
2
tEnv.registerFunction("split", new StringSplit());
tEnv.sqlQuery("SELECT a,word, length FROM t, LATERAL TABLE(split(a)) as T(word, length)");

LATERAL TABLE(split(a)) 的作用相当于 join。

Aggregateion Function

Aggregateion Function函数顾名思义是用来做聚合操作的。

这里我用一个求均值的函数来演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Mean extends AggregateFunction<Integer, MeanValue> {

@Override
public Integer getValue(MeanValue accumulator) {
return accumulator.sum/accumulator.count;
}

@Override
public MeanValue createAccumulator() {
return new MeanValue();
}

public void accumulate(MeanValue acc, int iValue) {
acc.sum += iValue;
acc.count ++;
}
public void merge(MeanValue acc, Iterable<MeanValue> it) {
Iterator<MeanValue> iter = it.iterator();
while (iter.hasNext()) {
MeanValue a = iter.next();
acc.count += a.count;
acc.sum += a.sum;
}
}
public void resetAccumulator(MeanValue acc) {
acc.count = 0;
acc.sum = 0;
}

}

Aggregateion Function 与流处理的Aggregateion Function也是很类似的,都需要自己创建累加器,之后就是控制对累加器的操作就可以了。

1
2
3

tEnv.registerFunction("get_mean", new Mean());
tEnv.sqlQuery("SELECT name,get_mean(v) as mean_value FROM t3 GROUP BY name");

参考文章1

参考文章2

0%