大数据技术栈之-实时数仓构建

2023-03-02 19:08:44 浏览数 (2)

前言

之前说了数据采集方案,数据库的数据,前端埋点数据,IOT数据经过一些中间件或者应用程序采集到Kafka后,分为了两条路线,一条是走离线,一条走实时,离线的会存储到HDFS,然后时候Hive构建离线数据仓库,实时的则进入flink做流式计算后再根据需求建模,然后写入到对应的数据库中提供使用,今天我们来说一下实时这条线路。

flink流式处理

flink是一个流批一体处理框架,不过我们一般都是用它来做流式处理,flink提供了丰富的connector,我们可以轻松地对接不同的数据源,如flink-doris-connector,flink-connector-kafka,flink-connector-jdbc,flink-connector-redis等,下面我们主要演示flink从kafka中获取数据,然后经过流式处理后,写入到doris中 ,当然,写入redis,mysql,es这些也是比较简单。

main方法

main方法就是flink的处理流程,主要分为几步,配置运行环境的一些选项,读取kfaka数据源,构建doris sink,进行计算,sink数据到doris。

代码语言:javascript复制
/**
 * 功能说明:kafka -> flink -> doris
 * <p>
 * Original @Author: steakliu-刘牌, 2022-11-14  17:17
 * <p>
 * Copyright (C)2020-2022  steakliu All rights reserved.
 */
public class Kafka2Flink2DorisApp {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = MyExecutionEnvironment.getWebUIExecutionEnvironment();
    //1.设置环境
    ExecutionEnvironmentUtil.setExecutionEnv(env);
    //2.获取kafka数据源
    KafkaSource<String> kafkaSource = KafkaDataSource.getKafkaSource();
    //3.获取Doris builder
    DorisSink.Builder<String> builder = DorisBuilder.build();
    //4. 计算
    SingleOutputStreamOperator<String> operator = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka-source")
      .flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String s, Collector<String> collector) throws Exception {
            String json = s.replace("成都","chengdu");
          collector.collect(json);
        }
      });
    //5. sink到doris
    operator.sinkTo(builder.build()).setParallelism(8);
    env.execute("kafka-flink-doris");
  }
}

环境配置

要使用flink流式计算,我们首先要获取StreamExecutionEnvironment,并按需进行配置,如下配置了checkpoint保存点的时间间隔,设置了并发度等,还有许多配置项,我们可以按需配置。

代码语言:javascript复制
/**
 * 功能说明: 环境配置
 * <p>
 * Original @Author: steakliu-刘牌, 2022-11-14  17:24
 * <p>
 * Copyright (C)2020-2022  steakliu All rights reserved.
 */
public class ExecutionEnvironmentUtil {
  public static void setExecutionEnv(StreamExecutionEnvironment env){
    //设置保存时间,3s保存一下,
    env.enableCheckpointing(10000);
    env.setParallelism(4);
    CheckpointConfig checkpointConfig = env.getCheckpointConfig();
    //checkpoit失败重试次数
    checkpointConfig.setTolerableCheckpointFailureNumber(3);
    //job取消时是否清除checkpoit数据,设置为不清除
    checkpointConfig.setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    //chenckpoit最大并发数
    checkpointConfig.setMaxConcurrentCheckpoints(3);
    //保证只有一次,是基于2PC提交
    checkpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5, Time.milliseconds(30000)));
    env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
  }
}

获取kafka数据源

通过kafka connector获取kafka数据源,从kafka的dorisUser主题获取数据,消费者组为userInfo,读取数据偏移量的策略是earliest,表示从最新的偏移量位置获取数据。

代码语言:javascript复制
/**
 * 功能说明: 获取kafka数据源
 * <p>
 * Original @Author: steakliu-刘牌, 2022-11-14  17:20
 * <p>
 * Copyright (C)2020-2022  steakliu All rights reserved.
 */
