16:实时计算需求及技术方案
目标:了解实时计算需求及技术方案
路径
- step1:实时计算需求
- step2:技术方案
实施
实时计算需求
实时统计消息总量
代码语言:javascript复制select count(*) from tbname;
实时统计各个地区发送消息的总量
代码语言:javascript复制select sender_area,count(*) from tbname group by sender_area;
实时统计各个地区接收消息的总量
代码语言:javascript复制select receiver_area,count(*) from tbname group by receiver_area;
实时统计每个用户发送消息的总量
代码语言:javascript复制select sender_account,count(*) from tbname group by sender_account;
实时统计每个用户接收消息的总量
代码语言:javascript复制select receiver_account,count(*) from tbname group by receiver_account;
|
构建实时统计报表
技术方案
- 实时采集:Flume
- 实时存储:Kafka
- 实时计算:Flink
- 实时结果:MySQL / Redis
- 实时报表:FineBI / JavaWeb可视化
小结
- 了解实时计算需求及技术选型
17:Flink的基本介绍
目标:了解Flink的功能、特点及应用场景
路径
- step1:功能
- step2:特点
- step3:应用
实施
代码语言:javascript复制Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.
- 功能:可以基于任何普通的集群平台,对有界的数据流或者无界的数据流实现高性能的有状态的分布式实时计算
- 特点
- 支持高吞吐、低延迟、高性能的流处理
- 支持带有事件时间的窗口(Window)操作
- 支持有状态计算的Exactly-once语义
- 支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作
- 支持具有Backpressure功能的持续流模型
- 支持基于轻量级分布式快照(Snapshot)实现的容错
- 一个运行时同时支持Batch on Streaming处理和Streaming处理
- Flink在JVM内部实现了自己的内存管理
- 支持迭代计算
- 支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存
- 应用:所有实时及离线数据计算场景
- 数据分析应用:实时数据仓库
- Batch analytics(批处理分析)
- Streaming analytics(流处理分析)
- 事件驱动类应用
- 欺诈检测(Fraud detection)
- 异常检测(Anomaly detection)
- 基于规则的告警(Rule-based alerting)
- 业务流程监控(Business process monitoring)
- Web应用程序(社交网络)
- 数据管道ETL
- Periodic ETL和Data Pipeline
- 公司
- 阿里:https://developer.aliyun.com/article/72242
- 腾讯:https://mp.weixin.qq.com/s/tyq6ZdwsgiuXYGi-VR_6KA
- 美团:https://tech.meituan.com/2021/08/26/data-warehouse-in-meituan-waimai.html
- 有赞:https://tech.youzan.com/flink-practice/
- Oppo、爱奇艺、唯品会……
- 数据分析应用:实时数据仓库
小结
- 了解Flink的功能、特点及应用场景
18:代码模块构建
目标:实现开发环境的代码模块构建
实施
导入包中代码到IDEA中
flink包:应用类包,用于存放实际的应用类
- MoMoFlinkCount:用于实现对每个需求的统计计算
- MySQLSink:用于将计算的结果写入MySQL
pojo包:实体类包,用于存放所有实体类
MoMoCountBean:用于封装统计分析的结果
代码语言:javascript复制private Integer id ; //结果id
private Long moMoTotalCount ; //总消息数
private String moMoProvince ; //省份
private String moMoUsername ; //用户
private Long moMo_MsgCount ; //消息数
//结果类型:1-总消息数 2-各省份发送消息数 3-各省份接受消息数 4-每个用户发送消息数 5-每个用户接受消息数
private String groupType ;
utils包:工具类包,用于存放所有工具类
HttpClientUtils:用于实现将经纬度地址解析为省份的工具类
代码语言:javascript复制public static String findByLatAndLng(String lat , String lng)
- 参数:经度,维度
- 返回值:省份
小结
- 实现开发环境的代码模块构建
19:省份解析工具类测试
目标:了解省份解析的实现
路径
- step1:基本设计
- step2:注册百度开发者
- step3:测试省份解析
实施
- 基本设计
- 业务场景:根据IP或者经纬度解析得到用户的国家、省份、城市信息
- 方案一:离线解析库【本地解析,快,不精准】
- 方案二:在线解析库【远程解析,并发限制,精准】
注册百度开发者
百度地图开放平台:https://lbsyun.baidu.com/
逆地理编码:https://lbsyun.baidu.com/index.php?title=webapi/guide/webservice-geocoding-abroad
代码语言:javascript复制 https://api.map.baidu.com/reverse_geocoding/v3/?ak=您的ak&output=json&coordtype=wgs84ll&location=31.225696563611,121.49884033194 //GET请求
注册开放平台,获取AK码:参考《附录三》
测试省份解析
- 注意:将代码中的AK码更改为自己的
package bigdata.itcast.cn.momo.online.utils;
import com.alibaba.fastjson.JSONObject;
import org.apache.http.HttpEntity;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import javax.swing.text.html.parser.Entity;
import java.io.IOException;
import java.util.Map;
public class HttpClientUtils {
//传入经纬度, 返回查询的地区
public static String findByLatAndLng(String lat , String lng){
try {
CloseableHttpClient httpClient = HttpClients.createDefault();
String url = "http://api.map.baidu.com/reverse_geocoding/v3/?ak=l8hKKRCuX2zrRa93jneDrPmc2UspGatO&output=json&coordtype=wgs84ll&location=" lat "," lng;
System.out.println(url);
//请求解析
HttpGet httpGet = new HttpGet(url);
//得到结果
CloseableHttpResponse response = httpClient.execute(httpGet);
//获取数据
HttpEntity httpEntity = response.getEntity();
//转换成JSON
String json = EntityUtils.toString(httpEntity);
//从JSON中返回省份
Map<String,Object> result = JSONObject.parseObject(json, Map.class);
if(result.get("status").equals(0)){
Map<String,Object> resultMap = (Map<String,Object>)result.get("result");
resultMap = (Map<String, Object>) resultMap.get("addressComponent");
String province = (String) resultMap.get("province");
return province;
}
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
public static void main(String[] args) {
//测试
String sf = findByLatAndLng("43.921297","124.655376");
System.out.println(sf);
}
}
小结
- 了解省份解析的实现
20:Flink代码解读
目标:了解Flink代码的基本实现
路径
- step1:消费Kafka
- step2:实时统计分析
- step3:实时更新结果到MySQL
实施
消费Kafka
代码语言:javascript复制//构建Kafka配置
Properties props = new Properties();
props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
props.setProperty("group.id", "momo2");
//构建消费者
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("MOMO_MSG", new SimpleStringSchema(),props);
//Flink加载消费者
DataStreamSource<String> streamSource = env.addSource(kafkaConsumer);
实时统计分析
代码语言:javascript复制//todo:3. 进行转换统计操作:
//3.1: 统计总消息量
countTotalMsg(streamSource);
//3.2: 基于经纬度, 统计各省份发送消息量
countProvinceSenderMsg(streamSource);
//3.3: 基于经纬度, 统计各省份接收消息量
countProvinceReceiverMsg(streamSource);
//3.4: 统计各个用户, 发送消息量
countUserNameSenderMsg(streamSource);
//3.5: 统计各个用户, 接收消息量
countUserNameReceiverMsg(streamSource);
//5. 执行flink操作
env.execute("momoFlinkCount");
实时更新结果到MySQL
代码语言:javascript复制streamOperator.addSink(new MysqlSink("2"));
代码语言:javascript复制if (status.equals("2")){
String sql = "select * from momo_count where momo_groupType = '2' and momo_province= '" value.getMoMoProvince() "' ";
ResultSet resultSet = stat.executeQuery(sql);
boolean flag = resultSet.next();
if(flag) {
sql = "update momo_count set momo_msgcount= '" value.getMoMo_MsgCount() "' where momo_groupType = '2' and momo_province= '" value.getMoMoProvince() "' ";
}else {
sql = "insert into momo_count( momo_province,momo_msgcount,momo_groupType) values ('" value.getMoMoProvince() "'," value.getMoMo_MsgCount() ",'2') ";
}
stat.executeUpdate(sql);
}
小结
- 了解Flink代码的基本实现
21:Flink实时计算测试
目标:实现Flink实时分析测试
路径
- step1:MySQL准备
- step2:运行测试
实施
MySQL准备
- 找到SQL文件
- 运行SQL文件创建结果数据库、表
运行测试
启动Flink程序:运行MoMoFlinkCount
启动Flume程序
代码语言:javascript复制cd /export/server/flume-1.9.0-bin
bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
启动模拟数据
代码语言:javascript复制java -jar /export/data/momo_init/MoMo_DataGen.jar
/export/data/momo_init/MoMo_Data.xlsx
/export/data/momo_data/
100
观察MySQL结果
小结
- 实现Flink实时分析测试