Flink用户画像(二)推荐部分

2021-11-15 17:15:06 浏览数 (1)

接Flink用户画像

创建用户画像偏爱品牌标签

创建一个商品品牌标签类

代码语言:javascript复制
@Data
public class Brand {
    private Long userId;
    private Long productId;
    private String brand;
    private Long nums = 0L;
    private String groupField;
    private Long timeInfo;
}

一个BrandMap实现MapFunction接口的转换类

代码语言:javascript复制
public class BrandMap implements MapFunction<String,Brand> {
    @Override
    public Brand map(String value) throws Exception {
        ScanOpertor scanOpertor = JSONObject.parseObject(value,ScanOpertor.class);
        Long userId = scanOpertor.getUserId();
        Long productId = scanOpertor.getProductId();
        String tablename = "product";
        String rowkey = productId   "";
        String famliyname = "info";
        String colum = "product_brand";
        //获取历史用户偏好品牌
        String brandString = HbaseUtil.getdata(tablename, rowkey, famliyname, colum);
        Brand brand = new Brand();
        brand.setBrand(brandString);
        Long timeInfo = DateUntil.getCurrentHourStart(System.currentTimeMillis());
        String groupField = "brand=="   timeInfo   "=="   userId
                  "=="   brandString;
        brand.setGroupField(groupField);
        brand.setTimeInfo(timeInfo);
        brand.setUserId(userId);
        brand.setProductId(productId);
        brand.setNums(1L);

        return brand;
    }
}

当我们新增一个商品

代码语言:javascript复制
INSERT INTO `product` VALUES (1, 1, '大牛洗发水', '洗发水', 23.2000000000, 23, '2021-11-01 19:13:44', '2021-11-01 19:13:51', '内蒙古包头市', '大牛', 10.0000000000, 'RF-666')

可以在HBase中查看到该商品的品牌

代码语言:javascript复制
scan 'product',{COLUMNS=>'info:product_brand'}
ROW                   COLUMN CELL                                               
 1                    column=info:product_brand, timestamp=1636809177766, value=
                      xE5xA4xA7xE7x89x9B

当用户浏览该商品时就会留下浏览痕迹。此处是为了存储用户每小时点击过的品牌和点击次数。

一个BrandReduce实现了ReduceFunction接口的统计类

代码语言:javascript复制
public class BrandReduce implements ReduceFunction<Brand> {
    @Override
    public Brand reduce(Brand value1, Brand value2) throws Exception {
        Long numbers1 = value1.getNums();
        String groupField = value1.getGroupField();
        Long userId = value1.getUserId();
        String brandString = value1.getBrand();
        Long timeInfo = value1.getTimeInfo();

        Long numbers2 = value2.getNums();

        Brand brand = new Brand();
        brand.setUserId(userId);
        brand.setBrand(brandString);
        brand.setGroupField(groupField);
        brand.setTimeInfo(timeInfo);
        brand.setNums(numbers1   numbers2);
        return brand;
    }
}

一个BrandSink实现了SinkFunction的存储类

代码语言:javascript复制
public class BrandSink implements SinkFunction<Brand> {
    private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();

    @Override
    public void invoke(Brand value, Context context) throws Exception {
        if (value != null) {
            Long timeInfo = value.getTimeInfo();
            String brandString = value.getBrand();
            Long numbers = value.getNums();
            String tablename = "brand_info";
            Map<String, String> dataMap = new HashMap<>();
            dataMap.put("timeinfo",timeInfo   "");
            dataMap.put("brandlabel",brandString);
            dataMap.put("numbers",numbers   "");
            Set<String> fields = new HashSet<>();
            fields.add("timeinfo");
            fields.add("numbers");
            clickUntil.saveData(tablename,dataMap,fields);
        }
    }
}

一个UserBrandSaveMap实现了MapFunction接口的转换类

