Python批量编写DataX脚本

2024-08-05 11:41:31 浏览数 (1)

此脚本用于批量配置生成DataX的采集器而编写

主要作用是将MySQL数据全量采集到hdfs指定的路径

其中生成的json配置文件的write的path配置项可根据个人使用情况进行更改

脚本主体

脚本根目录创建一个名为Table_Names.txt的文件,文件内容是以英文逗号隔开的MySQL表名

注:数据以一行排列即可

编写获取MySQL表头的脚本,将列名存入一个列表里,结果返回包含表名及其列名的字典

结果返回值示例:

文件名:get_Table_ColumnsName.py

代码语言:python代码运行次数:0复制
import pymysql

def get_db_connection():
    """
    创建并返回一个数据库连接和游标
    """
    conn = pymysql.connect(
        host='hadoop102',     # 数据库主机地址
        database='gmall',     # 数据库名称
        user='root',          # 数据库用户名
        password='000000',    # 数据库密码
        port=3306             # 数据库端口号
    )
    curses = conn.cursor()  # 创建一个游标,用于执行 SQL 语句
    return conn, curses      # 返回数据库连接和游标

def close_conn(conn, cursor):
    """
    关闭数据库连接和游标
    """
    cursor.close()  # 关闭游标
    conn.close()    # 关闭数据库连接

def query(sql, *args):
    """
    执行 SQL 查询并返回结果
    :param sql: SQL 查询语句
    :param args: SQL 查询参数
    :return: 查询结果列表,如发生错误则返回空列表
    """
    conn, cursor = get_db_connection()  # 获取数据库连接和游标
    try:
        cursor.execute(sql, args)  # 执行 SQL 查询
        res = cursor.fetchall()     # 获取查询结果
        conn.commit()               # 提交事务
        return res if res else []   # 返回查询结果或空列表
    except Exception as e:         # 捕获异常
        print(f"Error executing query: {e}")  # 打印错误信息
        conn.rollback()             # 出现错误时回滚事务
        return []                   # 返回空列表
    finally:
        close_conn(conn, cursor)    # 无论如何均关闭连接和游标

def get_table_columns(table_name):
    """
    获取指定表的列名
    :param table_name: 表名
    :return: 列名列表
    """
    sql = """  
    SELECT COLUMN_NAME  
    FROM INFORMATION_SCHEMA.COLUMNS  
    WHERE TABLE_SCHEMA = 'gmall' AND TABLE_NAME = %s  
    """
    return query(sql, table_name)  # 执行查询并返回结果

def print_table_columns(file_path):
    """
    读取文件中的表名并返回每个表的列名
    :param file_path: 包含表名的文件路径
    :return: 一个字典,字典包含每个表名及其对应的列名列表
    """
    table_columns = {}  # 初始化一个空字典,用于存储表名及其列名

    # 读取文件内容
    with open(file_path, 'r') as file:
        line = file.readline().strip()  # 读取第一行并去除空白字符
        table_names = line.split(',')    # 用逗号分隔并存储表名

    # 查询每个表的列名并存储到字典中
    for table_name in table_names:
        columns = get_table_columns(table_name)  # 获取列名
        column_names = [column[0] for column in columns]  # 提取列名
        table_columns[table_name] = column_names  # 使用表名作为键,将列名列表存入字典

    return table_columns  # 返回包含表名及其列名的字典

编写获取MySQL表头及其对应类型的脚本,结果返回包含所有表的列名及类型的字典

结果返回值示例:

文件名:get_Table_InParameter.py

代码语言:python代码运行次数:0复制
import pymysql

def get_db_connection():
    """
    创建并返回一个数据库连接和游标
    """
    conn = pymysql.connect(
        host='hadoop102',     # 数据库主机地址
        database='gmall',     # 数据库名称
        user='root',          # 数据库用户名
        password='000000',    # 数据库密码
        port=3306             # 数据库端口号
    )
    cursor = conn.cursor()  # 创建一个游标,用于执行 SQL 语句
    return conn, cursor      # 返回数据库连接和游标

def close_conn(conn, cursor):
    """
    关闭数据库连接和游标
    """
    cursor.close()  # 关闭游标
    conn.close()    # 关闭数据库连接

def query(sql, *args):
    """
    执行 SQL 查询并返回结果
    :param sql: SQL 查询语句
    :param args: SQL 查询参数
    :return: 查询结果列表,如发生错误则返回空列表
    """
    conn, cursor = get_db_connection()  # 获取数据库连接和游标
    try:
        cursor.execute(sql, args)  # 执行 SQL 查询
        res = cursor.fetchall()     # 获取查询结果
        return res if res else []   # 返回查询结果或空列表
    except Exception as e:
        print(f"Error executing query: {e}")  # 打印错误信息
        return []                   # 返回空列表
    finally:
        close_conn(conn, cursor)    # 无论如何均关闭连接和游标

