我们要使用的几个组件为Hadoop 2.6,HBase 1.0.0,MySQL 8,zookeeper 3.4.5,kafka 2.1.0,Flink 1.13,Canal 1.1.5。为了方便,这里都使用伪集群和单机安装。
Hadoop 2.6的简单安装
hadoop-env.sh
代码语言:javascript复制export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home
core-site.xml
代码语言:javascript复制<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://127.0.0.1:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/Users/admin/Downloads/hadoop2</value>
</property>
</configuration>
hdfs-site.xml
代码语言:javascript复制<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
</configuration>
在bin目录下执行
代码语言:javascript复制./hdfs namenode -format
在sbin目录下执行
代码语言:javascript复制./start-dfs.sh
访问地址
代码语言:javascript复制http://127.0.0.1:50070/
zookeeper 3.4.5安装
zoo.cfg
代码语言:javascript复制dataDir=/Users/admin/Downloads/zookeeper/data
在bin目录下执行
代码语言:javascript复制./zkServer.sh start
HBase 1.0.0安装
hbase-env.sh
代码语言:javascript复制export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home
hbase-site.xml
代码语言:javascript复制<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://127.0.0.1:9000/hbase</value>
</property>
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>localhost</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>
在bin目录下执行
代码语言:javascript复制./start-hbase.sh
访问地址
代码语言:javascript复制http://127.0.0.1:60010/
kafka 2.1.0安装
server.properties
代码语言:javascript复制log.dirs=/Users/admin/Downloads/kafka-logs
在bin目录下执行
代码语言:javascript复制./kafka-server-start.sh ../config/server.properties
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
MySQL 8 docker安装
新建文件夹,我这里为mysql-bin,新建文件my.cnf,内容如下
代码语言:javascript复制[client]
socket = /var/sock/mysqld/mysqld.sock
[mysql]
socket = /var/sock/mysqld/mysqld.sock
[mysqld]
skip-host-cache
skip-name-resolve
datadir = /var/lib/mysql
user = mysql
port = 3306
bind-address = 0.0.0.0
socket = /var/sock/mysqld/mysqld.sock
pid-file = /var/run/mysqld/mysqld.pid
general_log_file = /var/log/mysql/query.log
slow_query_log_file = /var/log/mysql/slow.log
log-error = /var/log/mysql/error.log
log-bin=mysql-bin
binlog-format=ROW
server-id=1
!includedir /etc/my.cnf.d/
!includedir /etc/mysql/conf.d/
!includedir /etc/mysql/docker-default.d/
启动命令
代码语言:javascript复制docker run -d --name mysql -e MYSQL_ROOT_PASSWORD=abcd123 -p 3306:3306 -v /Users/admin/Downloads/mysql-bin/my.cnf:/etc/my.cnf docker.io/cytopia/mysql-8.0
新建数据库protrait
新建表
代码语言:javascript复制DROP TABLE IF EXISTS `user_info`;
CREATE TABLE `user_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account` varchar(255) DEFAULT NULL,
`password` varchar(255) DEFAULT NULL,
`sex` varchar(255) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
`phone` varchar(255) DEFAULT NULL,
`status` int(255) DEFAULT NULL COMMENT '会员状态,0、普通会员,1、白银会员,2、黄金会员',
`wechat_account` varchar(255) DEFAULT NULL,
`zhifubao_account` varchar(255) DEFAULT NULL,
`email` varchar(255) DEFAULT NULL,
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4;
SET FOREIGN_KEY_CHECKS = 1;
Canal 1.1.5安装
canal.properties
代码语言:javascript复制canal.zkServers = 127.0.0.1:2181
canal.serverMode = kafka
在conf/example目录下的instance.properties
代码语言:javascript复制canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=abcd123
canal.instance.defaultDatabaseName=portrait
canal.mq.topic=test
在bin目录下执行
代码语言:javascript复制./startup.sh
此时当我们在数据库中插入一条数据的时候
代码语言:javascript复制insert into user_info (account,password,sex,age,phone,status,wechat_account,zhifubao_account,email,create_time,update_time)
values ('abcd','1234','男',24,'13873697762',0,'火名之月','abstart','981456@qq.com','2021-09-10','2021-10-11')
在kafka的消费端查看为
代码语言:javascript复制[2021-11-05 15:13:05,173] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
{
"data":[
{
"id":"8",
"account":"abcd",
"password":"1234",
"sex":"男",
"age":"24",
"phone":"13873697762",
"status":"0",
"wechat_account":"火名之月",
"zhifubao_account":"abstart",
"email":"981456@qq.com",
"create_time":"2021-09-10 00:00:00",
"update_time":"2021-10-11 00:00:00"
}
],
"database":"portrait",
"es":1636096762000,
"id":11,
"isDdl":false,
"mysqlType":{
"id":"bigint(0)",
"account":"varchar(255)",
"password":"varchar(255)",
"sex":"varchar(255)",
"age":"int(0)",
"phone":"varchar(255)",
"status":"int(255)",
"wechat_account":"varchar(255)",
"zhifubao_account":"varchar(255)",
"email":"varchar(255)",
"create_time":"datetime(0)",
"update_time":"datetime(0)"
},
"old":null,
"pkNames":[
"id"
],
"sql":"",
"sqlType":{
"id":-5,
"account":12,
"password":12,
"sex":12,
"age":4,
"phone":12,
"status":4,
"wechat_account":12,
"zhifubao_account":12,
"email":12,
"create_time":93,
"update_time":93
},
"table":"user_info",
"ts":1636096762605,
"type":"INSERT"
}
Flink流式处理消息
Java版依赖,有关Flink的详细内容请参考Flink技术整理 ,由于这里使用的是1.13.0,而之前使用的是1.7.2,有一些API已经不可用了。
代码语言:javascript复制<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.13.0</flink.version>
<alink.version>1.4.0</alink.version>
<fastjson.version>1.2.74</fastjson.version>
<java.version>1.8</java.version>
<scala.version>2.11.12</scala.version>
<hadoop.version>2.6.0</hadoop.version>
<hbase.version>1.0.0</hbase.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
代码语言:javascript复制<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba.alink</groupId>
<artifactId>alink_core_flink-1.13_2.11</artifactId>
<version>${alink.version}</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.11</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.16</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
</dependencies>
我们先使用Flink来读取Kafka消息
代码语言:javascript复制public class Test {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","127.0.0.1:9092");
properties.setProperty("group.id","portrait");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("test",
new SimpleStringSchema(),properties);
DataStreamSource<String> data = env.addSource(myConsumer);
env.enableCheckpointing(5000);
data.print();
env.execute("portrait test");
}
}
运行结果
代码语言:javascript复制16:39:42,070 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-21, groupId=portrait] Discovered group coordinator admindembp.lan:9092 (id: 2147483647 rack: null)
15> {"data":[{"id":"8","account":"abcd","password":"1234","sex":"男","age":"24","phone":"13873697762","status":"0","wechat_account":"火名之月","zhifubao_account":"abstart","email":"981456@qq.com","create_time":"2021-09-10 00:00:00","update_time":"2021-10-11 00:00:00"}],"database":"portrait","es":1636096762000,"id":11,"isDdl":false,"mysqlType":{"id":"bigint(0)","account":"varchar(255)","password":"varchar(255)","sex":"varchar(255)","age":"int(0)","phone":"varchar(255)","status":"int(255)","wechat_account":"varchar(255)","zhifubao_account":"varchar(255)","email":"varchar(255)","create_time":"datetime(0)","update_time":"datetime(0)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"account":12,"password":12,"sex":12,"age":4,"phone":12,"status":4,"wechat_account":12,"zhifubao_account":12,"email":12,"create_time":93,"update_time":93},"table":"user_info","ts":1636096762605,"type":"INSERT"}
现在我们将该数据解析并存储到HBase中
新建一个UserInfo的实体类
代码语言:javascript复制@Data
@ToString
public class UserInfo {
private Long id;
private String account;
private String password;
private String sex;
private Integer age;
private String phone;
private Integer status;
private String wechatAccount;
private String zhifubaoAccount;
private String email;
private Date createTime;
private Date updateTime;
}
一个HBase工具类
代码语言:javascript复制@Slf4j
public class HbaseUtil {
private static Admin admin = null;
private static Connection conn = null;
static {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.rootdir","hdfs://127.0.0.1:9000/hbase");
conf.set("hbase.zookeeper.quorum","127.0.0.1");
conf.set("hbase.client.scanner.timeout.period","600000");
conf.set("hbase.rpc.timeout","600000");
try {
conn = ConnectionFactory.createConnection(conf);
admin = conn.getAdmin();
}catch (IOException e) {
e.printStackTrace();
}
}
public static void createTable(String tableName,String famliyname) throws IOException {
HTableDescriptor tab = new HTableDescriptor(tableName);
HColumnDescriptor colDesc = new HColumnDescriptor(famliyname);
tab.addFamily(colDesc);
admin.createTable(tab);
log.info("over");
}
public static void put(String tablename, String rowkey, String famliyname, Map<String,String> datamap) throws IOException {
Table table = conn.getTable(TableName.valueOf(tablename));
byte[] rowkeybyte = Bytes.toBytes(rowkey);
Put put = new Put(rowkeybyte);
if (datamap != null) {
Set<Map.Entry<String,String>> set = datamap.entrySet();
for (Map.Entry<String,String> entry : set) {
String key = entry.getKey();
Object value = entry.getValue();
put.addColumn(Bytes.toBytes(famliyname),Bytes.toBytes(key),
Bytes.toBytes(value ""));
}
}
table.put(put);
table.close();
log.info("OK");
}
public static String getdata(String tablename,String rowkey,
String famliyname,String colmn) throws IOException {
Table table = conn.getTable(TableName.valueOf(tablename));
byte[] rowkeybyte = Bytes.toBytes(rowkey);
Get get = new Get(rowkeybyte);
Result result = table.get(get);
byte[] resultbytes = result.getValue(famliyname.getBytes(),colmn.getBytes());
if (resultbytes == null) {
return null;
}
return new String(resultbytes);
}
public static void putdata(String tablename,String rowkey,
String famliyname,String colum,
String data) throws IOException {
Table table = conn.getTable(TableName.valueOf(tablename));
Put put = new Put(rowkey.getBytes());
put.addColumn(famliyname.getBytes(),colum.getBytes(),data.getBytes());
table.put(put);
}
public static void main(String[] args) throws IOException {
// createTable("testinfo","time");
putdata("testinfo","1","time","info","ty");
// Map<String,String> datamap = new HashMap<>();
// datamap.put("info1","ty1");
// datamap.put("info2","ty2");
// put("testinfo","2","time",datamap);
String result = getdata("testinfo","1","time","info");
log.info(result);
}
}
在HBase的bin目录下执行
代码语言:javascript复制./hbase shell
create "user_info","info"
代码语言:javascript复制@Slf4j
public class TranferAnaly {
@SuppressWarnings("unchecked")
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","127.0.0.1:9092");
properties.setProperty("group.id","portrait");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("test",
new SimpleStringSchema(),properties);
DataStreamSource<String> data = env.addSource(myConsumer);
env.enableCheckpointing(5000);
DataStream<String> map = data.map(s -> {
JSONObject jsonObject = JSONObject.parseObject(s);
String type = jsonObject.getString("type");
String table = jsonObject.getString("table");
String database = jsonObject.getString("database");
String data1 = jsonObject.getString("data");
List<UserInfo> list = JSONObject.parseArray(data1,UserInfo.class);
log.info(list.toString());
for (UserInfo userInfo : list) {
String tablename = table;
String rowkey = userInfo.getId() "";
String famliyname = "info";
Map<String,String> datamap = JSONObject.parseObject(JSONObject.toJSONString(userInfo),Map.class);
datamap.put("database",database);
datamap.put("typebefore",HbaseUtil.getdata(tablename,rowkey,famliyname,"typecurrent"));
datamap.put("typecurrent",type);
HbaseUtil.put(tablename,rowkey,famliyname,datamap);
}
return null;
});
// map.print();
env.execute("portrait test");
}
}
在HBase中查询,即为
代码语言:javascript复制scan 'user_info'
ROW COLUMN CELL
12 column=info:account, timestamp=1636105093607, value=abcd
12 column=info:age, timestamp=1636105093607, value=24
12 column=info:createTime, timestamp=1636105093607, value=1631203200000
12 column=info:database, timestamp=1636105093607, value=portrait
12 column=info:email, timestamp=1636105093607, value=981456@qq.com
12 column=info:id, timestamp=1636105093607, value=12
12 column=info:password, timestamp=1636105093607, value=1234
12 column=info:phone, timestamp=1636105093607, value=13873697762
12 column=info:sex, timestamp=1636105093607, value=xE7x94xB7
12 column=info:status, timestamp=1636105093607, value=0
12 column=info:typebefore, timestamp=1636105093607, value=null
12 column=info:typecurrent, timestamp=1636105093607, value=INSERT
12 column=info:updateTime, timestamp=1636105093607, value=1633881600000
12 column=info:wechatAccount, timestamp=1636105093607, value=xE7x81xABxE5x90x8DxE4xB9x8BxE6x9Cx88
12 column=info:zhifubaoAccount, timestamp=1636105093607, value=abstart
1 row(s) in 0.0110 seconds
现在我们再将数据传递出去
在kafka的bin目录下执行,建立一个新的topic
代码语言:javascript复制./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic user_info
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic user_info
新增加一个Kafka工具类
代码语言:javascript复制@Slf4j
public class KafkaUtil {
private static Properties getProps() {
Properties props = new Properties();
props.put("bootstrap.servers","127.0.0.1:9092");
props.put("acks","all");
props.put("retries",2);
props.put("linger.ms",1000);
props.put("client.id","producer-syn-1");
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
return props;
}
public static void sendData(String topicName,String data) throws ExecutionException, InterruptedException {
KafkaProducer<String,String> producer = new KafkaProducer<>(getProps());
ProducerRecord<String,String> record = new ProducerRecord<>(topicName,data);
Future<RecordMetadata> metadataFuture = producer.send(record);
RecordMetadata recordMetadata = metadataFuture.get();
log.info("topic:" recordMetadata.topic());
log.info("partition:" recordMetadata.partition());
log.info("offset:" recordMetadata.offset());
}
}
然后将消息发送出去
代码语言:javascript复制@Slf4j
public class TranferAnaly {
@SuppressWarnings("unchecked")
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","127.0.0.1:9092");
properties.setProperty("group.id","portrait");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("test",
new SimpleStringSchema(),properties);
DataStreamSource<String> data = env.addSource(myConsumer);
env.enableCheckpointing(5000);
DataStream<String> map = data.map(s -> {
JSONObject jsonObject = JSONObject.parseObject(s);
String type = jsonObject.getString("type");
String table = jsonObject.getString("table");
String database = jsonObject.getString("database");
String data1 = jsonObject.getString("data");
List<UserInfo> list = JSONObject.parseArray(data1,UserInfo.class);
List<Map<String,String>> listdata = new ArrayList<>();
log.info(list.toString());
for (UserInfo userInfo : list) {
String tablename = table;
String rowkey = userInfo.getId() "";
String famliyname = "info";
Map<String,String> datamap = JSONObject.parseObject(JSONObject.toJSONString(userInfo),Map.class);
datamap.put("database",database);
datamap.put("typebefore",HbaseUtil.getdata(tablename,rowkey,famliyname,"typecurrent"));
datamap.put("typecurrent",type);
datamap.put("tablename",table);
HbaseUtil.put(tablename,rowkey,famliyname,datamap);
listdata.add(datamap);
}
return JSONObject.toJSONString(listdata);
});
map.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
List<Map> data = JSONObject.parseArray(value,Map.class);
for (Map<String,String> map : data) {
String tablename = map.get("tablename");
KafkaUtil.sendData(tablename,JSONObject.toJSONString(map));
}
}
});
env.execute("portrait test");
}
}
查看Kafka消费端
代码语言:javascript复制[2021-11-05 20:11:26,869] INFO [GroupCoordinator 0]: Assignment received from leader for group console-consumer-47692 for generation 1 (kafka.coordinator.group.GroupCoordinator)
{"wechatAccount":"火名之月","sex":"男","zhifubaoAccount":"abstart","updateTime":1633881600000,"password":"1234","database":"portrait","createTime":1631203200000,"phone":"13873697762","typecurrent":"INSERT","id":15,"tablename":"user_info","account":"abcd","age":24,"email":"981456@qq.com","status":0}
创建用户画像Years标签
创建一个年代标签实体类
代码语言:javascript复制@Data
public class Years {
private Long userid;
private String yearsFlag;
private Long numbers = 0L;
private String groupField;
}
创建一个YearsUntil工具类
代码语言:javascript复制public class YearsUntil {
public static String getYears(Integer age) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(new Date());
calendar.add(Calendar.YEAR,-age);
Date newDate = calendar.getTime();
DateFormat dateFormat = new SimpleDateFormat("yyyy");
String newDateString = dateFormat.format(newDate);
Integer newDateInteger = Integer.parseInt(newDateString);
String yearBaseType = "未知";
if (newDateInteger >= 1940 && newDateInteger < 1950) {
yearBaseType = "40后";
}else if (newDateInteger >= 1950 && newDateInteger < 1960) {
yearBaseType = "50后";
}else if (newDateInteger >= 1960 && newDateInteger < 1970) {
yearBaseType = "60后";
}else if (newDateInteger >= 1970 && newDateInteger < 1980) {
yearBaseType = "70后";
}else if (newDateInteger >= 1980 && newDateInteger < 1990) {
yearBaseType = "80后";
}else if (newDateInteger >= 1990 && newDateInteger < 2000) {
yearBaseType = "90后";
}else if (newDateInteger >= 2000 && newDateInteger < 2010) {
yearBaseType = "00后";
}else if (newDateInteger >= 2010 && newDateInteger < 2020) {
yearBaseType = "10后";
}
return yearBaseType;
}
}
创建一个ClickUntil接口
代码语言:javascript复制public interface ClickUntil {
void saveData(String tablename,Map<String,String> data);
}
一个实现类
代码语言:javascript复制public class DefaultClickUntil implements ClickUntil {
private static ClickUntil instance = new DefaultClickUntil();
public static ClickUntil createInstance() {
return instance;
}
private DefaultClickUntil() {
}
@Override
public void saveData(String tablename, Map<String, String> data) {
}
}
此处我们不实现接口方法,后续会有其他实现类来代替。
一个ClickUntilFactory工厂类
代码语言:javascript复制public class ClickUntilFactory {
public static ClickUntil createClickUntil() {
return DefaultClickUntil.createInstance();
}
}
一个DateUntil工具类
代码语言:javascript复制public class DateUntil {
public static String getByInterMinute(String timeInfo) {
Long timeMillons = Long.parseLong(timeInfo);
Date date = new Date(timeMillons);
DateFormat dateFormatMinute = new SimpleDateFormat("mm");
DateFormat dateFormatHour = new SimpleDateFormat("yyyyMMddHH");
String minute = dateFormatMinute.format(date);
String hour = dateFormatHour.format(date);
Long minuteLong = Long.parseLong(minute);
String replaceMinute = "";
if (minuteLong >= 0 && minuteLong < 5) {
replaceMinute = "05";
}else if (minuteLong >= 5 && minuteLong < 10) {
replaceMinute = "10";
}else if (minuteLong >= 10 && minuteLong < 15) {
replaceMinute = "15";
}else if (minuteLong >= 15 && minuteLong < 20) {
replaceMinute = "20";
}else if (minuteLong >= 20 && minuteLong < 25) {
replaceMinute = "25";
}else if (minuteLong >= 25 && minuteLong < 30) {
replaceMinute = "30";
}else if (minuteLong >= 30 && minuteLong < 35) {
replaceMinute = "35";
}else if (minuteLong >= 35 && minuteLong < 40) {
replaceMinute = "40";
}else if (minuteLong >= 40 && minuteLong < 45) {
replaceMinute = "45";
}else if (minuteLong >= 45 && minuteLong < 50) {
replaceMinute = "50";
}else if (minuteLong >= 50 && minuteLong < 55) {
replaceMinute = "55";
}else if (minuteLong >= 55 && minuteLong < 60) {
replaceMinute = "60";
}
return hour replaceMinute;
}
public static Long getCurrentFiveMinuteInterStart(Long visitTime) throws ParseException {
String timeString = getByInterMinute(visitTime "");
DateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm");
Date date = dateFormat.parse(timeString);
return date.getTime();
}
}
一个YearsAnalyMap的实现MapFunction接口的转换类
代码语言:javascript复制public class YearsAnalyMap implements MapFunction<String,Years> {
private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();
@Override
@SuppressWarnings("unchecked")
public Years map(String s) throws Exception {
Map<String,String> datamap = JSONObject.parseObject(s,Map.class);
String typecurrent = datamap.get("typecurrent");
Years years = new Years();
if (typecurrent.equals("INSERT")) {
UserInfo userInfo = JSONObject.parseObject(s,UserInfo.class);
String yearLabel = YearsUntil.getYears(userInfo.getAge());
Map<String,String> mapdata = new HashMap<>();
mapdata.put("userid",userInfo.getId() "");
mapdata.put("yearlabel",yearLabel);
clickUntil.saveData("user_info",mapdata);
String fiveMinute = DateUntil.getByInterMinute(System.currentTimeMillis() "");
String groupField = "yearlable==" fiveMinute "==" yearLabel;
Long numbers = 1L;
years.setGroupField(groupField);
years.setNumbers(numbers);
}
return years;
}
}
最后是用户画像的年份标签的Flink流处理
代码语言:javascript复制public class YearsAnaly {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","127.0.0.1:9092");
properties.setProperty("group.id","portrait");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("user_info",
new SimpleStringSchema(),properties);
DataStreamSource<String> data = env.addSource(myConsumer);
env.enableCheckpointing(5000);
data.map(new YearsAnalyMap());
}
}
现在我们将年份标签每5分钟进行一次汇总统计数量,并进行存储(Sink)。
新增一个YearsAnalyReduce实现了ReduceFunction接口的统计类
代码语言:javascript复制public class YearsAnalyReduce implements ReduceFunction<Years> {
@Override
public Years reduce(Years years, Years t1) throws Exception {
Long numbers1 = 0L;
String groupField = "";
if (years != null) {
numbers1 = years.getNumbers();
groupField = years.getGroupField();
}
Long numbers2 = 0L;
if (t1 != null) {
numbers2 = t1.getNumbers();
groupField = t1.getGroupField();
}
if (StringUtils.isNotBlank(groupField)) {
Years years1 = new Years();
years1.setGroupField(groupField);
years1.setNumbers(numbers1 numbers2);
return years1;
}
return null;
}
}
一个YearsAnalySink实现了SinkFunction接口的存储类
代码语言:javascript复制public class YearsAnalySink implements SinkFunction<Years> {
private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();
@Override
public void invoke(Years value, Context context) throws Exception {
if (value != null) {
String groupField = value.getGroupField();
String[] groupFields = groupField.split("==");
String timeinfo = groupFields[1];
String yearlabel = groupFields[2];
Long numbers = value.getNumbers();
String tablename = "yearslabel_info";
Map<String, String> dataMap = new HashMap<>();
dataMap.put("timeinfo",timeinfo);
dataMap.put("yearslabel",yearlabel);
dataMap.put("numbers",numbers "");
clickUntil.saveData(tablename,dataMap);
}
}
}
然后是Flink流处理
代码语言:javascript复制public class YearsAnaly {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","127.0.0.1:9092");
properties.setProperty("group.id","portrait");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("user_info",
new SimpleStringSchema(),properties);
DataStreamSource<String> data = env.addSource(myConsumer);
env.enableCheckpointing(5000);
DataStream<Years> map = data.map(new YearsAnalyMap());
DataStream<Years> reduce = map.keyBy(Years::getGroupField).timeWindowAll(Time.minutes(5))
.reduce(new YearsAnalyReduce());
reduce.addSink(new YearsAnalySink());
env.execute("portrait years");
}
}
这么做是为了看看不同的时间段内用户的年代标签会产生什么样的变化。
创建用户画像手机运营商标签
创建一个手机运营商工具类CarrierUntil
代码语言:javascript复制public class CarrierUntil {
/**
* 中国电信号码格式验证 手机段: 133,153,180,181,189,177,1700,173,199
**/
private static final String CHINA_TELECOM_PATTERN = "(^1(33|53|77|73|99|8[019])\d{8}$)|(^1700\d{7}$)";
/**
* 中国联通号码格式验证 手机段:130,131,132,155,156,185,186,145,176,1709
**/
private static final String CHINA_UNICOM_PATTERN = "(^1(3[0-2]|4[5]|5[56]|7[6]|8[56])\d{8}$)|(^1709\d{7}$)";
/**
* 中国移动号码格式验证
* 手机段:134,135,136,137,138,139,150,151,152,157,158,159,182,183,184,187,188,147,178,1705
**/
private static final String CHINA_MOBILE_PATTERN = "(^1(3[4-9]|4[7]|5[0-27-9]|7[8]|8[2-478])\d{8}$)|(^1705\d{7}$)";
/**
* 0、未知 1、移动 2、联通 3、电信
* @param telphone
* @return
*/
public static Integer getCarrierByTel(String telphone) {
boolean b1 = StringUtils.isNotBlank(telphone) && match(CHINA_MOBILE_PATTERN, telphone);
if (b1) {
return 1;
}
b1 = StringUtils.isNotBlank(telphone) && match(CHINA_UNICOM_PATTERN, telphone);
if (b1) {
return 2;
}
b1 = StringUtils.isNotBlank(telphone) && match(CHINA_TELECOM_PATTERN, telphone);
if (b1) {
return 3;
}
return 0;
}
private static boolean match(String regex, String tel) {
return Pattern.matches(regex, tel);
}
}
一个运营商标签实体类
代码语言:javascript复制@Data
public class Carrier {
private Long userid;
private String carrierName;
private Long numbers = 0L;
private String groupField;
}
一个CarrierAnalyMap实现了MapFunction接口的转换类
代码语言:javascript复制public class CarrierAnalyMap implements MapFunction<String,Carrier> {
private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();
@Override
@SuppressWarnings("unchecked")
public Carrier map(String s) throws Exception {
Map<String,String> datamap = JSONObject.parseObject(s,Map.class);
String typecurrent = datamap.get("typecurrent");
Carrier carrier = new Carrier();
if (typecurrent.equals("INSERT")) {
UserInfo userInfo = JSONObject.parseObject(s,UserInfo.class);
String telphone = userInfo.getPhone();
Integer carrierInteger = CarrierUntil.getCarrierByTel(telphone);
String carrierLabel = "";
switch (carrierInteger) {
case 0:
carrierLabel = "未知";
break;
case 1:
carrierLabel = "移动";
break;
case 2:
carrierLabel = "联通";
break;
case 3:
carrierLabel = "电信";
break;
default:
break;
}
Map<String,String> mapdata = new HashMap<>();
mapdata.put("userid",userInfo.getId() "");
mapdata.put("carrierlabel",carrierLabel);
clickUntil.saveData("user_info",mapdata);
String fiveMinute = DateUntil.getByInterMinute(System.currentTimeMillis() "");
String groupField = "carrierlabel==" fiveMinute "==" carrierLabel;
Long numbers = 1L;
carrier.setGroupField(groupField);
carrier.setNumbers(numbers);
}
return carrier;
}
}
一个CarrierAnalyReduce实现了ReduceFunction接口的统计类
代码语言:javascript复制public class CarrierAnalyReduce implements ReduceFunction<Carrier> {
@Override
public Carrier reduce(Carrier carrier, Carrier t1) throws Exception {
Long numbers1 = 0L;
String groupField = "";
if (carrier != null) {
numbers1 = carrier.getNumbers();
groupField = carrier.getGroupField();
}
Long numbers2 = 0L;
if (t1 != null) {
numbers2 = t1.getNumbers();
groupField = t1.getGroupField();
}
if (StringUtils.isNotBlank(groupField)) {
Carrier carrier1 = new Carrier();
carrier1.setGroupField(groupField);
carrier1.setNumbers(numbers1 numbers2);
return carrier1;
}
return null;
}
}
一个CarrierAnalySink实现了SinkFunction接口的存储类
代码语言:javascript复制public class CarrierAnalySink implements SinkFunction<Carrier> {
private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();
@Override
public void invoke(Carrier value, Context context) throws Exception {
if (value != null) {
String groupField = value.getGroupField();
String[] groupFields = groupField.split("==");
String timeinfo = groupFields[1];
String carrierLabel = groupFields[2];
Long numbers = value.getNumbers();
String tablename = "carrierlabel_info";
Map<String, String> dataMap = new HashMap<>();
dataMap.put("timeinfo",timeinfo);
dataMap.put("carrierlabel",carrierLabel);
dataMap.put("numbers",numbers "");
clickUntil.saveData(tablename,dataMap);
}
}
}
然后是Flink的流处理
代码语言:javascript复制public class CarrierAnaly {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","127.0.0.1:9092");
properties.setProperty("group.id","portrait");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("user_info",
new SimpleStringSchema(),properties);
DataStreamSource<String> data = env.addSource(myConsumer);
env.enableCheckpointing(5000);
DataStream<Carrier> map = data.map(new CarrierAnalyMap());
DataStream<Carrier> reduce = map.keyBy(Carrier::getGroupField).timeWindowAll(Time.minutes(5))
.reduce(new CarrierAnalyReduce());
reduce.addSink(new CarrierAnalySink());
env.execute("portrait carrier");
}
}
创建用户画像会员分类标签
会员标签实体类
代码语言:javascript复制@Data
public class Member {
private Long userid;
private String memberFlag;
private Long numbers = 0L;
private String groupField;
}
一个MemberAnalyMap实现了MapFunction接口的转换类
代码语言:javascript复制public class MemberAnalyMap implements MapFunction<String,Member> {
private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();
@Override
@SuppressWarnings("unchecked")
public Member map(String s) throws Exception {
Map<String,String> datamap = JSONObject.parseObject(s,Map.class);
String typecurrent = datamap.get("typecurrent");
Member member = new Member();
if (typecurrent.equals("INSERT")) {
UserInfo userInfo = JSONObject.parseObject(s,UserInfo.class);
Integer memberInteger = userInfo.getStatus();
String memberLabel = "";
switch (memberInteger) {
case 0:
memberLabel = "普通会员";
break;
case 1:
memberLabel = "白银会员";
break;
case 2:
memberLabel = "黄金会员";
break;
default:
break;
}
Map<String,String> mapdata = new HashMap<>();
mapdata.put("userid",userInfo.getId() "");
mapdata.put("memberlabel",memberLabel);
clickUntil.saveData("user_info",mapdata);
String fiveMinute = DateUntil.getByInterMinute(System.currentTimeMillis() "");
String groupField = "memberlable==" fiveMinute "==" memberLabel;
Long numbers = 1L;
member.setGroupField(groupField);
member.setNumbers(numbers);
}
return member;
}
}
一个MemberAnalyReduce实现了ReduceFunction接口的统计类
代码语言:javascript复制public class MemberAnalyReduce implements ReduceFunction<Member> {
@Override
public Member reduce(Member member, Member t1) throws Exception {
Long numbers1 = 0L;
String groupField = "";
if (member != null) {
numbers1 = member.getNumbers();
groupField = member.getGroupField();
}
Long numbers2 = 0L;
if (t1 != null) {
numbers2 = t1.getNumbers();
groupField = t1.getGroupField();
}
if (StringUtils.isNotBlank(groupField)) {
Member member1 = new Member();
member1.setGroupField(groupField);
member1.setNumbers(numbers1 numbers2);
return member1;
}
return null;
}
}
一个MemberAnalySink实现了SinkFunction接口的存储类
代码语言:javascript复制public class MemberAnalySink implements SinkFunction<Member> {
private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();
@Override
public void invoke(Member value, Context context) throws Exception {
if (value != null) {
String groupField = value.getGroupField();
String[] groupFields = groupField.split("==");
String timeinfo = groupFields[1];
String memberLabel = groupFields[2];
Long numbers = value.getNumbers();
String tablename = "memberlabel_info";
Map<String, String> dataMap = new HashMap<>();
dataMap.put("timeinfo",timeinfo);
dataMap.put("memberlabel",memberLabel);
dataMap.put("numbers",numbers "");
clickUntil.saveData(tablename,dataMap);
}
}
}
然后是Flink流处理
代码语言:javascript复制public class MemberAnaly {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","127.0.0.1:9092");
properties.setProperty("group.id","portrait");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("user_info",
new SimpleStringSchema(),properties);
DataStreamSource<String> data = env.addSource(myConsumer);
env.enableCheckpointing(5000);
DataStream<Member> map = data.map(new MemberAnalyMap());
DataStream<Member> reduce = map.keyBy(Member::getGroupField).timeWindowAll(Time.minutes(5))
.reduce(new MemberAnalyReduce());
reduce.addSink(new MemberAnalySink());
env.execute("portrait member");
}
}
用户画像行为特征
这里我们会分析用户的几个行为,并进行画像
- 浏览商品行为:频道id、商品id、商品类别id、浏览时间、停留时间、用户id、终端类别(1、PC端,2、微信小程序,3、app)、deviceId。
- 收藏商品行为:频道id、商品id、商品类别id、操作时间、操作类型(收藏,取消)、用户id、终端类别(1、PC端,2、微信小程序,3、app)
- 购物车行为:频道id、商品id、商品类别id、操作时间、操作类型(加入,取消)、用户id、终端类别(1、PC端,2、微信小程序,3、app)
- 关注商品行为:频道id、商品id、商品类别id、操作时间、操作类型(关注,取消)、用户id、终端类别(1、PC端,2、微信小程序,3、app)
定义四种行为的实体类
代码语言:javascript复制/**
* 浏览操作
*/
@Data
public class ScanOpertor {
/**
* 频道id
*/
private Long channelId;
/**
* 商品类型id
*/
private Long productTypeId;
/**
* 商品id
*/
private Long productId;
/**
* 浏览时间
*/
private Long scanTime;
/**
* 停留时间
*/
private Long stayTime;
/**
* 用户id
*/
private Long userId;
/**
* 终端类别
*/
private Integer deviceType;
/**
* 终端id
*/
private String deviceId;
}
代码语言:javascript复制/**
* 收藏操作
*/
@Data
public class CollectOpertor {
/**
* 频道id
*/
private Long channelId;
/**
* 商品类型id
*/
private Long productTypeId;
/**
* 商品id
*/
private Long productId;
/**
* 操作时间
*/
private Long opertorTime;
/**
* 操作类型
*/
private Integer opertorType;
/**
* 用户id
*/
private Long userId;
/**
* 终端类别
*/
private Integer deviceType;
/**
* 终端id
*/
private String deviceId;
}
代码语言:javascript复制/**
* 购物车操作
*/
@Data
public class CartOpertor {
/**
* 频道id
*/
private Long channelId;
/**
* 商品类型id
*/
private Long productTypeId;
/**
* 商品id
*/
private Long productId;
/**
* 操作时间
*/
private Long opertorTime;
/**
* 操作类型
*/
private Integer opertorType;
/**
* 用户id
*/
private Long userId;
/**
* 终端类别
*/
private Integer deviceType;
/**
* 终端id
*/
private String deviceId;
}
代码语言:javascript复制/**
* 关注操作
*/
@Data
public class AttentionOpertor {
/**
* 频道id
*/
private Long channelId;
/**
* 商品类型id
*/
private Long productTypeId;
/**
* 商品id
*/
private Long productId;
/**
* 操作时间
*/
private Long opertorTime;
/**
* 操作类型
*/
private Integer opertorType;
/**
* 用户id
*/
private Long userId;
/**
* 终端类别
*/
private Integer deviceType;
/**
* 终端id
*/
private String deviceId;
}
在Kafka的bin目录下执行
代码语言:javascript复制./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic scan
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic collection
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cart
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic attention
新建一个商品表
代码语言:javascript复制DROP TABLE IF EXISTS `product`;
CREATE TABLE `product` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`product_type_id` bigint(20) DEFAULT NULL,
`product_name` varchar(255) DEFAULT NULL,
`product_title` varchar(255) DEFAULT NULL,
`product_price` decimal(28,10) DEFAULT NULL,
`merchant_id` bigint(20) DEFAULT NULL,
`create_time` datetime DEFAULT NULL,
`update_time` datetime DEFAULT NULL,
`product_place` varchar(255) DEFAULT NULL,
`product_brand` varchar(255) DEFAULT NULL,
`product_weight` decimal(28,10) DEFAULT NULL,
`product_specification` varchar(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
对应实体类
代码语言:javascript复制@Data
public class Product {
private Long id;
private Long productTypeId;
private String productName;
private String productTitle;
private BigDecimal productPrice;
private Long merchantId;
private Date creteTime;
private Date updateTime;
private String productPlace;
private String productBrand;
private Double productWeight;
private String productSpecification;
}
一个商品类型表
代码语言:javascript复制DROP TABLE IF EXISTS `product_type`;
CREATE TABLE `product_type` (
`id` bigint(20) NOT NULL,
`product_type_name` varchar(255) DEFAULT NULL,
`product_type_desc` varchar(255) DEFAULT NULL,
`product_type_parent_id` bigint(20) DEFAULT NULL,
`product_type_level` int(11) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
对应实体类
代码语言:javascript复制@Data
public class ProductType {
private Long id;
private String productTypeName;
private String productTypeDesc;
private Long productTypeParentId;
private Integer productTypeLevel;
}
一个订单表
代码语言:javascript复制DROP TABLE IF EXISTS `order`;
CREATE TABLE `order` (
`id` bigint(20) NOT NULL,
`amount` decimal(28,10) DEFAULT NULL,
`user_id` bigint(20) DEFAULT NULL,
`product_id` bigint(20) DEFAULT NULL,
`merchant_id` bigint(20) DEFAULT NULL,
`create_time` datetime DEFAULT NULL,
`pay_time` datetime DEFAULT NULL,
`pay_status` int(11) DEFAULT NULL COMMENT '0、未支付,1、已支付,2、已退款',
`address` varchar(1000) DEFAULT NULL,
`telphone` varchar(255) DEFAULT NULL,
`username` varchar(255) DEFAULT NULL,
`trade_number` varchar(255) DEFAULT NULL,
`pay_type` int(255) DEFAULT NULL COMMENT '0、支付宝,1、银联,2、微信',
`number` int(11) DEFAULT NULL,
`order_status` int(255) DEFAULT NULL COMMENT '0、已提交,1、已支付,2、已取消,3、已删除',
`update_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
对应实体类
代码语言:javascript复制@Data
public class Order {
private Long id;
private BigDecimal amount;
private Long userId;
private Long productId;
private Long merchantId;
private Date createTime;
private Date payTime;
private Integer payStatus;
private String address;
private String telphone;
private String username;
private String tradeNumber;
private Integer payType;
private Integer number;
private Integer orderStatus;
private Date updateTime;
}
在HBase bin目录下执行
代码语言:javascript复制./hbase shell
create "product","info"
create "product_type","info"
create "order","info"
在Kafka的bin目录下执行
代码语言:javascript复制./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic product
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic product_type
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic order
由于有多个表,所以TranferAnaly修改如下
代码语言:javascript复制@Slf4j
public class TranferAnaly {
@SuppressWarnings("unchecked")
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","127.0.0.1:9092");
properties.setProperty("group.id","portrait");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("test",
new SimpleStringSchema(),properties);
DataStreamSource<String> data = env.addSource(myConsumer);
env.enableCheckpointing(5000);
DataStream<String> map = data.map(s -> {
JSONObject jsonObject = JSONObject.parseObject(s);
String type = jsonObject.getString("type");
String table = jsonObject.getString("table");
String database = jsonObject.getString("database");
String data1 = jsonObject.getString("data");
JSONArray jsonArray = JSONObject.parseArray(data1);
List<Map<String,String>> listdata = new ArrayList<>();
for (int i = 0; i < jsonArray.size(); i ) {
JSONObject jsonObject1 = jsonArray.getJSONObject(i);
String tablename = table;
String rowkey = jsonObject1.getString("id");
String famliyname = "info";
Map<String,String> datamap = JSONObject.parseObject(JSONObject.toJSONString(jsonObject1),Map.class);
datamap.put("database",database);
datamap.put("typebefore",HbaseUtil.getdata(tablename,rowkey,famliyname,"typecurrent"));
datamap.put("typecurrent",type);
datamap.put("tablename",table);
HbaseUtil.put(tablename,rowkey,famliyname,datamap);
listdata.add(datamap);
}
return JSONObject.toJSONString(listdata);
});
map.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value, Context context) throws Exception {
List<Map> data = JSONObject.parseArray(value,Map.class);
for (Map<String,String> map : data) {
String tablename = map.get("tablename");
KafkaUtil.sendData(tablename,JSONObject.toJSONString(map));
}
}
});
env.execute("portrait tranfer");
}
}
新建一个SpringBoot项目来进行业务数据收集
依赖
代码语言:javascript复制<properties>
<java.version>1.8</java.version>
<fastjson.version>1.2.74</fastjson.version>
</properties>
代码语言:javascript复制<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
配置文件
代码语言:javascript复制spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
producer:
retries: 0
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: -1
consumer:
group-id: portrait
auto-offset-reset: earliest
enable-auto-commit: false
auto-commit-interval: 100
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 10
listener:
concurrency: 3
type: batch
ack-mode: manual
Kafka生产者
代码语言:javascript复制@Component
@Slf4j
public class KafkaProducer {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@SuppressWarnings("unchecked")
public void produce(String topic,String message) {
try {
ListenableFuture future = kafkaTemplate.send(topic, message);
SuccessCallback<SendResult<String,String>> successCallback = new SuccessCallback<SendResult<String, String>>() {
@Override
public void onSuccess(@Nullable SendResult<String, String> result) {
log.info("发送消息成功");
}
};
FailureCallback failureCallback = new FailureCallback() {
@Override
public void onFailure(Throwable ex) {
log.error("发送消息失败",ex);
produce(topic,message);
}
};
future.addCallback(successCallback,failureCallback);
} catch (Exception e) {
log.error("发送消息异常",e);
}
}
}
收集控制类
代码语言:javascript复制@RestController
public class DataController {
@Autowired
private KafkaProducer kafkaProducer;
@PostMapping("/revicedata")
public void reviceData(@RequestBody String data) {
JSONObject jsonObject = JSONObject.parseObject(data);
String type = jsonObject.getString("type");
String topic = "";
switch (type) {
case "0":
topic = "scan";
break;
case "1":
topic = "collection";
break;
case "2":
topic = "cart";
break;
case "3":
topic = "attention";
break;
default:
break;
}
kafkaProducer.produce(topic,data);
}
}
其实这里只是一个简单的用户行为模拟,我们应该建立日志微服务来收集所有的用户行为。具体可以参考AOP原理与自实现 ,可以根据这里进行改造,将日志类由
代码语言:javascript复制@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Log implements Serializable {
private static final long serialVersionUID = -5398795297842978376L;
private Long id;
private String username;
/** 模块 */
private String module;
/** 参数值 */
private String params;
private String remark;
private Boolean flag;
private Date createTime;
private String ip;
private String area;
}
替换成上面的各种操作类即可。当然还要做一些其他的修改,这里就不去进行修改了。另外将RabbitMQ改成Kafka即可。
创建用户画像商品类别偏好标签
创建一个商品类型标签实体类
代码语言:javascript复制@Data
public class ProductTypeLabel {
private Long userid;
private String productTypeId;
private Long numbers = 0L;
private String groupField;
}
在DateUntil工具类中增加一个方法,获取当前时间的小时数。
代码语言:javascript复制public static Long getCurrentHourStart(Long visitTime) throws ParseException {
Date date = new Date(visitTime);
DateFormat dateFormat = new SimpleDateFormat("yyyyMMdd HH");
Date filterTime = dateFormat.parse(dateFormat.format(date));
return filterTime.getTime();
}
创建一个ProductTypeAnalyMap实现了MapFunction接口的转换类
代码语言:javascript复制public class ProductTypeAnalyMap implements MapFunction<String,ProductTypeLabel> {
@Override
public ProductTypeLabel map(String s) throws Exception {
ScanOpertor scanOpertor = JSONObject.parseObject(s, ScanOpertor.class);
Long userid = scanOpertor.getUserId();
Long productTypeId = scanOpertor.getProductTypeId();
String tablename = "user_info";
String rowkey = userid "";
String famliyname = "info";
String colum = "producttypelist";
//获取历史用户偏好商品类型
String productTypeListString = HbaseUtil.getdata(tablename, rowkey, famliyname, colum);
List<Map> temp = new ArrayList<>();
List<Map<String,Long>> result = new ArrayList<>();
if (StringUtils.isNotBlank(productTypeListString)) {
temp = JSONObject.parseArray(productTypeListString,Map.class);
}
for (Map map : temp) {
Long productTypeId1 = Long.parseLong(map.get("key").toString());
Long value = Long.parseLong(map.get("value").toString());
//如果新的商品类型与历史商品类型有相同的类型,偏好值 1
if (productTypeId.equals(productTypeId1)) {
value ;
map.put("value",value);
}
result.add(map);
}
Collections.sort(result,(o1,o2) -> {
Long value1 = o1.get("value");
Long value2 = o2.get("value");
return value2.compareTo(value1);
});
if (result.size() > 5) {
result = result.subList(0,5);
}
String data = JSONObject.toJSONString(result);
HbaseUtil.putdata(tablename,rowkey,famliyname,colum,data);
ProductTypeLabel productTypeLabel = new ProductTypeLabel();
//格式:productType==timehour==productTypeId
String groupField = "productType==" DateUntil.getCurrentHourStart(System.currentTimeMillis())
"==" productTypeId;
productTypeLabel.setUserid(userid);
productTypeLabel.setProductTypeId(productTypeId "");
productTypeLabel.setNumbers(1L);
productTypeLabel.setGroupField(groupField);
return productTypeLabel;
}
}
一个ProductTypeAnalyReduce实现了ReduceFunction接口的统计类
代码语言:javascript复制public class ProductTypeAnalyReduce implements ReduceFunction<ProductTypeLabel> {
@Override
public ProductTypeLabel reduce(ProductTypeLabel productTypeLabel, ProductTypeLabel t1) throws Exception {
Long numbers1 = 0L;
String groupField = "";
if (productTypeLabel != null) {
numbers1 = productTypeLabel.getNumbers();
groupField = productTypeLabel.getGroupField();
}
Long numbers2 = 0L;
if (t1 != null) {
numbers2 = t1.getNumbers();
groupField = t1.getGroupField();
}
if (StringUtils.isNotBlank(groupField)) {
ProductTypeLabel productTypeLabel1 = new ProductTypeLabel();
productTypeLabel1.setGroupField(groupField);
productTypeLabel1.setNumbers(numbers1 numbers2);
return productTypeLabel1;
}
return null;
}
}
一个ProductTypeAnalySink实现了SinkFunction接口的存储类
代码语言:javascript复制public class ProductTypeAnalySink implements SinkFunction<ProductTypeLabel> {
private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();
@Override
public void invoke(ProductTypeLabel value, Context context) throws Exception {
if (value != null) {
String groupField = value.getGroupField();
String[] groupFields = groupField.split("==");
String timeinfo = groupFields[1];
String productTypeLabel = groupFields[2];
Long numbers = value.getNumbers();
String tablename = "producttypelabel_info";
Map<String, String> dataMap = new HashMap<>();
dataMap.put("timeinfo",timeinfo);
dataMap.put("producttypelabel",productTypeLabel);
dataMap.put("numbers",numbers "");
clickUntil.saveData(tablename,dataMap);
}
}
}
然后是Flink的流处理
代码语言:javascript复制public class ProductTypeAnaly {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","127.0.0.1:9092");
properties.setProperty("group.id","portrait");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("scan",
new SimpleStringSchema(),properties);
DataStreamSource<String> data = env.addSource(myConsumer);
env.enableCheckpointing(5000);
DataStream<ProductTypeLabel> map = data.map(new ProductTypeAnalyMap());
DataStream<ProductTypeLabel> reduce = map.keyBy(ProductTypeLabel::getGroupField).timeWindowAll(Time.hours(1))
.reduce(new ProductTypeAnalyReduce());
reduce.addSink(new ProductTypeAnalySink());
env.execute("portrait scan");
}
}
创建用户画像纠结商品标签
创建一个纠结商品标签实体类
代码语言:javascript复制@Data
public class TangleProduct {
private Long userid;
private String productId;
private Long numbers = 0L;
private String groupField;
}
一个TangleProductAnalyMap实现了MapFunction接口的转换类
代码语言:javascript复制public class TangleProductAnalyMap implements MapFunction<String,TangleProduct> {
@Override
public TangleProduct map(String s) throws Exception {
CartOpertor cartOpertor = JSONObject.parseObject(s, CartOpertor.class);
Long userid = cartOpertor.getUserId();
Long productId = cartOpertor.getProductId();
String tablename = "user_info";
String rowkey = userid "";
String famliyname = "info";
String colum = "tangleproducts";
//获取历史用户纠结的商品
String tangleProducts = HbaseUtil.getdata(tablename, rowkey, famliyname, colum);
List<Map> temp = new ArrayList<>();
List<Map<String,Long>> result = new ArrayList<>();
if (StringUtils.isNotBlank(tangleProducts)) {
temp = JSONObject.parseArray(tangleProducts,Map.class);
}
for (Map map : temp) {
Long productId1 = Long.parseLong(map.get("key").toString());
Long value = Long.parseLong(map.get("value").toString());
//如果新的商品类型与历史商品类型有相同的类型,偏好值 1
if (productId.equals(productId1)) {
value ;
map.put("value",value);
}
result.add(map);
}
Collections.sort(result,(o1, o2) -> {
Long value1 = o1.get("value");
Long value2 = o2.get("value");
return value2.compareTo(value1);
});
if (result.size() > 5) {
result = result.subList(0,5);
}
String data = JSONObject.toJSONString(result);
HbaseUtil.putdata(tablename,rowkey,famliyname,colum,data);
TangleProduct tangleProduct = new TangleProduct();
//格式:tangleProduct==timehour==productId
String groupField = "tangleProduct==" DateUntil.getCurrentHourStart(System.currentTimeMillis())
"==" productId;
tangleProduct.setUserid(userid);
tangleProduct.setProductId(productId "");
tangleProduct.setNumbers(1L);
tangleProduct.setGroupField(groupField);
return tangleProduct;
}
}
一个TangleProductAnalyReduct实现了ReduceFunction接口的统计类
代码语言:javascript复制public class TangleProductAnalyReduct implements ReduceFunction<TangleProduct> {
@Override
public TangleProduct reduce(TangleProduct tangleProduct, TangleProduct t1) throws Exception {
Long numbers1 = 0L;
String groupField = "";
if (tangleProduct != null) {
numbers1 = tangleProduct.getNumbers();
groupField = tangleProduct.getGroupField();
}
Long numbers2 = 0L;
if (t1 != null) {
numbers2 = t1.getNumbers();
groupField = t1.getGroupField();
}
if (StringUtils.isNotBlank(groupField)) {
TangleProduct tangleProduct1 = new TangleProduct();
tangleProduct1.setGroupField(groupField);
tangleProduct1.setNumbers(numbers1 numbers2);
return tangleProduct1;
}
return null;
}
}
一个TangleProductAnalySink实现了SinkFunction接口的存储类
代码语言:javascript复制public class TangleProductAnalySink implements SinkFunction<TangleProduct> {
private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();
@Override
public void invoke(TangleProduct value, Context context) throws Exception {
if (value != null) {
String groupField = value.getGroupField();
String[] groupFields = groupField.split("==");
String timeinfo = groupFields[1];
String tangleProductLabel = groupFields[2];
Long numbers = value.getNumbers();
String tablename = "tangleproductlabel_info";
Map<String, String> dataMap = new HashMap<>();
dataMap.put("timeinfo",timeinfo);
dataMap.put("tangleproductlabel",tangleProductLabel);
dataMap.put("numbers",numbers "");
clickUntil.saveData(tablename,dataMap);
}
}
}
然后是Flink的流处理
代码语言:javascript复制public class TangleProductAnaly {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers","127.0.0.1:9092");
properties.setProperty("group.id","portrait");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("cart",
new SimpleStringSchema(),properties);
DataStreamSource<String> data = env.addSource(myConsumer);
env.enableCheckpointing(5000);
DataStream<TangleProduct> map = data.map(new TangleProductAnalyMap());
DataStream<TangleProduct> reduce = map.keyBy(TangleProduct::getGroupField).timeWindowAll(Time.hours(1))
.reduce(new TangleProductAnalyReduct());
reduce.addSink(new TangleProductAnalySink());
env.execute("portrait cart");
}
}