代码语言:javascript复制
public class UserBrandSaveMap implements MapFunction<Brand,Brand> {
    @Override
    public Brand map(Brand value) throws Exception {
        Long userId = value.getUserId();
        String brandString = value.getBrand();
        Long timeInfo = value.getTimeInfo();
        String tablename = "user_info";
        String rowkey = userId   "";
        String famliyname = "info";
        String colum = "brandlist";
        //获取用户偏爱的品牌历史数据
        String brandListString = HbaseUtil.getdata(tablename, rowkey, famliyname, colum);
        List<Map> temp = new ArrayList<>();
        List<Map<String,String>> result = new ArrayList<>();
        if (StringUtils.isNotBlank(brandListString)) {
            temp = JSONObject.parseArray(brandListString,Map.class);
        }
        for (Map map : temp) {
            String brandStr = map.get("key").toString();
            Long value1 = Long.parseLong(map.get("value").toString());
            //如果新的商品品牌与历史商品品牌相同,偏好值 1
            if (brandString.equals(brandStr)) {
                value1  ;
                map.put("value",value1   "");
            }
            result.add(map);
        }
        //对用户偏爱的品牌进行排序,取前5各品牌
        Collections.sort(result,(o1, o2) -> {
            Long value1 = Long.parseLong(o1.get("value"));
            Long value2 = Long.parseLong(o2.get("value"));
            return value2.compareTo(value1);
        });
        if (result.size() > 5) {
            result = result.subList(0,5);
        }
        String data = JSONObject.toJSONString(result);
        HbaseUtil.putdata(tablename,rowkey,famliyname,colum,data);
        Brand brand = new Brand();
        String groupField = "brandBy=="   timeInfo   "=="   brandString;
        brand.setGroupField(groupField);
        brand.setTimeInfo(timeInfo);
        brand.setBrand(brandString);
        brand.setNums(1L);

        return brand;
    }
}

此处是为了存储用户最为偏爱前5名的品牌的排名,用户每点击一次该品牌,就会使用户对该品牌的偏爱度 1,并重新排序存储。

一个UserBrandReduce实现了ReduceFunction接口的统计类

代码语言:javascript复制
public class UserBrandReduce implements ReduceFunction<Brand> {
    @Override
    public Brand reduce(Brand value1, Brand value2) throws Exception {
        Long numbers1 = value1.getNums();
        String groupField = value1.getGroupField();
        String brandString = value1.getBrand();
        Long timeInfo = value1.getTimeInfo();

        Long numbers2 = value2.getNums();

        Brand brand = new Brand();
        brand.setBrand(brandString);
        brand.setGroupField(groupField);
        brand.setTimeInfo(timeInfo);
        brand.setNums(numbers1   numbers2);
        return brand;
    }
}

一个UserBrandSink实现了SinkFunction接口的存储类

代码语言:javascript复制
public class UserBrandSink implements SinkFunction<Brand> {
    private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();

    @Override
    public void invoke(Brand value, Context context) throws Exception {
        if (value != null) {
            Long timeInfo = value.getTimeInfo();
            String brandString = value.getBrand();
            Long numbers = value.getNums();
            String tablename = "user_brand_info";
            Map<String, String> dataMap = new HashMap<>();
            dataMap.put("timeinfo",timeInfo   "");
            dataMap.put("userbrandlabel",brandString);
            dataMap.put("numbers",numbers   "");
            Set<String> fields = new HashSet<>();
            fields.add("timeinfo");
            fields.add("numbers");
            clickUntil.saveData(tablename,dataMap,fields);
        }
    }
}

然后是Flink的流处理

代码语言:javascript复制
public class BrandAnaly {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("group.id","portrait");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("scan",
                new SimpleStringSchema(),properties);
        DataStreamSource<String> data = env.addSource(myConsumer);
        env.enableCheckpointing(5000);
        DataStream<Brand> map = data.map(new BrandMap());
        DataStream<Brand> reduce = map.keyBy(Brand::getGroupField)
                .timeWindowAll(Time.hours(1))
                .reduce(new BrandReduce());
        reduce.addSink(new BrandSink());

        DataStream<Brand> userMap = reduce.map(new UserBrandSaveMap());
        DataStream<Brand> userReduce = userMap.keyBy(Brand::getGroupField).timeWindowAll(Time.hours(1))
                .reduce(new UserBrandReduce());
        userReduce.addSink(new UserBrandSink());

        env.execute("portrait brand type");
    }
}

推荐部分

热门商品统计

所谓热门商品就是用户购买数量最多的商品。

创建一个历史热门商品类

代码语言:javascript复制
@Data
public class HistoryHotProduct {
    private Long productId;
    private Long productTypeId;
    private Long numbers;
    private String groupField;
}

一个HistoryHotProductMap实现了FlatMapFunction接口的转换类

