TOC
一、基础概念
1、protobuf
简介
Protobuf是谷歌开源的一种平台无关、语言无关、可扩展且轻便高效的序列化数据结构的协议,可以用于网络通信和数据存储。
优缺点
安装protobuf
http://google.github.io/proto-lens/installing-protoc.html
考虑到和flink的兼容性,建议使用3.8版本。
idea也包含一个protobuf的插件,方便我们开发使用。
2、kafka-connector
https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html
参考相关文档
二、实际案例
1、背景介绍
在我们skywalking项目中,除了探针将Trace数据写入OAPServer中外,我们还需要通过Flink的kafka-connector消费其protobuf序列化后的数据,进行一些自定义的实时计算。
2、protoc生成java代码
代码语言:txt复制protoc --proto_path=${SKYWALKING_HOME}/apm-protocol/apm-network/src/main/proto/ --java_out=${RESULT_HOME}/src/main/java $(find ${SKYWALKING_HOME}/apm-protocol/apm-network/src/main/proto/ -iname "*.proto")
通过这个脚本可以批量转换所有的proto为java代码。
3、构建Deserializer
类
以Skywalking的JVMMetricCollection
为例:
public class KafkaJvmMetricDeserializer implements KafkaDeserializationSchema<JVMMetricCollection> {
@Override
public boolean isEndOfStream(JVMMetricCollection jvmMetricCollection) {
return false;
}
@Override
public JVMMetricCollection deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
return JVMMetricCollection.parseFrom(consumerRecord.value());
}
@Override
public TypeInformation<JVMMetricCollection> getProducedType() {
return TypeInformation.of(JVMMetricCollection.class);
}
}
注意: TypeInformation方法必须要写,否则env调用时会显示异常,需要人工指定returnType。
4、注册registerTypeWithKryoSerializer
代码语言:txt复制 env.getConfig().registerTypeWithKryoSerializer(JVMMetricCollection.class, ProtobufSerializer.class);
注册完才能在Flink的DataFlow里面识别。
5、FlinkKafkaConsumer
启动消费
代码语言:txt复制 FlinkKafkaConsumer<JVMMetricCollection> kafkaConsumer = new FlinkKafkaConsumer<>(topic,
new KafkaJvmMetricDeserializer(), properties);
DataStream<JVMMetricCollection> stream = env.addSource(kafkaConsumer);
三、问题排查
1、protobuf版本问题
回退到protobuf 3.8版本就ok
四、附录
1、maven配置
代码语言:txt复制<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill-protobuf</artifactId>
<version>0.7.6</version>
<!-- exclusions for dependency conversion -->
<exclusions>
<exclusion>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
</exclusion>
</exclusions>
</dependency>
更多内容可以关注我的公众号~