Hive-1.2.1_04_DML操作 5.1. Join8.1. 使用案例8.2. Transform实现

2020-10-15 11:33:21 浏览数 (1)

Hive官方文档:Home-UserDocumentation

Hive DML官方文档:LanguageManual DML

参考文章:Hive 用户指南

1. Loading files into tables

当我们做Load操作是,hive不会做任何数据转换,只是纯复制/移动操作,将数据文件移动到与Hive表对应的位置。

语法

代码语言:javascript复制
1 LOAD DATA [LOCAL] INPATH 'filepath' [OVERWRITE] INTO TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]

实例

代码语言:javascript复制
1 # 将本地的数据导入到表中
2 # 参见 Hive-1.2.1_03_DDL操作
3 load data local inpath '/app/software/hive/t_sz05_buck.dat' into table t_sz05; # 导入数据
4 load data local inpath '/app/software/hive/t_sz03_part.dat' into table t_sz03_part partition (dt='20180711', country='CN');

2. Inserting data into Hive Tables from queries

可以使用insert子句将查询结果插入到表中。

语法

代码语言:javascript复制
 1 # 标准语法:
 2 INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]] select_statement1 FROM from_statement;
 3 INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 FROM from_statement;
 4 
 5 # Hive extension (multiple inserts):
 6 FROM from_statement
 7 INSERT OVERWRITE TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...) [IF NOT EXISTS]] select_statement1
 8 [INSERT OVERWRITE TABLE tablename2 [PARTITION ... [IF NOT EXISTS]] select_statement2]
 9 [INSERT INTO TABLE tablename2 [PARTITION ...] select_statement2] ...;
10 FROM from_statement
11 INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1
12 [INSERT INTO TABLE tablename2 [PARTITION ...] select_statement2]
13 [INSERT OVERWRITE TABLE tablename2 [PARTITION ... [IF NOT EXISTS]] select_statement2] ...;
14 
15 # Hive extension (dynamic partition inserts):
16 INSERT OVERWRITE TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...) select_statement FROM from_statement;
17 INSERT INTO TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...) select_statement FROM from_statement;

实例

代码语言:javascript复制
 1 # 建表
 2 create table t_sz10 (id int, name string)
 3  row format delimited fields terminated by ',';
 4 
 5 # 操作步骤
 6 0: jdbc:hive2://mini01:10000> select * from t_sz02_ext; # 要查询的表 
 7  ---------------- ------------------ -- 
 8 | t_sz02_ext.id  | t_sz02_ext.name  |
 9  ---------------- ------------------ -- 
10 | 1              | 刘晨               |
11 | 2              | 王敏               |
12 | 3              | 张立               |
13 | 4              | 刘刚               |
14 | 5              | 孙庆               |
15 | 6              | 易思玲             |
16 | 7              | 李娜               |
17 | 8              | 梦圆圆             |
18 | NULL           | NULL              |
19  ---------------- ------------------ -- 
20 9 rows selected (0.099 seconds)
21 0: jdbc:hive2://mini01:10000> insert into table t_sz10 select id, name from t_sz02_ext where id < 5; 
22 ……………… # MapReduce
23 No rows affected (16.029 seconds)
24 0: jdbc:hive2://mini01:10000> select * from t_sz10; # 数据已经插入 
25  ------------ -------------- -- 
26 | t_sz10.id  | t_sz10.name  |
27  ------------ -------------- -- 
28 | 1          | 刘晨           |
29 | 2          | 王敏           |
30 | 3          | 张立           |
31 | 4          | 刘刚           |
32  ------------ -------------- -- 
33 4 rows selected (0.092 seconds)

3. Writing data into the filesystem from queries

根据查询结果导出数据。如果有local 那么导出到本地,如果没有local那么导出到HDFS。

