前言
之前说了数据采集方案,数据库的数据,前端埋点数据,IOT数据经过一些中间件或者应用程序采集到Kafka后,分为了两条路线,一条是走离线,一条走实时,离线的会存储到HDFS,然后时候Hive构建离线数据仓库,实时的则进入flink做流式计算后再根据需求建模,然后写入到对应的数据库中提供使用,今天我们来说一下实时这条线路。
flink流式处理
flink是一个流批一体处理框架,不过我们一般都是用它来做流式处理,flink提供了丰富的connector,我们可以轻松地对接不同的数据源,如flink-doris-connector,flink-connector-kafka,flink-connector-jdbc,flink-connector-redis等,下面我们主要演示flink从kafka中获取数据,然后经过流式处理后,写入到doris中 ,当然,写入redis,mysql,es这些也是比较简单。
main方法
main方法就是flink的处理流程,主要分为几步,配置运行环境的一些选项,读取kfaka数据源,构建doris sink,进行计算,sink数据到doris。
代码语言:javascript复制/**
* 功能说明:kafka -> flink -> doris
* <p>
* Original @Author: steakliu-刘牌, 2022-11-14 17:17
* <p>
* Copyright (C)2020-2022 steakliu All rights reserved.
*/
public class Kafka2Flink2DorisApp {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = MyExecutionEnvironment.getWebUIExecutionEnvironment();
//1.设置环境
ExecutionEnvironmentUtil.setExecutionEnv(env);
//2.获取kafka数据源
KafkaSource<String> kafkaSource = KafkaDataSource.getKafkaSource();
//3.获取Doris builder
DorisSink.Builder<String> builder = DorisBuilder.build();
//4. 计算
SingleOutputStreamOperator<String> operator = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source")
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
String json = s.replace("成都","chengdu");
collector.collect(json);
}
});
//5. sink到doris
operator.sinkTo(builder.build()).setParallelism(8);
env.execute("kafka-flink-doris");
}
}
环境配置
要使用flink流式计算,我们首先要获取StreamExecutionEnvironment,并按需进行配置,如下配置了checkpoint保存点的时间间隔,设置了并发度等,还有许多配置项,我们可以按需配置。
代码语言:javascript复制/**
* 功能说明: 环境配置
* <p>
* Original @Author: steakliu-刘牌, 2022-11-14 17:24
* <p>
* Copyright (C)2020-2022 steakliu All rights reserved.
*/
public class ExecutionEnvironmentUtil {
public static void setExecutionEnv(StreamExecutionEnvironment env){
//设置保存时间,3s保存一下,
env.enableCheckpointing(10000);
env.setParallelism(4);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
//checkpoit失败重试次数
checkpointConfig.setTolerableCheckpointFailureNumber(3);
//job取消时是否清除checkpoit数据,设置为不清除
checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
//chenckpoit最大并发数
checkpointConfig.setMaxConcurrentCheckpoints(3);
//保证只有一次,是基于2PC提交
checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000)));
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
}
}
获取kafka数据源
通过kafka connector获取kafka数据源,从kafka的dorisUser主题获取数据,消费者组为userInfo,读取数据偏移量的策略是earliest,表示从最新的偏移量位置获取数据。
代码语言:javascript复制/**
* 功能说明: 获取kafka数据源
* <p>
* Original @Author: steakliu-刘牌, 2022-11-14 17:20
* <p>
* Copyright (C)2020-2022 steakliu All rights reserved.
*/
@Slf4j
public class KafkaDataSource {
public static KafkaSource<String> getKafkaSource() {
return KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("dorisUser")
.setGroupId("userInfo")
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(new KafkaRecordDeserializationSchema<String>() {
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> consumerRecord, Collector<String> collector) throws IOException {
String s = new String(consumerRecord.value());
System.out.println("从kafka获取的数据 =======> " s);
collector.collect(s);
}
@Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
})
.build();
}
}
Doris建表
使用的AGGREGATE模型,Doris有Aggregate,Unique,Duplicate三种数据模型,根据需求选择合适自己业务的模型。
代码语言:javascript复制CREATE TABLE `user`
(
`user_id` largeint(40) NOT NULL COMMENT "用户id",
`date` date NOT NULL COMMENT "数据灌入日期时间",
`city` varchar(20) NULL COMMENT "用户所在城市",
`age` smallint(6) NULL COMMENT "用户年龄",
`sex` tinyint(4) NULL COMMENT "用户性别",
`last_visit_date` datetime REPLACE NULL DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
`cost` bigint(20) SUM NULL DEFAULT "0" COMMENT "用户总消费",
`max_dwell_time` int(11) MAX NULL DEFAULT "0" COMMENT "用户最大停留时间",
`min_dwell_time` int(11) MIN NULL DEFAULT "99999" COMMENT "用户最小停留时间"
) ENGINE=OLAP
AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2"
);
构建doris sink
格式使用的是json,因为从kafka传过来的是json字符串,这里在构建doris sink的时候设置格式为json,doris sink会帮忙解析。
代码语言:javascript复制/**
* 功能说明:doris sink
* <p>
* Original @Author: steakliu-刘牌, 2022-11-14 17:27
* <p>
* Copyright (C)2020-2022 steakliu All rights reserved.
*/
public class DorisBuilder {
public static DorisSink.Builder<String> build() {
DorisSink.Builder<String> builder = DorisSink.builder();
final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
Properties pro = new Properties();
pro.setProperty("format", "json");
pro.setProperty("read_json_by_line", "true");
pro.setProperty("line_delimiter", "n");
DorisOptions.Builder dorisBuilder = DorisOptions.builder();
dorisBuilder.setFenodes("localhost:8030")
.setTableIdentifier("demo.user")
.setUsername("root")
.setPassword("");
DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
executionBuilder
.setStreamLoadProp(pro)
.setLabelPrefix("a1");
builder.setDorisReadOptions(readOptionBuilder.build())
.setDorisExecutionOptions(executionBuilder.build())
.setSerializer(new SimpleStringSerializer())
.setDorisOptions(dorisBuilder.build());
return builder;
}
}
上面一个基本的流程就走完了,主要就是获取数据源,然后进行计算,最后写入到目标库,上面flink做计算案例中只是简单的使用了FloatMap算子,做了一个字符替换,flink提供了丰富的算子供我们使用,可以根据实际需求进行选择。
❝通过上面简单的案例,我们能对实时数仓的构建路线有一个简单的认知,当然,实际使用还需要考虑很多问题,踩很多坑,不过最重要的是心中要有一个蓝图,这样才有去选择和考虑其他方式或框架的基础,不然就是一把抓黑。
❝今天的分享就到这里,感谢你的观看,我们下期见。