def get_table_columns_with_types(table_name):
    """
    获取指定表的列名及其类型
    :param table_name: 表名
    :return: 列名及其类型的列表
    """
    sql = """  
    SELECT COLUMN_NAME, DATA_TYPE  
    FROM INFORMATION_SCHEMA.COLUMNS  
    WHERE TABLE_SCHEMA = 'gmall' AND TABLE_NAME = %s  
    """
    columns_info = query(sql, table_name)  # 执行查询以获取列信息

    result = []
    for column_name, data_type in columns_info:
        # 根据数据类型映射到所需的类型
        if 'int' in data_type:
            type_name = 'bigint'
        elif 'varchar' in data_type or 'text' in data_type:
            type_name = 'string'
        else:
            type_name = 'string'  # 默认类型

        result.append({
            "name": column_name,
            "type": type_name
        })

    return result  # 返回包含列名及其类型的列表

def print_table_columns(file_path):
    """
    读取文件中的表名并返回每个表的列名及类型
    :param file_path: 包含表名的文件路径
    :return: 一个字典,字典包含每个表名及其对应的列名及类型
    """
    all_columns = {}  # 初始化一个空字典,用于存储所有表的列名

    # 读取文件内容
    with open(file_path, 'r') as file:
        line = file.readline().strip()  # 读取第一行并去除空白字符
        table_names = line.split(',')    # 用逗号分隔并存储表名

    # 查询每个表的列名及类型并存储到字典中
    for table_name in table_names:
        columns = get_table_columns_with_types(table_name)  # 获取列名和类型
        all_columns[table_name] = columns  # 将每个表的列名及类型存入字典中

    return all_columns  # 返回包含所有表的列名及类型的字典

编写主要程序,把配置文件生成到源代码根目录的import文件夹

文件名:DataX_Configuration_Builder.py

代码语言:python代码运行次数:0复制
import json
import os
import get_Table_ColumnsName
import get_Table_InParameter

#TODO 指定配置
HOST_NAME = "hadoop102"
DATABASE_NAME = "gmall"
MYSQL_USER_NAME = "root"
MYSQL_PASSWORD = "000000"
FILE_PATH = r".Table_Names.txt"
SAVE_PATH = r".import"


with open(FILE_PATH, 'r', encoding='utf-8') as file:
    content = file.read()

tables = content.split(',')

TABLE_NUMS = len(tables)

GET_TABLE_NAMES = get_Table_ColumnsName.print_table_columns(FILE_PATH)
GET_PARAMETER = get_Table_InParameter.print_table_columns(FILE_PATH)

for NUM in range(TABLE_NUMS):
    tablename = list(GET_TABLE_NAMES.keys())[NUM]
    print("--------------------"   tablename   "--------------------")
    reader_parameter = GET_TABLE_NAMES[tablename]
    writer_parameter = GET_PARAMETER[tablename]

    # 构建JSON数据
    data = {
        "job": {
            "content": [
                {
                    "reader": {
                        "name": "mysqlreader",
                        "parameter": {
                            "column": reader_parameter,
                            "connection": [
                                {
                                    "jdbcUrl": [
                                        f"jdbc:mysql://{HOST_NAME}:3306/{DATABASE_NAME}?useUnicode=true&allowPublicKeyRetrieval=true&characterEncoding=utf-8"
                                    ],
                                    "table": [
                                        f"{tablename}"
                                    ]
                                }
                            ],
                            "password": f"{MYSQL_PASSWORD}",
                            "splitPk": "",
                            "username": f"{MYSQL_USER_NAME}"
                        }
                    },
                    "writer": {
                        "name": "hdfswriter",
                        "parameter": {
                            "column": writer_parameter,
                            "compress": "gzip",
                            "defaultFS": f"hdfs://{HOST_NAME}:8020",
                            "fieldDelimiter": "t",
                            "fileName": f"{tablename}",
                            "fileType": "text",
                            "path": "${targetdir}",
                            "writeMode": "truncate"
                        }
                    }
                }
            ],
            "setting": {
                "speed": {
                    "channel": 1
                }
            }
        }
    }

    json_data = json.dumps(data , indent=4)

    file_path = os.path.join(SAVE_PATH , f"{tablename}.json")

    os.makedirs(os.path.dirname(file_path) , exist_ok=True)

    with open(file_path, 'w', encoding='utf-8') as json_file:
        json_file.write(json_data)

    print(f"{tablename} 数据已保存 import 文件夹")

脚本运行

代码相关配置项更改后创建一个import文件夹,运行DataX_Configuration_Builder.py,运行结束即可在import文件夹得到相应的配置文件

0 人点赞