接Flink用户画像
创建用户画像偏爱品牌标签
创建一个商品品牌标签类
代码语言:javascript复制@Data
public class Brand {
private Long userId;
private Long productId;
private String brand;
private Long nums = 0L;
private String groupField;
private Long timeInfo;
}
一个BrandMap实现MapFunction接口的转换类
代码语言:javascript复制public class BrandMap implements MapFunction<String,Brand> {
@Override
public Brand map(String value) throws Exception {
ScanOpertor scanOpertor = JSONObject.parseObject(value,ScanOpertor.class);
Long userId = scanOpertor.getUserId();
Long productId = scanOpertor.getProductId();
String tablename = "product";
String rowkey = productId "";
String famliyname = "info";
String colum = "product_brand";
//获取历史用户偏好品牌
String brandString = HbaseUtil.getdata(tablename, rowkey, famliyname, colum);
Brand brand = new Brand();
brand.setBrand(brandString);
Long timeInfo = DateUntil.getCurrentHourStart(System.currentTimeMillis());
String groupField = "brand==" timeInfo "==" userId
"==" brandString;
brand.setGroupField(groupField);
brand.setTimeInfo(timeInfo);
brand.setUserId(userId);
brand.setProductId(productId);
brand.setNums(1L);
return brand;
}
}
当我们新增一个商品
代码语言:javascript复制INSERT INTO `product` VALUES (1, 1, '大牛洗发水', '洗发水', 23.2000000000, 23, '2021-11-01 19:13:44', '2021-11-01 19:13:51', '内蒙古包头市', '大牛', 10.0000000000, 'RF-666')
可以在HBase中查看到该商品的品牌
代码语言:javascript复制scan 'product',{COLUMNS=>'info:product_brand'}
ROW COLUMN CELL
1 column=info:product_brand, timestamp=1636809177766, value=
xE5xA4xA7xE7x89x9B
当用户浏览该商品时就会留下浏览痕迹。此处是为了存储用户每小时点击过的品牌和点击次数。
一个BrandReduce实现了ReduceFunction接口的统计类
代码语言:javascript复制public class BrandReduce implements ReduceFunction<Brand> {
@Override
public Brand reduce(Brand value1, Brand value2) throws Exception {
Long numbers1 = value1.getNums();
String groupField = value1.getGroupField();
Long userId = value1.getUserId();
String brandString = value1.getBrand();
Long timeInfo = value1.getTimeInfo();
Long numbers2 = value2.getNums();
Brand brand = new Brand();
brand.setUserId(userId);
brand.setBrand(brandString);
brand.setGroupField(groupField);
brand.setTimeInfo(timeInfo);
brand.setNums(numbers1 numbers2);
return brand;
}
}
一个BrandSink实现了SinkFunction的存储类
代码语言:javascript复制public class BrandSink implements SinkFunction<Brand> {
private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();
@Override
public void invoke(Brand value, Context context) throws Exception {
if (value != null) {
Long timeInfo = value.getTimeInfo();
String brandString = value.getBrand();
Long numbers = value.getNums();
String tablename = "brand_info";
Map<String, String> dataMap = new HashMap<>();
dataMap.put("timeinfo",timeInfo "");
dataMap.put("brandlabel",brandString);
dataMap.put("numbers",numbers "");
Set<String> fields = new HashSet<>();
fields.add("timeinfo");
fields.add("numbers");
clickUntil.saveData(tablename,dataMap,fields);
}
}
}
一个UserBrandSaveMap实现了MapFunction接口的转换类
代码语言:javascript复制public class UserBrandSaveMap implements MapFunction<Brand,Brand> {
@Override
public Brand map(Brand value) throws Exception {
Long userId = value.getUserId();
String brandString = value.getBrand();
Long timeInfo = value.getTimeInfo();
String tablename = "user_info";
String rowkey = userId "";
String famliyname = "info";
String colum = "brandlist";
//获取用户偏爱的品牌历史数据
String brandListString = HbaseUtil.getdata(tablename, rowkey, famliyname, colum);
List<Map> temp = new ArrayList<>();
List<Map<String,String>> result = new ArrayList<>();
if (StringUtils.isNotBlank(brandListString)) {
temp = JSONObject.parseArray(brandListString,Map.class);
}
for (Map map : temp) {
String brandStr = map.get("key").toString();
Long value1 = Long.parseLong(map.get("value").toString());
//如果新的商品品牌与历史商品品牌相同,偏好值 1
if (brandString.equals(brandStr)) {
value1 ;
map.put("value",value1 "");
}
result.add(map);
}
//对用户偏爱的品牌进行排序,取前5各品牌
Collections.sort(result,(o1, o2) -> {
Long value1 = Long.parseLong(o1.get("value"));
Long value2 = Long.parseLong(o2.get("value"));
return value2.compareTo(value1);
});
if (result.size() > 5) {
result = result.subList(0,5);
}
String data = JSONObject.toJSONString(result);
HbaseUtil.putdata(tablename,rowkey,famliyname,colum,data);
Brand brand = new Brand();
String groupField = "brandBy==" timeInfo "==" brandString;
brand.setGroupField(groupField);
brand.setTimeInfo(timeInfo);
brand.setBrand(brandString);
brand.setNums(1L);
return brand;
}
}
此处是为了存储用户最为偏爱前5名的品牌的排名,用户每点击一次该品牌,就会使用户对该品牌的偏爱度 1,并重新排序存储。
一个UserBrandReduce实现了ReduceFunction接口的统计类
代码语言:javascript复制public class UserBrandReduce implements ReduceFunction<Brand> {
@Override
public Brand reduce(Brand value1, Brand value2) throws Exception {
Long numbers1 = value1.getNums();
String groupField = value1.getGroupField();
String brandString = value1.getBrand();
Long timeInfo = value1.getTimeInfo();
Long numbers2 = value2.getNums();
Brand brand = new Brand();
brand.setBrand(brandString);
brand.setGroupField(groupField);
brand.setTimeInfo(timeInfo);
brand.setNums(numbers1 numbers2);
return brand;
}
}
一个UserBrandSink实现了SinkFunction接口的存储类
代码语言:javascript复制public class UserBrandSink implements SinkFunction<Brand> {
private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();
@Override
public void invoke(Brand value, Context context) throws Exception {
if (value != null) {
Long timeInfo = value.getTimeInfo();
String brandString = value.getBrand();
Long numbers = value.getNums();
String tablename = "user_brand_info";
Map<String, String> dataMap = new HashMap<>();
dataMap.put("timeinfo",timeInfo "");
dataMap.put("userbrandlabel",brandString);
dataMap.put("numbers",numbers "");
Set<String> fields = new HashSet<>();
fields.add("timeinfo");
fields.add("numbers");
clickUntil.saveData(tablename,dataMap,fields);
}
}
}
然后是Flink的流处理
代码语言:javascript复制public class BrandAnaly {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","127.0.0.1:9092");
properties.setProperty("group.id","portrait");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("scan",
new SimpleStringSchema(),properties);
DataStreamSource<String> data = env.addSource(myConsumer);
env.enableCheckpointing(5000);
DataStream<Brand> map = data.map(new BrandMap());
DataStream<Brand> reduce = map.keyBy(Brand::getGroupField)
.timeWindowAll(Time.hours(1))
.reduce(new BrandReduce());
reduce.addSink(new BrandSink());
DataStream<Brand> userMap = reduce.map(new UserBrandSaveMap());
DataStream<Brand> userReduce = userMap.keyBy(Brand::getGroupField).timeWindowAll(Time.hours(1))
.reduce(new UserBrandReduce());
userReduce.addSink(new UserBrandSink());
env.execute("portrait brand type");
}
}
推荐部分
热门商品统计
所谓热门商品就是用户购买数量最多的商品。
创建一个历史热门商品类
代码语言:javascript复制@Data
public class HistoryHotProduct {
private Long productId;
private Long productTypeId;
private Long numbers;
private String groupField;
}
一个HistoryHotProductMap实现了FlatMapFunction接口的转换类
代码语言:javascript复制public class HistoryHotProductMap implements FlatMapFunction<String,HistoryHotProduct> {
@Override
public void flatMap(String value, Collector<HistoryHotProduct> out) throws Exception {
Order order = JSONObject.parseObject(value,Order.class);
Integer payStatus = order.getPayStatus();
//如果订单不是未支付状态
if (payStatus > 0) {
HistoryHotProduct historyHotProduct = new HistoryHotProduct();
historyHotProduct.setProductId(order.getProductId());
historyHotProduct.setProductTypeId(order.getProductTypeId());
historyHotProduct.setNumbers(1L);
String groupFiled = "HistoryHotProduct==" order.getProductId() "=="
order.getProductTypeId();
historyHotProduct.setGroupField(groupFiled);
out.collect(historyHotProduct);
}
}
}
一个HistoryHotProductReduce实现了ReduceFunction接口的统计类
代码语言:javascript复制public class HistoryHotProductReduce implements ReduceFunction<HistoryHotProduct> {
@Override
public HistoryHotProduct reduce(HistoryHotProduct value1, HistoryHotProduct value2) throws Exception {
Long numbers1 = value1.getNumbers();
Long numbers2 = value2.getNumbers();
value1.setNumbers(numbers1 numbers2);
return value1;
}
}
一个HistoryHotProductSink实现了SinkFunction接口的存储类
代码语言:javascript复制public class HistoryHotProductSink implements SinkFunction<HistoryHotProduct> {
private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();
@Override
public void invoke(HistoryHotProduct value, Context context) throws Exception {
if (value != null) {
Map<String, String> data = new HashMap<>();
Long productId = value.getProductId();
Long productTypeId = value.getProductTypeId();
Long numbers = value.getNumbers();
data.put("productId", productId "");
data.put("productTypeId", productTypeId "");
data.put("numbers", numbers "");
String tablename = "history_hot_product";
Set<String> fields = new HashSet<>();
fields.add("productId");
fields.add("productTypeId");
fields.add("numbers");
clickUntil.saveData(tablename,data,fields);
}
}
}
然后是Flink的流处理
代码语言:javascript复制public class HistoryHotProductAnaly {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","127.0.0.1:9092");
properties.setProperty("group.id","portrait");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("order",
new SimpleStringSchema(),properties);
//指定偏移量
myConsumer.setStartFromLatest();
DataStreamSource<String> data = env.addSource(myConsumer);
env.enableCheckpointing(5000);
DataStream<HistoryHotProduct> map = data.flatMap(new HistoryHotProductMap());
DataStream<HistoryHotProduct> reduce = map.keyBy(HistoryHotProduct::getGroupField)
.timeWindow(Time.hours(5))
.reduce(new HistoryHotProductReduce());
reduce.addSink(new HistoryHotProductSink());
env.execute("history hot product");
}
}
历史评分统计
在数据库中新建评价表
代码语言:javascript复制DROP TABLE IF EXISTS `evaluate`;
CREATE TABLE `evaluate` (
`id` bigint(20) NOT NULL,
`user_id` bigint(20) DEFAULT NULL,
`order_id` bigint(20) DEFAULT NULL,
`product_id` bigint(20) DEFAULT NULL,
`product_type_id` bigint(20) DEFAULT NULL,
`evaluate_time` datetime DEFAULT NULL,
`score` int(255) DEFAULT NULL,
`content` varchar(500) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
对应实体类
代码语言:javascript复制@Data
public class Evaluate {
private Long id;
private Long userId;
private Long orderId;
private Long productId;
private Long productTypeId;
private Date evaluateTime;
private Integer score; //1-5 1-2差评 3-4中评 5好评
private String content; //评价内容
}
在HBase中执行
代码语言:javascript复制create 'evaluate','info'
在Kafka的bin目录下执行
代码语言:javascript复制./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic evaluate
一个评分标签实体类
代码语言:javascript复制@Data
public class Score {
private Long productId;
private Long productTypeId;
private Long numbers;
private String groupField;
}
一个ScoreMap实现了FlatMapFunction接口的转换类
代码语言:javascript复制public class ScoreMap implements FlatMapFunction<String,Score> {
@Override
public void flatMap(String value, Collector<Score> out) throws Exception {
Evaluate evaluate = JSONObject.parseObject(value,Evaluate.class);
Integer score = evaluate.getScore();
if (score > 2) {
Score scoreResult = new Score();
Long productId = evaluate.getProductId();
Long productTypeId = evaluate.getProductTypeId();
scoreResult.setProductId(productId);
scoreResult.setProductTypeId(productTypeId);
scoreResult.setNumbers(1L);
String groupField = "score==" productId "==" productTypeId;
scoreResult.setGroupField(groupField);
out.collect(scoreResult);
}
}
}
一个ScoreReduce实现了ReduceFunction的统计类
代码语言:javascript复制public class ScoreReduce implements ReduceFunction<Score> {
@Override
public Score reduce(Score value1, Score value2) throws Exception {
Long numbers1 = value1.getNumbers();
Long numbers2 = value2.getNumbers();
value1.setNumbers(numbers1 numbers2);
return value1;
}
}
一个ScoreSink实现了SinkFunction接口的存储类
代码语言:javascript复制public class ScoreSink implements SinkFunction<Score> {
private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();
@Override
public void invoke(Score value, Context context) throws Exception {
if (value != null) {
Map<String, String> data = new HashMap<>();
Long productId = value.getProductId();
Long productTypeId = value.getProductTypeId();
Long numbers = value.getNumbers();
data.put("productId", productId "");
data.put("productTypeId", productTypeId "");
data.put("numbers", numbers "");
String tablename = "score";
Set<String> fields = new HashSet<>();
fields.add("productId");
fields.add("productTypeId");
fields.add("numbers");
clickUntil.saveData(tablename,data,fields);
}
}
}
然后是Flink的流处理
代码语言:javascript复制public class ScoreAnaly {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","127.0.0.1:9092");
properties.setProperty("group.id","portrait");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("evaluate",
new SimpleStringSchema(),properties);
DataStreamSource<String> data = env.addSource(myConsumer);
env.enableCheckpointing(5000);
DataStream<Score> map = data.flatMap(new ScoreMap());
DataStream<Score> reduct = map.keyBy(Score::getGroupField)
.timeWindow(Time.hours(5))
.reduce(new ScoreReduce());
reduct.addSink(new ScoreSink());
env.execute("portrait score");
}
}
近期热门商品统计
近期热门指的的最近10天的热门商品
创建一个近期热门商品标签实体类
代码语言:javascript复制@Data
public class RecentHotProduct {
private Long productId;
private Long productTypeId;
private Long numbers;
private String dateTime;
private String groupField;
}
DateUntil增加一个静态方法
代码语言:javascript复制public static String transferDate(Date date,String dateFormatStr) {
DateFormat dateFormat = new SimpleDateFormat(dateFormatStr);
return dateFormat.format(date);
}
一个RecentHotProductMap实现了FlatMapFunction接口的转换类
代码语言:javascript复制public class RecentHotProductMap implements FlatMapFunction<String,RecentHotProduct> {
@Override
public void flatMap(String value, Collector<RecentHotProduct> out) throws Exception {
Order order = JSONObject.parseObject(value,Order.class);
Integer payStatus = order.getPayStatus();
if (payStatus > 0) {
RecentHotProduct recentHotProduct = new RecentHotProduct();
Date date = order.getCreateTime();
Long productId = order.getProductId();
Long productTypeId = order.getProductTypeId();
String dateString = DateUntil.transferDate(date,"yyyyMMdd");
String groupField = "RecentHotProduct==" productId "=="
productTypeId "==" dateString;
recentHotProduct.setDateTime(dateString);
recentHotProduct.setGroupField(groupField);
recentHotProduct.setProductId(productId);
recentHotProduct.setProductTypeId(productTypeId);
recentHotProduct.setNumbers(1L);
out.collect(recentHotProduct);
}
}
}
一个RecentHotProductReduce实现了ReduceFunction接口的统计类
代码语言:javascript复制public class RecentHotProductReduce implements ReduceFunction<RecentHotProduct> {
@Override
public RecentHotProduct reduce(RecentHotProduct value1, RecentHotProduct value2) throws Exception {
Long numbers1 = value1.getNumbers();
Long numbers2 = value2.getNumbers();
value1.setNumbers(numbers1 numbers2);
return value1;
}
}
一个RecentHotProductSink实现了SinkFunction接口的存储类
代码语言:javascript复制public class RecentHotProductSink implements SinkFunction<RecentHotProduct> {
private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();
@Override
public void invoke(RecentHotProduct value, Context context) throws Exception {
if (value != null) {
Map<String, String> data = new HashMap<>();
Long productId = value.getProductId();
Long productTypeId = value.getProductTypeId();
Long numbers = value.getNumbers();
String dateTime = value.getDateTime();
data.put("productId", productId "");
data.put("productTypeId", productTypeId "");
data.put("numbers", numbers "");
data.put("dateTime",dateTime);
String tablename = "recent_hot_product";
Set<String> fields = new HashSet<>();
fields.add("productId");
fields.add("productTypeId");
fields.add("numbers");
clickUntil.saveData(tablename,data,fields);
}
}
}
然后是Flink的流处理
代码语言:javascript复制public class RecentHotProductAnaly {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","127.0.0.1:9092");
properties.setProperty("group.id","portrait");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("order",
new SimpleStringSchema(),properties);
//指定偏移量
myConsumer.setStartFromLatest();
DataStreamSource<String> data = env.addSource(myConsumer);
env.enableCheckpointing(5000);
DataStream<RecentHotProduct> map = data.flatMap(new RecentHotProductMap());
DataStream<RecentHotProduct> reduce = map.keyBy(RecentHotProduct::getGroupField)
.timeWindow(Time.hours(5))
.reduce(new RecentHotProductReduce());
reduce.addSink(new RecentHotProductSink());
env.execute("recent hot product");
}
}
优质商品统计
创建一个优质商品标签实体类
代码语言:javascript复制@Data
public class HighQualityProduct {
private Long productId;
private Long productTypeId;
private Long numbers;
private Integer scoreTotal;
private String dateTime;
private String groupField;
}
一个HighQualityProductMap实现了FlatMapFunction接口的转换类
代码语言:javascript复制public class HighQualityProductMap implements FlatMapFunction<String,HighQualityProduct> {
@Override
public void flatMap(String value, Collector<HighQualityProduct> out) throws Exception {
Evaluate evaluate = JSONObject.parseObject(value, Evaluate.class);
Integer score = evaluate.getScore();
Date date = evaluate.getEvaluateTime();
String dateTime = DateUntil.transferDate(date,"yyyyMMdd");
HighQualityProduct highQualityProduct = new HighQualityProduct();
Long productId = evaluate.getProductId();
Long productTypeId = evaluate.getProductTypeId();
highQualityProduct.setProductId(productId);
highQualityProduct.setProductTypeId(productTypeId);
highQualityProduct.setNumbers(1L);
highQualityProduct.setScoreTotal(score);
highQualityProduct.setDateTime(dateTime);
String groupField = "HighQualityProduct==" productId "==" productTypeId
"==" dateTime;
highQualityProduct.setGroupField(groupField);
out.collect(highQualityProduct);
}
}
一个HighQualityProductReduce实现了ReduceFunction接口的统计类
代码语言:javascript复制public class HighQualityProductReduce implements ReduceFunction<HighQualityProduct> {
@Override
public HighQualityProduct reduce(HighQualityProduct value1, HighQualityProduct value2) throws Exception {
Long numbers1 = value1.getNumbers();
Long numbers2 = value2.getNumbers();
Integer score1 = value1.getScoreTotal();
Integer score2 = value2.getScoreTotal();
value1.setNumbers(numbers1 numbers2);
value1.setScoreTotal(score1 score2);
return value1;
}
}
一个HighQualityProductSink实现了SinkFunction接口的存储类
代码语言:javascript复制public class HighQualityProductSink implements SinkFunction<HighQualityProduct> {
private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();
@Override
public void invoke(HighQualityProduct value, Context context) throws Exception {
if (value != null) {
Map<String, String> data = new HashMap<>();
Long productId = value.getProductId();
Long productTypeId = value.getProductTypeId();
Long numbers = value.getNumbers();
Integer scoreTotal = value.getScoreTotal();
String dateTime = value.getDateTime();
data.put("productId", productId "");
data.put("productTypeId", productTypeId "");
data.put("numbers", numbers "");
data.put("scoreTotal",scoreTotal "");
data.put("dateTime",dateTime);
String tablename = "recent_hot_product";
Set<String> fields = new HashSet<>();
fields.add("productId");
fields.add("productTypeId");
fields.add("numbers");
fields.add("scoreTotal");
clickUntil.saveData(tablename,data,fields);
}
}
}
然后是Flink的流处理
代码语言:javascript复制public class HighQualityProductAnaly {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","127.0.0.1:9092");
properties.setProperty("group.id","portrait");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("evaluate",
new SimpleStringSchema(),properties);
//指定偏移量
myConsumer.setStartFromLatest();
DataStreamSource<String> data = env.addSource(myConsumer);
env.enableCheckpointing(5000);
DataStream<HighQualityProduct> map = data.flatMap(new HighQualityProductMap());
DataStream<HighQualityProduct> reduce = map.keyBy(HighQualityProduct::getGroupField)
.timeWindow(Time.hours(5))
.reduce(new HighQualityProductReduce());
reduce.addSink(new HighQualityProductSink());
env.execute("high quality product");
}
}
浏览次数统计
创建一个浏览次数标签实体类
代码语言:javascript复制@Data
public class ScanTimes {
private Long productId;
private Long productTypeId;
private Long numbers;
private String dateTime;
private String groupField;
}
DateUnitl增加一个静态方法
代码语言:javascript复制public static Date getCurrentTime(Long visitTime) {
return new Date(visitTime);
}
一个ScanTimesMap实现了FlatMapFunction接口的转换类
代码语言:javascript复制public class ScanTimesMap implements FlatMapFunction<String,ScanTimes> {
@Override
public void flatMap(String value, Collector<ScanTimes> out) throws Exception {
ScanOpertor scanOpertor = JSONObject.parseObject(value,ScanOpertor.class);
ScanTimes scanTimes = new ScanTimes();
Long productId = scanOpertor.getProductId();
Long productTypeId = scanOpertor.getProductTypeId();
Long time = scanOpertor.getScanTime();
Date date = DateUntil.getCurrentTime(time);
String dateTime = DateUntil.transferDate(date,"yyyyMMdd");
scanTimes.setProductId(productId);
scanTimes.setProductTypeId(productTypeId);
scanTimes.setDateTime(dateTime);
String groupField = "ScanTimes==" productId "==" productTypeId
"==" dateTime;
scanTimes.setGroupField(groupField);
scanTimes.setNumbers(1L);
out.collect(scanTimes);
}
}
一个ScanTimesReduce实现了ReduceFunction接口的统计类
代码语言:javascript复制public class ScanTimesReduce implements ReduceFunction<ScanTimes> {
@Override
public ScanTimes reduce(ScanTimes value1, ScanTimes value2) throws Exception {
Long numbers1 = value1.getNumbers();
Long numbers2 = value2.getNumbers();
value1.setNumbers(numbers1 numbers2);
return value1;
}
}
一个ScanTimesSink实现了SinkFunction接口的存储类
代码语言:javascript复制public class ScanTimesSink implements SinkFunction<ScanTimes> {
private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();
@Override
public void invoke(ScanTimes value, Context context) throws Exception {
if (value != null) {
Map<String, String> data = new HashMap<>();
Long productId = value.getProductId();
Long productTypeId = value.getProductTypeId();
Long numbers = value.getNumbers();
String dateTime = value.getDateTime();
data.put("productId", productId "");
data.put("productTypeId", productTypeId "");
data.put("numbers", numbers "");
data.put("dateTime",dateTime);
String tablename = "scan_times";
Set<String> fields = new HashSet<>();
fields.add("productId");
fields.add("productTypeId");
fields.add("numbers");
clickUntil.saveData(tablename,data,fields);
}
}
}
然后是Flink的流处理
代码语言:javascript复制public class ScanTimesAnaly {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","127.0.0.1:9092");
properties.setProperty("group.id","portrait");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("scan",
new SimpleStringSchema(),properties);
DataStreamSource<String> data = env.addSource(myConsumer);
env.enableCheckpointing(5000);
DataStream<ScanTimes> map = data.flatMap(new ScanTimesMap());
DataStream<ScanTimes> reduct = map.keyBy(ScanTimes::getGroupField)
.timeWindow(Time.hours(5))
.reduce(new ScanTimesReduce());
reduct.addSink(new ScanTimesSink());
env.execute("scan times");
}
}