大家好啊,老李最近高产如母猪,我也来凑个热闹。说起来挺魔幻的,去年这时候,我还是一个连java curd都不会的菜鸡,今天却在这里大谈大数据开发- -。我也没想到,等以后有机会可以讲讲写java的心路历程,目前还是一个java菜鸟,也因为目前的公司部门里没有足够的数据开发,我自己硬着头皮写了几个Flink应用,没想到这东西上手还是挺简单的,所以就很想分享给大家。
都2021年了,我们看看现在的大数据开发什么东西火,毫无疑问,Flink这个新兴之子,占了很大一块。随便一搜某招聘网站
是不是心动了
。
简介&&准备
回过头来,
我们看看Flink到底是个啥,其实我之前有一篇文章讲过 这次来整个高端的API实时QPS流计算 。还是引用官网的那句话——Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. 简而言之,就是一个可以对流进行有状态的计算的这么个东西。之前的那篇文章可能对没接触过的人来说比较不友好,所以我想开个系列来让各位也可以“精通”Flink。
准备工作
- 开发工具,IntelliJ IDEA
- 安装java java的安装比php还简单,下载好jdk,设置好JAVA_HOME就OK,目前主流还是jdk8。可以参考 https://juejin.cn/post/6844903895504797710
- 安装maven maven的话,也一样,下载下来直接配个环境变量就行。可以参考 https://cloud.tencent.com/developer/article/1680711
直接开干
我们先来安装运行Flink job的集群(反正这些运维会做好的,Flink还可以运行在yarn集群上,还有Flink on k8s,当然,它也可以单独以jar包的形式运行)
代码语言:javascript复制wget https://mirrors.tuna.tsinghua.edu.cn/apache/flink/flink-1.13.0/flink-1.13.0-bin-scala_2.11.tgz
代码语言:javascript复制tar -xzf flink-1.13.0-bin-scala_2.11.tgz
代码语言:javascript复制cd flink-1.13.0
./bin/start-cluster.sh
这些都没什么坑,装好了之后是这样的。
然后执行
代码语言:javascript复制./bin/flink run examples/streaming/WordCount.jar
然后我们打开 http://127.0.0.1:8081 可以看到
这上面就是控制台了,我们提交的Flink任务都能在这上面看到。刚才执行的WordCount.jar就是我们刚才运行的那个job。
能运行到这一步说明你已经成功了百分之99了,马上你就要“精通”了
“手动编写”个Flink任务
我们在一个目录下执行如下命令
代码语言:javascript复制 mvn archetype:generate
-DarchetypeGroupId=org.apache.flink
-DarchetypeArtifactId=flink-walkthrough-datastream-java
-DarchetypeVersion=1.13.0
-DgroupId=frauddetection
-DartifactId=frauddetection
-Dversion=0.1
-Dpackage=spendreport
-DinteractiveMode=false
然后我们就可以看到一个maven生成的项目叫frauddetection
这里我们需要把pom.xml的这两行删掉。为什么呢,这里就涉及到了maven打包的知识点了,如果有这个provided的,那么项目依赖的包就不会被打包到jar包里。具体可以参考 https://segmentfault.com/a/1190000038594247
为什么这里官方的demo这么设计呢,因为我们刚才装好了可以运行Flink任务的集群,集群里面已经有这些了,所以我们打成的jar包可以不用这些,这样可以大大减少jar包的大小,因为我们要在idea里面运行(不去掉idea里打出来的jar会找不到该class,会报错运行不起来),所以我这里还是建议大家可以去掉。
然后我们再用idea自带的maven插件执行clean跟install,就可以跑了。
这个demo任务的输出是这样的。
当你完成这一步的时候,Congratulations,You are Big Data开发工程师!!!你已经编写了一个反欺诈实时流计算应用了,是不是有点成就感。
代码解析
因为有些观众只喜欢听我扯淡,应该很少有人去手动实践,我这里就把这个frauddetection项目的代码贴下,顺便把代码的含义也用注释的方式写在下面,方便大家阅读
主要两个类 一个FraudDetectionJob
代码语言:javascript复制package spendreport;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.sink.AlertSink;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
public class FraudDetectionJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/*
所有的flink任务都要有一个数据源
这里的TransactionSource是flink包里自带的,
里面无限循环生成信用卡模拟交易数据
每条交易数据包括了信用卡 ID (accountId),
交易发生的时间 (timestamp) 以及交易的金额(amount)
name就是给这个环节起个名字的意思,不参与实际业务,下面也一样。
*/
DataStream<Transaction> transactions = env
.addSource(new TransactionSource())
.name("transactions");
/*
这里的keyBy我们以后可以展开来学习,
先简单的理解为就是把数据流按accountId相同的一个个分开去处理了
process就是真正的处理类,
就是一个自定义叫做FraudDetector的类,下面会讲到这个类里做了啥
*/
DataStream<Alert> alerts = transactions
.keyBy(Transaction::getAccountId)
.process(new FraudDetector())
.name("fraud-detector");
/*
这个sink可以暂时也不同管他是干嘛的,
我们只要明白这里就是把数据处理结果通过终端输出就OK了
*/
alerts
.addSink(new AlertSink())
.name("send-alerts");
env.execute("Fraud Detection");
}
}
另一个类FraudDetector
代码语言:javascript复制package spendreport;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
private static final long serialVersionUID = 1L;
private static final double SMALL_AMOUNT = 1.00;
private static final double LARGE_AMOUNT = 500.00;
private static final long ONE_MINUTE = 60 * 1000;
@Override
public void processElement(
Transaction transaction,
Context context,
Collector<Alert> collector) throws Exception {
/*
这里实例化了一个Alert类,
把前面数据流里的accountId设置到了Alert类里的Id
然后通过一个Collector类来收集并输出
*/
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
}
这时候,如果产品来了个小需求,对于一个账户,如果出现小于 1 的交易后紧跟着一个大于 500 的交易,就可能疑似在洗钱,需要输出告警一下。
按照业务开发思维,我们需要保存一下上一笔的交易状态,当然,flink里面也自带一些能存储状态的State。
我们只要稍微改下之前的FraudDetector处理类
代码语言:javascript复制package spendreport;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Alert;
import org.apache.flink.walkthrough.common.entity.Transaction;
/**
* Skeleton code for implementing a fraud detector.
*/
public class FraudDetector extends KeyedProcessFunction<Long, Transaction, Alert> {
private static final long serialVersionUID = 1L;
/**
* 定义小额交易的金额
*/
private static final double SMALL_AMOUNT = 1.00;
/**
* 定义大额交易的金额
*/
private static final double LARGE_AMOUNT = 500.00;
/**
* 定义一个存储布尔值的状态类
*/
private transient ValueState<Boolean> flagState;
@Override
public void open(Configuration parameters) throws Exception {
//重写类的open方法,初始化我们的状态类
ValueStateDescriptor<Boolean> flagDescriptor = new ValueStateDescriptor<>(
"flag",
Types.BOOLEAN);
flagState = getRuntimeContext().getState(flagDescriptor);
super.open(parameters);
}
@Override
public void processElement(
Transaction transaction,
Context context,
Collector<Alert> collector) throws Exception {
//先从状态类里获取上一笔交易是否小额交易
Boolean lastTransactionWasSmall = flagState.value();
//如果上一笔交易是小额交易,并且当前这笔是大额交易
if (lastTransactionWasSmall != null) {
if (transaction.getAmount() > LARGE_AMOUNT) {
//告警输出
Alert alert = new Alert();
alert.setId(transaction.getAccountId());
collector.collect(alert);
}
//清空State
flagState.clear();
}
//如果当前笔是小额交易,设置State
if (transaction.getAmount() < SMALL_AMOUNT) {
flagState.update(true);
}
}
}
然后我们再看运行结果
完美,我们抓到了这个账户3的疑似欺诈交易的人。
是不是flink还是有点有趣的,而且是容易上手的,你们脑海里是不是也有一个flink到底是个啥的大致轮廓了。可能这个东西平时大家后端开发可能都接触不到,可能大数据也没大家想的那么高端,如果老哥们感兴趣的话,可以自己玩玩,我感觉是可以扩宽后端业务开发思维的,说不定能帮你解决一些你的疑难杂症需求。
感觉写的还行的话,各位老哥点个赞可好
本篇是是在官方教程文档来展开讲解
这一篇文章也算给我自己开了个新坑,后面应该还会有三四篇
参考资料
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/