代码语言:javascript复制
 1 Standard syntax:
 2 INSERT OVERWRITE [LOCAL] DIRECTORY directory1
 3   [ROW FORMAT row_format] [STORED AS file_format] (Note: Only available starting with Hive 0.11.0)
 4   SELECT ... FROM ...
 5  
 6 Hive extension (multiple inserts):
 7 FROM from_statement
 8 INSERT OVERWRITE [LOCAL] DIRECTORY directory1 select_statement1
 9 [INSERT OVERWRITE [LOCAL] DIRECTORY directory2 select_statement2] ...
10  
11   
12 row_format
13   : DELIMITED [FIELDS TERMINATED BY char [ESCAPED BY char]] [COLLECTION ITEMS TERMINATED BY char]
14         [MAP KEYS TERMINATED BY char] [LINES TERMINATED BY char]
15         [NULL DEFINED AS char] (Note: Only available starting with Hive 0.13)

实例1

代码语言:javascript复制
 1 ### 这是一个分区表
 2 0: jdbc:hive2://mini01:10000> select * from t_sz03_part;
 3  ----------------- ------------------- ----------------- ---------------------- -- 
 4 | t_sz03_part.id  | t_sz03_part.name  | t_sz03_part.dt  | t_sz03_part.country  |
 5  ----------------- ------------------- ----------------- ---------------------- -- 
 6 | 1               | 张三_20180711     | 20180711        | CN                   |
 7 | 2               | lisi_20180711     | 20180711        | CN                   |
 8 | 3               | Wangwu_20180711   | 20180711        | CN                   |
 9 | 11              | Tom_20180711      | 20180711        | US                   |
10 | 12              | Dvid_20180711     | 20180711        | US                   |
11 | 13              | cherry_20180711   | 20180711        | US                   |
12 | 1               | 张三_20180712     | 20180712        | CN                   |
13 | 2               | lisi_20180712     | 20180712        | CN                   |
14 | 3               | Wangwu_20180712   | 20180712        | CN                   |
15 | 11              | Tom_20180712      | 20180712        | US                   |
16 | 12              | Dvid_20180712     | 20180712        | US                   |
17 | 13              | cherry_20180712   | 20180712        | US                   |
18  ----------------- ------------------- ----------------- ---------------------- -- 
19 12 rows selected (0.543 seconds)

导出1

代码语言:javascript复制
 1 ###  导出1,如果导出的目录不存在,那么创建对应目录 
 2 0: jdbc:hive2://mini01:10000> insert overwrite local directory '/app/software/hive/export/t_sz03_part_exp.dat'
 3 0: jdbc:hive2://mini01:10000>   select a.* from t_sz03_part a;   
 4 INFO  : Number of reduce tasks is set to 0 since there's no reduce operator
 5 INFO  : number of splits:2
 6 INFO  : Submitting tokens for job: job_1531701073794_0001
 7 INFO  : The url to track the job: http://mini02:8088/proxy/application_1531701073794_0001/
 8 INFO  : Starting Job = job_1531701073794_0001, Tracking URL = http://mini02:8088/proxy/application_1531701073794_0001/
 9 INFO  : Kill Command = /app/hadoop/bin/hadoop job  -kill job_1531701073794_0001
