Flink用户画像用户画像行为特征

2021-11-10 15:15:37 浏览数 (1)

我们要使用的几个组件为Hadoop 2.6,HBase 1.0.0,MySQL 8,zookeeper 3.4.5,kafka 2.1.0,Flink 1.13,Canal 1.1.5。为了方便,这里都使用伪集群和单机安装。

Hadoop 2.6的简单安装

hadoop-env.sh

代码语言:javascript复制
export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home

core-site.xml

代码语言:javascript复制
<configuration>
<property>
    <name>fs.defaultFS</name>
    <value>hdfs://127.0.0.1:9000</value>
</property>
<property>
    <name>hadoop.tmp.dir</name>
    <value>/Users/admin/Downloads/hadoop2</value>
</property>
</configuration>

hdfs-site.xml

代码语言:javascript复制
<configuration>
	<property>
		<name>dfs.replication</name>
		<value>1</value>
	</property>
	<property>
		<name>dfs.permissions</name>
		<value>false</value>
	</property>
</configuration>

在bin目录下执行

代码语言:javascript复制
./hdfs namenode -format

在sbin目录下执行

代码语言:javascript复制
./start-dfs.sh

访问地址

代码语言:javascript复制
http://127.0.0.1:50070/

zookeeper 3.4.5安装

zoo.cfg

代码语言:javascript复制
dataDir=/Users/admin/Downloads/zookeeper/data

在bin目录下执行

代码语言:javascript复制
./zkServer.sh start

HBase 1.0.0安装

hbase-env.sh

代码语言:javascript复制
export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.8.0_172.jdk/Contents/Home

hbase-site.xml

代码语言:javascript复制
<configuration>
	<property>
		<name>hbase.rootdir</name>
		<value>hdfs://127.0.0.1:9000/hbase</value>
	</property>
	<property>
		<name>hbase.cluster.distributed</name>
		<value>true</value>
	</property>
	<property>
		<name>hbase.zookeeper.quorum</name>
		<value>localhost</value>
	</property>
	<property>
		<name>dfs.replication</name>
		<value>1</value>
	</property>
</configuration>

在bin目录下执行

代码语言:javascript复制
./start-hbase.sh

访问地址

代码语言:javascript复制
http://127.0.0.1:60010/

kafka 2.1.0安装

server.properties

代码语言:javascript复制
log.dirs=/Users/admin/Downloads/kafka-logs

在bin目录下执行

代码语言:javascript复制
./kafka-server-start.sh ../config/server.properties 
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

MySQL 8 docker安装

新建文件夹,我这里为mysql-bin,新建文件my.cnf,内容如下

代码语言:javascript复制
[client]
socket = /var/sock/mysqld/mysqld.sock
[mysql]
socket = /var/sock/mysqld/mysqld.sock
[mysqld]
skip-host-cache
skip-name-resolve
datadir = /var/lib/mysql
user = mysql
port = 3306
bind-address = 0.0.0.0
socket = /var/sock/mysqld/mysqld.sock
pid-file = /var/run/mysqld/mysqld.pid
general_log_file = /var/log/mysql/query.log
slow_query_log_file = /var/log/mysql/slow.log
log-error = /var/log/mysql/error.log
log-bin=mysql-bin
binlog-format=ROW
server-id=1
!includedir /etc/my.cnf.d/
!includedir /etc/mysql/conf.d/
!includedir /etc/mysql/docker-default.d/

启动命令

代码语言:javascript复制
docker run -d --name mysql -e MYSQL_ROOT_PASSWORD=abcd123 -p 3306:3306 -v /Users/admin/Downloads/mysql-bin/my.cnf:/etc/my.cnf docker.io/cytopia/mysql-8.0

新建数据库protrait

新建表

代码语言:javascript复制
DROP TABLE IF EXISTS `user_info`;
CREATE TABLE `user_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `account` varchar(255) DEFAULT NULL,
  `password` varchar(255) DEFAULT NULL,
  `sex` varchar(255) DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  `phone` varchar(255) DEFAULT NULL,
  `status` int(255) DEFAULT NULL COMMENT '会员状态,0、普通会员,1、白银会员,2、黄金会员',
  `wechat_account` varchar(255) DEFAULT NULL,
  `zhifubao_account` varchar(255) DEFAULT NULL,
  `email` varchar(255) DEFAULT NULL,
  `create_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4;

SET FOREIGN_KEY_CHECKS = 1;

Canal 1.1.5安装

canal.properties

代码语言:javascript复制
canal.zkServers = 127.0.0.1:2181
canal.serverMode = kafka

在conf/example目录下的instance.properties

代码语言:javascript复制
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=root
canal.instance.dbPassword=abcd123
canal.instance.defaultDatabaseName=portrait
canal.mq.topic=test

在bin目录下执行

代码语言:javascript复制
./startup.sh

此时当我们在数据库中插入一条数据的时候

代码语言:javascript复制
insert into user_info (account,password,sex,age,phone,status,wechat_account,zhifubao_account,email,create_time,update_time) 
values ('abcd','1234','男',24,'13873697762',0,'火名之月','abstart','981456@qq.com','2021-09-10','2021-10-11')

在kafka的消费端查看为

代码语言:javascript复制
[2021-11-05 15:13:05,173] INFO [GroupMetadataManager brokerId=0] Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
{
"data":[
{
"id":"8",
"account":"abcd",
"password":"1234",
"sex":"男",
"age":"24",
"phone":"13873697762",
"status":"0",
"wechat_account":"火名之月",
"zhifubao_account":"abstart",
"email":"981456@qq.com",
"create_time":"2021-09-10 00:00:00",
"update_time":"2021-10-11 00:00:00"
}
],
"database":"portrait",
"es":1636096762000,
"id":11,
"isDdl":false,
"mysqlType":{
"id":"bigint(0)",
"account":"varchar(255)",
"password":"varchar(255)",
"sex":"varchar(255)",
"age":"int(0)",
"phone":"varchar(255)",
"status":"int(255)",
"wechat_account":"varchar(255)",
"zhifubao_account":"varchar(255)",
"email":"varchar(255)",
"create_time":"datetime(0)",
"update_time":"datetime(0)"
},
"old":null,
"pkNames":[
"id"
],
"sql":"",
"sqlType":{
"id":-5,
"account":12,
"password":12,
"sex":12,
"age":4,
"phone":12,
"status":4,
"wechat_account":12,
"zhifubao_account":12,
"email":12,
"create_time":93,
"update_time":93
},
"table":"user_info",
"ts":1636096762605,
"type":"INSERT"
}

Flink流式处理消息

Java版依赖,有关Flink的详细内容请参考Flink技术整理 ,由于这里使用的是1.13.0,而之前使用的是1.7.2,有一些API已经不可用了。

代码语言:javascript复制
<properties>
   <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
   <flink.version>1.13.0</flink.version>
   <alink.version>1.4.0</alink.version>
   <fastjson.version>1.2.74</fastjson.version>
   <java.version>1.8</java.version>
   <scala.version>2.11.12</scala.version>
   <hadoop.version>2.6.0</hadoop.version>
   <hbase.version>1.0.0</hbase.version>
   <scala.binary.version>2.11</scala.binary.version>
   <maven.compiler.source>${java.version}</maven.compiler.source>
   <maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
