【Spark数仓项目】需求八:MySQL的DataX全量导入和增量导入Hive

2024-07-25 15:41:54 浏览数 (1)

【Spark数仓项目】需求八:MySQL的DataX全量导入和增量导入Hive

一、mysql全量导入hive[分区表]

需求介绍:

本需求将模拟从MySQL中向Hive数仓中导入数据,数据以时间分区。测试两种导入场景,一种是将数据全量导入,即包含所有时间分区;另一种是每天运行调度,仅导入当天时间分区中的用户数据。


  • mysql表建表语句:
代码语言:javascript复制
create table t_order(
	id   	 	int   primary key auto_increment,
	amt  	 	decimal(10,2),
	`status` 	int  default 0,
	user_id  	int,
	create_time timestamp DEFAULT CURRENT_TIMESTAMP,
	modify_time timestamp DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)
  • hive
代码语言:javascript复制
create table t_order(
	id   	 	int,
	amt  	 	decimal(10,2),
	`status` 	int,
	user_id  	int,
	create_time date,
	modify_time date
)partitioned by (dt string)
row format delimited 
fields terminated by 't'

注意字段时间戳,我们将从以上MySQL向Hive导入数据。

  • 编写datax的json脚本
代码语言:javascript复制
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://hadoop10:3306/spark-dw"],
                                "querySql": [
                                    "select id,amt,status,user_id,create_time,modify_time from t_order"
                                ]
                            }
                        ],
                        "password": "0000",
                        "username": "root",
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
							{"name": "id","type": "int"},
							{"name": "amt","type": "double"},
							{"name": "status","type": "int"},
							{"name": "user_id","type": "int"},
							{"name": "create_time","type": "string"},
							{"name": "modify_time","type": "string"}
                     		],
                        "defaultFS": "hdfs://hadoop10:8020",
                        "fieldDelimiter": "t",
                        "fileName": "t_order",
                        "fileType": "text",
                        "path": "/user/hive/warehouse/test_hive.db/t_order/dt=$dt",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}
  • 执行导入操作

在mysql中添加测试数据 导入mysql中7-11的数据到hive下7-11分区

代码语言:javascript复制
insert into t_order(amt,user_id) values(100,1001)
insert into t_order values(null,100,0,1001,'2023-07-11 10:18:39','2023-07-11 10:18:39')
insert into t_order values(null,120,0,1001,'2023-07-11 10:18:39','2023-07-11 10:18:39')

在hive下创建分区

代码语言:javascript复制
alter table t_order add partition(dt='2023-07-11')

运行dataX脚本

代码语言:javascript复制
python /opt/installs/datax/bin/datax.py -p "-Ddt=2023-07-11" /opt/installs/datax/job/mysql2hive.json

此部分的操作是将先插入mysql的三条数据导入到hive。


在mysql中添加测试数据 导入mysql中7-12的数据到hive下7-12分区

代码语言:javascript复制
insert into t_order values(null,200,0,1001,'2023-07-12 10:18:39','2023-07-12 10:18:39');
insert into t_order values(null,220,0,1001,'2023-07-12 10:18:39','2023-07-12 10:18:39');

在hive下创建分区

代码语言:javascript复制
alter table t_order add partition(dt='2023-07-12')

运行datax脚本

代码语言:javascript复制
python /opt/installs/datax/bin/datax.py -p "-Ddt=2023-07-12" /opt/installs/datax/job/mysql2hive.json

此部分的操作是将先插入mysql的三条数据和本次插入mysql的数据都导入到hive。 根据查询结果可以看到,此时我们重复导入了第一部分的数据,这就是全量导入。

二、mysql增量导入hive

大方向:事实表用增量[订单表] 维度表用全量[商品表]

绝大部分公司采用的方案:全量为主、增量为辅

要想采用增量导入还有一个问题是你的业务库表能够支持增量导入

1. 增量导入的第一种实现方法

根据 id主键,查询hive表中最大的id值,然后去mysql中查询大于上述id值的数据。 如果有些使用uuid的,则不能用id,这种方案不适用于对修改的数据进行同步。

2. 另一种方法是 时间字段

在表中增加一个modify_time字段,如果数据新增或者修改,可以根据这个字段查询数据抽取到hive

3. dataX脚本

代码语言:javascript复制
{
    "job": {
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "connection": [
                            {
                                "jdbcUrl": ["jdbc:mysql://hadoop10:3306/spark-dw"],
                                "querySql": [
                                    "select id,amt,status,user_id,create_time,modify_time from t_order where date_format(modify_time,'%Y-%m-%d') = '$dt'"
                                ]
                            }
                        ],
                        "password": "0000",
                        "username": "root",
                    }
                },
                "writer": {
                    "name": "hdfswriter",
                    "parameter": {
                        "column": [
							{"name": "id","type": "int"},
							{"name": "amt","type": "double"},
							{"name": "status","type": "int"},
							{"name": "user_id","type": "int"},
							{"name": "create_time","type": "string"},
							{"name": "modify_time","type": "string"}
                     		],
                        "defaultFS": "hdfs://hadoop10:8020",
                        "fieldDelimiter": "t",
                        "fileName": "t_order",
                        "fileType": "text",
                        "path": "/user/hive/warehouse/test_hive.db/t_order/dt=$dt",
                        "writeMode": "append"
                    }
                }
            }
        ],
        "setting": {
            "speed": {
                "channel": "1"
            }
        }
    }
}

运行该增量脚本,即可按照分区的日期,每次导入需要的mysql数据到hive。

