Pilosa简介
Pilosa是一款开源的分布式索引,主要是为了查询速度和水平伸缩性而设计的。如果数据规模在数十亿,并且有上百万的属性值,那么就可以考虑使用Pilosa解决这些问题:哪些属性最常见?哪些数据对象拥有特定的某些属性?哪些属性组会经常一起出现?等等类似的问题。
Pilosa数据模型
Pilosa的核心结构是一个boolean矩阵。如下所示,是一个基本的数据模型图,从图中我们可以看到有index、field、shard等entity:
为了方便理解,我们将传统数据库的相关概念与Pilosa的进行了映射对比,详细内容如表格所示:
我们可以结合这两个图来看下,Pilosa中各个entity的含义:
- Index,表示一组数据的命名空间,类似关系表的Table,但是具体含义也不太一样,下面会有例子进行说明。不同index之间的数据不能进行交叉查询;
- Column,列id是一个连续递增的整数,对于index中的所有字段都是公用的,一个列通常对应于关系表中的一行记录,例如数据模型中的第二列,就代表某行记录在字段A的row0和row10、字段B的row3都有值,而其他的情况无值;
- Row,行id是一个连续的整数,在每个字段内公用。每一行就表示字段的一种属性值,例如性别字段,通常就是有男/女两行;
- Field,对应关系表的一个字段,上面也说过了,字段的每一行都代表关系字段的一种属性值。
- Ranked,可以在创建字段的时候设置rank字段,这样在生成bitmap的时候时候就会按行id维护对列计数的排序缓存,有助于提升TopN的查询速度,如图所示:
- LRU,LRU缓存会记录最近被访问的行,如下所示:
- Time Quantum,如果在字段上设置了时间量选项,Pilosa就会创建额外的视图,这些视图允许对指定范围的行进行向下查询,下面会在具体的字段类型中再详细说明;
- Shard,索引的数据会被分成若干分片(之前叫slices),每个分片都保存固定数量的列,对应关系表中即为固定数量的记录数据(从基本模型图可以看到,0~1023列位于分片0、1024~2047位于分片1,以此类推)。
字段类型
- Set,Pilosa的默认字段类型,表示一个行和列的标准二进制矩阵,矩阵中的每一行代表一个字段的值,下面的例子就是在repository下创建了一个“info”字段,并且带有100000条记录的排序:
curl localhost:10101/index/repository/field/info
-X POST
-d
'{"options": {"type": "set", "cacheType": "ranked", "cacheSize":100000}}'
- Int,用于存储整数值,与索引中的其他字符共享相同的列,但是该字段的值必须位于创建字符时指定的最小值和最大值之间。下面的例子就是创建了一个quantity字段,并且范围是从-1000到2000:
curl localhost:10101/index/repository/field/quantity
-X POST
-d '{"options": {"type": "int", "min": -1000, “max”:2000}}'
- Time,该字段类似于set,但是除了行和列的信息之外,还会额外存储每个位的时间值到定义的粒度。下面的例子就是创建了一个event字段,并且定了时间量为YMD,即支持查询范围到天级别:
curl localhost:10101/index/repository/field/event
-X POST
-d '{"options": {"type": "time", "timeQuantum": "YMD"}}'
使用了time类型的字段之后,会为每个定义的时间段产生数据视图,对于以下的Set()就会产生对应的数据视图:
代码语言:javascript复制Set(3, A=8, 2017-05-18T00:00)
Set(3, A=8, 2017-05-19T00:00)
- Mutex,该字段与set字符类似,区别在于要求每列的行值必须互斥,即每一列只能对该字段设置一个值,如果更新了互斥字段上的列值,则先前的字段值会被清除。
- Boolean,该字段与mutex字段类似,只识别true和false。
使用方式
pilosa import
我们以官方的一个例子来介绍如何使用Pilosa的import命令将普通的数据导入到Pilosa中进行查询,这个名为“Star Trace”的示例,统计了1000个github上最流行的包含“go”关键字的项目,如下所示是两个测试表的原始结构:
表名 | 列名 | 类型 |
---|---|---|
stargazer | user_id | INT |
stargazer | repo_id | INT |
stargazer | ts | STRING |
language | language_id | INT |
language | repo_id | INT |
可以通过如下两条命令下载stargazer.csv和language.csv:
代码语言:javascript复制curl -O https://raw.githubusercontent.com/pilosa/getting-started/master/stargazer.csv
curl -O https://raw.githubusercontent.com/pilosa/getting-started/master/language.csv
要将这些数据导入到pilosa中,并生成相应的bitmap,我们首先需要在pilosa中新建相应的index以及field,具体的操作可以参考官方文档,相应的成员变量如下所示:
变量名称 | 变量类型 | 数据类型 |
---|---|---|
repository | index | / |
stargazer | field | set |
language | field | time |
然后我们就可以通过如下命令,将数据导入到pilosa中: 导入完成之后,我们就可以直接使用pilosa的语法进行查询,下面我们列举了一些pilosa和impala在某些查询场景下的SQL对比:
代码语言:javascript复制pilosa import -i repository -f stargazer stargazer.csv
pilosa import -i repository -f language language.csv
1.编程语言使用Top5 Impala SQL:
代码语言:javascript复制select lang_id,count(1) c from language
group by lang_id order by c desc limit 5;
Pilosa SQL:
代码语言:javascript复制TopN(language, n=5)
在两个系统中执行的结果分别如下所示,我们可以看到,两个查询的结果是一致的:
2. user_id为14和19的用户,同时标记了的项目,且项目使用的编程语言id为1 Impala SQL:
代码语言:javascript复制select a.repo_id from stargazer a,stargazer b,language c
where a.user_id=14 and b.user_id=19 and c.lang_id=1
and a.repo_id=b.repo_id and a.repo_id=c.repo_id;
Pilosa SQL:
代码语言:javascript复制Intersect(
Row(stargazer=14),
Row(stargazer=19),
Row(language=1))
在两个系统中执行的结果分别如下所示,我们可以看到,两个查询的结果是一致的:
pdk kafka
Pilosa提供了一个PDK(Pilosa Dev Kit),通过该工具我们可以直接从kafka中读取数据,存入Pilosa中进行查询,该工具还提供了其他的一些用例,后续我们也会提到。这里我们就需要用到kafkagen和kafka这两个命令,主要工作流程如下图所示:pdk kafkagen向kafka中写入测试数据,pdk kafka从kafka中读入数据,按照相应的映射存储到Pilosa中。相关流程图如下所示,图片来自PDK源码:
测试环境准备:
- 下载confluent全家桶,然后依次部署zookeeper、kafka、schema registry、kafka rest proxy、console center;
- 安装并启动pilosa服务,编译安装pdk; 准备完成之后,我们就可以通过如下命令向kafka写入测试数据,然后将数据写入Pilosa中:
pdk kafkagen -t pilosa_test
pdk kafka -t pilosa_test -n pilosa_test -p localhost:10101
我们将数据写入了测试kafka名为pilosa_test的topic,然后将读取到Pilosa中名为pilosa_test的index,执行成功后,我们就可以看到Pilosa中已经生成了相应的index和filed。我们通过kafka的control center可以看到测试数据对应的schema:
截图中对应kafka数据的字段在Pilosa中的对应的filed为:
Field | Type | CACHE SIZE | MIN/MAX |
---|---|---|---|
timestamp | set | 100000 | / |
geoip-time_zone | set | 100000 | / |
geoip-longitude | int | / | -9223372036854776000,9223372036854776000 |
我们可以看到,kafka中的每个字段,Pilosa都会根据类型构建相应的filed,如果碰到多层嵌套的情况,就会按照路径往下搜索,直到达到该字段为止,以这个全路径的名称来作为Pilosa的字段名称。然后我们就可以通过Pilosa的SQL进行相应的查询分析了。
Pilosa的多表关联计算
多表关联计算是一种非常常见的SQL计算,在进行数据分析的时候时候,常常会需要将订单表与其他几个货品表、用户表等进行关联,筛选出符合条件的数据,Pilosa中同样也提供了一个类似的示例,即Star Schema Benchmark(以下简称SSB),这是一个数据库查询性能的测试,涉及到的都是零售相关的表,是一个典型的星型模式,各个表之间的关系如下所示:
图中一共有5张表,我们在关系表中可以直接使用JOIN关键字来进行关联查询,例如:
代码语言:javascript复制select sum(lo_extendedprice*lo_discount) as revenue
from lineorder, dwdate
where lo_orderdate = d_datekey
and d_year = 1993
and lo_discount between 1 and 3
and lo_quantity < 25;
在Pilosa中,可以通过pdk ssb来构造SSB测试的Pilosa版本,详细过程这里就不再赘述,可以参考官网介绍:Pilosa的零售分析和SSB基准测试。构造完成之后,我们就可以通过如下的SQL来实现上述关系表的SQL结果:
代码语言:javascript复制Sum(
Intersect(
Row(lo_year = 1993),
Row(lo_discount >= 1),
Row(lo_discount <= 3),
Row(lo_quantity < 25)
),field=lo_revenue_computed)
关于Pilosa重写的SSB的Query,可以参考官方提供的ssb-demo,里面的query.go文件中包含所有的Query。我们通过对比两个以上两个SQL可以看到,loextendedpricelodiscount在Pilosa中被替换成了lorevenuecomputed这个字段。我们在使用pdk ssb导入SSB数据到pilosa的时候,pdk在内部做了这样一个转换,Pilosa对于这种表达式计算需要预先定义好,这点跟kylin有点类似。关于从关系表到Pilosa的字段映射可以参考pdk的源码,以下是部分字段的转换代码:
代码语言:javascript复制m.index.AddValue("lo_quantity", col, int64(rec.lo_quantity))
m.index.AddValue("lo_extendedprice", col, int64(rec.lo_extendedprice))
m.index.AddValue("lo_discount", col, int64(rec.lo_discount))
m.index.AddValue("lo_revenue", col, int64(rec.lo_revenue))
m.index.AddValue("lo_supplycost", col, int64(rec.lo_supplycost))
revenueComputed := int64(float64(rec.lo_extendedprice) * float64(rec.lo_discount) * 0.01)
m.index.AddValue("lo_revenue_computed", col, revenueComputed)
profitComputed := uint32(rec.lo_revenue) - rec.lo_supplycost
m.index.AddValue("lo_profit", col, int64(profitComputed))
我们通过上述代码可以看到,lo_revenue_computed和lo_profit这种都是多个字段经过表达式计算之后的结果,在进行Query重写的时候,将SSB原生Query中的表达式替换为对应的字段即可。 由于Pilosa无法跨index进行交叉查询,因此我们将SSB数据导入的时候,会将相关的字段都放到一个index中,所以在最开始的时候,就提到了,index跟传统关系表的含义也不完全一样。事实证明,Pilosa对于这种星型模式还是非常适合的,但是我们通过上述例子可以看到,对于某个实际使用场景,我们需要设计相应的模型,并定义好相应的字段,然后将数据导入到Pilosa中,才能进行正常的查询分析。所以,在一般业务场景下,可能使用kafka导入的方式相对比较合理。