代码语言:javascript复制
<dependencies>
   <!-- Apache Flink dependencies -->
   <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
   </dependency>
   <dependency>
      <groupId>com.alibaba.alink</groupId>
      <artifactId>alink_core_flink-1.13_2.11</artifactId>
      <version>${alink.version}</version>
   </dependency>
   <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>${fastjson.version}</version>
   </dependency>
   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
   </dependency>
   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
   </dependency>
   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
   </dependency>
   <dependency>
      <groupId>org.apache.hbase</groupId>
      <artifactId>hbase-client</artifactId>
      <version>${hbase.version}</version>
   </dependency>
   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
   </dependency>
   <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
   </dependency>
   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
   </dependency>
   <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
   </dependency>
   <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>${hadoop.version}</version>
   </dependency>
   <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>${hadoop.version}</version>
   </dependency>
   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_2.11</artifactId>
      <version>${flink.version}</version>
   </dependency>
   <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>1.1.1</version>
   </dependency>
   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
      <version>${flink.version}</version>
   </dependency>
   <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
      <version>${flink.version}</version>
   </dependency>
   <dependency>
      <groupId>mysql</groupId>
      <artifactId>mysql-connector-java</artifactId>
      <version>8.0.11</version>
   </dependency>
   <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <version>1.18.16</version>
      <optional>true</optional>
   </dependency>
   <dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-log4j12</artifactId>
      <version>1.7.7</version>
      <scope>runtime</scope>
   </dependency>
   <dependency>
      <groupId>log4j</groupId>
      <artifactId>log4j</artifactId>
      <version>1.2.17</version>
      <scope>runtime</scope>
   </dependency>
</dependencies>

我们先使用Flink来读取Kafka消息

代码语言:javascript复制
public class Test {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("group.id","portrait");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("test",
                new SimpleStringSchema(),properties);

        DataStreamSource<String> data = env.addSource(myConsumer);
        env.enableCheckpointing(5000);
        data.print();
        env.execute("portrait test");
    }
}

运行结果

代码语言:javascript复制
16:39:42,070 INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - [Consumer clientId=consumer-21, groupId=portrait] Discovered group coordinator admindembp.lan:9092 (id: 2147483647 rack: null)
15> {"data":[{"id":"8","account":"abcd","password":"1234","sex":"男","age":"24","phone":"13873697762","status":"0","wechat_account":"火名之月","zhifubao_account":"abstart","email":"981456@qq.com","create_time":"2021-09-10 00:00:00","update_time":"2021-10-11 00:00:00"}],"database":"portrait","es":1636096762000,"id":11,"isDdl":false,"mysqlType":{"id":"bigint(0)","account":"varchar(255)","password":"varchar(255)","sex":"varchar(255)","age":"int(0)","phone":"varchar(255)","status":"int(255)","wechat_account":"varchar(255)","zhifubao_account":"varchar(255)","email":"varchar(255)","create_time":"datetime(0)","update_time":"datetime(0)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"account":12,"password":12,"sex":12,"age":4,"phone":12,"status":4,"wechat_account":12,"zhifubao_account":12,"email":12,"create_time":93,"update_time":93},"table":"user_info","ts":1636096762605,"type":"INSERT"}

现在我们将该数据解析并存储到HBase中

新建一个UserInfo的实体类

代码语言:javascript复制
@Data
@ToString
public class UserInfo {
    private Long id;
    private String account;
    private String password;
    private String sex;
    private Integer age;
    private String phone;
    private Integer status;
    private String wechatAccount;
    private String zhifubaoAccount;
    private String email;
    private Date createTime;
    private Date updateTime;
}

一个HBase工具类

代码语言:javascript复制
@Slf4j
public class HbaseUtil {
    private static Admin admin = null;
    private static Connection conn = null;

    static {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.rootdir","hdfs://127.0.0.1:9000/hbase");
        conf.set("hbase.zookeeper.quorum","127.0.0.1");
        conf.set("hbase.client.scanner.timeout.period","600000");
        conf.set("hbase.rpc.timeout","600000");
        try {
            conn = ConnectionFactory.createConnection(conf);
            admin = conn.getAdmin();
        }catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void createTable(String tableName,String famliyname) throws IOException {
        HTableDescriptor tab = new HTableDescriptor(tableName);
        HColumnDescriptor colDesc = new HColumnDescriptor(famliyname);
        tab.addFamily(colDesc);
        admin.createTable(tab);
        log.info("over");
    }

    public static void put(String tablename, String rowkey, String famliyname, Map<String,String> datamap) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tablename));
        byte[] rowkeybyte = Bytes.toBytes(rowkey);
        Put put = new Put(rowkeybyte);
        if (datamap != null) {
            Set<Map.Entry<String,String>> set = datamap.entrySet();
            for (Map.Entry<String,String> entry : set) {
                String key = entry.getKey();
                Object value = entry.getValue();
                put.addColumn(Bytes.toBytes(famliyname),Bytes.toBytes(key),
                        Bytes.toBytes(value   ""));
            }
        }
        table.put(put);
        table.close();
        log.info("OK");
    }

    public static String getdata(String tablename,String rowkey,
                                 String famliyname,String colmn) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tablename));
        byte[] rowkeybyte = Bytes.toBytes(rowkey);
        Get get = new Get(rowkeybyte);
        Result result = table.get(get);
        byte[] resultbytes = result.getValue(famliyname.getBytes(),colmn.getBytes());
        if (resultbytes == null) {
            return null;
        }
        return new String(resultbytes);
    }

    public static void putdata(String tablename,String rowkey,
                               String famliyname,String colum,
                               String data) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tablename));
        Put put = new Put(rowkey.getBytes());
        put.addColumn(famliyname.getBytes(),colum.getBytes(),data.getBytes());
        table.put(put);
    }

    public static void main(String[] args) throws IOException {
//        createTable("testinfo","time");
        putdata("testinfo","1","time","info","ty");
//        Map<String,String> datamap = new HashMap<>();
//        datamap.put("info1","ty1");
//        datamap.put("info2","ty2");
//        put("testinfo","2","time",datamap);
        String result = getdata("testinfo","1","time","info");
        log.info(result);
    }
}

在HBase的bin目录下执行

代码语言:javascript复制
./hbase shell
create "user_info","info"
代码语言:javascript复制
@Slf4j
public class TranferAnaly {

    @SuppressWarnings("unchecked")
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("group.id","portrait");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("test",
                new SimpleStringSchema(),properties);

        DataStreamSource<String> data = env.addSource(myConsumer);
        env.enableCheckpointing(5000);
        DataStream<String> map = data.map(s -> {
            JSONObject jsonObject = JSONObject.parseObject(s);
            String type = jsonObject.getString("type");
            String table = jsonObject.getString("table");
            String database = jsonObject.getString("database");
            String data1 = jsonObject.getString("data");
            List<UserInfo> list = JSONObject.parseArray(data1,UserInfo.class);
            log.info(list.toString());
            for (UserInfo userInfo : list) {
                String tablename = table;
                String rowkey = userInfo.getId()   "";
                String famliyname = "info";
                Map<String,String> datamap = JSONObject.parseObject(JSONObject.toJSONString(userInfo),Map.class);
                datamap.put("database",database);
                datamap.put("typebefore",HbaseUtil.getdata(tablename,rowkey,famliyname,"typecurrent"));
                datamap.put("typecurrent",type);
                HbaseUtil.put(tablename,rowkey,famliyname,datamap);
            }
            return null;
        });
//        map.print();
        env.execute("portrait test");
    }
}

在HBase中查询,即为

