Presto Connector连接器
Presto Connector支持从多种数据源读取数据,例如:Hive、MySQL、Redis、Kudu、Kafka等。Presto Connector只支持从对应的Connector中查询数据,不支持建表及插入等非查询操作,这个使用Presto 主要应用于OLAP场景决定的。
一、Hive Connector
1、配置
Presto连接Hive在Presto搭建安装中已经讲解过,
可以参照:https://prestodb.io/docs/current/connector/hive.html
2、案例
在Hive中创建两张表h1和h2,并向两张表中加载数据,操作如下:
代码语言:javascript复制#创建表h1
create table h1 (id int,name string,age int) row format delimited fields terminated by 't';
#创建表h2
create table h2 (id int,name string,score int) row format delimited fields terminated by 't';
分别向两张表中加载如下数据:
代码语言:javascript复制h1表h1.txt
1 zs 18
2 ls 19
3 ww 20
4 ml 21
5 tq 22
load data local inpath '/root/h1.txt' into table h1;
h2表h2.txt
1 zs 100
2 ls 200
3 ww 300
4 ml 400
5 tq 500
load data local inpath '/root/h2.txt' into table h2;
使用Presto查询以下sql:
代码语言:javascript复制presto:default> select t1.id,t1.name,t1.age,t2.score from h1 t1 join h2 t2 on t1.id = t2.id;
id | name | age | score
---- ------ ----- -------
2 | ls | 19 | 200
1 | zs | 18 | 100
3 | ww | 20 | 300
4 | ml | 21 | 400
5 | tq | 22 | 500
presto:default> select sum(score) as total_score from h2;
total_score
-------------
1500
二、MySQL Connector
1、配置
在node3“/software/presto-0.259/etc/catalog”目录下创建mysql.properties,内容如下:
代码语言:javascript复制connector.name=mysql
connection-url=jdbc:mysql://node2:3306
connection-user=root
connection-password=123456
将以上配置好的文件发送到presto各个节点对应的路径下,这里发送到node4、node5对应的“/software/presto-0.259/etc/catalog”目录下,然后重启presto。
在node3上执行如下命令,presto连接MySQL
代码语言:javascript复制#在node2 mysql节点上创建数据库
mysql> create database presto_db;
#在node3上执行连接mysql命令
[root@node3 ~]# cd /software/presto-0.259/
[root@node3 presto-0.259]# ./presto --server node3:8080 --catalog mysql --schema presto_db
#查询所有schema信息,就是mysql中所有的库信息
presto:presto_db> show schemas from mysql;
2、案例
在mysql库“presto”中导入以下两张表“machine_consume_detail”和“machine_local_info”数据,附件如下:
machine_consume_detail.sql
machine_local_info.sql
执行如下查询sql:
代码语言:javascript复制#查询每个省份城市机器营收情况
presto:presto_db> select b.province,b.city,sum(a.amount) as total_amount from machine_consume_detail a join machine_local_info b on a.mid=b.mid group by b.province,b.city;
province | city | total_amount
---------- ------------ --------------
河南 | 洛阳 | 27450
江西 | 萍乡 | 20600
广东 | 汕头 | 39600
浙江 | 宁波 | 28600
湖南 | 张家界 | 11100
湖南 | 郴州 | 49800
江西 | 景德镇 | 39000
四川 | 德阳 | 30500
上海 | 金山区 | 23300
重庆 | 奉节县 | 7000
... ...
#或者直接针对表“machine_consume_detail”进行查询
presto:presto_db> select pkg_name,sum(amount)as total_amount from machine_consume_detail group by pkg_name;
pkg_name | total_amount
------------ --------------
15分钟包时 | 8536437
单曲 | 4718860
90分钟包时 | 1154470
45分钟包时 | 2735044
30分钟包时 | 9119362
618套餐 | 6180
60分钟包时 | 3630769
NULL | 126706
三、Kafka Connector
1、配置
Presto可以将Kafka中的数据映射成Presto中的表进行实时的OLAP数据分析,但是读取的Kafka版本有要求,Kafka版本需要在Kafka2.3.1版本以上。在node3“/software/presto-0.259/etc/catalog”目录下创建kafka.properties,内容如下:
代码语言:javascript复制connector.name=kafka
#指定Kafka broker节点
kafka.nodes=node1:9092,node2:9092,node3:9092
#所有相关的表名,每一个都是kafka中的一个topic
kafka.table-names=mydb.person_infos,mydb.score_infos
以上kafka.properties文件需要在每台presto节点对应路径都要有。
注意:以上mydb是指定的schema信息,需要配置读取Kafka数据的json配置文件,在Kafka中的数据一般是json格式,producer向Kafka中生产的数据有可能含有key,有可能没有key,使用presto查询Kafka中的数据,需要将Kafka中的数据映射到表字段上,那么presto读取Kafka数据时就需要有一个配置文件来配置这些内容。
首先这个配置文件需要默认放在“$PRESTO_HOME/etc/kafka”目录下,配置文件是json类型,文件名称自己随意定义,但是必须以“*.json”结尾。如果读取的Kafka topic中的数据没有key,那么文件配置内容如下:
代码语言:javascript复制{
"tableName": "person_infos",
"schemaName": "mydb",
"topicName": "presto-topic1",
"message": {
"dataFormat": "json",
"fields": [{
"name": "person_id",
"mapping": "personId",
"type": "BIGINT"
},
{
"name": "person_name",
"mapping": "personName",
"type": "VARCHAR"
},
{
"name": "person_age",
"mapping": "personAge",
"type": "BIGINT"
}
]
}
}
如果读取的Kafka topic数据有key,那么文件配置内容如下:
代码语言:javascript复制{
"tableName": "person_infos",
"schemaName": "mydb",
"topicName": "presto-topic1",
"key": {
"dataFormat": "raw",
"fields": [
{
"name": "kafka_key",
"dataFormat": "BYTE",
"type": "VARCHAR",
"hidden": "false"
}
]
},
"message": {
"dataFormat": "json",
"fields": [
{
"name": "person_id",
"mapping": "personId",
"type": "BIGINT"
},
{
"name": "person_name",
"mapping": "personName",
"type": "VARCHAR"
},
{
"name": "person_age",
"mapping": "personAge",
"type": "BIGINT"
}
]
}
}
对上面配置文件的解释如下:
- “tableName”:读取指定topic数据映射成presto中的表名称。
- “schemaName”:指定Presto连接的库名,不指定默认就是default,后期在Presto中查询时,可以指定--schmea来进入指定的库,也可以跨库关联表查询。
- “topicName”:指定Presto读取Kafka 的topic名称
- “key”:配置Kafka对应topic中key对应的信息,dataFormat指定为row,代表当前key就是正常行数据;fields指定key映射到Presto表中的字段信息;name指的是当前key值映射到Presto表中的字段名;dataFormat:指定topic key的类型,这里选择byte;type:指定当前key在Presto表中的类型,这里指定为VARCHAR,其他还可以转变的类型参照https://prestodb.io/docs/current/connector/kafka.html#raw-decoder;hidden表示在查询对应的presto表时,是否隐藏该列。
- “message”:配合Kafka topic value相关信息。
- “dataFormat”:指定value的类型,这里是json,除此之外,还可以指定为Row,csv,avro格式。
- “fields”:配置Presto对应表中字段信息。
- “name”:映射的presto表中对应的字段名称
- “mapping”:topic json value中对应的json属性值。
- “type”:指定presto中字段的类型,更多类型参照:Kafka Connector — Presto 0.277 Documentation
以上Presto映射读取Kafka数据的配置文件需要在每台Presto对应的路径中都要有。并且需要重启Presto才能正常访问Kafka中的数据。
2、案例
2.1、配置读取Kafka的配置文件
根据前面“kafka.properties”文件中的配置,在presto中我们将:wq要创建两个表“person_infos”和“score_infos”,在“$PRESTO_HOME/etc/kafka”路径下分别创建t1.properties与t2.properties两个配置文件,配置表映射的topic信息及字段信息如下:
t1.json,映射“presto-topic1”中的用户数据,生产数据有key。
代码语言:javascript复制{
"tableName": "person_infos",
"schemaName": "mydb",
"topicName": "presto-topic1",
"key": {
"dataFormat": "raw",
"fields": [{
"name": "kafka_key",
"dataFormat": "BYTE",
"type": "VARCHAR",
"hidden": "false"
}]
},
"message": {
"dataFormat": "json",
"fields": [{
"name": "person_id",
"mapping": "personId",
"type": "BIGINT"
},
{
"name": "person_name",
"mapping": "personName",
"type": "VARCHAR"
},
{
"name": "person_age",
"mapping": "personAge",
"type": "BIGINT"
}
]
}
}
t2.json,映射presto-topic2中的用户分数数据,生产数据没有key:
代码语言:javascript复制{
"tableName": "score_infos",
"schemaName": "mydb",
"topicName": "presto-topic2",
"message": {
"dataFormat": "json",
"fields": [{
"name": "person_id",
"mapping": "personId",
"type": "BIGINT"
},
{
"name": "person_name",
"mapping": "personName",
"type": "VARCHAR"
},
{
"name": "person_score",
"mapping": "personScore",
"type": "BIGINT"
}
]
}
}
将配置好的“$PRESTO_HOME/etc/catalog/kafka.properties”文件与“$PRESTO_HOME/catalog/kafka”下的两个json文件发送到node4、node5节点上。
2.2、启动Kafka集群,在Kafka中创建对应的两个topic
代码语言:javascript复制./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic presto-topic1 --partitions 3 --replication-factor 3
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181 --create --topic presto-topic2 --partitions 3 --replication-factor 3
2.3、启动Presto集群,启动Presto客户端
代码语言:javascript复制#启动Presto集群
[root@node3 ~]# /software/presto-0.259/bin/launcher run
[root@node4 ~]# /software/presto-0.259/bin/launcher run
[root@node5 ~]# /software/presto-0.259/bin/launcher run
#启动Presto客户端
[root@node3 presto-0.259]# cd /software/presto-0.259
[root@node3 presto-0.259]# ./presto --server node3:8080 --catalog kafka --schema mydb
2.4、执行如下查询命令,进行OLAP分析
代码语言:javascript复制#查询对应的schema信息
presto:mydb> show tables;
Table
--------------
person_infos
score_infos
(2 rows)
#查询表 person_infos数据,目前还没有数据
presto:mydb> select * from person_infos;
kafka_key | person_id | person_name | person_age
----------- ----------- ------------- ------------
(0 rows)
#查询表score_infos数据,目前还没有数据
presto:mydb> select * from score_infos;
person_id | person_name | person_score
----------- ------------- --------------
(0 rows)
使用Kafka 客户端分别向以上两个topic中生产数据:
代码语言:javascript复制#向topic presto-topic1中生产数据
[root@node1 bin]# ./kafka-console-producer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic presto-topic1 --property parse.key=true --property key.separator='|'
#生产数据如下:
key1|{"personId":1,"personName":"zhangsan","personAge":18}
key2|{"personId":2,"personName":"lisi","personAge":19}
key3|{"personId":3,"personName":"wangwu","personAge":20}
key4|{"personId":4,"personName":"maliu","personAge":21}
key5|{"personId":5,"personName":"tianqi","personAge":22}
#向topic presto-topic2中生产数据
[root@node1 bin]# ./kafka-console-producer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic presto-topic2
#生产数据如下
{"personId":1,"personName":"zhangsan","personScore":100}
{"personId":2,"personName":"lisi","personScore":200}
{"personId":3,"personName":"wangwu","personScore":300}
{"personId":4,"personName":"maliu","personScore":400}
{"personId":5,"personName":"tianqi","personScore":500}
再次在presto中执行如下命令进行OLAP查询:
代码语言:javascript复制#查询person_infos
presto:mydb> select * from person_infos;
kafka_key | person_id | person_name | person_age
----------- ----------- ------------- ------------
key3 | 3 | wangwu | 20
key5 | 5 | tianqi | 22
key4 | 4 | maliu | 21
key1 | 1 | zhangsan | 18
key2 | 2 | lisi | 19
(5 rows)
#查询score_infos
presto:mydb> select * from score_infos;
person_id | person_name | person_score
----------- ------------- --------------
1 | zhangsan | 100
2 | lisi | 200
3 | wangwu | 300
4 | maliu | 400
5 | tianqi | 500
(5 rows)
#进行关联OLAP查询
presto:mydb> select a.person_id,a.person_name,a.person_age,b.person_score from mydb.person_infos a join mydb.score_infos b on a.person_id = b.person_id;
person_id | person_name | person_age | person_score
----------- ------------- ------------ --------------
4 | maliu | 21 | 400
3 | wangwu | 20 | 300
5 | tianqi | 22 | 500
1 | zhangsan | 18 | 100
2 | lisi | 19 | 200
(5 rows)