背景介绍
近期我们遇到了一位客户提出的问题:MySQL 建表时,数据库表定义的字符集是 latin1,里面的数据是以 GBK 编码的方式写入的。当 Flink 的 JDBC Connector 在读取此维表时,输出数据的中文出现了乱码现象,如下图:
原因分析
对于 Oceanus 平台而言,内部的数据处理都是以 Unicode 为标准的。对于非 Unicode 的字符集,在 JDBC Connector 读取时,可能会出现各种异常情况,即使 JDBC 连接 URL 参数中指定了characterEncoding
也无法避免中文乱码问题。
对于 MySQL 数据而言,最怕的不是数据乱码,而是变成问号 (????)。通常来讲,如果遇到了全是问号的情况,则数据基本无法还原了;而对于乱码来说,很可能源数据还在,只是编码选错了,通过恰当的解码方式,还是有希望恢复原有的数据。
因此我们需要编写一个 UDF(用户自定义函数),将 JDBC Connector 读到的 Latin1(这里实际上是 GBK)数据进行解码。
首先我们来看一下数据库中的原始数据(首先需要将终端的编码改为 GBK,否则显示的仍然是乱码):
以 id 为 1 的数据为例,这里喵的 GBK 编码是0xDF 0xF7
。
那问题来了,既然 Flink 并没有报类型错误,说明输入输出还是当作字符串看待的,只是字符串没有经过妥善解码。那 Flink 将其读取成了什么呢?我们来写一个 UDF 自定义函数看看。
UDF 编写
对于这种编解码的场景,适合使用 Flink 的标量函数(Scalar Function),即单条记录进,单条记录出,无需聚合等复杂操作。
在当前的流计算 Oceanus 版本中,已经支持通过CREATE TEMPORARY SYSTEM FUNCTION
的方式来 声明 UDF。声明 UDF 后,在 程序包管理 界面,可以上传具体的实现类 JAR 包。
我们先编写一个打印出 String 里每个 Char 内容的函数,类名为DecodeLatin1
.
初步代码
请先在 pom.xml 中引入 Flink 相关依赖,随后可以开始编写 UDF:
代码语言:txt复制package com.tencent.cloud.oceanus.udf;
import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.stream.IntStream;
public class DecodeLatin1 extends ScalarFunction {
private static final Logger LOGGER = LoggerFactory.getLogger(DecodeLatin1.class);
public String eval(String input) {
char[] inputCharArray = input.toCharArray(); // 不能用 getBytes() 方法, 否则原始内容会被再次编码
IntStream.range(0, inputCharArray.length).forEach(i -> {
LOGGER.info("{}", Integer.toHexString(inputCharArray[i]));
});
return input;
}
}
编写完成并打包后,可以将程序包上传(对于自建的 Flink 集群,则是放入 Flink 的 lib 目录):
随后可以在 SQL 代码中,引用这个程序包:
作业提交运行后,我们可以尝试读取 id=1 的数据,发现打印出来的日志里,字符串中实际上保留了原始字符的 GBK 编码,只是没有经过妥善解码,导致输出时误当作 Unicode 处理了。
另外还注意到,对于原始 Latin1 而言,每个字符占 1 个字节,而这里 Java String 中使用的是 Char 结构,每个字符占了 2 个字节,且高位字节恒为 0. 此猜想在 这篇 MySQL 官方文档 中得到了验证。
那么给我们的启示是:可以直接将 char[] 数组转为等长的 byte[] 数组,而不能按照传统思路,创建一个长度为 char[] 数组两倍的 byte[] 数组。
改版后的代码
按照上面的思路,我们重新实现了一版,该版本可以实现解码并重新生成正确 String。
代码语言:txt复制package com.tencent.cloud.oceanus.udf;
import org.apache.flink.table.functions.ScalarFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.util.stream.IntStream;
/**
* 如果 JDBC 数据库的 VARCHAR 为 Latin1 (或 GBK 等) 编码
* 可以使用这个函数转换为标准字符串
*
* SQL 代码声明方式:
* CREATE TEMPORARY SYSTEM FUNCTION DECODE_LATIN1 AS 'com.tencent.cloud.oceanus.udf.DecodeLatin1' LANGUAGE JAVA;
*/
public class DecodeLatin1 extends ScalarFunction {
private static final Logger LOGGER = LoggerFactory.getLogger(DecodeLatin1.class);
public String eval(String input) {
return eval(input, "latin1");
}
public String eval(String input, String fromCharset) {
char[] inputCharArray = input.toCharArray();
// JDBC Driver 读取的 Latin1 字符, 高 8 位都是 0x00, 因此只考虑低 8 位即可, byte 和 char 数据部分等长, 长度无需乘以二
byte[] inputBytes = new byte[inputCharArray.length];
IntStream.range(0, inputCharArray.length).forEach(i -> {
inputBytes[i] = (byte) inputCharArray[i];
LOGGER.debug("{}", String.format("0xX ", inputBytes[i]));
});
try {
return new String(inputBytes, fromCharset);
} catch (UnsupportedEncodingException e) {
// 与 GET_JSON_OBJECT 的异常处理方式保持一致, 遇到异常数据时输出 null, 避免日志过量打印
LOGGER.debug("Unsupported charset {} for input string {}", fromCharset, input, e);
return null;
}
}
}
上传新版的 UDF,然后再次运行(注意本次增加了一个新字段FromCharset
,表示解码使用的实际字符集):
然后我们再读取数据库中 id 为 1 的数据,现在输出就正常了:
总结
在遇到数据乱码等原生 Flink 无法轻易解决的问题时,可以尝试自定义函数来定位和排查,一旦确认问题根源,可以同样使用自定义函数来对数据进行校正。大大扩展了 Flink SQL 的功能。
另外,程序包可以分版本在不同的作业之间复用,基础包(UDF)和业务包(调用 UDF 的主程序)可以实现解耦。如果有更优化的实现,可以只更新基础包,避免对业务包的改动引入的风险。