Apache Griffin+Flink+Kafka实现流式数据质量监控实战

2022-04-13 09:41:27 浏览数 (1)

一. 组件及版本

本文用的组件包括以下几个,是参考了官方案例,版本可以参考github以及里面的pom文件。本文假定以下环境均已安装好。

  • JDK (1.8)
  • MySQL(version 5.6)
  • Hadoop (2.7.2)
  • Hive (version 2.4)
  • Spark (version 2.4.1)
  • Kafka (version 0.11)
  • Griffin (version 0.6.0)
  • Zookeeper (version 3.4.1)

这里有详细的配置过程和可能遇到的bug。

二. kafka数据生成脚本

由于是测试案例,我们就写一个生成数据的脚本,并且把数据写到kafka source中,真实的场景应该是源源不断写数据到kafka中的(比如flume或者其他工具),具体数据脚本和模版可以参考官方demo数据

gen-data.sh

代码语言:javascript复制
#!/bin/bash

#current time
cur_time=`date  %Y-%m-%d_%H:%M:%S`
sed s/TIME/$cur_time/ /opt/module/data/source.temp > /opt/module/data/source.tp

#create data
for row in 1 2 3 4 5 6 7 8 9 10
do
  sed -n "${row}p" < /opt/module/data/source.tp > sline
  cnt=`shuf -i1-2 -n1`
  clr="red"
  if [ $cnt == 2 ]; then clr="yellow"; fi
  sed s/COLOR/$clr/ sline >> /opt/module/data/source.data
done
rm sline

rm source.tp

#import data
kafka-console-producer.sh --broker-list hadoop101:9092 --topic source < /opt/module/data/source.data

rm source.data

echo "insert data at ${cur_time}"

streaming-data.sh

代码语言:javascript复制
#!/bin/bash

#create topics
kafka-topics.sh --create --zookeeper hadoop101:2181 --replication-factor 1 --partitions 1 --topic source
kafka-topics.sh --create --zookeeper hadoop101:2181 --replication-factor 1 --partitions 1 --topic target

#every minute
set  e
while true
do
  /opt/module/data/gen-data.sh
  sleep 90
done
set -e

source.temp

代码语言:javascript复制
{"id": 1, "name": "Apple", "color": "COLOR", "time": "TIME"}
{"id": 2, "name": "Banana", "color": "COLOR", "time": "TIME"}
{"id": 3, "name": "Cherry", "color": "COLOR", "time": "TIME"}
{"id": 4, "name": "Durian", "color": "COLOR", "time": "TIME"}
{"id": 5, "name": "Lichee", "color": "COLOR", "time": "TIME"}
{"id": 6, "name": "Peach", "color": "COLOR", "time": "TIME"}
{"id": 7, "name": "Papaya", "color": "COLOR", "time": "TIME"}
{"id": 8, "name": "Lemon", "color": "COLOR", "time": "TIME"}
{"id": 9, "name": "Mango", "color": "COLOR", "time": "TIME"}
{"id": 10, "name": "Pitaya", "color": "COLOR", "time": "TIME"}

三. Flink流式处理

flink流式数据分成三个部分,读取kafka,业务处理,写入kafka

  1. 首先交代我的pom.xml引入的依赖
代码语言:javascript复制
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xxxx</groupId>
    <artifactId>kafka_Flink_kafka_Test</artifactId>
    <version>1.0-SNAPSHOT</version>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>



            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.ink.FlinkLambdaTest.FlinkToLambda</mainClass>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                            </transformers>
                            <relocations>
                                <relocation>
                                    <pattern>org.codehaus.plexus.util</pattern>
                                    <shadedPattern>org.shaded.plexus.util</shadedPattern>
                                    <excludes>
                                        <exclude>org.codehaus.plexus.util.xml.Xpp3Dom</exclude>
                                        <exclude>org.codehaus.plexus.util.xml.pull.*</exclude>
                                    </excludes>
                                </relocation>
                            </relocations>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

    <dependencies>
        <!--<dependency>-->
        <!--<groupId>org.apache.flink</groupId>-->
        <!--<artifactId>flink-table_2.10</artifactId>-->
        <!--<version>1.3.2</version>-->
        <!--</dependency>-->
        <dependency>
            <groupId>org.json</groupId>
            <artifactId>json</artifactId>
            <version>20090211</version>
        </dependency>
        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>2.6.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.10.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.10.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_2.11</artifactId>
            <version>1.10.1</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_2.11</artifactId>
            <version>1.10.1</version>
        </dependency>



        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
            <version>1.10.1</version>
        </dependency>
    </dependencies>

</project>
  1. 先写个bean类模版,用来接收json数据
代码语言:javascript复制
import java.util.Date;

public class Student{
    private int id;
    private String name;
    private String color;
    private Date time;

    public Student(){}

