【Flink实战】用户统计:按照省份维度统计新老用户

2023-09-14 08:11:28 浏览数 (1)

数据源JSON格式数据

代码语言:javascript复制
{
"deviceType":"iPhone 10",
"uid":"user_1",
"product":{
"name":"宝马",
"category":"车"
},
"os":"iOS",
"ip":"171.11.85.21",
"nu":1,
"channel":"华为商城",
"time":1735419335423,
"event":"browse",
"net":"WiFi",
"device":"4759947c-cd47-433c-ac8f-ae923a6d38b6",
"version":"V1.2.0"
}

统计分析

  • 关键字
    • 省份 :ip ==>省份,城市,运行商,经纬度…
    • 解决方案: 1)请求商业接口 ,高德百度 2)开源版 github.com ipparse 3) 从缓存中找 ,请求
    • https://www.free-api.com/doc/90 免费接口
  • IP测试类: 导入maven依赖http请求
代码语言:javascript复制
Apache HttpComponents是Apache软件基金会的开源项目,它提供了一系列的高性能,高可用性的Java组件,用于实现HTTP协议,包括客户端,服务器,代理,缓存,身份验证,Cookie管理和HTTP协议处理。它的目标是提供一个完整的,高性能的HTTP解决方案,而不需要任何外部依赖。HttpComponents是一个基于Java的客户端/服务器HTTP协议实现,它提供了一个完整的,高性能的HTTP解决方案,而不需要任何外部依赖。

        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.6</version>
        </dependency>

IP提取测试

代码语言:javascript复制
public class IPRequest {
    public static void main(String[] args) {
        String ip="120.79.75.140";
        String province="-";
        String city="-";
        String url="https://apis.juhe.cn/ip/ipNew?ip=" ip "&key="  Key.key;
        CloseableHttpResponse response=null;
        System.out.println(url);
        CloseableHttpClient httpClient = HttpClients.createDefault();
        try {
            HttpGet httpGet = new HttpGet(url);
            response = httpClient.execute(httpGet);
            int code = response.getStatusLine().getStatusCode();
            if(code==200){
                HttpEntity entity = response.getEntity();
                String result = EntityUtils.toString(entity, "UTF-8");
                //{"resultcode":"200","reason":"查询成功","result":{"Country":"中国","Province":"广东省","City":"深圳市","Isp":"阿里云"},"error_code":0}
                JSONObject jsonData = JSON.parseObject(result);
                JSONObject data = jsonData.getJSONObject("result");
                province=data.getString("Province");
                city=data.getString("City");
                System.out.println(province city);
            }
        }catch (Exception e){
            e.printStackTrace();
        }

    }
}

结果:
广东省深圳市
  • 总体代码 写入Redis复用上一篇文章的代码修改即可
代码语言:javascript复制
public class ProvinceUserCntV1 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> stream = environment.readTextFile("data/access.json");
        environment.setParallelism(1); //设置并行度为1方便观察
        SingleOutputStreamOperator<Access> filter = stream.map(new MapFunction<String, Access>() {
            @Override
            public Access map(String s) throws Exception {
                // json 转 Access
                try {
                    return JSON.parseObject(s, Access.class);
                } catch (Exception e) {
                    e.printStackTrace();
                    return null;
                }
            }
            //这里是只要不为空的数据  x != null等于把上面的空的数据过滤掉
        }).filter(x -> x != null).filter(
                new FilterFunction<Access>() {
            @Override
            public boolean filter(Access access) throws Exception {
                //只过滤出来 event='startup'的数据
                return "startup".equals(access.event);
            }
        });
        //使用Rich额外定义一个类来实现map
        SingleOutputStreamOperator<Access> result = filter.map(new IPMapFunction());
        DataStreamSink<Tuple3<String, Integer, Integer>> user = result.map(new MapFunction<Access, Tuple3<String, Integer, Integer>>() {
            @Override
            public Tuple3<String, Integer, Integer> map(Access access) throws Exception {
                return Tuple3.of(access.province, access.nu, 1);
            }
        }).keyBy(new KeySelector<Tuple3<String, Integer, Integer>, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> getKey(Tuple3<String, Integer, Integer> value) throws Exception {
                return Tuple2.of(value.f0, value.f1);
            }
        }).sum(2).print("按照省份维度统计新老用户");


        environment.execute("OsUserCntAppV1");
    }
}
  • 关于flink异步IO的一篇文章 https://zhuanlan.zhihu.com/p/365232338
  • 上面的操作写入是同步的 IO 这样会存在问题,出现问题的时候后面的数据就阻塞了**(流处理吞吐量问题)**
  • 日志中是商品与相关信息是记录的ID,需要去连表查询数据库补全数据,考虑与外部数据连通的性能
  • 需要支持异步的数据库 只需要按照Flink给的模板实现一个RichAsync就可以了
  • 后面有单独的一篇文章完成Flink异步IO
思考:上面的代码还存在哪些问题?
  • 工作中:很大程度都是各种维度的统计分析
    • 离线数仓
    • 实时数仓
  • 较多的维度
    • 操作系统 新老用户
    • 新老用户
    • 省份 新老用户
    • 操作系统 省份 新老用户
    • 运营商 省份 新老用户
    • 运营商 新老用户
    • ==> KeyBy(…).sum(index)
  • 会遇到的统计问题
    • 每N(小时/分钟)统计一次
    • 每10分钟统计一次
    • 从xxxx==>xxxx事件段内的各种维度(…) 统计

0 人点赞