flink之Datastram3

2024-06-16 23:07:10 浏览数 (2)

七、Sink输出算子

Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。

老版本:

Flink1.12以前(当前使用的是flink1.17),Sink算子的创建是通过调用DataStream的.addSink()方法实现的。

代码语言:javascript复制
stream.addSink(new SinkFunction(…));

addSink方法同样需要传入一个参数,实现的是SinkFunction接口。在这个接口中只需要重写一个方法invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用。

新版本:

代码语言:javascript复制
stream.sinkTo(…)

Sink多数情况下同样并不需要我们自己实现。之前我们一直在使用的print方法其实就是一种Sink,它表示将数据流写入标准控制台打印输出。Flink官方为我们提供了一部分的框架的Sink连接器。如下图所示,列出了Flink官方目前支持的第三方系统连接器:

可以看到,像Kafka之类流式系统,Flink提供了完美对接,source/sink两端都能连接,可读可写;而对于Elasticsearch、JDBC等数据存储系统,则只提供了输出写入的sink连接器。

除Flink官方之外,Apache Bahir框架(doris也有了适配Flink的API ),也实现了一些其他第三方系统与Flink的连接器。

1、输出到文件

Flink专门提供了一个流式文件系统的连接器:FileSink,为批处理和流处理提供了一个统一的Sink,它可以将分区文件写入Flink支持的文件系统。

FileSink支持行编码(Row-encoded)和批量编码(Bulk-encoded)格式。这两种不同的方式都有各自的构建器(builder),可以直接调用FileSink的静态方法:

代码语言:javascript复制
 行编码: FileSink.forRowFormat(basePath,rowEncoder)。
 批量编码: FileSink.forBulkFormat(basePath,bulkWriterFactory)。

案例:

代码语言:javascript复制
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 每个目录中,都有 并行度个数的 文件在写入
    env.setParallelism(2);

    // 必须开启checkpoint,否则一直都是 .inprogress
    env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);


    DataGeneratorSource<String> dataGeneratorSource = new DataGeneratorSource<>(
            new GeneratorFunction<Long, String>() {
                @Override
                public String map(Long value) throws Exception {
                    return "Number:"   value;
                }
            },
            Long.MAX_VALUE,
            RateLimiterStrategy.perSecond(1000),
            Types.STRING
    );

    DataStreamSource<String> dataGen = env.fromSource(dataGeneratorSource, WatermarkStrategy.noWatermarks(), "data-generator");

    // 输出到文件系统
    FileSink<String> fieSink = FileSink
            // 输出行式存储的文件,指定路径、指定编码
            .<String>forRowFormat(new Path("f:/tmp"), new SimpleStringEncoder<>("UTF-8"))
            // 输出文件的一些配置: 文件名的前缀、后缀
            .withOutputFileConfig(
                    OutputFileConfig.builder()
                            .withPartPrefix("atguigu-")
                            .withPartSuffix(".log")
                            .build()
            )
            // 按照目录分桶:如下,就是每个小时一个目录
            .withBucketAssigner(new DateTimeBucketAssigner<>("yyyy-MM-dd HH", ZoneId.systemDefault()))
            // 文件滚动策略:  1分钟 或 1m
            .withRollingPolicy(
                    DefaultRollingPolicy.builder()
                            .withRolloverInterval(Duration.ofMinutes(1))
                            .withMaxPartSize(new MemorySize(1024*1024))
                            .build()
            )
            .build();


    dataGen.sinkTo(fieSink);

    env.execute();
}

2、输出到kafka

步骤:

(1)添加Kafka 连接器依赖

(2)启动Kafka集群

(3)编写输出到Kafka的代码

代码语言:javascript复制
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    // 如果是精准一次,必须开启checkpoint(后续章节介绍)
    env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);


    SingleOutputStreamOperator<String> sensorDS = env
            .socketTextStream("hadoop102", 7777);

    /**
     * Kafka Sink:
     * TODO 注意:如果要使用 精准一次 写入Kafka,需要满足以下条件,缺一不可
     * 1、开启checkpoint(后续介绍)
     * 2、设置事务前缀
     * 3、设置事务超时时间:   checkpoint间隔 <  事务超时时间  < max的15分钟
     */
    KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
            // 指定 kafka 的地址和端口
            .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
            // 指定序列化器:指定Topic名称、具体的序列化
            .setRecordSerializer(
                    KafkaRecordSerializationSchema.<String>builder()
                            .setTopic("ws")
                            .setValueSerializationSchema(new SimpleStringSchema())
                            .build()
            )
            // 写到kafka的一致性级别: 精准一次、至少一次
            .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
            // 如果是精准一次,必须设置 事务的前缀
            .setTransactionalIdPrefix("atguigu-")
            // 如果是精准一次,必须设置 事务超时时间: 大于checkpoint间隔,小于 max 15分钟
            .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10*60*1000 "")
            .build();


    sensorDS.sinkTo(kafkaSink);


    env.execute();
}

自定义序列化器,实现带key的record:

代码语言:javascript复制
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE);
    env.setRestartStrategy(RestartStrategies.noRestart());


    SingleOutputStreamOperator<String> sensorDS = env
            .socketTextStream("hadoop102", 7777);


    /**
     * 如果要指定写入kafka的key,可以自定义序列化器:
     * 1、实现 一个接口,重写 序列化 方法
     * 2、指定key,转成 字节数组
     * 3、指定value,转成 字节数组
     * 4、返回一个 ProducerRecord对象,把key、value放进去
     */
    KafkaSink<String> kafkaSink = KafkaSink.<String>builder()
            .setBootstrapServers("hadoop102:9092,hadoop103:9092,hadoop104:9092")
            .setRecordSerializer(
                    new KafkaRecordSerializationSchema<String>() {

                        @Nullable
                        @Override
                        public ProducerRecord<byte[], byte[]> serialize(String element, KafkaSinkContext context, Long timestamp) {
                            String[] datas = element.split(",");
                            byte[] key = datas[0].getBytes(StandardCharsets.UTF_8);
                            byte[] value = element.getBytes(StandardCharsets.UTF_8);
                            return new ProducerRecord<>("ws", key, value);
                        }
                    }
            )
            .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
            .setTransactionalIdPrefix("atguigu-")
            .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 10 * 60 * 1000   "")
            .build();


    sensorDS.sinkTo(kafkaSink);


    env.execute();
}

在这里 要提一嘴 , 当从kafka获取数据的时候,即kafka成为source算子的时候,需要注意空值的传递,此时需要筛选控制

代码语言:javascript复制
KafkaSource.<String>builder()
        .setBootstrapServers(Constant.KAFKA_BROKERS)
        .setGroupId(groupId)
        .setTopics(topic)
        .setStartingOffsets(OffsetsInitializer.earliest())
        .setValueOnlyDeserializer(new DeserializationSchema<String>() {
            @Override
            public String deserialize(byte[] message) throws IOException {
                if (message != null) {
                    return new String(message, StandardCharsets.UTF_8);
                }
                return null;
            }

            @Override
            public boolean isEndOfStream(String nextElement) {
                return false;
            }

            @Override
            public TypeInformation<String> getProducedType() {
                return Types.STRING;
            }
        })
        .build();

定义了一个自定义的 DeserializationSchema<String> 实例。在这个实例中:

deserialize(byte[] message) throws IOException 方法用于将字节数组形式的消息反序列化为字符串。它根据字节数组是否为空进行相应处理,将其转换为字符串,使用了指定的 StandardCharsets.UTF_8 字符集。

isEndOfStream(String nextElement) 方法用于判断是否到达流的结尾,这里返回 false 表示未到达。

getProducedType() 方法返回值的类型信息,这里明确为字符串类型。

通过这样的设置,确保了从 Kafka 中读取到的数据能够按照指定的方式正确地进行值的反序列化,以便后续程序进行处理和使用。例如,在后续的流程中,可以方便地将反序列化得到的字符串进行各种操作和分析。

3、输出到MySQL(JDBC)

写入数据的MySQL的测试步骤如下。

(1)添加依赖

(2)启动MySQL,在目标数据库下建对应的表 , 此博客 在test库下建表ws

代码语言:javascript复制
//ws对应的表结构
CREATE TABLE `ws` (
        `id` varchar(100) NOT NULL,
        `ts` bigint(20) DEFAULT NULL,
        `vc` int(11) DEFAULT NULL,
        PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

(3)编写输出到MySQL的代码

代码语言:javascript复制
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);


    SingleOutputStreamOperator<WaterSensor> sensorDS = env
            .socketTextStream("hadoop102", 7777)
            .map(new WaterSensorMapFunction());


    /**
     * TODO 写入mysql
     * 1、只能用老的sink写法: addsink
     * 2、JDBCSink的4个参数:
     *    第一个参数: 执行的sql,一般就是 insert into
     *    第二个参数: 预编译sql, 对占位符填充值
     *    第三个参数: 执行选项 ---》 攒批、重试
     *    第四个参数: 连接选项 ---》 url、用户名、密码
     */
    SinkFunction<WaterSensor> jdbcSink = JdbcSink.sink(
            "insert into ws values(?,?,?)",
            new JdbcStatementBuilder<WaterSensor>() {
                @Override
                public void accept(PreparedStatement preparedStatement, WaterSensor waterSensor) throws SQLException {
                    //每收到一条WaterSensor,如何去填充占位符
                    preparedStatement.setString(1, waterSensor.getId());
                    preparedStatement.setLong(2, waterSensor.getTs());
                    preparedStatement.setInt(3, waterSensor.getVc());
                }
            },
            JdbcExecutionOptions.builder()
                    .withMaxRetries(3) // 重试次数
                    .withBatchSize(100) // 批次的大小:条数
                    .withBatchIntervalMs(3000) // 批次的时间
                    .build(),
            new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                    .withUrl("jdbc:mysql://hadoop102:3306/test?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8")
                    .withUsername("root")
                    .withPassword("000000")
                    .withConnectionCheckTimeoutSeconds(60) // 重试的超时时间
                    .build()
    );


    sensorDS.addSink(jdbcSink);


    env.execute();
}

(4)运行代码,用客户端连接MySQL,查看是否成功写入数据。

0 人点赞