@Slf4j
public class KafkaDataSource {
  public static KafkaSource<String> getKafkaSource() {
    return KafkaSource.<String>builder()
      .setBootstrapServers("localhost:9092")
      .setTopics("dorisUser")
      .setGroupId("userInfo")
      .setStartingOffsets(OffsetsInitializer.earliest())
      .setDeserializer(new KafkaRecordDeserializationSchema<String>() {
        @Override
        public void deserialize(ConsumerRecord<byte[], byte[]> consumerRecord, Collector<String> collector) throws IOException {
          String s = new String(consumerRecord.value());
          System.out.println("从kafka获取的数据 =======> " s);
          collector.collect(s);
        }
        @Override
        public TypeInformation<String> getProducedType() {
          return TypeInformation.of(String.class);
        }
      })
      .build();
  }
}

Doris建表

使用的AGGREGATE模型,Doris有Aggregate,Unique,Duplicate三种数据模型,根据需求选择合适自己业务的模型。

代码语言:javascript复制
CREATE TABLE `user`
(
  `user_id`         largeint(40) NOT NULL COMMENT "用户id",
  `date`            date NOT NULL COMMENT "数据灌入日期时间",
  `city`            varchar(20) NULL COMMENT "用户所在城市",
  `age`             smallint(6) NULL COMMENT "用户年龄",
  `sex`             tinyint(4) NULL COMMENT "用户性别",
  `last_visit_date` datetime REPLACE NULL DEFAULT "1970-01-01 00:00:00" COMMENT "用户最后一次访问时间",
  `cost`            bigint(20) SUM NULL DEFAULT "0" COMMENT "用户总消费",
  `max_dwell_time`  int(11) MAX NULL DEFAULT "0" COMMENT "用户最大停留时间",
  `min_dwell_time`  int(11) MIN NULL DEFAULT "99999" COMMENT "用户最小停留时间"
) ENGINE=OLAP
AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`user_id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2"
);  

构建doris sink

格式使用的是json,因为从kafka传过来的是json字符串,这里在构建doris sink的时候设置格式为json,doris sink会帮忙解析。

代码语言:javascript复制
/**
 * 功能说明:doris sink
 * <p>
 * Original @Author: steakliu-刘牌, 2022-11-14  17:27
 * <p>
 * Copyright (C)2020-2022  steakliu All rights reserved.
 */
public class DorisBuilder {
  public static DorisSink.Builder<String> build() {
    DorisSink.Builder<String> builder = DorisSink.builder();
    final DorisReadOptions.Builder readOptionBuilder = DorisReadOptions.builder();
    Properties pro = new Properties();
    pro.setProperty("format", "json");
    pro.setProperty("read_json_by_line", "true");
    pro.setProperty("line_delimiter", "n");

    DorisOptions.Builder dorisBuilder = DorisOptions.builder();
    dorisBuilder.setFenodes("localhost:8030")
      .setTableIdentifier("demo.user")
      .setUsername("root")
      .setPassword("");

    DorisExecutionOptions.Builder executionBuilder = DorisExecutionOptions.builder();
    executionBuilder
      .setStreamLoadProp(pro)
      .setLabelPrefix("a1");

    builder.setDorisReadOptions(readOptionBuilder.build())
      .setDorisExecutionOptions(executionBuilder.build())
      .setSerializer(new SimpleStringSerializer())
      .setDorisOptions(dorisBuilder.build());
    return builder;
  }
}

上面一个基本的流程就走完了,主要就是获取数据源,然后进行计算,最后写入到目标库,上面flink做计算案例中只是简单的使用了FloatMap算子,做了一个字符替换,flink提供了丰富的算子供我们使用,可以根据实际需求进行选择。

❝通过上面简单的案例,我们能对实时数仓的构建路线有一个简单的认知,当然,实际使用还需要考虑很多问题,踩很多坑,不过最重要的是心中要有一个蓝图,这样才有去选择和考虑其他方式或框架的基础,不然就是一把抓黑。

❝今天的分享就到这里,感谢你的观看,我们下期见。

0 人点赞