三、利用Python自动生成Datax的json脚本

1. 创建mysql和hive数据库

代码语言:javascript复制
create table t_student(
	id   	 int PRIMARY key,
	name  	 	varchar(50),
	`age` 		int
);

create table t_person(
	id   	 		int PRIMARY key,
	name  	 	varchar(50),
	parentid 		int
);

INSERT into t_student values
(1,'zhanmusi',15),
(2,'lisi',55),
(3,'lisi',66);

INSERT into t_person values
(1,'miky',06),
(2,'tom',16),
(3,'jakcon',26);
代码语言:javascript复制
create table ods_t_student(
	id   	 	int,
	name  	 	string,
	`age` 		int
)partitioned by (dt string)
row format delimited 
fields terminated by 't'

create table ods_t_person(
	id   	 		int,
	name  	 		string,
	parentid 		int
)partitioned by (dt string)
row format delimited 
fields terminated by 't'

2. 修改python脚本里面的密码(2处)和hdfs端口

代码语言:javascript复制
import json
import sys
import pymysql


def gen_json(dbname, tablename):
 s1 = {
     "job": {
         "content": [
             {
                 "reader": {
                     "name": "mysqlreader",
                     "parameter": {
                         "connection": [
                             {
                                 "jdbcUrl": ["jdbc:mysql://hadoop10:3306/"   dbname   "?useSSL=false"],
                                 "table": [tablename]
                             }
                         ],
                         "password": "0000",  # 密码
                         "username": "root",
                         "column": getColumn(dbname, tablename)
                     }
                 },
                 "writer": {
                     "name": "hdfswriter",
                     "parameter": {
                         "column": getColumnAndType(dbname, tablename),
                         "defaultFS": "hdfs://hadoop10:8020",  # hdfs端口
                         "fileType": "text",
                         "path": "/user/hive/warehouse/ods_"   tablename   "/dt=$dt",
                         "fieldDelimiter": "t",
                         "fileName": tablename,
                         "writeMode": "append"
                     }
                 }
             }
         ],
         "setting": {
             "speed": {
                 "channel": "1"
             }
         }
     }
 }

 with open('d:/test/'   tablename   '.json', 'w') as f:
     json.dump(s1, f)


def queryDataBase(dbname, tablename):
 conn = pymysql.connect(user='root', password='0000', host='hadoop10')  # 密码
 cursor = conn.cursor()
 cursor.execute(
     "select column_name ,data_type from information_schema.`COLUMNS` where TABLE_SCHEMA = %s and table_name = %s order by ordinal_position",
     [dbname, tablename])
 fetchall = cursor.fetchall()
 cursor.close()
 conn.close()
 return fetchall


def getColumn(dbname, tablename):
 k1 = queryDataBase(dbname, tablename)
 k2 = list(map(lambda x: x[0], k1))
 return k2


def getColumnAndType(dbname, tablename):
 k1 = queryDataBase(dbname, tablename)
 mappings = {
     'bigint': 'bigint',
     'varchar': 'string',
     'int': 'int',
     'datetime': 'string',
     'text': 'string'
 }
 k2 = list(map(lambda x: {"name": x[0], "type": mappings[x[1].lower()]}, k1))
 return k2


if __name__ == '__main__':
 l = sys.argv[1:]
 dbname = l[0]  # mysql数据库名
 tablename = l[1]  # 表名
 gen_json(dbname, tablename)

3. 运行python脚本

代码语言:javascript复制
(untitled0606) C:UsersLenovoPycharmProjectsuntitled0606>python .test0606test_gen.py spark-dw t_student

(untitled0606) C:UsersLenovoPycharmProjectsuntitled0606>python .test0606test_gen.py spark-dw t_person

4. 将生成的json文件上传到linux

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-oB30wKR6-1689299346463)(上课笔记-day13.assets1689068747698.png)]

5. 编写shell脚本 b.sh

代码语言:javascript复制
#! /bin/bash

dt=$1

if [ ''$1 == '' ]
then
  dt=$(date -d yesterday  %Y-%m-%d)
fi

echo $dt

s=$(hive -e "show partitions ods_t_student partition(dt='$dt')")

echo === $s ====

if [ "$s" == "partition" ]
then
 hive -e "alter table ods_t_student add partition(dt='$dt')"
else
 echo "$dt分区已经存在"
fi

python /opt/installs/datax/bin/datax.py -p "-Ddt=$dt" /opt/installs/datax/job/t_student.json



s=$(hive -e "show partitions ods_t_person partition(dt='$dt')")

echo === $s ====

if [ "$s" == "partition" ]
then
 hive -e "alter table ods_t_person add partition(dt='$dt')"
else
 echo "$dt分区已经存在"
fi

python /opt/installs/datax/bin/datax.py -p "-Ddt=$dt" /opt/installs/datax/job/t_person.json

6. 运行shell

root@hadoop10 app]# sh b.sh 2023-07-13

代码语言:javascript复制
任务启动时刻                    : 2023-07-13 02:31:38
任务结束时刻                    : 2023-07-13 02:31:50
任务总计耗时                    :                 12s
任务平均流量                    :                2B/s
记录写入速度                    :              0rec/s
读出记录总数                    :                   3
读写失败总数                    :                   0
  • hive
代码语言:javascript复制
id|name    |age|dt        |
--|--------|---|----------|
 1|zhanmusi| 15|2023-07-13|
 2|lisi    | 55|2023-07-13|
 3|lisi    | 66|2023-07-13|

0 人点赞