    public Student(int id, String name, String color, Date time) {
        this.id = id;
        this.name = name;
        this.color = color;
        this.time = time;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getColor() {
        return color;
    }

    public void setColor(String color) {
        this.color = color;
    }

    public Date getTime() {
        return time;
    }

    public void setTime(Date time) {
        this.time = time;
    }

    @Override
    public String toString() {
        return "Student{"  
                "id="   id  
                ", name='"   name   '''  
                ", color='"   color   '''  
                ", time='"   time   '''  
                '}';
    }
}
  1. 读取kafka,有关读取和写入kafka的配置信息,是可以写到kafkaUtil工具类中的,我这里为了方便,就直接嵌入到代码中了,就做个测试
代码语言:javascript复制
// 创建Flink执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 一定要设置启动检查点!!
        //env.enableCheckpointing(5000);
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        // Kafka参数
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop101:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");
        String inputTopic = "source";
        String outputTopic = "target";

        // Source
        FlinkKafkaConsumer010<String> consumer =
                new FlinkKafkaConsumer010<String>(inputTopic, new SimpleStringSchema(), properties);
        DataStream<String> stream = env.addSource(consumer);
  1. flink业务处理,这一块由于所处的业务不同,我只是简单demo以下,以20%的概率修改数据使之成为异常数据用于检测,这是为了模拟业务中可能对数据处理有误而发生数据质量问题。这里要特别提一下,本案例是假定flink业务处理时延忽略不计,真实场景中可能由于flink处理延迟导致target端误认为数据丢失,这一部分我还在研究他的源码,日后更新,有了解的大神,还请指点迷津。
代码语言:javascript复制
//使用Flink算子简单处理数据
        // Transformations
        // 使用Flink算子对输入流的文本进行操作
        // 按空格切词、计数、分区、设置时间窗口、聚合
        //{"id": 1, "name": "Apple", "color": "COLOR", "time": "TIME"}
        DataStream<String> outMap = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                return handleData(value);
            }
        });
代码语言:javascript复制
public static String handleData(String line){
        try {
                if (line!=null&& !line.equals("")){
                    Gson gson = new GsonBuilder().setLenient().setDateFormat("yyyy-MM-dd_HH:mm:ss").create();
                    JsonReader reader = new JsonReader(new StringReader(line));
                    Student student = gson.fromJson(reader, Student.class);
                    int rand = ra.nextInt(10)   1;
                    if (rand > 8) student.setName(student.getName()   "_"   ra.nextInt(10));
                    return gson.toJson(student);
                }
                else return "";
        }catch (Exception e){
            return "";
        }
    }

因为遇到了几个bug,所以这样创建gson

  1. 写入kafka,其中FlinkKafkaProducer010我们选择的构造器是(brokerList,topicId,serializationSchema)
代码语言:javascript复制
//Sink
        outMap.addSink(new FlinkKafkaProducer010<String>(
                "hadoop101:9092",
                "target",
                new SimpleStringSchema()
        ));
        outMap.print();
        env.execute();

四. Apache Griffin配置与启动

有关griffin的streaming模式配置,就是配置dq.json和env.json

dq.json

代码语言:javascript复制
{
  "name": "streaming_accu",
  "process.type": "streaming",
  "data.sources": [
    {
      "name": "src",
      "baseline": true,
      "connector": 
        {
          "type": "kafka",
          "version": "0.10",
          "config": {
            "kafka.config": {
              "bootstrap.servers": "hadoop101:9092",
              "group.id": "griffin",
              "auto.offset.reset": "largest",
              "auto.commit.enable": "false"
            },
            "topics": "source_1",
            "key.type": "java.lang.String",
            "value.type": "java.lang.String"
          },
          "pre.proc": [
            {
              "dsl.type": "df-opr",
              "rule": "from_json"
            }
          ]
        }
      ,
      "checkpoint": {
        "type": "json",
        "file.path": "hdfs://hadoop101:9000/griffin/streaming/dump/source",
        "info.path": "source_1",
        "ready.time.interval": "10s",
        "ready.time.delay": "0",
        "time.range": ["-5m", "0"],
        "updatable": true
      }
    }, {
      "name": "tgt",
      "connector": 
        {
          "type": "kafka",
          "version": "0.10",
          "config": {
            "kafka.config": {
              "bootstrap.servers": "hadoop101:9092",
              "group.id": "griffin",
              "auto.offset.reset": "largest",
              "auto.commit.enable": "false"
            },
            "topics": "target_1",
            "key.type": "java.lang.String",
            "value.type": "java.lang.String"
          },
          "pre.proc": [
            {
              "dsl.type": "df-opr",
              "rule": "from_json"
            }
          ]
        }
      ,
      "checkpoint": {
        "type": "json",
        "file.path": "hdfs://hadoop101:9000/griffin/streaming/dump/target",
        "info.path": "target_1",
        "ready.time.interval": "10s",
        "ready.time.delay": "0",
        "time.range": ["-1m", "0"]
      }
    }
  ],
  "evaluate.rule": {
    "rules": [
      {
        "dsl.type": "griffin-dsl",
        "dq.type": "accuracy",
        "out.dataframe.name": "accu",
        "rule": "src.login_id = tgt.login_id AND src.bussiness_id = tgt.bussiness_id AND src.event_id = tgt.event_id",
        "details": {
          "source": "src",
          "target": "tgt",
          "miss": "miss_count",
          "total": "total_count",
          "matched": "matched_count"
        },
        "out":[
          {
            "type":"metric",
            "name": "accu"
          },
          {
            "type":"record",
            "name": "missRecords"
          }
        ]
      }
    ]
  },
  "sinks": ["HdfsSink"]
}

env.json

代码语言:javascript复制
{
  "spark": {
    "log.level": "WARN",
    "checkpoint.dir": "hdfs://hadoop101:9000/griffin/checkpoint",
    "batch.interval": "20s",
    "process.interval": "1m",
    "init.clear": true,
    "config": {
      "spark.default.parallelism": 4,
      "spark.task.maxFailures": 5,
      "spark.streaming.kafkaMaxRatePerPartition": 1000,
      "spark.streaming.concurrentJobs": 4,
      "spark.yarn.maxAppAttempts": 5,
      "spark.yarn.am.attemptFailuresValidityInterval": "1h",
      "spark.yarn.max.executor.failures": 120,
      "spark.yarn.executor.failuresValidityInterval": "1h",
      "spark.hadoop.fs.hdfs.impl.disable.cache": true
    }
  },
  "sinks": [
    {
      "name":"ConsoleSink",
      "type": "console"
    },
    {
      "name":"HdfsSink",
      "type": "hdfs",
      "config": {
        "path": "hdfs://hadoop101:9000/griffin/persist"
      }
    },
    {
      "name":"ElasticsearchSink",
      "type": "elasticsearch",
      "config": {
        "method": "post",
        "api": "http://hadoop101:9200/griffin/accuracy"
      }
    }
  ],
  "griffin.checkpoint": [
    {
      "type": "zk",
      "config": {
        "hosts": "hadoop101:2181",
        "namespace": "griffin/infocache",
        "lock.path": "lock",
        "mode": "persist",
        "init.clear": true,
        "close.clear": false
      }
    }
  ]
}

最后把项目提交到spark上运行,检测数据

代码语言:javascript复制
spark-submit --class org.apache.griffin.measure.Application --master yarn --deploy-mode client --queue default 
--driver-memory 1g --executor-memory 1g --num-executors 3 
<path>/griffin-measure.jar 
<path>/env.json <path>/dq.json

五. 全局代码

在本地创建个maven项目,由于这是个简单的测试项目,自己构建就好,我只写了两个类做测试

Student.class

代码语言:javascript复制
import java.util.Date;

public class Student{
    private int id;
    private String name;
    private String color;
    private Date time;

    public Student(){}

    public Student(int id, String name, String color, Date time) {
        this.id = id;
        this.name = name;
        this.color = color;
        this.time = time;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getColor() {
        return color;
    }

    public void setColor(String color) {
        this.color = color;
    }

    public Date getTime() {
        return time;
    }

    public void setTime(Date time) {
        this.time = time;
    }

    @Override
    public String toString() {
        return "Student{"  
                "id="   id  
                ", name='"   name   '''  
                ", color='"   color   '''  
                ", time='"   time   '''  
                '}';
    }
}

flinkProcess.class

代码语言:javascript复制
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.stream.JsonReader;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import java.io.StringReader;
import java.util.Properties;
import java.util.Random;

public class flinkProcess {
    public static Random ra = new Random();
    public static void main(String[] args) throws Exception {
        // 创建Flink执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 一定要设置启动检查点!!
        //env.enableCheckpointing(5000);
        //env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        // Kafka参数
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "hadoop101:9092");
        properties.setProperty("group.id", "consumer-group");
        properties.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");
        String inputTopic = "source";
        String outputTopic = "target";