代码语言:javascript复制
scan 'user_info'
ROW                                                          COLUMN CELL                                                                                                                                                                      
 12                                                          column=info:account, timestamp=1636105093607, value=abcd                                                                                                                         
 12                                                          column=info:age, timestamp=1636105093607, value=24                                                                                                                               
 12                                                          column=info:createTime, timestamp=1636105093607, value=1631203200000                                                                                                             
 12                                                          column=info:database, timestamp=1636105093607, value=portrait                                                                                                                    
 12                                                          column=info:email, timestamp=1636105093607, value=981456@qq.com                                                                                                                  
 12                                                          column=info:id, timestamp=1636105093607, value=12                                                                                                                                
 12                                                          column=info:password, timestamp=1636105093607, value=1234                                                                                                                        
 12                                                          column=info:phone, timestamp=1636105093607, value=13873697762                                                                                                                    
 12                                                          column=info:sex, timestamp=1636105093607, value=xE7x94xB7                                                                                                                     
 12                                                          column=info:status, timestamp=1636105093607, value=0                                                                                                                             
 12                                                          column=info:typebefore, timestamp=1636105093607, value=null                                                                                                                      
 12                                                          column=info:typecurrent, timestamp=1636105093607, value=INSERT                                                                                                                   
 12                                                          column=info:updateTime, timestamp=1636105093607, value=1633881600000                                                                                                             
 12                                                          column=info:wechatAccount, timestamp=1636105093607, value=xE7x81xABxE5x90x8DxE4xB9x8BxE6x9Cx88                                                                       
 12                                                          column=info:zhifubaoAccount, timestamp=1636105093607, value=abstart                                                                                                              
1 row(s) in 0.0110 seconds

现在我们再将数据传递出去

在kafka的bin目录下执行,建立一个新的topic

代码语言:javascript复制
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic user_info
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic user_info

新增加一个Kafka工具类

代码语言:javascript复制
@Slf4j
public class KafkaUtil {
    private static Properties getProps() {
        Properties props = new Properties();
        props.put("bootstrap.servers","127.0.0.1:9092");
        props.put("acks","all");
        props.put("retries",2);
        props.put("linger.ms",1000);
        props.put("client.id","producer-syn-1");
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        return props;
    }

    public static void sendData(String topicName,String data) throws ExecutionException, InterruptedException {
        KafkaProducer<String,String> producer = new KafkaProducer<>(getProps());
        ProducerRecord<String,String> record = new ProducerRecord<>(topicName,data);
        Future<RecordMetadata> metadataFuture = producer.send(record);
        RecordMetadata recordMetadata = metadataFuture.get();
        log.info("topic:"   recordMetadata.topic());
        log.info("partition:"   recordMetadata.partition());
        log.info("offset:"   recordMetadata.offset());
    }
}

然后将消息发送出去

代码语言:javascript复制
@Slf4j
public class TranferAnaly {

    @SuppressWarnings("unchecked")
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("group.id","portrait");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("test",
                new SimpleStringSchema(),properties);

        DataStreamSource<String> data = env.addSource(myConsumer);
        env.enableCheckpointing(5000);
        DataStream<String> map = data.map(s -> {
            JSONObject jsonObject = JSONObject.parseObject(s);
            String type = jsonObject.getString("type");
            String table = jsonObject.getString("table");
            String database = jsonObject.getString("database");
            String data1 = jsonObject.getString("data");
            List<UserInfo> list = JSONObject.parseArray(data1,UserInfo.class);
            List<Map<String,String>> listdata = new ArrayList<>();
            log.info(list.toString());
            for (UserInfo userInfo : list) {
                String tablename = table;
                String rowkey = userInfo.getId()   "";
                String famliyname = "info";
                Map<String,String> datamap = JSONObject.parseObject(JSONObject.toJSONString(userInfo),Map.class);
                datamap.put("database",database);
                datamap.put("typebefore",HbaseUtil.getdata(tablename,rowkey,famliyname,"typecurrent"));
                datamap.put("typecurrent",type);
                datamap.put("tablename",table);
                HbaseUtil.put(tablename,rowkey,famliyname,datamap);
                listdata.add(datamap);
            }
            return JSONObject.toJSONString(listdata);
        });
        map.addSink(new SinkFunction<String>() {
            @Override
            public void invoke(String value, Context context) throws Exception {
                List<Map> data = JSONObject.parseArray(value,Map.class);
                for (Map<String,String> map : data) {
                    String tablename = map.get("tablename");
                    KafkaUtil.sendData(tablename,JSONObject.toJSONString(map));
                }
            }
        });
        env.execute("portrait test");
    }
}

查看Kafka消费端

代码语言:javascript复制
[2021-11-05 20:11:26,869] INFO [GroupCoordinator 0]: Assignment received from leader for group console-consumer-47692 for generation 1 (kafka.coordinator.group.GroupCoordinator)
{"wechatAccount":"火名之月","sex":"男","zhifubaoAccount":"abstart","updateTime":1633881600000,"password":"1234","database":"portrait","createTime":1631203200000,"phone":"13873697762","typecurrent":"INSERT","id":15,"tablename":"user_info","account":"abcd","age":24,"email":"981456@qq.com","status":0}

创建用户画像Years标签

创建一个年代标签实体类

代码语言:javascript复制
@Data
public class Years {
    private Long userid;
    private String yearsFlag;
    private Long numbers = 0L;
    private String groupField;
}

创建一个YearsUntil工具类

代码语言:javascript复制
public class YearsUntil {
    public static String getYears(Integer age) {
        Calendar calendar = Calendar.getInstance();
        calendar.setTime(new Date());
        calendar.add(Calendar.YEAR,-age);
        Date newDate = calendar.getTime();
        DateFormat dateFormat = new SimpleDateFormat("yyyy");
        String newDateString = dateFormat.format(newDate);
        Integer newDateInteger = Integer.parseInt(newDateString);
        String yearBaseType = "未知";
        if (newDateInteger >= 1940 && newDateInteger < 1950) {
            yearBaseType = "40后";
        }else if (newDateInteger >= 1950 && newDateInteger < 1960) {
            yearBaseType = "50后";
        }else if (newDateInteger >= 1960 && newDateInteger < 1970) {
            yearBaseType = "60后";
        }else if (newDateInteger >= 1970 && newDateInteger < 1980) {
            yearBaseType = "70后";
        }else if (newDateInteger >= 1980 && newDateInteger < 1990) {
            yearBaseType = "80后";
        }else if (newDateInteger >= 1990 && newDateInteger < 2000) {
            yearBaseType = "90后";
        }else if (newDateInteger >= 2000 && newDateInteger < 2010) {
            yearBaseType = "00后";
        }else if (newDateInteger >= 2010 && newDateInteger < 2020) {
            yearBaseType = "10后";
        }
        return yearBaseType;
    }
}

创建一个ClickUntil接口

代码语言:javascript复制
public interface ClickUntil {
    void saveData(String tablename,Map<String,String> data);
}

一个实现类

代码语言:javascript复制
public class DefaultClickUntil implements ClickUntil {
    private static ClickUntil instance = new DefaultClickUntil();

    public static ClickUntil createInstance() {
        return instance;
    }

    private DefaultClickUntil() {

    }

    @Override
    public void saveData(String tablename, Map<String, String> data) {

    }
}

此处我们不实现接口方法,后续会有其他实现类来代替。

一个ClickUntilFactory工厂类

代码语言:javascript复制
public class ClickUntilFactory {
    public static ClickUntil createClickUntil() {
        return DefaultClickUntil.createInstance();
    }
}

一个DateUntil工具类

