大数据Presto(三):Presto Connector连接器

2022-11-22 08:26:59 浏览数 (1)

Presto Connector连接器

Presto Connector支持从多种数据源读取数据,例如:Hive、MySQL、Redis、Kudu、Kafka等。Presto Connector只支持从对应的Connector中查询数据,不支持建表及插入等非查询操作,这个使用Presto 主要应用于OLAP场景决定的。

一、Hive Connector

1、配置

Presto连接Hive在Presto搭建安装中已经讲解过,

可以参照:https://prestodb.io/docs/current/connector/hive.html

2、​​​​​​​案例

在Hive中创建两张表h1和h2,并向两张表中加载数据,操作如下:

代码语言:javascript复制
#创建表h1
create table h1 (id int,name string,age int) row format delimited fields terminated by 't';
#创建表h2
create table h2 (id int,name string,score int) row format delimited fields terminated by 't';

分别向两张表中加载如下数据:

代码语言:javascript复制
h1表h1.txt
1       zs      18
2       ls      19
3       ww      20
4       ml      21
5       tq      22
load data local inpath '/root/h1.txt' into table h1;

h2表h2.txt
1       zs      100
2       ls      200
3       ww      300
4       ml      400
5       tq      500
load data local inpath '/root/h2.txt' into table h2;

使用Presto查询以下sql:

代码语言:javascript复制
presto:default> select t1.id,t1.name,t1.age,t2.score from h1 t1 join h2 t2 on t1.id = t2.id;
 id | name | age | score 
---- ------ ----- -------
  2 | ls   |  19 |   200 
  1 | zs   |  18 |   100 
  3 | ww   |  20 |   300 
  4 | ml   |  21 |   400 
  5 | tq   |  22 |   500 

presto:default> select sum(score) as total_score from h2;
 total_score 
-------------
        1500 

二、MySQL Connector

1、​​​​​​​​​​​​​​配置

在node3“/software/presto-0.259/etc/catalog”目录下创建mysql.properties,内容如下:

代码语言:javascript复制
connector.name=mysql
connection-url=jdbc:mysql://node2:3306
connection-user=root
connection-password=123456

将以上配置好的文件发送到presto各个节点对应的路径下,这里发送到node4、node5对应的“/software/presto-0.259/etc/catalog”目录下,然后重启presto。

在node3上执行如下命令,presto连接MySQL

代码语言:javascript复制
#在node2 mysql节点上创建数据库
mysql> create database presto_db;

#在node3上执行连接mysql命令
[root@node3 ~]# cd /software/presto-0.259/
[root@node3 presto-0.259]# ./presto --server node3:8080 --catalog mysql --schema presto_db

#查询所有schema信息,就是mysql中所有的库信息
presto:presto_db> show schemas from mysql;

2、​​​​​​​​​​​​​​案例

在mysql库“presto”中导入以下两张表“machine_consume_detail”和“machine_local_info”数据,附件如下:

machine_consume_detail.sql

machine_local_info.sql

执行如下查询sql:

代码语言:javascript复制
#查询每个省份城市机器营收情况
presto:presto_db> select b.province,b.city,sum(a.amount) as total_amount from machine_consume_detail a join machine_local_info b on a.mid=b.mid group by b.province,b.city;

province |    city    | total_amount 
---------- ------------ --------------
 河南     | 洛阳       |        27450 
 江西     | 萍乡       |        20600 
 广东     | 汕头       |        39600 
 浙江     | 宁波       |        28600 
 湖南     | 张家界     |        11100 
 湖南     | 郴州       |        49800 
 江西     | 景德镇     |        39000 
 四川     | 德阳       |        30500 
 上海     | 金山区     |        23300 
 重庆     | 奉节县     |         7000
... ...

#或者直接针对表“machine_consume_detail”进行查询
presto:presto_db> select pkg_name,sum(amount)as total_amount from machine_consume_detail group by pkg_name;

  pkg_name  | total_amount 
------------ --------------
 15分钟包时 |      8536437 
 单曲       |      4718860 
 90分钟包时 |      1154470 
 45分钟包时 |      2735044 
 30分钟包时 |      9119362 
 618套餐    |         6180 
 60分钟包时 |      3630769 
 NULL       |       126706 

三、​​​​​​​​​​​​​​Kafka Connector

1、配置

Presto可以将Kafka中的数据映射成Presto中的表进行实时的OLAP数据分析,但是读取的Kafka版本有要求,Kafka版本需要在Kafka2.3.1版本以上。在node3“/software/presto-0.259/etc/catalog”目录下创建kafka.properties,内容如下:

代码语言:javascript复制
connector.name=kafka
#指定Kafka broker节点
kafka.nodes=node1:9092,node2:9092,node3:9092
#所有相关的表名,每一个都是kafka中的一个topic
kafka.table-names=mydb.person_infos,mydb.score_infos

