此脚本用于批量配置生成DataX的采集器而编写
主要作用是将MySQL数据全量采集到hdfs指定的路径
其中生成的json配置文件的write的path配置项可根据个人使用情况进行更改
脚本主体
脚本根目录创建一个名为Table_Names.txt的文件,文件内容是以英文逗号隔开的MySQL表名
注:数据以一行排列即可
编写获取MySQL表头的脚本,将列名存入一个列表里,结果返回包含表名及其列名的字典
结果返回值示例:
文件名:get_Table_ColumnsName.py
代码语言:python代码运行次数:0复制import pymysql
def get_db_connection():
"""
创建并返回一个数据库连接和游标
"""
conn = pymysql.connect(
host='hadoop102', # 数据库主机地址
database='gmall', # 数据库名称
user='root', # 数据库用户名
password='000000', # 数据库密码
port=3306 # 数据库端口号
)
curses = conn.cursor() # 创建一个游标,用于执行 SQL 语句
return conn, curses # 返回数据库连接和游标
def close_conn(conn, cursor):
"""
关闭数据库连接和游标
"""
cursor.close() # 关闭游标
conn.close() # 关闭数据库连接
def query(sql, *args):
"""
执行 SQL 查询并返回结果
:param sql: SQL 查询语句
:param args: SQL 查询参数
:return: 查询结果列表,如发生错误则返回空列表
"""
conn, cursor = get_db_connection() # 获取数据库连接和游标
try:
cursor.execute(sql, args) # 执行 SQL 查询
res = cursor.fetchall() # 获取查询结果
conn.commit() # 提交事务
return res if res else [] # 返回查询结果或空列表
except Exception as e: # 捕获异常
print(f"Error executing query: {e}") # 打印错误信息
conn.rollback() # 出现错误时回滚事务
return [] # 返回空列表
finally:
close_conn(conn, cursor) # 无论如何均关闭连接和游标
def get_table_columns(table_name):
"""
获取指定表的列名
:param table_name: 表名
:return: 列名列表
"""
sql = """
SELECT COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = 'gmall' AND TABLE_NAME = %s
"""
return query(sql, table_name) # 执行查询并返回结果
def print_table_columns(file_path):
"""
读取文件中的表名并返回每个表的列名
:param file_path: 包含表名的文件路径
:return: 一个字典,字典包含每个表名及其对应的列名列表
"""
table_columns = {} # 初始化一个空字典,用于存储表名及其列名
# 读取文件内容
with open(file_path, 'r') as file:
line = file.readline().strip() # 读取第一行并去除空白字符
table_names = line.split(',') # 用逗号分隔并存储表名
# 查询每个表的列名并存储到字典中
for table_name in table_names:
columns = get_table_columns(table_name) # 获取列名
column_names = [column[0] for column in columns] # 提取列名
table_columns[table_name] = column_names # 使用表名作为键,将列名列表存入字典
return table_columns # 返回包含表名及其列名的字典
编写获取MySQL表头及其对应类型的脚本,结果返回包含所有表的列名及类型的字典
结果返回值示例:
文件名:get_Table_InParameter.py
代码语言:python代码运行次数:0复制import pymysql
def get_db_connection():
"""
创建并返回一个数据库连接和游标
"""
conn = pymysql.connect(
host='hadoop102', # 数据库主机地址
database='gmall', # 数据库名称
user='root', # 数据库用户名
password='000000', # 数据库密码
port=3306 # 数据库端口号
)
cursor = conn.cursor() # 创建一个游标,用于执行 SQL 语句
return conn, cursor # 返回数据库连接和游标
def close_conn(conn, cursor):
"""
关闭数据库连接和游标
"""
cursor.close() # 关闭游标
conn.close() # 关闭数据库连接
def query(sql, *args):
"""
执行 SQL 查询并返回结果
:param sql: SQL 查询语句
:param args: SQL 查询参数
:return: 查询结果列表,如发生错误则返回空列表
"""
conn, cursor = get_db_connection() # 获取数据库连接和游标
try:
cursor.execute(sql, args) # 执行 SQL 查询
res = cursor.fetchall() # 获取查询结果
return res if res else [] # 返回查询结果或空列表
except Exception as e:
print(f"Error executing query: {e}") # 打印错误信息
return [] # 返回空列表
finally:
close_conn(conn, cursor) # 无论如何均关闭连接和游标
def get_table_columns_with_types(table_name):
"""
获取指定表的列名及其类型
:param table_name: 表名
:return: 列名及其类型的列表
"""
sql = """
SELECT COLUMN_NAME, DATA_TYPE
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = 'gmall' AND TABLE_NAME = %s
"""
columns_info = query(sql, table_name) # 执行查询以获取列信息
result = []
for column_name, data_type in columns_info:
# 根据数据类型映射到所需的类型
if 'int' in data_type:
type_name = 'bigint'
elif 'varchar' in data_type or 'text' in data_type:
type_name = 'string'
else:
type_name = 'string' # 默认类型
result.append({
"name": column_name,
"type": type_name
})
return result # 返回包含列名及其类型的列表
def print_table_columns(file_path):
"""
读取文件中的表名并返回每个表的列名及类型
:param file_path: 包含表名的文件路径
:return: 一个字典,字典包含每个表名及其对应的列名及类型
"""
all_columns = {} # 初始化一个空字典,用于存储所有表的列名
# 读取文件内容
with open(file_path, 'r') as file:
line = file.readline().strip() # 读取第一行并去除空白字符
table_names = line.split(',') # 用逗号分隔并存储表名
# 查询每个表的列名及类型并存储到字典中
for table_name in table_names:
columns = get_table_columns_with_types(table_name) # 获取列名和类型
all_columns[table_name] = columns # 将每个表的列名及类型存入字典中
return all_columns # 返回包含所有表的列名及类型的字典
编写主要程序,把配置文件生成到源代码根目录的import文件夹
文件名:DataX_Configuration_Builder.py
代码语言:python代码运行次数:0复制import json
import os
import get_Table_ColumnsName
import get_Table_InParameter
#TODO 指定配置
HOST_NAME = "hadoop102"
DATABASE_NAME = "gmall"
MYSQL_USER_NAME = "root"
MYSQL_PASSWORD = "000000"
FILE_PATH = r".Table_Names.txt"
SAVE_PATH = r".import"
with open(FILE_PATH, 'r', encoding='utf-8') as file:
content = file.read()
tables = content.split(',')
TABLE_NUMS = len(tables)
GET_TABLE_NAMES = get_Table_ColumnsName.print_table_columns(FILE_PATH)
GET_PARAMETER = get_Table_InParameter.print_table_columns(FILE_PATH)
for NUM in range(TABLE_NUMS):
tablename = list(GET_TABLE_NAMES.keys())[NUM]
print("--------------------" tablename "--------------------")
reader_parameter = GET_TABLE_NAMES[tablename]
writer_parameter = GET_PARAMETER[tablename]
# 构建JSON数据
data = {
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": reader_parameter,
"connection": [
{
"jdbcUrl": [
f"jdbc:mysql://{HOST_NAME}:3306/{DATABASE_NAME}?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
],
"table": [
f"{tablename}"
]
}
],
"password": f"{MYSQL_PASSWORD}",
"splitPk": "",
"username": f"{MYSQL_USER_NAME}"
}
},
"writer": {
"name": "hdfswriter",
"parameter": {
"column": writer_parameter,
"compress": "gzip",
"defaultFS": f"hdfs://{HOST_NAME}:8020",
"fieldDelimiter": "t",
"fileName": f"{tablename}",
"fileType": "text",
"path": "${targetdir}",
"writeMode": "truncate"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}
json_data = json.dumps(data , indent=4)
file_path = os.path.join(SAVE_PATH , f"{tablename}.json")
os.makedirs(os.path.dirname(file_path) , exist_ok=True)
with open(file_path, 'w', encoding='utf-8') as json_file:
json_file.write(json_data)
print(f"{tablename} 数据已保存 import 文件夹")
脚本运行
代码相关配置项更改后创建一个import文件夹,运行DataX_Configuration_Builder.py,运行结束即可在import文件夹得到相应的配置文件