代码语言:javascript复制
public class DateUntil {
    public static String getByInterMinute(String timeInfo) {
        Long timeMillons = Long.parseLong(timeInfo);
        Date date = new Date(timeMillons);
        DateFormat dateFormatMinute = new SimpleDateFormat("mm");
        DateFormat dateFormatHour = new SimpleDateFormat("yyyyMMddHH");
        String minute = dateFormatMinute.format(date);
        String hour = dateFormatHour.format(date);
        Long minuteLong = Long.parseLong(minute);
        String replaceMinute = "";
        if (minuteLong >= 0 && minuteLong < 5) {
            replaceMinute = "05";
        }else if (minuteLong >= 5 && minuteLong < 10) {
            replaceMinute = "10";
        }else if (minuteLong >= 10 && minuteLong < 15) {
            replaceMinute = "15";
        }else if (minuteLong >= 15 && minuteLong < 20) {
            replaceMinute = "20";
        }else if (minuteLong >= 20 && minuteLong < 25) {
            replaceMinute = "25";
        }else if (minuteLong >= 25 && minuteLong < 30) {
            replaceMinute = "30";
        }else if (minuteLong >= 30 && minuteLong < 35) {
            replaceMinute = "35";
        }else if (minuteLong >= 35 && minuteLong < 40) {
            replaceMinute = "40";
        }else if (minuteLong >= 40 && minuteLong < 45) {
            replaceMinute = "45";
        }else if (minuteLong >= 45 && minuteLong < 50) {
            replaceMinute = "50";
        }else if (minuteLong >= 50 && minuteLong < 55) {
            replaceMinute = "55";
        }else if (minuteLong >= 55 && minuteLong < 60) {
            replaceMinute = "60";
        }
        return hour   replaceMinute;
    }

    public static Long getCurrentFiveMinuteInterStart(Long visitTime) throws ParseException {
        String timeString = getByInterMinute(visitTime   "");
        DateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmm");
        Date date = dateFormat.parse(timeString);
        return date.getTime();
    }
}

一个YearsAnalyMap的实现MapFunction接口的转换类

代码语言:javascript复制
public class YearsAnalyMap implements MapFunction<String,Years> {
    private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();

    @Override
    @SuppressWarnings("unchecked")
    public Years map(String s) throws Exception {
        Map<String,String> datamap = JSONObject.parseObject(s,Map.class);
        String typecurrent = datamap.get("typecurrent");
        Years years = new Years();
        if (typecurrent.equals("INSERT")) {
            UserInfo userInfo = JSONObject.parseObject(s,UserInfo.class);
            String yearLabel = YearsUntil.getYears(userInfo.getAge());
            Map<String,String> mapdata = new HashMap<>();
            mapdata.put("userid",userInfo.getId()   "");
            mapdata.put("yearlabel",yearLabel);
            clickUntil.saveData("user_info",mapdata);
            String fiveMinute = DateUntil.getByInterMinute(System.currentTimeMillis()   "");
            String groupField = "yearlable=="   fiveMinute   "=="   yearLabel;
            Long numbers = 1L;
            years.setGroupField(groupField);
            years.setNumbers(numbers);
        }
        return years;
    }
}

最后是用户画像的年份标签的Flink流处理

代码语言:javascript复制
public class YearsAnaly {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("group.id","portrait");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("user_info",
                new SimpleStringSchema(),properties);
        DataStreamSource<String> data = env.addSource(myConsumer);
        env.enableCheckpointing(5000);
        data.map(new YearsAnalyMap());
    }
}

现在我们将年份标签每5分钟进行一次汇总统计数量,并进行存储(Sink)。

新增一个YearsAnalyReduce实现了ReduceFunction接口的统计类

代码语言:javascript复制
public class YearsAnalyReduce implements ReduceFunction<Years> {
    @Override
    public Years reduce(Years years, Years t1) throws Exception {
        Long numbers1 = 0L;
        String groupField = "";
        if (years != null) {
            numbers1 = years.getNumbers();
            groupField = years.getGroupField();
        }
        Long numbers2 = 0L;
        if (t1 != null) {
            numbers2 = t1.getNumbers();
            groupField = t1.getGroupField();
        }
        if (StringUtils.isNotBlank(groupField)) {
            Years years1 = new Years();
            years1.setGroupField(groupField);
            years1.setNumbers(numbers1   numbers2);
            return years1;
        }
        return null;
    }
}

一个YearsAnalySink实现了SinkFunction接口的存储类

代码语言:javascript复制
public class YearsAnalySink implements SinkFunction<Years> {
    private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();

    @Override
    public void invoke(Years value, Context context) throws Exception {
        if (value != null) {
            String groupField = value.getGroupField();
            String[] groupFields = groupField.split("==");
            String timeinfo = groupFields[1];
            String yearlabel = groupFields[2];
            Long numbers = value.getNumbers();
            String tablename = "yearslabel_info";
            Map<String, String> dataMap = new HashMap<>();
            dataMap.put("timeinfo",timeinfo);
            dataMap.put("yearslabel",yearlabel);
            dataMap.put("numbers",numbers   "");
            clickUntil.saveData(tablename,dataMap);
        }
    }
}

然后是Flink流处理

代码语言:javascript复制
public class YearsAnaly {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("group.id","portrait");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("user_info",
                new SimpleStringSchema(),properties);
        DataStreamSource<String> data = env.addSource(myConsumer);
        env.enableCheckpointing(5000);
        DataStream<Years> map = data.map(new YearsAnalyMap());
        DataStream<Years> reduce = map.keyBy(Years::getGroupField).timeWindowAll(Time.minutes(5))
                .reduce(new YearsAnalyReduce());
        reduce.addSink(new YearsAnalySink());
        env.execute("portrait years");
    }
}

这么做是为了看看不同的时间段内用户的年代标签会产生什么样的变化。

创建用户画像手机运营商标签

创建一个手机运营商工具类CarrierUntil

代码语言:javascript复制
public class CarrierUntil {
    /**
     * 中国电信号码格式验证 手机段: 133,153,180,181,189,177,1700,173,199
     **/
    private static final String CHINA_TELECOM_PATTERN = "(^1(33|53|77|73|99|8[019])\d{8}$)|(^1700\d{7}$)";

    /**
     * 中国联通号码格式验证 手机段:130,131,132,155,156,185,186,145,176,1709
     **/
    private static final String CHINA_UNICOM_PATTERN = "(^1(3[0-2]|4[5]|5[56]|7[6]|8[56])\d{8}$)|(^1709\d{7}$)";

    /**
     * 中国移动号码格式验证
     * 手机段:134,135,136,137,138,139,150,151,152,157,158,159,182,183,184,187,188,147,178,1705
     **/
    private static final String CHINA_MOBILE_PATTERN = "(^1(3[4-9]|4[7]|5[0-27-9]|7[8]|8[2-478])\d{8}$)|(^1705\d{7}$)";

    /**
     * 0、未知 1、移动 2、联通 3、电信
     * @param telphone
     * @return
     */
    public static Integer getCarrierByTel(String telphone) {
        boolean b1 = StringUtils.isNotBlank(telphone) && match(CHINA_MOBILE_PATTERN, telphone);
        if (b1) {
            return 1;
        }
        b1 = StringUtils.isNotBlank(telphone) && match(CHINA_UNICOM_PATTERN, telphone);
        if (b1) {
            return 2;
        }
        b1 = StringUtils.isNotBlank(telphone) && match(CHINA_TELECOM_PATTERN, telphone);
        if (b1) {
            return 3;
        }
        return 0;
    }

