数据统计在互联网平台中扮演着至关重要的角色,不仅能够深入分析用户在互联网平台上的行为,包括浏览、点击、搜索、购买等行为数据。通过这些数据企业还可以了解用户的喜好、需求和购买习惯,从而优化产品、服务和营销策略。典型的应用主要就是用户画像构建和用户行为路径分析。
那么一些典型的数据统计系统都是用哪些技术来实现的呢?
宏观的讲,单纯从数据库存储层面就有很多技术,比如关系型数据库、NoSQL、OLAP等存储系统,逻辑层面还有ETL、数据挖掘等,前端层面涉及的技术也有很多很多......,但是今天我想单就存储层面,使用Redis做一个简单的数据统计案例。
为什么可以使用Redis做数据统计?
使用Redis做数据统计具有一系列优势:
高速读写性能:Redis是一款基于内存的数据库,其读写速度远快于传统的磁盘存储数据库。这使得Redis在进行数据统计时能够迅速响应,提高数据处理效率。
丰富的数据结构:Redis支持多种数据结构,如字符串(String)、哈希(Hash)、列表(List)、集合(Set)、有序集合(ZSet)等。这些数据结构为数据统计提供了灵活的选择,可以根据不同的统计需求选择最合适的数据结构。
实时更新能力:Redis支持发布/订阅模式,可以实现数据的实时更新。在数据统计场景中,这意味着可以实时地获取最新的数据并进行统计,从而快速响应数据变化。
数据持久化和可扩展性:Redis提供了RDB和AOF两种持久化机制,确保在服务器重启或发生故障时数据不会丢失。这对于数据统计来说至关重要,因为统计结果需要长期保存以供后续分析和使用。并且Redis支持主从复制和集群模式,可以实现数据的水平扩展。在数据统计需求不断增长的情况下,可以通过增加Redis节点来扩展存储容量和处理能力。
但与此同时,Redis进行大规模数据统计也有很多风险:
内存消耗:由于Redis是基于内存的数据库,因此当数据量较大时,会占用大量的内存资源。这可能导致需要投入更多的成本来扩展内存容量或者进行性能优化。
单线程模型:Redis的命令处理是单线程的,这意味着在处理大量并发请求时可能存在性能瓶颈。虽然Redis通过IO多路复用技术优化了网络IO性能,但在极端情况下仍可能受到单线程模型的限制。
数据安全性:Redis的数据存储在内存中,如果服务器发生崩溃或断电等意外情况,可能会导致数据丢失。
需求场景
本次的场景很简单:用户给博客评分,只有用户、博客、分这三个概念。
于是我们定义业务模型
代码语言:go复制type Record struct {
UserId int `json:"user_id,omitempty"`
BlogId int `json:"blog_id,omitempty"`
Score int `json:"score,omitempty"`
}
- UserId:用户唯一标识
- BlogId:博客唯一标识
- Score:用户给博客评定的分数
数据统计其实核心无非就是两件事:存数据和查数据, 因此我们可以先定义一个抽象,QueryType表示查询的维度,有用户维度和博客维度,典型的应用场景:
用户维度:给哪些博客评分过
博客维度:有哪些用户给该博客评过分
抽象定义:
代码语言:go复制type QueryType int
const (
QueryTypeUser QueryType = 1
QueryTypeBlog QueryType = 2
)
type DataAnalyzer interface {
Vote(userId, blogId, score int) error // 存储评分数据
Query(t QueryType, id int) ([]Record, error) // 按不同维度查询数据
}
存数据和查数据的抽象我们都定义好了,但是还有一个非常关键的问题:使用哪一种数据结构来存储数据更有利于数据的高效存取?
关键问题分析
Redis常用的数据结构有五种,分别是String、List、Set、Hash和Zset, 其中有两种比较适合我们本次的业务场景,分别是最简单的String和ZSet,理由如下:
String :
- 适用于简单的键值对存储,如存储用户的访问次数、页面的点击量等。
- 可以使用INCR、DECR等命令实现原子性的自增自减操作,非常适合计数器场景。
ZSet :
- 是Set的一个扩展类型,每个元素都会关联一个分数(score),用于排序。
- 支持范围查询,可以根据分数范围获取元素列表。
- 在数据统计中,非常适合用于排行榜、实时数据分析等场景,可以根据分数进行排序和查询。
下面我们通过不同数据结构分别实现一遍
使用String数据结构实现业务
定义常量作为Key的前缀
代码语言:go复制const (
RedisVote = "redis-vote"
RedisVoteV2 = "redis-vote-v2"
)
type StringData struct {
}
存数据采用预定格式的Key和Int类型的Value实现,Key由前缀、userId、blogId组成,Value为分数
代码语言:go复制func (s *StringData) Vote(userId, blogId, score int) error {
genKey := func(userId, blogId int) string {
return fmt.Sprintf("%s-uid-%d-bid-%d", RedisVote, userId, blogId)
}
key := genKey(userId, blogId)
if _, err := redisClient.Set(redisClient.Context(), key, score, 0).Result(); err != nil {
return err
}
return nil
}
取数据的时候需要解析Key为userId、blogId,然后Value为分数
代码语言:go复制func (s *StringData) Query(queryType QueryType, id int) ([]Record, error) {
parseKey := func(key string) (userId int, blogId int) {
split := strings.Split(key, "-uid-")
if len(split) != 2 {
return -1, -1
}
str := strings.Split(split[1], "-bid-")
if len(str) != 2 {
return -1, -1
}
return cast.ToInt(str[0]), cast.ToInt(str[1])
}
var keyPattern string
switch queryType {
case QueryTypeUser:
keyPattern = fmt.Sprintf("%s-uid-%d-bid-*", RedisVote, id)
case QueryTypeBlog:
keyPattern = fmt.Sprintf("%s-uid-*-bid-%d", RedisVote, id)
}
keys, err := redisClient.Keys(redisClient.Context(), keyPattern).Result()
if err != nil {
return nil, err
}
records := make([]Record, 0)
for _, key := range keys {
result, err := redisClient.Get(redisClient.Context(), key).Result()
if err != nil {
continue
}
userId, blogId := parseKey(key)
records = append(records, Record{UserId: userId, BlogId: blogId, Score: cast.ToInt(result)})
}
return records, nil
}
使用ZSet类型实现业务
使用ZSet做出该功能
代码语言:go复制type ZSetData struct {
}
使用ZSet存数据的话为了方便查询需要存两份,按照Blog维度以blogId为Key和以User维度以userId为Key各存一份
代码语言:go复制func (z *ZSetData) Vote(userId, blogId, score int) error {
if _, err := redisClient.ZAdd(redisClient.Context(), fmt.Sprintf("%s-bid-%d", RedisVoteV2, blogId),
&redis.Z{Score: float64(score), Member: userId}).Result(); err != nil {
return err
}
if _, err := redisClient.ZAdd(redisClient.Context(), fmt.Sprintf("%s-uid-%d", RedisVoteV2, userId),
&redis.Z{Score: float64(score), Member: blogId}).Result(); err != nil {
return err
}
return nil
}
取数据的话只需要根据数据的查询维度取一份就可以了
代码语言:go复制func (z *ZSetData) Query(queryType QueryType, id int) ([]Record, error) {
var result []redis.Z
var err error
switch queryType {
case QueryTypeUser:
result, err = redisClient.ZRangeWithScores(redisClient.Context(), fmt.Sprintf("%s-uid-%d", RedisVoteV2, id), 0, -1).Result()
case QueryTypeBlog:
result, err = redisClient.ZRangeWithScores(redisClient.Context(), fmt.Sprintf("%s-bid-%d", RedisVoteV2, id), 0, -1).Result()
}
if err != nil {
return nil, err
}
records := make([]Record, 0)
for _, res := range result {
switch queryType {
case QueryTypeUser:
records = append(records, Record{UserId: id, BlogId: cast.ToInt(res.Member), Score: cast.ToInt(res.Score)})
case QueryTypeBlog:
records = append(records, Record{UserId: cast.ToInt(res.Member), BlogId: id, Score: cast.ToInt(res.Score)})
}
}
return records, nil
}
实现http服务
代码语言:go复制func initRedis() {
redisClient = redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
Password: "ibarryyan",
DB: 0,
})
if _, err := redisClient.Ping(context.Background()).Result(); err != nil {
panic(err)
}
}
func main() {
initRedis()
data := ZSetData{} // 可以切换到StringData
mux := http.DefaultServeMux
mux.HandleFunc("/vote", func(resp http.ResponseWriter, req *http.Request) {
userId := req.URL.Query().Get("userId")
blogId := req.URL.Query().Get("blogId")
score := req.URL.Query().Get("score")
fmt.Printf("%s vote, userId:%s, blogId:%s, score:%s n", time.Now().Format("2006-01-02 15:04:05"), userId, blogId, score)
if err := data.Vote(cast.ToInt(userId), cast.ToInt(blogId), cast.ToInt(score)); err != nil {
_, _ = resp.Write([]byte("err"))
} else {
_, _ = resp.Write([]byte("ok"))
}
return
})
mux.HandleFunc("/query", func(resp http.ResponseWriter, req *http.Request) {
queryType := req.URL.Query().Get("type")
id := req.URL.Query().Get("id")
fmt.Printf("%s query type %s, id:%s, n", time.Now().Format("2006-01-02 15:04:05"), queryType, id)
records, err := data.Query(QueryType(cast.ToInt(queryType)), cast.ToInt(id))
if err != nil {
_, _ = resp.Write([]byte("err"))
}
bytes, err := json.Marshal(records)
if err != nil {
_, _ = resp.Write([]byte("err"))
} else {
_, _ = resp.Write(bytes)
}
return
})
_ = http.ListenAndServe(":8080", mux)
}
两个接口的参数和返回值:
http://localhost:8080/vote:
代码语言:shell复制Request:
- userId
- blogId
- score
Response:
- ok
http://localhost:8080/query:
代码语言:shell复制Request:
- type
- id
Response:
- [{},{}...]
效果测试
存数据
代码语言:go复制func TestVote(t *testing.T) {
client := http.Client{}
resp, err := client.Get(fmt.Sprintf("http://localhost:8080/vote?userId=%d&blogId=%d&score=%d", 2, 2, 1))
if err != nil {
fmt.Println(err)
}
defer func() { _ = resp.Body.Close() }()
bytes, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(string(bytes))
}
取数据
代码语言:go复制func TestQuery(t *testing.T) {
client := http.Client{}
resp, err := client.Get(fmt.Sprintf("http://localhost:8080/query?type=%d&id=%d", 1, 2))
if err != nil {
fmt.Println(err)
}
defer func() { _ = resp.Body.Close() }()
bytes, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(string(bytes))
}
性能测试
存数据:
代码语言:go复制func TestVoteBatch(t *testing.T) {
var count int32
start := time.Now().UnixMilli()
client := http.Client{}
for i := 0; i < 100; i {
for j := 0; j < 100; j {
count = 1
resp, err := client.Get(fmt.Sprintf("http://localhost:8080/vote?userId=%d&blogId=%d&score=%d", i 100, j 100, 1))
if err != nil {
fmt.Println(err)
}
bytes, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println(err)
continue
}
fmt.Println(string(bytes))
}
}
end := time.Now().UnixMilli()
fmt.Printf("耗时:%d ,总计打分:%d", end-start, count)
}
测试结果:
代码语言:shell复制ZSet 耗时:101108 ,总计打分:10000--- PASS: TestVoteBatch (101.11s)
String 耗时:51354 , 总计打分:10000--- PASS: TestVoteBatch (51.36s)
取数据:
代码语言:go复制func TestQueryBatch(t *testing.T) {
var total int
start := time.Now().UnixMilli()
client := http.Client{}
for j := 0; j < 100; j {
resp, err := client.Get(fmt.Sprintf("http://localhost:8080/query?type=%d&id=%d", 1, j))
if err != nil {
fmt.Println(err)
}
bytes, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println(err)
continue
}
var res []Record
if err := json.Unmarshal(bytes, &res); err != nil {
fmt.Println(err)
}
fmt.Println(res)
total = len(res)
}
for j := 0; j < 100; j {
resp, err := client.Get(fmt.Sprintf("http://localhost:8080/query?type=%d&id=%d", 2, j))
if err != nil {
fmt.Println(err)
}
bytes, err := io.ReadAll(resp.Body)
if err != nil {
fmt.Println(err)
continue
}
var res []Record
if err := json.Unmarshal(bytes, &res); err != nil {
fmt.Println(err)
}
fmt.Println(res)
total = len(res)
}
end := time.Now().UnixMilli()
fmt.Printf("耗时:%d ,查询数据条目:%d", end-start, total)
}
测试结果:
代码语言:shell复制ZSet 耗时:6404 , 查询数据条目:20000--- PASS: TestQueryBatch (6.40s)
String 耗时:101378 ,查询数据条目:20000--- PASS: TestQueryBatch (101.38s)
结论:
String类型在存数据时性能优于ZSet类型,但是在取数据时要慢与ZSet类型,主要是由于String类型查询数据时每次都要按照Key进行模糊匹配查询,该操作比较耗时,ZSet类型存数据慢的原因主要是它的内部数据结构不如String简单,操作复杂度高于String,进而耗时较高。
小结
做数据统计时,通常会使用多种类型的数据库,具体选择取决于数据的性质、规模、查询需求以及性能要求等因素。 在大规模数据统计时,一般会使用Clickhouse和Doris等OLAP进行数据存储,而Redis更偏向于作为缓存层进行热点数据的存储。
在选择数据库进行数据统计时,需要根据具体的应用场景和需求来评估不同数据库的特点和优势。Redis、ClickHouse和Doris都是优秀的数据库系统,在数据统计领域有着广泛的应用。Redis适合用于缓存和加速数据查询过程;ClickHouse适合处理大规模的数据统计和分析任务;而Doris则在多表联合查询和复杂数据分析方面表现出色。