10 INFO  : Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 0
11 INFO  : 2018-07-16 09:42:32,888 Stage-1 map = 0%,  reduce = 0%
12 INFO  : 2018-07-16 09:42:43,496 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU 2.87 sec
13 INFO  : 2018-07-16 09:42:44,575 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 6.58 sec
14 INFO  : MapReduce Total cumulative CPU time: 6 seconds 580 msec
15 INFO  : Ended Job = job_1531701073794_0001
16 INFO  : Copying data to local directory /app/software/hive/export/t_sz03_part_exp.dat from hdfs://mini01:9000/tmp/hive/yun/38de38d6-11fc-4090-957d-21d983d9df04/hive_2018-07-16_09-42-16_386_323439967845595583-1/-mr-10000
17 INFO  : Copying data to local directory /app/software/hive/export/t_sz03_part_exp.dat from hdfs://mini01:9000/tmp/hive/yun/38de38d6-11fc-4090-957d-21d983d9df04/hive_2018-07-16_09-42-16_386_323439967845595583-1/-mr-10000
18 No rows affected (29.35 seconds)
19 
20 # 本地系统的导出数据, 没有任何分隔符
21 [yun@mini01 t_sz03_part_exp.dat]$ pwd
22 /app/software/hive/export/t_sz03_part_exp.dat
23 [yun@mini01 t_sz03_part_exp.dat]$ ll
24 total 8
25 -rw-r--r-- 1 yun yun 176 Jul 16 09:42 000000_0
26 -rw-r--r-- 1 yun yun 176 Jul 16 09:42 000001_0
27 [yun@mini01 t_sz03_part_exp.dat]$ cat 000000_0
28 1张三_2018071120180711CN
29 2lisi_2018071120180711CN
30 3Wangwu_2018071120180711CN
31 11Tom_2018071220180712US
32 12Dvid_2018071220180712US
33 13cherry_2018071220180712US
34 [yun@mini01 t_sz03_part_exp.dat]$ cat 000001_0
35 11Tom_2018071120180711US
36 12Dvid_2018071120180711US
37 13cherry_2018071120180711US
38 1张三_2018071220180712CN
39 2lisi_2018071220180712CN
40 3Wangwu_2018071220180712CN

导出2

代码语言:javascript复制
 1 # 导出2 # 有分隔符
 2 0: jdbc:hive2://mini01:10000> insert overwrite local directory '/app/software/hive/export/t_sz03_part_exp2.dat'
 3 0: jdbc:hive2://mini01:10000>   row format delimited fields terminated by ','
 4 0: jdbc:hive2://mini01:10000>   select a.* from t_sz03_part a; 
 5 INFO  : Number of reduce tasks is set to 0 since there's no reduce operator
 6 INFO  : number of splits:2
 7 INFO  : Submitting tokens for job: job_1531701073794_0002
 8 INFO  : The url to track the job: http://mini02:8088/proxy/application_1531701073794_0002/
 9 INFO  : Starting Job = job_1531701073794_0002, Tracking URL = http://mini02:8088/proxy/application_1531701073794_0002/
10 INFO  : Kill Command = /app/hadoop/bin/hadoop job  -kill job_1531701073794_0002
11 INFO  : Hadoop job information for Stage-1: number of mappers: 2; number of reducers: 0
12 INFO  : 2018-07-16 09:49:23,516 Stage-1 map = 0%,  reduce = 0%
13 INFO  : 2018-07-16 09:49:34,985 Stage-1 map = 50%,  reduce = 0%, Cumulative CPU 3.2 sec
14 INFO  : 2018-07-16 09:49:36,245 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 6.49 sec
15 INFO  : MapReduce Total cumulative CPU time: 6 seconds 490 msec
16 INFO  : Ended Job = job_1531701073794_0002
17 INFO  : Copying data to local directory /app/software/hive/export/t_sz03_part_exp2.dat from hdfs://mini01:9000/tmp/hive/yun/38de38d6-11fc-4090-957d-21d983d9df04/hive_2018-07-16_09-49-09_419_2948346934380749234-1/-mr-10000
18 INFO  : Copying data to local directory /app/software/hive/export/t_sz03_part_exp2.dat from hdfs://mini01:9000/tmp/hive/yun/38de38d6-11fc-4090-957d-21d983d9df04/hive_2018-07-16_09-49-09_419_2948346934380749234-1/-mr-10000
19 No rows affected (27.983 seconds)
20 
21 # 本地导出数据,根据 逗号(,) 分隔
22 [yun@mini01 t_sz03_part_exp2.dat]$ pwd
23 /app/software/hive/export/t_sz03_part_exp2.dat
24 [yun@mini01 t_sz03_part_exp2.dat]$ ll
25 total 8
26 -rw-r--r-- 1 yun yun 176 Jul 16 09:49 000000_0
27 -rw-r--r-- 1 yun yun 176 Jul 16 09:49 000001_0
28 [yun@mini01 t_sz03_part_exp2.dat]$ cat 000000_0
29 1,张三_20180711,20180711,CN
30 2,lisi_20180711,20180711,CN
31 3,Wangwu_20180711,20180711,CN
32 11,Tom_20180712,20180712,US
33 12,Dvid_20180712,20180712,US
34 13,cherry_20180712,20180712,US
35 [yun@mini01 t_sz03_part_exp2.dat]$ cat 000001_0
36 11,Tom_20180711,20180711,US
37 12,Dvid_20180711,20180711,US
38 13,cherry_20180711,20180711,US
39 1,张三_20180712,20180712,CN
40 2,lisi_20180712,20180712,CN
41 3,Wangwu_20180712,20180712,CN