以上kafka.properties文件需要在每台presto节点对应路径都要有。

注意:以上mydb是指定的schema信息,需要配置读取Kafka数据的json配置文件,在Kafka中的数据一般是json格式,producer向Kafka中生产的数据有可能含有key,有可能没有key,使用presto查询Kafka中的数据,需要将Kafka中的数据映射到表字段上,那么presto读取Kafka数据时就需要有一个配置文件来配置这些内容。

首先这个配置文件需要默认放在“$PRESTO_HOME/etc/kafka”目录下,配置文件是json类型,文件名称自己随意定义,但是必须以“*.json”结尾。如果读取的Kafka topic中的数据没有key,那么文件配置内容如下:

代码语言:javascript复制
{
	"tableName": "person_infos",
	"schemaName": "mydb",
	"topicName": "presto-topic1",
	"message": {
		"dataFormat": "json",
		"fields": [{
				"name": "person_id",
				"mapping": "personId",
				"type": "BIGINT"
			},
			{
				"name": "person_name",
				"mapping": "personName",
				"type": "VARCHAR"
			},
			{
				"name": "person_age",
				"mapping": "personAge",
				"type": "BIGINT"
			}
		]
	}
}

如果读取的Kafka topic数据有key,那么文件配置内容如下:

代码语言:javascript复制
{
    "tableName": "person_infos",
    "schemaName": "mydb",
    "topicName": "presto-topic1",
    "key": {
        "dataFormat": "raw",
        "fields": [
            {
                "name": "kafka_key",
                "dataFormat": "BYTE",
                "type": "VARCHAR",
                "hidden": "false"
            }
        ]
    },
    "message": {
        "dataFormat": "json",
        "fields": [
            {
                "name": "person_id",
                "mapping": "personId",
                "type": "BIGINT"
            },
            {
                "name": "person_name",
                "mapping": "personName",
                "type": "VARCHAR"
            },
            {
                "name": "person_age",
                "mapping": "personAge",
                "type": "BIGINT"
            } 
		]
    }
}

对上面配置文件的解释如下:

  • “tableName”:读取指定topic数据映射成presto中的表名称。
  • “schemaName”:指定Presto连接的库名,不指定默认就是default,后期在Presto中查询时,可以指定--schmea来进入指定的库,也可以跨库关联表查询。
  • “topicName”:指定Presto读取Kafka 的topic名称
  • “key”:配置Kafka对应topic中key对应的信息,dataFormat指定为row,代表当前key就是正常行数据;fields指定key映射到Presto表中的字段信息;name指的是当前key值映射到Presto表中的字段名;dataFormat:指定topic key的类型,这里选择byte;type:指定当前key在Presto表中的类型,这里指定为VARCHAR,其他还可以转变的类型参照https://prestodb.io/docs/current/connector/kafka.html#raw-decoder;hidden表示在查询对应的presto表时,是否隐藏该列。
  • “message”:配合Kafka topic value相关信息。
  • “dataFormat”:指定value的类型,这里是json,除此之外,还可以指定为Row,csv,avro格式。
  • “fields”:配置Presto对应表中字段信息。
  • “name”:映射的presto表中对应的字段名称
  • “mapping”:topic json value中对应的json属性值。
  • “type”:指定presto中字段的类型,更多类型参照:Kafka Connector — Presto 0.277 Documentation

以上Presto映射读取Kafka数据的配置文件需要在每台Presto对应的路径中都要有。并且需要重启Presto才能正常访问Kafka中的数据。

2、​​​​​​​​​​​​​​案例

2.1、配置读取Kafka的配置文件

根据前面“kafka.properties”文件中的配置,在presto中我们将:wq要创建两个表“person_infos”和“score_infos”,在“$PRESTO_HOME/etc/kafka”路径下分别创建t1.properties与t2.properties两个配置文件,配置表映射的topic信息及字段信息如下:

t1.json,映射“presto-topic1”中的用户数据,生产数据有key。

代码语言:javascript复制
{
	"tableName": "person_infos",
	"schemaName": "mydb",
	"topicName": "presto-topic1",
	"key": {
		"dataFormat": "raw",
		"fields": [{
			"name": "kafka_key",
			"dataFormat": "BYTE",
			"type": "VARCHAR",
			"hidden": "false"
		}]
	},
	"message": {
		"dataFormat": "json",
		"fields": [{
				"name": "person_id",
				"mapping": "personId",
				"type": "BIGINT"
			},
			{
				"name": "person_name",
				"mapping": "personName",
				"type": "VARCHAR"
			},
			{
				"name": "person_age",
				"mapping": "personAge",
				"type": "BIGINT"
			}
		]
	}
}