代码语言:javascript复制
public class HistoryHotProductMap implements FlatMapFunction<String,HistoryHotProduct> {
    @Override
    public void flatMap(String value, Collector<HistoryHotProduct> out) throws Exception {
        Order order = JSONObject.parseObject(value,Order.class);
        Integer payStatus = order.getPayStatus();
        //如果订单不是未支付状态
        if (payStatus > 0) {
            HistoryHotProduct historyHotProduct = new HistoryHotProduct();
            historyHotProduct.setProductId(order.getProductId());
            historyHotProduct.setProductTypeId(order.getProductTypeId());
            historyHotProduct.setNumbers(1L);
            String groupFiled = "HistoryHotProduct=="   order.getProductId()   "=="
                      order.getProductTypeId();
            historyHotProduct.setGroupField(groupFiled);
            out.collect(historyHotProduct);
        }
    }
}

一个HistoryHotProductReduce实现了ReduceFunction接口的统计类

代码语言:javascript复制
public class HistoryHotProductReduce implements ReduceFunction<HistoryHotProduct> {
    @Override
    public HistoryHotProduct reduce(HistoryHotProduct value1, HistoryHotProduct value2) throws Exception {
        Long numbers1 = value1.getNumbers();
        Long numbers2 = value2.getNumbers();
        value1.setNumbers(numbers1   numbers2);
        return value1;
    }
}

一个HistoryHotProductSink实现了SinkFunction接口的存储类

代码语言:javascript复制
public class HistoryHotProductSink implements SinkFunction<HistoryHotProduct> {
    private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();

    @Override
    public void invoke(HistoryHotProduct value, Context context) throws Exception {
        if (value != null) {
            Map<String, String> data = new HashMap<>();
            Long productId = value.getProductId();
            Long productTypeId = value.getProductTypeId();
            Long numbers = value.getNumbers();
            data.put("productId", productId   "");
            data.put("productTypeId", productTypeId   "");
            data.put("numbers", numbers   "");
            String tablename = "history_hot_product";
            Set<String> fields = new HashSet<>();
            fields.add("productId");
            fields.add("productTypeId");
            fields.add("numbers");
            clickUntil.saveData(tablename,data,fields);
        }
    }
}

然后是Flink的流处理

代码语言:javascript复制
public class HistoryHotProductAnaly {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("group.id","portrait");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("order",
                new SimpleStringSchema(),properties);
        //指定偏移量
        myConsumer.setStartFromLatest();
        DataStreamSource<String> data = env.addSource(myConsumer);
        env.enableCheckpointing(5000);
        DataStream<HistoryHotProduct> map = data.flatMap(new HistoryHotProductMap());
        DataStream<HistoryHotProduct> reduce = map.keyBy(HistoryHotProduct::getGroupField)
                .timeWindow(Time.hours(5))
                .reduce(new HistoryHotProductReduce());
        reduce.addSink(new HistoryHotProductSink());
        
        env.execute("history hot product");
    }
}

历史评分统计

在数据库中新建评价表

