【Flink笔记】kafka-connector消费protobuf格式数据

2022-02-28 20:20:48 浏览数 (2)

TOC

一、基础概念

1、protobuf

简介

Protobuf是谷歌开源的一种平台无关、语言无关、可扩展且轻便高效的序列化数据结构的协议,可以用于网络通信和数据存储。

优缺点

image.pngimage.png

安装protobuf

http://google.github.io/proto-lens/installing-protoc.html

考虑到和flink的兼容性,建议使用3.8版本。

image.pngimage.png

idea也包含一个protobuf的插件,方便我们开发使用。

2、kafka-connector

https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/kafka.html

参考相关文档

二、实际案例

1、背景介绍

image.pngimage.png

在我们skywalking项目中,除了探针将Trace数据写入OAPServer中外,我们还需要通过Flink的kafka-connector消费其protobuf序列化后的数据,进行一些自定义的实时计算。

2、protoc生成java代码

代码语言:txt复制
protoc --proto_path=${SKYWALKING_HOME}/apm-protocol/apm-network/src/main/proto/  --java_out=${RESULT_HOME}/src/main/java $(find ${SKYWALKING_HOME}/apm-protocol/apm-network/src/main/proto/  -iname "*.proto")

通过这个脚本可以批量转换所有的proto为java代码。

3、构建Deserializer

以Skywalking的JVMMetricCollection为例:

代码语言:txt复制
public class KafkaJvmMetricDeserializer implements KafkaDeserializationSchema<JVMMetricCollection> {

    @Override
    public boolean isEndOfStream(JVMMetricCollection jvmMetricCollection) {
        return false;
    }

    @Override
    public JVMMetricCollection deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {

        return JVMMetricCollection.parseFrom(consumerRecord.value());
    }

    @Override
    public TypeInformation<JVMMetricCollection> getProducedType() {
        return TypeInformation.of(JVMMetricCollection.class);
    }

}

注意: TypeInformation方法必须要写,否则env调用时会显示异常,需要人工指定returnType。

4、注册registerTypeWithKryoSerializer

代码语言:txt复制
  env.getConfig().registerTypeWithKryoSerializer(JVMMetricCollection.class, ProtobufSerializer.class);

注册完才能在Flink的DataFlow里面识别。

5、FlinkKafkaConsumer启动消费

代码语言:txt复制
        FlinkKafkaConsumer<JVMMetricCollection> kafkaConsumer = new FlinkKafkaConsumer<>(topic,
                new KafkaJvmMetricDeserializer(), properties);
        DataStream<JVMMetricCollection> stream = env.addSource(kafkaConsumer);

三、问题排查

1、protobuf版本问题

image.pngimage.png

回退到protobuf 3.8版本就ok

四、附录

1、maven配置

代码语言:txt复制
<dependency>
	<groupId>io.grpc</groupId>
	<artifactId>grpc-protobuf</artifactId>
	<version>${grpc.version}</version>
</dependency>

<dependency>
	<groupId>com.google.protobuf</groupId>
	<artifactId>protobuf-java</artifactId>
	<version>${protobuf.version}</version>
	<scope>compile</scope>
</dependency>

<dependency>
	<groupId>com.twitter</groupId>
	<artifactId>chill-protobuf</artifactId>
	<version>0.7.6</version>
	<!-- exclusions for dependency conversion -->
	<exclusions>
		<exclusion>
			<groupId>com.esotericsoftware.kryo</groupId>
			<artifactId>kryo</artifactId>
		</exclusion>
	</exclusions>
</dependency>

更多内容可以关注我的公众号~

0 人点赞