t2.json,映射presto-topic2中的用户分数数据,生产数据没有key:

代码语言:javascript复制
{
	"tableName": "score_infos",
	"schemaName": "mydb",
	"topicName": "presto-topic2",
	"message": {
		"dataFormat": "json",
		"fields": [{
				"name": "person_id",
				"mapping": "personId",
				"type": "BIGINT"
			},
			{
				"name": "person_name",
				"mapping": "personName",
				"type": "VARCHAR"
			},
			{
				"name": "person_score",
				"mapping": "personScore",
				"type": "BIGINT"
			}
		]
	}
}

将配置好的“$PRESTO_HOME/etc/catalog/kafka.properties”文件与“$PRESTO_HOME/catalog/kafka”下的两个json文件发送到node4、node5节点上。

2.2、启动Kafka集群,在Kafka中创建对应的两个topic

代码语言:javascript复制
./kafka-topics.sh  --zookeeper node3:2181,node4:2181,node5:2181  --create  --topic presto-topic1  --partitions 3 --replication-factor 3
./kafka-topics.sh  --zookeeper node3:2181,node4:2181,node5:2181  --create  --topic presto-topic2  --partitions 3 --replication-factor 3

2.3、启动Presto集群,启动Presto客户端

代码语言:javascript复制
#启动Presto集群
[root@node3 ~]# /software/presto-0.259/bin/launcher run
[root@node4 ~]# /software/presto-0.259/bin/launcher run
[root@node5 ~]# /software/presto-0.259/bin/launcher run

#启动Presto客户端
[root@node3 presto-0.259]# cd /software/presto-0.259
[root@node3 presto-0.259]# ./presto --server node3:8080 --catalog kafka --schema mydb

2.4、执行如下查询命令,进行OLAP分析

代码语言:javascript复制
#查询对应的schema信息
presto:mydb> show tables;
    Table     
--------------
 person_infos 
 score_infos  
(2 rows)

#查询表 person_infos数据,目前还没有数据

presto:mydb> select * from person_infos;
 kafka_key | person_id | person_name | person_age 
----------- ----------- ------------- ------------
(0 rows)

#查询表score_infos数据,目前还没有数据
presto:mydb> select * from score_infos;
 person_id | person_name | person_score 
----------- ------------- --------------
(0 rows)

使用Kafka 客户端分别向以上两个topic中生产数据:

代码语言:javascript复制
#向topic presto-topic1中生产数据
[root@node1 bin]# ./kafka-console-producer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic presto-topic1 --property  parse.key=true --property key.separator='|'
#生产数据如下:
key1|{"personId":1,"personName":"zhangsan","personAge":18}
key2|{"personId":2,"personName":"lisi","personAge":19}
key3|{"personId":3,"personName":"wangwu","personAge":20}
key4|{"personId":4,"personName":"maliu","personAge":21}
key5|{"personId":5,"personName":"tianqi","personAge":22}

#向topic presto-topic2中生产数据
[root@node1 bin]# ./kafka-console-producer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic presto-topic2
#生产数据如下
{"personId":1,"personName":"zhangsan","personScore":100}
{"personId":2,"personName":"lisi","personScore":200}
{"personId":3,"personName":"wangwu","personScore":300}
{"personId":4,"personName":"maliu","personScore":400}
{"personId":5,"personName":"tianqi","personScore":500}

再次在presto中执行如下命令进行OLAP查询:

代码语言:javascript复制
#查询person_infos
presto:mydb> select * from person_infos;
 kafka_key | person_id | person_name | person_age 
----------- ----------- ------------- ------------
 key3      |         3 | wangwu      |         20 
 key5      |         5 | tianqi      |         22 
 key4      |         4 | maliu       |         21 
 key1      |         1 | zhangsan    |         18 
 key2      |         2 | lisi        |         19 
(5 rows)

#查询score_infos
presto:mydb> select * from score_infos;

 person_id | person_name | person_score 
----------- ------------- --------------
         1 | zhangsan    |          100 
         2 | lisi        |          200 
         3 | wangwu      |          300 
         4 | maliu       |          400 
         5 | tianqi      |          500 
(5 rows)

#进行关联OLAP查询
presto:mydb> select a.person_id,a.person_name,a.person_age,b.person_score from mydb.person_infos a join mydb.score_infos b on a.person_id = b.person_id;

person_id | person_name | person_age | person_score 
----------- ------------- ------------ --------------
         4 | maliu       |         21 |          400 
         3 | wangwu      |         20 |          300 
         5 | tianqi      |         22 |          500 
         1 | zhangsan    |         18 |          100 
         2 | lisi        |         19 |          200 
(5 rows)

0 人点赞