代码语言:javascript复制
DROP TABLE IF EXISTS `evaluate`;
CREATE TABLE `evaluate` (
  `id` bigint(20) NOT NULL,
  `user_id` bigint(20) DEFAULT NULL,
  `order_id` bigint(20) DEFAULT NULL,
  `product_id` bigint(20) DEFAULT NULL,
  `product_type_id` bigint(20) DEFAULT NULL,
  `evaluate_time` datetime DEFAULT NULL,
  `score` int(255) DEFAULT NULL,
  `content` varchar(500) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

对应实体类

代码语言:javascript复制
@Data
public class Evaluate {
    private Long id;
    private Long userId;
    private Long orderId;
    private Long productId;
    private Long productTypeId;
    private Date evaluateTime;
    private Integer score; //1-5 1-2差评 3-4中评 5好评
    private String content; //评价内容
}

在HBase中执行

代码语言:javascript复制
create 'evaluate','info'

在Kafka的bin目录下执行

代码语言:javascript复制
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic evaluate

一个评分标签实体类

代码语言:javascript复制
@Data
public class Score {
    private Long productId;
    private Long productTypeId;
    private Long numbers;
    private String groupField;
}

一个ScoreMap实现了FlatMapFunction接口的转换类

代码语言:javascript复制
public class ScoreMap implements FlatMapFunction<String,Score> {
    @Override
    public void flatMap(String value, Collector<Score> out) throws Exception {
        Evaluate evaluate = JSONObject.parseObject(value,Evaluate.class);
        Integer score = evaluate.getScore();
        if (score > 2) {
            Score scoreResult = new Score();
            Long productId = evaluate.getProductId();
            Long productTypeId = evaluate.getProductTypeId();
            scoreResult.setProductId(productId);
            scoreResult.setProductTypeId(productTypeId);
            scoreResult.setNumbers(1L);
            String groupField = "score=="   productId   "=="   productTypeId;
            scoreResult.setGroupField(groupField);
            out.collect(scoreResult);
        }
    }
}

一个ScoreReduce实现了ReduceFunction的统计类

代码语言:javascript复制
public class ScoreReduce implements ReduceFunction<Score> {
    @Override
    public Score reduce(Score value1, Score value2) throws Exception {
        Long numbers1 = value1.getNumbers();
        Long numbers2 = value2.getNumbers();
        value1.setNumbers(numbers1   numbers2);
        return value1;
    }
}

一个ScoreSink实现了SinkFunction接口的存储类

代码语言:javascript复制
public class ScoreSink implements SinkFunction<Score> {
    private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();

    @Override
    public void invoke(Score value, Context context) throws Exception {
        if (value != null) {
            Map<String, String> data = new HashMap<>();
            Long productId = value.getProductId();
            Long productTypeId = value.getProductTypeId();
            Long numbers = value.getNumbers();
            data.put("productId", productId   "");
            data.put("productTypeId", productTypeId   "");
            data.put("numbers", numbers   "");
            String tablename = "score";
            Set<String> fields = new HashSet<>();
            fields.add("productId");
            fields.add("productTypeId");
            fields.add("numbers");
            clickUntil.saveData(tablename,data,fields);
        }
    }
}

然后是Flink的流处理

代码语言:javascript复制
public class ScoreAnaly {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("group.id","portrait");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("evaluate",
                new SimpleStringSchema(),properties);
        DataStreamSource<String> data = env.addSource(myConsumer);
        env.enableCheckpointing(5000);
        DataStream<Score> map = data.flatMap(new ScoreMap());
        DataStream<Score> reduct = map.keyBy(Score::getGroupField)
                .timeWindow(Time.hours(5))
                .reduce(new ScoreReduce());
        reduct.addSink(new ScoreSink());
        
        env.execute("portrait score");
    }
}

近期热门商品统计

近期热门指的的最近10天的热门商品

创建一个近期热门商品标签实体类

代码语言:javascript复制
@Data
public class RecentHotProduct {
    private Long productId;
    private Long productTypeId;
    private Long numbers;
    private String dateTime;
    private String groupField;
}

DateUntil增加一个静态方法

代码语言:javascript复制
public static String transferDate(Date date,String dateFormatStr) {
    DateFormat dateFormat = new SimpleDateFormat(dateFormatStr);
    return dateFormat.format(date);
}

一个RecentHotProductMap实现了FlatMapFunction接口的转换类

代码语言:javascript复制
public class RecentHotProductMap implements FlatMapFunction<String,RecentHotProduct> {
    @Override
    public void flatMap(String value, Collector<RecentHotProduct> out) throws Exception {
        Order order = JSONObject.parseObject(value,Order.class);
        Integer payStatus = order.getPayStatus();
        if (payStatus > 0) {
            RecentHotProduct recentHotProduct = new RecentHotProduct();
            Date date = order.getCreateTime();
            Long productId = order.getProductId();
            Long productTypeId = order.getProductTypeId();
            String dateString = DateUntil.transferDate(date,"yyyyMMdd");
            String groupField = "RecentHotProduct=="   productId   "=="
                      productTypeId   "=="   dateString;
            recentHotProduct.setDateTime(dateString);
            recentHotProduct.setGroupField(groupField);
            recentHotProduct.setProductId(productId);
            recentHotProduct.setProductTypeId(productTypeId);
            recentHotProduct.setNumbers(1L);
            out.collect(recentHotProduct);
        }
    }
}

一个RecentHotProductReduce实现了ReduceFunction接口的统计类

代码语言:javascript复制
public class RecentHotProductReduce implements ReduceFunction<RecentHotProduct> {
    @Override
    public RecentHotProduct reduce(RecentHotProduct value1, RecentHotProduct value2) throws Exception {
        Long numbers1 = value1.getNumbers();
        Long numbers2 = value2.getNumbers();
        value1.setNumbers(numbers1   numbers2);
        return value1;
    }
}

一个RecentHotProductSink实现了SinkFunction接口的存储类

代码语言:javascript复制
public class RecentHotProductSink implements SinkFunction<RecentHotProduct> {
    private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();