    private static boolean match(String regex, String tel) {
        return Pattern.matches(regex, tel);
    }
}

一个运营商标签实体类

代码语言:javascript复制
@Data
public class Carrier {
    private Long userid;
    private String carrierName;
    private Long numbers = 0L;
    private String groupField;
}

一个CarrierAnalyMap实现了MapFunction接口的转换类

代码语言:javascript复制
public class CarrierAnalyMap implements MapFunction<String,Carrier> {
    private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();

    @Override
    @SuppressWarnings("unchecked")
    public Carrier map(String s) throws Exception {
        Map<String,String> datamap = JSONObject.parseObject(s,Map.class);
        String typecurrent = datamap.get("typecurrent");
        Carrier carrier = new Carrier();
        if (typecurrent.equals("INSERT")) {
            UserInfo userInfo = JSONObject.parseObject(s,UserInfo.class);
            String telphone = userInfo.getPhone();
            Integer carrierInteger = CarrierUntil.getCarrierByTel(telphone);
            String carrierLabel = "";
            switch (carrierInteger) {
                case 0:
                    carrierLabel = "未知";
                    break;
                case 1:
                    carrierLabel = "移动";
                    break;
                case 2:
                    carrierLabel = "联通";
                    break;
                case 3:
                    carrierLabel = "电信";
                    break;
                default:
                    break;
            }
            Map<String,String> mapdata = new HashMap<>();
            mapdata.put("userid",userInfo.getId()   "");
            mapdata.put("carrierlabel",carrierLabel);
            clickUntil.saveData("user_info",mapdata);
            String fiveMinute = DateUntil.getByInterMinute(System.currentTimeMillis()   "");
            String groupField = "carrierlabel=="   fiveMinute   "=="   carrierLabel;
            Long numbers = 1L;
            carrier.setGroupField(groupField);
            carrier.setNumbers(numbers);
        }
        return carrier;
    }
}

一个CarrierAnalyReduce实现了ReduceFunction接口的统计类

代码语言:javascript复制
public class CarrierAnalyReduce implements ReduceFunction<Carrier> {
    @Override
    public Carrier reduce(Carrier carrier, Carrier t1) throws Exception {
        Long numbers1 = 0L;
        String groupField = "";
        if (carrier != null) {
            numbers1 = carrier.getNumbers();
            groupField = carrier.getGroupField();
        }
        Long numbers2 = 0L;
        if (t1 != null) {
            numbers2 = t1.getNumbers();
            groupField = t1.getGroupField();
        }
        if (StringUtils.isNotBlank(groupField)) {
            Carrier carrier1 = new Carrier();
            carrier1.setGroupField(groupField);
            carrier1.setNumbers(numbers1   numbers2);
            return carrier1;
        }
        return null;
    }
}

一个CarrierAnalySink实现了SinkFunction接口的存储类

代码语言:javascript复制
public class CarrierAnalySink implements SinkFunction<Carrier> {
    private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();

    @Override
    public void invoke(Carrier value, Context context) throws Exception {
        if (value != null) {
            String groupField = value.getGroupField();
            String[] groupFields = groupField.split("==");
            String timeinfo = groupFields[1];
            String carrierLabel = groupFields[2];
            Long numbers = value.getNumbers();
            String tablename = "carrierlabel_info";
            Map<String, String> dataMap = new HashMap<>();
            dataMap.put("timeinfo",timeinfo);
            dataMap.put("carrierlabel",carrierLabel);
            dataMap.put("numbers",numbers   "");
            clickUntil.saveData(tablename,dataMap);
        }
    }
}

然后是Flink的流处理

代码语言:javascript复制
public class CarrierAnaly {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("group.id","portrait");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("user_info",
                new SimpleStringSchema(),properties);
        DataStreamSource<String> data = env.addSource(myConsumer);
        env.enableCheckpointing(5000);
        DataStream<Carrier> map = data.map(new CarrierAnalyMap());
        DataStream<Carrier> reduce = map.keyBy(Carrier::getGroupField).timeWindowAll(Time.minutes(5))
                .reduce(new CarrierAnalyReduce());
        reduce.addSink(new CarrierAnalySink());
        env.execute("portrait carrier");
    }
}

创建用户画像会员分类标签

会员标签实体类

代码语言:javascript复制
@Data
public class Member {
    private Long userid;
    private String memberFlag;
    private Long numbers = 0L;
    private String groupField;
}

一个MemberAnalyMap实现了MapFunction接口的转换类

代码语言:javascript复制
public class MemberAnalyMap implements MapFunction<String,Member> {
    private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();

    @Override
    @SuppressWarnings("unchecked")
    public Member map(String s) throws Exception {
        Map<String,String> datamap = JSONObject.parseObject(s,Map.class);
        String typecurrent = datamap.get("typecurrent");
        Member member = new Member();
        if (typecurrent.equals("INSERT")) {
            UserInfo userInfo = JSONObject.parseObject(s,UserInfo.class);
            Integer memberInteger = userInfo.getStatus();
            String memberLabel = "";
            switch (memberInteger) {
                case 0:
                    memberLabel = "普通会员";
                    break;
                case 1:
                    memberLabel = "白银会员";
                    break;
                case 2:
                    memberLabel = "黄金会员";
                    break;
                default:
                    break;
            }
            Map<String,String> mapdata = new HashMap<>();
            mapdata.put("userid",userInfo.getId()   "");
            mapdata.put("memberlabel",memberLabel);
            clickUntil.saveData("user_info",mapdata);
            String fiveMinute = DateUntil.getByInterMinute(System.currentTimeMillis()   "");
            String groupField = "memberlable=="   fiveMinute   "=="   memberLabel;
            Long numbers = 1L;
            member.setGroupField(groupField);
            member.setNumbers(numbers);
        }
        return member;
    }
}

一个MemberAnalyReduce实现了ReduceFunction接口的统计类

代码语言:javascript复制
public class MemberAnalyReduce implements ReduceFunction<Member> {
    @Override
    public Member reduce(Member member, Member t1) throws Exception {
        Long numbers1 = 0L;
        String groupField = "";
        if (member != null) {
            numbers1 = member.getNumbers();
            groupField = member.getGroupField();
        }
        Long numbers2 = 0L;
        if (t1 != null) {
            numbers2 = t1.getNumbers();
            groupField = t1.getGroupField();
        }
        if (StringUtils.isNotBlank(groupField)) {
            Member member1 = new Member();
            member1.setGroupField(groupField);
            member1.setNumbers(numbers1   numbers2);
            return member1;
        }
        return null;
    }
}

一个MemberAnalySink实现了SinkFunction接口的存储类

代码语言:javascript复制
public class MemberAnalySink implements SinkFunction<Member> {
    private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();
    
    @Override
    public void invoke(Member value, Context context) throws Exception {
        if (value != null) {
            String groupField = value.getGroupField();
            String[] groupFields = groupField.split("==");
            String timeinfo = groupFields[1];
            String memberLabel = groupFields[2];
            Long numbers = value.getNumbers();
            String tablename = "memberlabel_info";
            Map<String, String> dataMap = new HashMap<>();
            dataMap.put("timeinfo",timeinfo);
            dataMap.put("memberlabel",memberLabel);
            dataMap.put("numbers",numbers   "");
            clickUntil.saveData(tablename,dataMap);
        }
    }
}

然后是Flink流处理

