相信我,你也能成为大数据开发工程师(一)

2021-06-17 20:26:40 浏览数 (1)

大家好啊,老李最近高产如母猪,我也来凑个热闹。说起来挺魔幻的,去年这时候,我还是一个连java curd都不会的菜鸡,今天却在这里大谈大数据开发- -。我也没想到,等以后有机会可以讲讲写java的心路历程,目前还是一个java菜鸟,也因为目前的公司部门里没有足够的数据开发,我自己硬着头皮写了几个Flink应用,没想到这东西上手还是挺简单的,所以就很想分享给大家。

都2021年了,我们看看现在的大数据开发什么东西火,毫无疑问,Flink这个新兴之子,占了很大一块。随便一搜某招聘网站

是不是心动了

简介&&准备

回过头来,

我们看看Flink到底是个啥,其实我之前有一篇文章讲过 这次来整个高端的API实时QPS流计算 。还是引用官网的那句话——Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. 简而言之,就是一个可以对流进行有状态的计算的这么个东西。之前的那篇文章可能对没接触过的人来说比较不友好,所以我想开个系列来让各位也可以“精通”Flink。

准备工作

  • 开发工具,IntelliJ IDEA
  • 安装java java的安装比php还简单,下载好jdk,设置好JAVA_HOME就OK,目前主流还是jdk8。可以参考 https://juejin.cn/post/6844903895504797710
  • 安装maven maven的话,也一样,下载下来直接配个环境变量就行。可以参考 https://cloud.tencent.com/developer/article/1680711

直接开干

我们先来安装运行Flink job的集群(反正这些运维会做好的,Flink还可以运行在yarn集群上,还有Flink on k8s,当然,它也可以单独以jar包的形式运行)

代码语言:javascript复制
wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.13.0/flink-1.13.0-bin-scala_2.11.tgz
代码语言:javascript复制
tar -xzf flink-1.13.0-bin-scala_2.11.tgz
代码语言:javascript复制
cd flink-1.13.0
./bin/start-cluster.sh

这些都没什么坑,装好了之后是这样的。

然后执行

代码语言:javascript复制
./bin/flink run examples/streaming/WordCount.jar

然后我们打开 http://127.0.0.1:8081 可以看到

这上面就是控制台了,我们提交的Flink任务都能在这上面看到。刚才执行的WordCount.jar就是我们刚才运行的那个job。

能运行到这一步说明你已经成功了百分之99了,马上你就要“精通”了

“手动编写”个Flink任务

我们在一个目录下执行如下命令

代码语言:javascript复制
 mvn archetype:generate 
    -DarchetypeGroupId=org.apache.flink 
    -DarchetypeArtifactId=flink-walkthrough-datastream-java 
    -DarchetypeVersion=1.13.0 
    -DgroupId=frauddetection 
    -DartifactId=frauddetection 
    -Dversion=0.1 
    -Dpackage=spendreport 
    -DinteractiveMode=false

然后我们就可以看到一个maven生成的项目叫frauddetection

这里我们需要把pom.xml的这两行删掉。为什么呢,这里就涉及到了maven打包的知识点了,如果有这个provided的,那么项目依赖的包就不会被打包到jar包里。具体可以参考 https://segmentfault.com/a/1190000038594247

为什么这里官方的demo这么设计呢,因为我们刚才装好了可以运行Flink任务的集群,集群里面已经有这些了,所以我们打成的jar包可以不用这些,这样可以大大减少jar包的大小,因为我们要在idea里面运行(不去掉idea里打出来的jar会找不到该class,会报错运行不起来),所以我这里还是建议大家可以去掉。

然后我们再用idea自带的maven插件执行clean跟install,就可以跑了。

这个demo任务的输出是这样的。

当你完成这一步的时候,Congratulations,You are Big Data开发工程师!!!你已经编写了一个反欺诈实时流计算应用了,是不是有点成就感。

代码解析

因为有些观众只喜欢听我扯淡,应该很少有人去手动实践,我这里就把这个frauddetection项目的代码贴下,顺便把代码的含义也用注释的方式写在下面,方便大家阅读

主要两个类 一个FraudDetectionJob

代码语言:javascript复制
package spendreport;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;