    @Override
    public void invoke(RecentHotProduct value, Context context) throws Exception {
        if (value != null) {
            Map<String, String> data = new HashMap<>();
            Long productId = value.getProductId();
            Long productTypeId = value.getProductTypeId();
            Long numbers = value.getNumbers();
            String dateTime = value.getDateTime();
            data.put("productId", productId   "");
            data.put("productTypeId", productTypeId   "");
            data.put("numbers", numbers   "");
            data.put("dateTime",dateTime);
            String tablename = "recent_hot_product";
            Set<String> fields = new HashSet<>();
            fields.add("productId");
            fields.add("productTypeId");
            fields.add("numbers");
            clickUntil.saveData(tablename,data,fields);
        }
    }
}

然后是Flink的流处理

代码语言:javascript复制
public class RecentHotProductAnaly {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("group.id","portrait");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("order",
                new SimpleStringSchema(),properties);
        //指定偏移量
        myConsumer.setStartFromLatest();
        DataStreamSource<String> data = env.addSource(myConsumer);
        env.enableCheckpointing(5000);
        DataStream<RecentHotProduct> map = data.flatMap(new RecentHotProductMap());
        DataStream<RecentHotProduct> reduce = map.keyBy(RecentHotProduct::getGroupField)
                .timeWindow(Time.hours(5))
                .reduce(new RecentHotProductReduce());
        reduce.addSink(new RecentHotProductSink());

        env.execute("recent hot product");
    }
}

优质商品统计

创建一个优质商品标签实体类

代码语言:javascript复制
@Data
public class HighQualityProduct {
    private Long productId;
    private Long productTypeId;
    private Long numbers;
    private Integer scoreTotal;
    private String dateTime;
    private String groupField;
}

一个HighQualityProductMap实现了FlatMapFunction接口的转换类

代码语言:javascript复制
public class HighQualityProductMap implements FlatMapFunction<String,HighQualityProduct> {
    @Override
    public void flatMap(String value, Collector<HighQualityProduct> out) throws Exception {
        Evaluate evaluate = JSONObject.parseObject(value, Evaluate.class);
        Integer score = evaluate.getScore();
        Date date = evaluate.getEvaluateTime();
        String dateTime = DateUntil.transferDate(date,"yyyyMMdd");
        HighQualityProduct highQualityProduct = new HighQualityProduct();
        Long productId = evaluate.getProductId();
        Long productTypeId = evaluate.getProductTypeId();
        highQualityProduct.setProductId(productId);
        highQualityProduct.setProductTypeId(productTypeId);
        highQualityProduct.setNumbers(1L);
        highQualityProduct.setScoreTotal(score);
        highQualityProduct.setDateTime(dateTime);
        String groupField = "HighQualityProduct=="   productId   "=="   productTypeId
                  "=="   dateTime;
        highQualityProduct.setGroupField(groupField);
        out.collect(highQualityProduct);
    }
}

一个HighQualityProductReduce实现了ReduceFunction接口的统计类

代码语言:javascript复制
public class HighQualityProductReduce implements ReduceFunction<HighQualityProduct> {
    @Override
    public HighQualityProduct reduce(HighQualityProduct value1, HighQualityProduct value2) throws Exception {
        Long numbers1 = value1.getNumbers();
        Long numbers2 = value2.getNumbers();
        Integer score1 = value1.getScoreTotal();
        Integer score2 = value2.getScoreTotal();
        value1.setNumbers(numbers1   numbers2);
        value1.setScoreTotal(score1   score2);
        return value1;
    }
}

一个HighQualityProductSink实现了SinkFunction接口的存储类

代码语言:javascript复制
public class HighQualityProductSink implements SinkFunction<HighQualityProduct> {
    private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();
    
    @Override
    public void invoke(HighQualityProduct value, Context context) throws Exception {
        if (value != null) {
            Map<String, String> data = new HashMap<>();
            Long productId = value.getProductId();
            Long productTypeId = value.getProductTypeId();
            Long numbers = value.getNumbers();
            Integer scoreTotal = value.getScoreTotal();
            String dateTime = value.getDateTime();
            data.put("productId", productId   "");
            data.put("productTypeId", productTypeId   "");
            data.put("numbers", numbers   "");
            data.put("scoreTotal",scoreTotal   "");
            data.put("dateTime",dateTime);
            String tablename = "recent_hot_product";
            Set<String> fields = new HashSet<>();
            fields.add("productId");
            fields.add("productTypeId");
            fields.add("numbers");
            fields.add("scoreTotal");
            clickUntil.saveData(tablename,data,fields);
        }
    }
}

