大家好,泥腿子安尼特又和大家见面了。转眼一年又要过去了,我也跌跌撞撞的算是翻完了这本。
就像读书的时候周日晚上补作业一样,我也想在2020年再写一篇文章。前段时间倒腾了下配置中心,但是因为学艺不精,再加上连个CAP都不懂的人,我感觉写了可能也没卵用,不能提升姿势。那我整点啥,那就来简介下今年火到爆的Flink。
开篇
那Flink到底是个啥,来我们来看下它官网的介绍。
是不是和我第一眼看到的一样,不知所云,先不用管,主要这个东西前面带个Apache就很牛逼。(扯个题外话
,几年前,我刚入行PHP的时候,我清晰的记得有个面试题,web服务器,nginx与apache比,然后为啥nginx牛逼,那时候我记得就百度到的答案默念一遍,然后apache在我心中一直是个拉胯的存在= =)
那Flink又有多牛逼呢!我来上个图,最近股价猛跌的福报厂双11的时候用Flink进行实时计算是这样的
是不是很牛逼!
统计关键词
好的 废话不多说,我们基于官网的demo 开始进入Flink的旅程
我们先不管什么是流什么是批,对着代码就是干
代码语言:javascript复制package org.myorg.quickstart;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
public class BatchJob {
public static void main(String[] args) throws Exception {
// set up the batch execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.readTextFile("/path/xxxx.log");
DataSet<Tuple2<String, Integer>> counts =
text.flatMap(new LogLevelFilter())
.groupBy(0)
.sum(1);
counts.print();
env.execute("Flink Batch Java API Skeleton");
}
// 自定义函数
public static class LogLevelFilter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// json decode每一行的log
JSONObject jsonObject = JSON.parseObject(value);
// 统计不同level的log
if (jsonObject.containsKey("level")) {
String level = jsonObject.getString("level");
out.collect(new Tuple2<>(level, 1));
}
}
}
}
那输出结果如下
这时候有小伙伴要问了,这就是大数据,实时流计算???
差不多一行linux命令可以搞定
这只是个demo,能统计关键词了 那我们再扩展一下,基于nginx的access.log 我们搞个实时统计网站qps
实时QPS统计
我们先开启nginx access log 顺便把每一行的log记录成json串 比如这样
代码语言:javascript复制{
"@timestamp":"2020-12-27T18:58:38 08:00",
"remote_addr":"127.0.0.1",
"referer":"-",
"request":"GET /wechat/config HTTP/1.1",
"status":200,
"bytes":64,
"user_agent":"Mozilla/5.0 (Macintosh; Intel Mac OS X 11_0_1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36",
"http_x_forwarded":"-",
"request_time":"0.145"
}
然后 上代码 实时读取文件流 算qps
flink 读取文件流有两种模式 一种是直接一次性读完 一种是持续性检测,因为nginx access log是会不断增加的 所以我们选择第二种 来实时统计网站请求状态码的count
代码语言:javascript复制package org.myorg.quickstart;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.io.TextInputFormat;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class StreamingJob {
public static final Logger logger = LoggerFactory.getLogger(StreamingJob.class);
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TextInputFormat textInputFormat = new TextInputFormat(null);
DataStream<String> nginxAccessLog = env.readFile(textInputFormat, "path/to/nginx_log/access.log", FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
DataStream<StatusCount> nginxAccessLogStream = nginxAccessLog.flatMap((FlatMapFunction<String, StatusCount>) (s, collector) -> {
try {
JSONObject jsonObject = JSON.parseObject(s);
if (jsonObject.containsKey("status")) {
Integer status = jsonObject.getInteger("status");
if (status.compareTo(500) < 0) {
collector.collect(StatusCount.of(status "", 1L));
}
}
} catch (Exception e) {
logger.error("解析nginx access log 错误 {}", e.getMessage());
}
});
DataStream<StatusCount> windowsCounts = nginxAccessLogStream.
keyBy("status").
timeWindow(Time.seconds(1)).
sum("count");
windowsCounts.print().setParallelism(1);
env.execute("Flink Streaming Java API Skeleton");
}
public static class StatusCount {
public String status;
public Long count;
public static StatusCount of(String word, Long count) {
StatusCount statusCount = new StatusCount();
statusCount.status = word;
statusCount.count = count;
return statusCount;
}
@Override
public String toString() {
return "status:" status " count:" count;
}
}
}
效果就是这样的
这里就实时的打印出了每秒中nginx access log中状态小于500的所有status
这样岂不是完成了实时统计QPS 而且还可以按状态分组。
这时候又有小伙伴要问了
我实时cat nginx log也差不多也行啊!
假如你公司有50台api服务器 每台每天产生500G的日志 而且日志按小时或者文件名分割 你cat给我一个看看!
实际生产环境,现在主流都是ELK一套来管理log(我之前也大致介绍过),运维也不会直接把log往ES插,因为高峰期的时候 ES的写入速度并不快 可能会插崩它。所以,运维一般还是把log收集到kafka,然后消费kafka的方式插入ES,flink也可以消费kafka,只要把这里的文件流换成消费kakfa就可以做到算出API整体的QPS了。
如果你看到了这里,实操之后,我们再回过头来解释下刚才的代码,再了解下flink是个啥。因为如果开篇就大肆介绍名词 简介,我感觉你们也不会看,因为感觉跟自己没啥关系。
我们看上述两个例子的代码,都是先读取一个文件流,然后用自定义的类来解析每行文本,然后第一个例子group就像你们sql中groupby 因为我把每行文本的level提取出来了,然后还有个计数,所以有个Tuple2。
第二个例子稍微麻烦点,可能也难以理解点,因为用到了时间窗口。就是我把每秒读取文本里的内容当做一个独立的时间窗口,这样每秒access log里各种status都打印出来了。而且他是可以一直在不断运行并且一直打印下去的。
那我还是不明白flink牛逼在哪啊!我再来介绍一个概念,是什么是有界流,什么是无界流
假如李老某年某月开了个网站,
那么网站的数据的开始时间就是他第一次网站发布的时候。现在这个网站也还开着,每天都不断有人陆陆续续的访问,数据一致在积累,假如50年后李老嗝屁了,但是小李还继续维护着这个网站,100年后,小李也嗝屁了,小小李说不定还继续维护着这个网站。所以你不知道这个数据的边界在哪,数据从现在到未来一直会源源不断的流进来,这就是无界的数据流。就像我上面两个demo,第一个我一次性读了这个文本,那么数据是有界限的,第二个例子,因为我nginx access log就可以类比李老的网站,没有界限,所以可以叫它无界流。而flink就是非常方便能处理这些无界流的数据。
我们再来看官网那句话 ——
Stateful Computations Over Streams
在流上进行有状态的计算,是不是有点觉得牛逼了呢。当然我只是单机随便演示下demo。flink可以稳定的运行在大数据成熟的yarn集群上,一个flink job可以消费多个流 而且可以保存多个状态。flink集成了消费kafka、rabbit MQ 等等之类的数据源,所以用起来也很方便。比如你可以消费kafka里的上报数据,kafka里的binlog数据,来实时计算比如一分钟的订单数啊,一分钟内的GVM啊等等之类。至于其它一些高端的概念,比如什么滑动窗口、滚动窗口、什么水印、什么反压机制,我也不懂。
本文主要基于一些简单的demo简介了flink 里面很多概念跟代码都没解释清楚,主要是不想让大家入门的时候接收太多的名词概念
本泥腿子在线上刚发布过一个flink job 目前也有很多不懂的,所以可能表述的不是那么好 大家见谅
如果大家有兴趣学这个玩意,真的学好了真的高薪,现在这东西火的一逼
如果有兴趣的话 可以对着官网看几眼,官网介绍的挺全的。
https://flink.apache.org/zh/flink-architecture.html