        // Source
        FlinkKafkaConsumer010<String> consumer =
                new FlinkKafkaConsumer010<String>(inputTopic, new SimpleStringSchema(), properties);
        DataStream<String> stream = env.addSource(consumer);

        //使用Flink算子简单处理数据
        // Transformations
        // 使用Flink算子对输入流的文本进行操作
        // 按空格切词、计数、分区、设置时间窗口、聚合
        //{"id": 1, "name": "Apple", "color": "COLOR", "time": "TIME"}
        DataStream<String> outMap = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                return handleData(value);
            }
        });

        //Sink
        outMap.addSink(new FlinkKafkaProducer010<String>(
                "hadoop101:9092",
                "target",
                new SimpleStringSchema()
        ));
        outMap.print();
        env.execute();
    }

    public static String handleData(String line){
        try {
                if (line!=null&& !line.equals("")){
                    Gson gson = new GsonBuilder().setLenient().setDateFormat("yyyy-MM-dd_HH:mm:ss").create();
                    JsonReader reader = new JsonReader(new StringReader(line));
                    Student student = gson.fromJson(reader, Student.class);
                    int rand = ra.nextInt(10)   1;
                    if (rand > 8) student.setName(student.getName()   "_"   ra.nextInt(10));
                    return gson.toJson(student);
                }
                else return "";
        }catch (Exception e){
            return "";
        }
    }
}

提示:在kafka中如果生成了一些不合格式的数据,程序会一直报错,可以参考这篇文章删除掉相应的kafka dataDir和zookeeper的znode数据,重新生成数据,运行代码。

0 人点赞