数据源JSON格式数据
代码语言:javascript复制{
"deviceType":"iPhone 10",
"uid":"user_1",
"product":{
"name":"宝马",
"category":"车"
},
"os":"iOS",
"ip":"171.11.85.21",
"nu":1,
"channel":"华为商城",
"time":1735419335423,
"event":"browse",
"net":"WiFi",
"device":"4759947c-cd47-433c-ac8f-ae923a6d38b6",
"version":"V1.2.0"
}
统计分析
- 关键字
- 省份 :ip ==>省份,城市,运行商,经纬度…
- 解决方案: 1)请求商业接口 ,高德百度 2)开源版 github.com ipparse 3) 从缓存中找 ,请求
- https://www.free-api.com/doc/90 免费接口
- IP测试类: 导入maven依赖http请求
Apache HttpComponents是Apache软件基金会的开源项目,它提供了一系列的高性能,高可用性的Java组件,用于实现HTTP协议,包括客户端,服务器,代理,缓存,身份验证,Cookie管理和HTTP协议处理。它的目标是提供一个完整的,高性能的HTTP解决方案,而不需要任何外部依赖。HttpComponents是一个基于Java的客户端/服务器HTTP协议实现,它提供了一个完整的,高性能的HTTP解决方案,而不需要任何外部依赖。
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.6</version>
</dependency>
IP提取测试
代码语言:javascript复制public class IPRequest {
public static void main(String[] args) {
String ip="120.79.75.140";
String province="-";
String city="-";
String url="https://apis.juhe.cn/ip/ipNew?ip=" ip "&key=" Key.key;
CloseableHttpResponse response=null;
System.out.println(url);
CloseableHttpClient httpClient = HttpClients.createDefault();
try {
HttpGet httpGet = new HttpGet(url);
response = httpClient.execute(httpGet);
int code = response.getStatusLine().getStatusCode();
if(code==200){
HttpEntity entity = response.getEntity();
String result = EntityUtils.toString(entity, "UTF-8");
//{"resultcode":"200","reason":"查询成功","result":{"Country":"中国","Province":"广东省","City":"深圳市","Isp":"阿里云"},"error_code":0}
JSONObject jsonData = JSON.parseObject(result);
JSONObject data = jsonData.getJSONObject("result");
province=data.getString("Province");
city=data.getString("City");
System.out.println(province city);
}
}catch (Exception e){
e.printStackTrace();
}
}
}
结果:
广东省深圳市
- 总体代码 写入Redis复用上一篇文章的代码修改即可
public class ProvinceUserCntV1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream = environment.readTextFile("data/access.json");
environment.setParallelism(1); //设置并行度为1方便观察
SingleOutputStreamOperator<Access> filter = stream.map(new MapFunction<String, Access>() {
@Override
public Access map(String s) throws Exception {
// json 转 Access
try {
return JSON.parseObject(s, Access.class);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
//这里是只要不为空的数据 x != null等于把上面的空的数据过滤掉
}).filter(x -> x != null).filter(
new FilterFunction<Access>() {
@Override
public boolean filter(Access access) throws Exception {
//只过滤出来 event='startup'的数据
return "startup".equals(access.event);
}
});
//使用Rich额外定义一个类来实现map
SingleOutputStreamOperator<Access> result = filter.map(new IPMapFunction());
DataStreamSink<Tuple3<String, Integer, Integer>> user = result.map(new MapFunction<Access, Tuple3<String, Integer, Integer>>() {
@Override
public Tuple3<String, Integer, Integer> map(Access access) throws Exception {
return Tuple3.of(access.province, access.nu, 1);
}
}).keyBy(new KeySelector<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> getKey(Tuple3<String, Integer, Integer> value) throws Exception {
return Tuple2.of(value.f0, value.f1);
}
}).sum(2).print("按照省份维度统计新老用户");
environment.execute("OsUserCntAppV1");
}
}
- 关于flink异步IO的一篇文章 https://zhuanlan.zhihu.com/p/365232338
- 上面的操作写入是同步的 IO 这样会存在问题,出现问题的时候后面的数据就阻塞了**(流处理吞吐量问题)**
- 日志中是商品与相关信息是记录的ID,需要去连表查询数据库补全数据,考虑与外部数据连通的性能
- 需要支持异步的数据库 只需要按照Flink给的模板实现一个RichAsync就可以了
- 后面有单独的一篇文章完成Flink异步IO
思考:上面的代码还存在哪些问题?
- 工作中:很大程度都是各种维度的统计分析
- 离线数仓
- 实时数仓
- 较多的维度
- 操作系统 新老用户
- 新老用户
- 省份 新老用户
- 操作系统 省份 新老用户
- 运营商 省份 新老用户
- 运营商 新老用户
- ==> KeyBy(…).sum(index)
- 会遇到的统计问题
- 每N(小时/分钟)统计一次
- 每10分钟统计一次
- 从xxxx==>xxxx事件段内的各种维度(…) 统计