然后是Flink的流处理

代码语言:javascript复制
public class HighQualityProductAnaly {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("group.id","portrait");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("evaluate",
                new SimpleStringSchema(),properties);
        //指定偏移量
        myConsumer.setStartFromLatest();
        DataStreamSource<String> data = env.addSource(myConsumer);
        env.enableCheckpointing(5000);
        DataStream<HighQualityProduct> map = data.flatMap(new HighQualityProductMap());
        DataStream<HighQualityProduct> reduce = map.keyBy(HighQualityProduct::getGroupField)
                .timeWindow(Time.hours(5))
                .reduce(new HighQualityProductReduce());
        reduce.addSink(new HighQualityProductSink());

        env.execute("high quality product");
    }
}

浏览次数统计

创建一个浏览次数标签实体类

代码语言:javascript复制
@Data
public class ScanTimes {
    private Long productId;
    private Long productTypeId;
    private Long numbers;
    private String dateTime;
    private String groupField;
}

DateUnitl增加一个静态方法

代码语言:javascript复制
public static Date getCurrentTime(Long visitTime) {
    return new Date(visitTime);
}

一个ScanTimesMap实现了FlatMapFunction接口的转换类

代码语言:javascript复制
public class ScanTimesMap implements FlatMapFunction<String,ScanTimes> {
    @Override
    public void flatMap(String value, Collector<ScanTimes> out) throws Exception {
        ScanOpertor scanOpertor = JSONObject.parseObject(value,ScanOpertor.class);
        ScanTimes scanTimes = new ScanTimes();
        Long productId = scanOpertor.getProductId();
        Long productTypeId = scanOpertor.getProductTypeId();
        Long time = scanOpertor.getScanTime();
        Date date = DateUntil.getCurrentTime(time);
        String dateTime = DateUntil.transferDate(date,"yyyyMMdd");
        scanTimes.setProductId(productId);
        scanTimes.setProductTypeId(productTypeId);
        scanTimes.setDateTime(dateTime);
        String groupField = "ScanTimes=="   productId   "=="   productTypeId
                  "=="   dateTime;
        scanTimes.setGroupField(groupField);
        scanTimes.setNumbers(1L);
        out.collect(scanTimes);
    }
}

一个ScanTimesReduce实现了ReduceFunction接口的统计类

代码语言:javascript复制
public class ScanTimesReduce implements ReduceFunction<ScanTimes> {
    @Override
    public ScanTimes reduce(ScanTimes value1, ScanTimes value2) throws Exception {
        Long numbers1 = value1.getNumbers();
        Long numbers2 = value2.getNumbers();
        value1.setNumbers(numbers1   numbers2);
        return value1;
    }
}

一个ScanTimesSink实现了SinkFunction接口的存储类

代码语言:javascript复制
public class ScanTimesSink implements SinkFunction<ScanTimes> {
    private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();
    
    @Override
    public void invoke(ScanTimes value, Context context) throws Exception {
        if (value != null) {
            Map<String, String> data = new HashMap<>();
            Long productId = value.getProductId();
            Long productTypeId = value.getProductTypeId();
            Long numbers = value.getNumbers();
            String dateTime = value.getDateTime();
            data.put("productId", productId   "");
            data.put("productTypeId", productTypeId   "");
            data.put("numbers", numbers   "");
            data.put("dateTime",dateTime);
            String tablename = "scan_times";
            Set<String> fields = new HashSet<>();
            fields.add("productId");
            fields.add("productTypeId");
            fields.add("numbers");
            clickUntil.saveData(tablename,data,fields);
        }
    }
}

然后是Flink的流处理

代码语言:javascript复制
public class ScanTimesAnaly {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("group.id","portrait");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("scan",
                new SimpleStringSchema(),properties);
        DataStreamSource<String> data = env.addSource(myConsumer);
        env.enableCheckpointing(5000);
        DataStream<ScanTimes> map = data.flatMap(new ScanTimesMap());
        DataStream<ScanTimes> reduct = map.keyBy(ScanTimes::getGroupField)
                .timeWindow(Time.hours(5))
                .reduce(new ScanTimesReduce());
        reduct.addSink(new ScanTimesSink());

        env.execute("scan times");
    }
}

0 人点赞