4. Insert

语法

代码语言:javascript复制
1 INSERT INTO TABLE tablename [PARTITION (partcol1[=val1], partcol2[=val2] ...)] VALUES values_row [, values_row ...]
2   
3 Where values_row is:
4 ( value [, value ...] )

就是一个正常的insert语句

实例1

代码语言:javascript复制
 1 # 建表语句
 2 CREATE TABLE students (name VARCHAR(64), age INT, gpa DECIMAL(3, 2))
 3   CLUSTERED BY (age) INTO 2 BUCKETS STORED AS ORC;
 4 
 5 # insert语句   其中insert 会走map reduce
 6 INSERT INTO TABLE students
 7   VALUES ('fred flintstone', 35, 1.28), ('barney rubble', 32, 2.32);
 8 
 9 # 查询结果
10 0: jdbc:hive2://mini01:10000> select * from students;
11  ------------------ --------------- --------------- -- 
12 |  students.name   | students.age  | students.gpa  |
13  ------------------ --------------- --------------- -- 
14 | fred flintstone  | 35            | 1.28          |
15 | barney rubble    | 32            | 2.32          |
16  ------------------ --------------- --------------- -- 
17 2 rows selected (0.241 seconds)

实例2

代码语言:javascript复制
 1 CREATE TABLE pageviews (userid VARCHAR(64), link STRING, came_from STRING)
 2   PARTITIONED BY (datestamp STRING) CLUSTERED BY (userid) INTO 256 BUCKETS STORED AS ORC;
 3  
 4 INSERT INTO TABLE pageviews PARTITION (datestamp = '2014-09-23')
 5   VALUES ('jsmith', 'mail.com', 'sports.com'), ('jdoe', 'mail.com', null);
 6 
 7 
 8 # 查询结果
 9 0: jdbc:hive2://mini01:10000> select * from pageviews ;
10  ------------------- ----------------- ---------------------- ---------------------- -- 
11 | pageviews.userid  | pageviews.link  | pageviews.came_from  | pageviews.datestamp  |
12  ------------------- ----------------- ---------------------- ---------------------- -- 
13 | jsmith            | mail.com        | sports.com           | 2014-09-23           |
14 | jdoe              | mail.com        | NULL                 | 2014-09-23           |
15  ------------------- ----------------- ---------------------- ---------------------- -- 
16 2 rows selected (0.123 seconds)

5. Select

代码语言:javascript复制
1 SELECT [ALL | DISTINCT] select_expr, select_expr, ...
2   FROM table_reference
3   [WHERE where_condition]
4   [GROUP BY col_list]
5   [ORDER BY col_list]
6   [CLUSTER BY col_list
7     | [DISTRIBUTE BY col_list] [SORT BY col_list]
8   ]
9  [LIMIT [offset,] rows]

注意:

1、order by 会对输入做全局排序,因此只有一个reducer,会导致当输入规模较大时,需要较长的计算时间。

2、sort by不是全局排序,其在数据进入reducer前完成排序。因此,如果用sort by进行排序,并且设置mapred.reduce.tasks>1,则sort by只保证每个reducer的输出有序,不保证全局有序。

3、distribute by(字段)根据指定的字段将数据分到不同的reducer,且分发算法是hash散列。

4、Cluster by(字段)除了具有Distribute by的功能外,还会对该字段进行排序。