代码语言:javascript复制
public class MemberAnaly {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("group.id","portrait");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("user_info",
                new SimpleStringSchema(),properties);
        DataStreamSource<String> data = env.addSource(myConsumer);
        env.enableCheckpointing(5000);
        DataStream<Member> map = data.map(new MemberAnalyMap());
        DataStream<Member> reduce = map.keyBy(Member::getGroupField).timeWindowAll(Time.minutes(5))
                .reduce(new MemberAnalyReduce());
        reduce.addSink(new MemberAnalySink());
        env.execute("portrait member");
    }
}

用户画像行为特征

这里我们会分析用户的几个行为,并进行画像

  1. 浏览商品行为:频道id、商品id、商品类别id、浏览时间、停留时间、用户id、终端类别(1、PC端,2、微信小程序,3、app)、deviceId。
  2. 收藏商品行为:频道id、商品id、商品类别id、操作时间、操作类型(收藏,取消)、用户id、终端类别(1、PC端,2、微信小程序,3、app)
  3. 购物车行为:频道id、商品id、商品类别id、操作时间、操作类型(加入,取消)、用户id、终端类别(1、PC端,2、微信小程序,3、app)
  4. 关注商品行为:频道id、商品id、商品类别id、操作时间、操作类型(关注,取消)、用户id、终端类别(1、PC端,2、微信小程序,3、app)

定义四种行为的实体类

代码语言:javascript复制
/**
 * 浏览操作
 */
@Data
public class ScanOpertor {
    /**
     * 频道id
     */
    private Long channelId;
    /**
     * 商品类型id
     */
    private Long productTypeId;
    /**
     * 商品id
     */
    private Long productId;
    /**
     * 浏览时间
     */
    private Long scanTime;
    /**
     * 停留时间
     */
    private Long stayTime;
    /**
     * 用户id
     */
    private Long userId;
    /**
     * 终端类别
     */
    private Integer deviceType;
    /**
     * 终端id
     */
    private String deviceId;
}
代码语言:javascript复制
/**
 * 收藏操作
 */
@Data
public class CollectOpertor {
    /**
     * 频道id
     */
    private Long channelId;
    /**
     * 商品类型id
     */
    private Long productTypeId;
    /**
     * 商品id
     */
    private Long productId;
    /**
     * 操作时间
     */
    private Long opertorTime;
    /**
     * 操作类型
     */
    private Integer opertorType;
    /**
     * 用户id
     */
    private Long userId;
    /**
     * 终端类别
     */
    private Integer deviceType;
    /**
     * 终端id
     */
    private String deviceId;
}
代码语言:javascript复制
/**
 * 购物车操作
 */
@Data
public class CartOpertor {
    /**
     * 频道id
     */
    private Long channelId;
    /**
     * 商品类型id
     */
    private Long productTypeId;
    /**
     * 商品id
     */
    private Long productId;
    /**
     * 操作时间
     */
    private Long opertorTime;
    /**
     * 操作类型
     */
    private Integer opertorType;
    /**
     * 用户id
     */
    private Long userId;
    /**
     * 终端类别
     */
    private Integer deviceType;
    /**
     * 终端id
     */
    private String deviceId;
}
代码语言:javascript复制
/**
 * 关注操作
 */
@Data
public class AttentionOpertor {
    /**
     * 频道id
     */
    private Long channelId;
    /**
     * 商品类型id
     */
    private Long productTypeId;
    /**
     * 商品id
     */
    private Long productId;
    /**
     * 操作时间
     */
    private Long opertorTime;
    /**
     * 操作类型
     */
    private Integer opertorType;
    /**
     * 用户id
     */
    private Long userId;
    /**
     * 终端类别
     */
    private Integer deviceType;
    /**
     * 终端id
     */
    private String deviceId;
}

在Kafka的bin目录下执行

代码语言:javascript复制
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic scan
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic collection
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cart
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic attention

新建一个商品表

代码语言:javascript复制
DROP TABLE IF EXISTS `product`;
CREATE TABLE `product` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `product_type_id` bigint(20) DEFAULT NULL,
  `product_name` varchar(255) DEFAULT NULL,
  `product_title` varchar(255) DEFAULT NULL,
  `product_price` decimal(28,10) DEFAULT NULL,
  `merchant_id` bigint(20) DEFAULT NULL,
  `create_time` datetime DEFAULT NULL,
  `update_time` datetime DEFAULT NULL,
  `product_place` varchar(255) DEFAULT NULL,
  `product_brand` varchar(255) DEFAULT NULL,
  `product_weight` decimal(28,10) DEFAULT NULL,
  `product_specification` varchar(255) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

对应实体类

代码语言:javascript复制
@Data
public class Product {
    private Long id;
    private Long productTypeId;
    private String productName;
    private String productTitle;
    private BigDecimal productPrice;
    private Long merchantId;
    private Date creteTime;
    private Date updateTime;
    private String productPlace;
    private String productBrand;
    private Double productWeight;
    private String productSpecification;
}

一个商品类型表