public class FraudDetectionJob {
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    /*
      所有的flink任务都要有一个数据源
      这里的TransactionSource是flink包里自带的,
      里面无限循环生成信用卡模拟交易数据
      每条交易数据包括了信用卡 ID (accountId),
      交易发生的时间 (timestamp) 以及交易的金额(amount)
      name就是给这个环节起个名字的意思,不参与实际业务,下面也一样。
     */
    DataStream<Transaction> transactions = env
      .addSource(new TransactionSource())
      .name("transactions");

    /*
      这里的keyBy我们以后可以展开来学习,
      先简单的理解为就是把数据流按accountId相同的一个个分开去处理了
      process就是真正的处理类,
      就是一个自定义叫做FraudDetector的类,下面会讲到这个类里做了啥
     */
    DataStream<Alert> alerts = transactions
      .keyBy(Transaction::getAccountId)
      .process(new FraudDetector())
      .name("fraud-detector");

    /*
      这个sink可以暂时也不同管他是干嘛的,
      我们只要明白这里就是把数据处理结果通过终端输出就OK了
     */
    alerts
      .addSink(new AlertSink())
      .name("send-alerts");

    env.execute("Fraud Detection");
  }
}

另一个类FraudDetector

代码语言:javascript复制
package spendreport;

import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;

public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

    private static final long serialVersionUID = 1L;

    private static final double SMALL_AMOUNT = 1.00;
    private static final double LARGE_AMOUNT = 500.00;
    private static final long ONE_MINUTE = 60 * 1000;

    @Override
    public void processElement(
            Transaction transaction,
            Context context,
            Collector<Alert> collector) throws Exception {

        /*
          这里实例化了一个Alert类,
          把前面数据流里的accountId设置到了Alert类里的Id
          然后通过一个Collector类来收集并输出
         */
        Alert alert = new Alert();
        alert.setId(transaction.getAccountId());
        collector.collect(alert);
    }
}

这时候,如果产品来了个小需求,对于一个账户,如果出现小于 1 的交易后紧跟着一个大于 500 的交易,就可能疑似在洗钱,需要输出告警一下。

按照业务开发思维,我们需要保存一下上一笔的交易状态,当然,flink里面也自带一些能存储状态的State。

我们只要稍微改下之前的FraudDetector处理类

代码语言:javascript复制
package spendreport;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;

/**
 * Skeleton code for implementing a fraud detector.
 */
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {

    private static final long serialVersionUID = 1L;
    
    /**
     * 定义小额交易的金额
     */
    private static final double SMALL_AMOUNT = 1.00;

    /**
     * 定义大额交易的金额
     */
    private static final double LARGE_AMOUNT = 500.00;

    /**
     * 定义一个存储布尔值的状态类
     */
    private transient ValueState<Boolean> flagState;

    @Override
    public void open(Configuration parameters) throws Exception {
        //重写类的open方法,初始化我们的状态类
        ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
                "flag",
                Types.BOOLEAN);
        flagState = getRuntimeContext().getState(flagDescriptor);
        super.open(parameters);
    }

    @Override
    public void processElement(
            Transaction transaction,
            Context context,
            Collector<Alert> collector) throws Exception {

        //先从状态类里获取上一笔交易是否小额交易
        Boolean lastTransactionWasSmall = flagState.value();

        //如果上一笔交易是小额交易,并且当前这笔是大额交易
        if (lastTransactionWasSmall != null) {
            if (transaction.getAmount() > LARGE_AMOUNT) {
                //告警输出
                Alert alert = new Alert();
                alert.setId(transaction.getAccountId());

                collector.collect(alert);
            }
            //清空State
            flagState.clear();
        }
        //如果当前笔是小额交易,设置State
        if (transaction.getAmount() < SMALL_AMOUNT) {
            flagState.update(true);
        }
    }
}

然后我们再看运行结果

完美,我们抓到了这个账户3的疑似欺诈交易的人。

是不是flink还是有点有趣的,而且是容易上手的,你们脑海里是不是也有一个flink到底是个啥的大致轮廓了。可能这个东西平时大家后端开发可能都接触不到,可能大数据也没大家想的那么高端,如果老哥们感兴趣的话,可以自己玩玩,我感觉是可以扩宽后端业务开发思维的,说不定能帮你解决一些你的疑难杂症需求。

感觉写的还行的话,各位老哥点个赞可好


本篇是是在官方教程文档来展开讲解

这一篇文章也算给我自己开了个新坑,后面应该还会有三四篇

参考资料

https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/

0 人点赞