因此,如果分桶和sort字段是同一个时,此时,cluster by = distribute by sort by

分桶表的作用:最大的作用是用来提高join操作的效率;

(思考这个问题:

select a.id,a.name,b.addr from a join b on a.id = b.id;

如果a表和b表已经是分桶表,而且分桶的字段是id字段

做这个join操作时,还需要全表做笛卡尔积吗?)

5.1. Join

两张表

代码语言:javascript复制
1 SELECT a.* FROM a JOIN b ON (a.id = b.id);
2 SELECT a.* FROM a JOIN b ON (a.id = b.id AND a.department = b.department);
3 SELECT a.* FROM a LEFT OUTER JOIN b ON (a.id <> b.id);
4 
5 示例:
6 select * from t_sz01 a join t_sz05 b on a.id = b.id;

三张表

代码语言:javascript复制
1 SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2);
2 
3 示例:
4 select * from t_sz01 a join t_sz05 b on a.id = b.id join t_sz03_part c on a.id = c.id;

6. Update

语法

代码语言:javascript复制
1 UPDATE tablename SET column = value [, column = value ...] [WHERE expression]

7. Delete

语法

代码语言:javascript复制
1 DELETE FROM tablename [WHERE expression]

8. User-Defined Functions (UDFs)

官方文档:LanguageManual UDF

8.1. 使用案例

代码语言:javascript复制
 1 hive (test_db)> select current_database();  
 2 OK
 3 test_db
 4 Time taken: 0.324 seconds, Fetched: 1 row(s)
 5 hive (test_db)> create table dual (id string); # 建表
 6 OK
 7 Time taken: 0.082 seconds
 8 
 9 # 本地文件上传
10 [yun@mini01 hive]$ ll /app/software/hive/dual.dat 
11 -rw-rw-r-- 1 yun yun 2 Jul 16 20:54 /app/software/hive/dual.dat
12 [yun@mini01 hive]$ cat /app/software/hive/dual.dat 
13  # 只有一个空格 【必须要有一个字符,不能为空】
14 hive (test_db)> load data local inpath '/app/software/hive/dual.dat' overwrite into table dual; # 导入数据
15 Loading data to table test_db.dual
16 Table test_db.dual stats: [numFiles=1, numRows=0, totalSize=2, rawDataSize=0]
17 OK
18 Time taken: 0.421 seconds
19 
20 # 函数测试
21 hive (test_db)> select substr('zhangtest', 2, 3) from dual; # 测试 substr
22 OK
23 han
24 Time taken: 0.081 seconds, Fetched: 1 row(s)
25 hive (test_db)> select concat('zha', '---', 'kkk') from dual; # 测试concat
26 OK
27 zha---kkk
28 Time taken: 0.118 seconds, Fetched: 1 row(s)

8.2. Transform实现

Hive的 TRANSFORM 关键字提供了在SQL中调用自写脚本的功能

适合实现Hive中没有的功能又不想写UDF的情况

使用示例1:下面这句sql就是借用了weekday_mapper.py对数据进行了处理.

代码语言:javascript复制
 1 CREATE TABLE u_data_new (
 2   movieid INT,
 3   rating INT,
 4   weekday INT,
 5   userid INT)
 6 ROW FORMAT DELIMITED
 7 FIELDS TERMINATED BY 't';
 8 
 9 add FILE weekday_mapper.py;
10 
11 INSERT OVERWRITE TABLE u_data_new
12 SELECT
13   TRANSFORM (movieid , rate, timestring,uid)
14   USING 'python weekday_mapper.py'
15   AS (movieid, rating, weekday,userid)
16 FROM t_rating;

其中weekday_mapper.py内容如下

代码语言:javascript复制
1 #!/bin/python
2 import sys
3 import datetime
4 
5 for line in sys.stdin:
6   line = line.strip()
7   movieid, rating, unixtime,userid = line.split('t')
8   weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
9   print 't'.join([movieid, rating, str(weekday),userid])

0 人点赞