代码语言:javascript复制
DROP TABLE IF EXISTS `product_type`;
CREATE TABLE `product_type` (
  `id` bigint(20) NOT NULL,
  `product_type_name` varchar(255) DEFAULT NULL,
  `product_type_desc` varchar(255) DEFAULT NULL,
  `product_type_parent_id` bigint(20) DEFAULT NULL,
  `product_type_level` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

对应实体类

代码语言:javascript复制
@Data
public class ProductType {
    private Long id;
    private String productTypeName;
    private String productTypeDesc;
    private Long productTypeParentId;
    private Integer productTypeLevel;
}

一个订单表

代码语言:javascript复制
DROP TABLE IF EXISTS `order`;
CREATE TABLE `order` (
  `id` bigint(20) NOT NULL,
  `amount` decimal(28,10) DEFAULT NULL,
  `user_id` bigint(20) DEFAULT NULL,
  `product_id` bigint(20) DEFAULT NULL,
  `merchant_id` bigint(20) DEFAULT NULL,
  `create_time` datetime DEFAULT NULL,
  `pay_time` datetime DEFAULT NULL,
  `pay_status` int(11) DEFAULT NULL COMMENT '0、未支付,1、已支付,2、已退款',
  `address` varchar(1000) DEFAULT NULL,
  `telphone` varchar(255) DEFAULT NULL,
  `username` varchar(255) DEFAULT NULL,
  `trade_number` varchar(255) DEFAULT NULL,
  `pay_type` int(255) DEFAULT NULL COMMENT '0、支付宝,1、银联,2、微信',
  `number` int(11) DEFAULT NULL,
  `order_status` int(255) DEFAULT NULL COMMENT '0、已提交,1、已支付,2、已取消,3、已删除',
  `update_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

对应实体类

代码语言:javascript复制
@Data
public class Order {
    private Long id;
    private BigDecimal amount;
    private Long userId;
    private Long productId;
    private Long merchantId;
    private Date createTime;
    private Date payTime;
    private Integer payStatus;
    private String address;
    private String telphone;
    private String username;
    private String tradeNumber;
    private Integer payType;
    private Integer number;
    private Integer orderStatus;
    private Date updateTime;
}

在HBase bin目录下执行

代码语言:javascript复制
./hbase shell
create "product","info"
create "product_type","info"
create "order","info"

在Kafka的bin目录下执行

代码语言:javascript复制
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic product
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic product_type
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic order

由于有多个表,所以TranferAnaly修改如下

代码语言:javascript复制
@Slf4j
public class TranferAnaly {

    @SuppressWarnings("unchecked")
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("group.id","portrait");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("test",
                new SimpleStringSchema(),properties);

        DataStreamSource<String> data = env.addSource(myConsumer);
        env.enableCheckpointing(5000);
        DataStream<String> map = data.map(s -> {
            JSONObject jsonObject = JSONObject.parseObject(s);
            String type = jsonObject.getString("type");
            String table = jsonObject.getString("table");
            String database = jsonObject.getString("database");
            String data1 = jsonObject.getString("data");
            JSONArray jsonArray = JSONObject.parseArray(data1);
            List<Map<String,String>> listdata = new ArrayList<>();
            for (int i = 0; i < jsonArray.size(); i  ) {
                JSONObject jsonObject1 = jsonArray.getJSONObject(i);
                String tablename = table;
                String rowkey = jsonObject1.getString("id");
                String famliyname = "info";
                Map<String,String> datamap = JSONObject.parseObject(JSONObject.toJSONString(jsonObject1),Map.class);
                datamap.put("database",database);
                datamap.put("typebefore",HbaseUtil.getdata(tablename,rowkey,famliyname,"typecurrent"));
                datamap.put("typecurrent",type);
                datamap.put("tablename",table);
                HbaseUtil.put(tablename,rowkey,famliyname,datamap);
                listdata.add(datamap);
            }
            return JSONObject.toJSONString(listdata);
        });
        map.addSink(new SinkFunction<String>() {
            @Override
            public void invoke(String value, Context context) throws Exception {
                List<Map> data = JSONObject.parseArray(value,Map.class);
                for (Map<String,String> map : data) {
                    String tablename = map.get("tablename");
                    KafkaUtil.sendData(tablename,JSONObject.toJSONString(map));
                }
            }
        });
        env.execute("portrait tranfer");
    }
}

新建一个SpringBoot项目来进行业务数据收集

依赖

代码语言:javascript复制
<properties>
   <java.version>1.8</java.version>
   <fastjson.version>1.2.74</fastjson.version>
</properties>
代码语言:javascript复制
<dependencies>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
   </dependency>
   <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>${fastjson.version}</version>
   </dependency>
   <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
   </dependency>
   <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
      <optional>true</optional>
   </dependency>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
   </dependency>
</dependencies>

配置文件

代码语言:javascript复制
spring:
  kafka:
    bootstrap-servers: 127.0.0.1:9092
    producer:
      retries: 0
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      acks: -1
    consumer:
      group-id: portrait
      auto-offset-reset: earliest
      enable-auto-commit: false
      auto-commit-interval: 100
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 10
    listener:
      concurrency: 3
      type: batch
      ack-mode: manual

Kafka生产者

代码语言:javascript复制
@Component
@Slf4j
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @SuppressWarnings("unchecked")
    public void produce(String topic,String message) {
        try {
            ListenableFuture future = kafkaTemplate.send(topic, message);
            SuccessCallback<SendResult<String,String>> successCallback = new SuccessCallback<SendResult<String, String>>() {
                @Override
                public void onSuccess(@Nullable SendResult<String, String> result) {
                    log.info("发送消息成功");
                }
            };
            FailureCallback failureCallback = new FailureCallback() {
                @Override
                public void onFailure(Throwable ex) {
                    log.error("发送消息失败",ex);
                    produce(topic,message);
                }
            };
            future.addCallback(successCallback,failureCallback);
        } catch (Exception e) {
            log.error("发送消息异常",e);
        }
    }
}

收集控制类

代码语言:javascript复制
@RestController
public class DataController {
    @Autowired
    private KafkaProducer kafkaProducer;

    @PostMapping("/revicedata")
    public void reviceData(@RequestBody String data) {
        JSONObject jsonObject = JSONObject.parseObject(data);
        String type = jsonObject.getString("type");
        String topic = "";
        switch (type) {
            case "0":
                topic = "scan";
                break;
            case "1":
                topic = "collection";
                break;
            case "2":
                topic = "cart";
                break;
            case "3":
                topic = "attention";
                break;
            default:
                break;
        }
        kafkaProducer.produce(topic,data);
    }
}

其实这里只是一个简单的用户行为模拟,我们应该建立日志微服务来收集所有的用户行为。具体可以参考AOP原理与自实现 ,可以根据这里进行改造,将日志类由

代码语言:javascript复制
@Builder
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Log implements Serializable {

   private static final long serialVersionUID = -5398795297842978376L;

   private Long id;
   private String username;
   /** 模块 */
   private String module;
   /** 参数值 */
   private String params;
   private String remark;
   private Boolean flag;
   private Date createTime;
   private String ip;
   private String area;
}

替换成上面的各种操作类即可。当然还要做一些其他的修改,这里就不去进行修改了。另外将RabbitMQ改成Kafka即可。

创建用户画像商品类别偏好标签

创建一个商品类型标签实体类

代码语言:javascript复制
@Data
public class ProductTypeLabel {
    private Long userid;
    private String productTypeId;
    private Long numbers = 0L;
    private String groupField;
}

在DateUntil工具类中增加一个方法,获取当前时间的小时数。

代码语言:javascript复制
public static Long getCurrentHourStart(Long visitTime) throws ParseException {
    Date date = new Date(visitTime);
    DateFormat dateFormat = new SimpleDateFormat("yyyyMMdd HH");
    Date filterTime = dateFormat.parse(dateFormat.format(date));
    return filterTime.getTime();
}

创建一个ProductTypeAnalyMap实现了MapFunction接口的转换类

代码语言:javascript复制
public class ProductTypeAnalyMap implements MapFunction<String,ProductTypeLabel> {
    @Override
    public ProductTypeLabel map(String s) throws Exception {
        ScanOpertor scanOpertor = JSONObject.parseObject(s, ScanOpertor.class);
        Long userid = scanOpertor.getUserId();
        Long productTypeId = scanOpertor.getProductTypeId();
        String tablename = "user_info";
        String rowkey = userid   "";
        String famliyname = "info";
        String colum = "producttypelist";
        //获取历史用户偏好商品类型
        String productTypeListString = HbaseUtil.getdata(tablename, rowkey, famliyname, colum);
        List<Map> temp = new ArrayList<>();
        List<Map<String,Long>> result = new ArrayList<>();
        if (StringUtils.isNotBlank(productTypeListString)) {
            temp = JSONObject.parseArray(productTypeListString,Map.class);
        }
        for (Map map : temp) {
            Long productTypeId1 = Long.parseLong(map.get("key").toString());
            Long value = Long.parseLong(map.get("value").toString());
            //如果新的商品类型与历史商品类型有相同的类型,偏好值 1
            if (productTypeId.equals(productTypeId1)) {
                value  ;
                map.put("value",value);
            }
            result.add(map);
        }
        Collections.sort(result,(o1,o2) -> {
            Long value1 = o1.get("value");
            Long value2 = o2.get("value");
            return value2.compareTo(value1);
        });
        if (result.size() > 5) {
            result = result.subList(0,5);
        }
        String data = JSONObject.toJSONString(result);
        HbaseUtil.putdata(tablename,rowkey,famliyname,colum,data);
        ProductTypeLabel productTypeLabel = new ProductTypeLabel();
        //格式:productType==timehour==productTypeId
        String groupField = "productType=="   DateUntil.getCurrentHourStart(System.currentTimeMillis())
                  "=="   productTypeId;
        productTypeLabel.setUserid(userid);
        productTypeLabel.setProductTypeId(productTypeId   "");
        productTypeLabel.setNumbers(1L);
        productTypeLabel.setGroupField(groupField);
        return productTypeLabel;
    }
}

一个ProductTypeAnalyReduce实现了ReduceFunction接口的统计类

代码语言:javascript复制
public class ProductTypeAnalyReduce implements ReduceFunction<ProductTypeLabel> {
    @Override
    public ProductTypeLabel reduce(ProductTypeLabel productTypeLabel, ProductTypeLabel t1) throws Exception {
        Long numbers1 = 0L;
        String groupField = "";
        if (productTypeLabel != null) {
            numbers1 = productTypeLabel.getNumbers();
            groupField = productTypeLabel.getGroupField();
        }
        Long numbers2 = 0L;
        if (t1 != null) {
            numbers2 = t1.getNumbers();
            groupField = t1.getGroupField();
        }
        if (StringUtils.isNotBlank(groupField)) {
            ProductTypeLabel productTypeLabel1 = new ProductTypeLabel();
            productTypeLabel1.setGroupField(groupField);
            productTypeLabel1.setNumbers(numbers1   numbers2);
            return productTypeLabel1;
        }
        return null;
    }
}

一个ProductTypeAnalySink实现了SinkFunction接口的存储类

代码语言:javascript复制
public class ProductTypeAnalySink implements SinkFunction<ProductTypeLabel> {
    private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();

    @Override
    public void invoke(ProductTypeLabel value, Context context) throws Exception {
        if (value != null) {
            String groupField = value.getGroupField();
            String[] groupFields = groupField.split("==");
            String timeinfo = groupFields[1];
            String productTypeLabel = groupFields[2];
            Long numbers = value.getNumbers();
            String tablename = "producttypelabel_info";
            Map<String, String> dataMap = new HashMap<>();
            dataMap.put("timeinfo",timeinfo);
            dataMap.put("producttypelabel",productTypeLabel);
            dataMap.put("numbers",numbers   "");
            clickUntil.saveData(tablename,dataMap);
        }
    }
}

然后是Flink的流处理

代码语言:javascript复制
public class ProductTypeAnaly {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("group.id","portrait");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("scan",
                new SimpleStringSchema(),properties);
        DataStreamSource<String> data = env.addSource(myConsumer);
        env.enableCheckpointing(5000);
        DataStream<ProductTypeLabel> map = data.map(new ProductTypeAnalyMap());
        DataStream<ProductTypeLabel> reduce = map.keyBy(ProductTypeLabel::getGroupField).timeWindowAll(Time.hours(1))
                .reduce(new ProductTypeAnalyReduce());
        reduce.addSink(new ProductTypeAnalySink());
        env.execute("portrait scan");
    }
}

创建用户画像纠结商品标签

创建一个纠结商品标签实体类

代码语言:javascript复制
@Data
public class TangleProduct {
    private Long userid;
    private String productId;
    private Long numbers = 0L;
    private String groupField;
}

一个TangleProductAnalyMap实现了MapFunction接口的转换类

代码语言:javascript复制
public class TangleProductAnalyMap implements MapFunction<String,TangleProduct> {
    @Override
    public TangleProduct map(String s) throws Exception {
        CartOpertor cartOpertor = JSONObject.parseObject(s, CartOpertor.class);
        Long userid = cartOpertor.getUserId();
        Long productId = cartOpertor.getProductId();
        String tablename = "user_info";
        String rowkey = userid   "";
        String famliyname = "info";
        String colum = "tangleproducts";
        //获取历史用户纠结的商品
        String tangleProducts = HbaseUtil.getdata(tablename, rowkey, famliyname, colum);
        List<Map> temp = new ArrayList<>();
        List<Map<String,Long>> result = new ArrayList<>();
        if (StringUtils.isNotBlank(tangleProducts)) {
            temp = JSONObject.parseArray(tangleProducts,Map.class);
        }
        for (Map map : temp) {
            Long productId1 = Long.parseLong(map.get("key").toString());
            Long value = Long.parseLong(map.get("value").toString());
            //如果新的商品类型与历史商品类型有相同的类型,偏好值 1
            if (productId.equals(productId1)) {
                value  ;
                map.put("value",value);
            }
            result.add(map);
        }
        Collections.sort(result,(o1, o2) -> {
            Long value1 = o1.get("value");
            Long value2 = o2.get("value");
            return value2.compareTo(value1);
        });
        if (result.size() > 5) {
            result = result.subList(0,5);
        }
        String data = JSONObject.toJSONString(result);
        HbaseUtil.putdata(tablename,rowkey,famliyname,colum,data);
        TangleProduct tangleProduct = new TangleProduct();
        //格式:tangleProduct==timehour==productId
        String groupField = "tangleProduct=="   DateUntil.getCurrentHourStart(System.currentTimeMillis())
                  "=="   productId;
        tangleProduct.setUserid(userid);
        tangleProduct.setProductId(productId   "");
        tangleProduct.setNumbers(1L);
        tangleProduct.setGroupField(groupField);
        return tangleProduct;
    }
}

一个TangleProductAnalyReduct实现了ReduceFunction接口的统计类

代码语言:javascript复制
public class TangleProductAnalyReduct implements ReduceFunction<TangleProduct> {
    @Override
    public TangleProduct reduce(TangleProduct tangleProduct, TangleProduct t1) throws Exception {
        Long numbers1 = 0L;
        String groupField = "";
        if (tangleProduct != null) {
            numbers1 = tangleProduct.getNumbers();
            groupField = tangleProduct.getGroupField();
        }
        Long numbers2 = 0L;
        if (t1 != null) {
            numbers2 = t1.getNumbers();
            groupField = t1.getGroupField();
        }
        if (StringUtils.isNotBlank(groupField)) {
            TangleProduct tangleProduct1 = new TangleProduct();
            tangleProduct1.setGroupField(groupField);
            tangleProduct1.setNumbers(numbers1   numbers2);
            return tangleProduct1;
        }
        return null;
    }
}

一个TangleProductAnalySink实现了SinkFunction接口的存储类

代码语言:javascript复制
public class TangleProductAnalySink implements SinkFunction<TangleProduct> {
    private ClickUntil clickUntil = ClickUntilFactory.createClickUntil();
    
    @Override
    public void invoke(TangleProduct value, Context context) throws Exception {
        if (value != null) {
            String groupField = value.getGroupField();
            String[] groupFields = groupField.split("==");
            String timeinfo = groupFields[1];
            String tangleProductLabel = groupFields[2];
            Long numbers = value.getNumbers();
            String tablename = "tangleproductlabel_info";
            Map<String, String> dataMap = new HashMap<>();
            dataMap.put("timeinfo",timeinfo);
            dataMap.put("tangleproductlabel",tangleProductLabel);
            dataMap.put("numbers",numbers   "");
            clickUntil.saveData(tablename,dataMap);
        }
    }
}

然后是Flink的流处理

代码语言:javascript复制
public class TangleProductAnaly {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers","127.0.0.1:9092");
        properties.setProperty("group.id","portrait");
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("cart",
                new SimpleStringSchema(),properties);
        DataStreamSource<String> data = env.addSource(myConsumer);
        env.enableCheckpointing(5000);
        DataStream<TangleProduct> map = data.map(new TangleProductAnalyMap());
        DataStream<TangleProduct> reduce = map.keyBy(TangleProduct::getGroupField).timeWindowAll(Time.hours(1))
                .reduce(new TangleProductAnalyReduct());
        reduce.addSink(new TangleProductAnalySink());
        env.execute("portrait cart");
    }
}

0 人点赞