Python3实现MySQL数据增量更新同步到MongoDB

2022-01-05 21:50:30 浏览数 (1)

目录

一、MySQL工具类

二、MongoDB工具类

三、数据同步实现代码

一、MySQL工具类

代码语言:javascript复制
# -*- encoding: utf-8 -*-

import pymysql

class MySQLUtil:
    """
    MySQL工具类
    """
    def __init__(self, host="127.0.0.1", user=None, passwd=None, db=None, charset="utf8", *args, **kwargs):
        """构造函数"""
        self.host = host
        self.user = user
        self.passwd = passwd
        self.db = db
        self.conn = pymysql.connect(host=host, user=user, passwd=passwd, db=db, charset=charset, *args, **kwargs)

    def __del__(self):
        """析构函数"""
        self.conn.close()

    def get_cursor(self):
        """使用游标"""
        return self.conn.cursor()

    def select_db(self, db):
        """选择数据库"""
        self.conn.select_db(db)

    def list_databases(self):
        """查询所有数据库"""
        cursor = self.conn.cursor()
        cursor.execute("SHOW DATABASES")
        return cursor.fetchall()

    def list_tables(self):
        """查询所有表"""
        cursor = self.conn.cursor()
        cursor.execute("SHOW TABLES")
        return cursor.fetchall()

    def execute(self, sql, args=None):
        """执行SQL"""
        cursor = self.conn.cursor()
        cursor.execute(sql, args)
        return cursor.fetchall()

    def get_version(self):
        """打印MySQL版本"""
        cursor = self.conn.cursor()
        cursor.execute("SELECT VERSION()")
        version = cursor.fetchone()
        print("MySQL Version : %s" % version)
        return version

    def list_table_metadata(self):
        """查询所有表的元数据信息"""
        sql = "SELECT * FROM information_schema.TABLES WHERE TABLE_TYPE !='SYSTEM VIEW' AND TABLE_SCHEMA NOT IN ('sys','mysql','information_schema','performance_schema')"
        cursor = self.conn.cursor()
        cursor.execute(sql)
        return cursor.fetchall()

    def get_table_fields(self, db, table, args=None):
        """获取表字段信息"""
        sql = 'SELECT COLUMN_NAME FROM information_schema.COLUMNS WHERE table_schema="' db '" AND table_name="' table '"'
        cursor = self.conn.cursor()
        cursor.execute(sql, args)
        fields = []
        for field in cursor.fetchall():
            fields.append(field[0])
        return fields

    def table_metadata(self, db, table, args=None):
        """查询表字段的元数据信息"""
        db = "'"   db   "'"
        table = "'"   table   "'"
        """执行SQL"""
        sql = """
        SELECT 
            column_name,column_type,ordinal_position,column_comment,column_default 
        FROM 
            information_schema.COLUMNS 
        WHERE 
            table_schema = %s AND table_name = %s;
        """ % (db, table)
        cursor = self.conn.cursor()
        cursor.execute(sql, args)
        return cursor.fetchall()


if __name__ == "__main__":
    mysqlUtil = MySQLUtil("127.0.0.1", "root", "123456", "test")
    mysqlUtil = MySQLUtil(host="127.0.0.1", user="root", passwd="123456", db="test")
    mysqlUtil.get_version()
    conn = mysqlUtil.conn
    mysqlUtil.select_db("test")
    print(type(conn.db), conn.db)
    databases = mysqlUtil.list_databases()
    print(type(databases), databases)
    tables = mysqlUtil.list_tables()
    print(type(tables), tables)
    sql = "SELECT * FROM t_user"
    result = mysqlUtil.execute(sql)
    for i in result:
        print(i)
    result = mysqlUtil.table_metadata("test", "t_user")
    result = mysqlUtil.get_table_fields("test", "t_user")
    for i in result:
        print(i)

二、MongoDB工具类

代码语言:javascript复制
#-*- encoding: utf-8 -*-

import pymongo

class MongoDBUtil:
    """
    MongoDB工具类
    """
    def __init__(self, ip="127.0.0.1", db_name=None, port="27017"):
        """构造函数"""
        self.client = pymongo.MongoClient("mongodb://"   ip   ":"   port)
        self.database = self.client[db_name]

    def __del__(self):
        """析构函数"""
        # print("__del__")
        self.client.close()

    def create_database(self, db_name):
        """创建数据库"""
        return self.client.get_database(db_name)

    def drop_database(self, db_name):
        """删除数据库"""
        return self.client.drop_database(db_name)

    def select_database(self, db_name):
        """使用数据库"""
        self.database = self.client[db_name]
        return self.database

    def get_database(self, db_name):
        """使用数据库"""
        # return self.client[db_name]
        return self.client.get_database(db_name)

    def list_database_names(self):
        """获取所有数据库列表"""
        return self.client.list_database_names()

    def create_collection(self, collect_name):
        """创建集合"""
        collect = self.database.get_collection(collect_name)
        if(collect is not None):
            print("collection %s already exists" % collect_name)
            return collect
        return self.database.create_collection(collect_name)

    def drop_collection(self, collect_name):
        """获取所有集合名称"""
        return self.database.drop_collection(collect_name)

    def get_collection(self, collect_name):
        """获取集合"""
        return self.database.get_collection(collect_name)

    def list_collection_names(self):
        """获取所有集合名称"""
        return self.database.list_collection_names()

    def insert(self, collect_name, documents):
        """插入单条或多条数据"""
        return self.database.get_collection(collect_name).insert(documents)

    def insert_one(self, collect_name, document):
        """插入一条数据"""
        return self.database.get_collection(collect_name).insert_one(document)

    def insert_many(self, collect_name, documents):
        """插入多条数据"""
        return self.database.get_collection(collect_name).insert_many(documents)

    def delete_one(self, collect_name, filter, collation=None, hint=None, session=None):
        """删除一条记录"""
        return self.database.get_collection(collect_name).delete_one(filter, collation, hint, session)

    def delete_many(self, collect_name, filter, collation=None, hint=None, session=None):
        """删除所有记录"""
        return self.database.get_collection(collect_name).delete_many(filter, collation, hint, session)

    def find_one_and_delete(self, collect_name, filter, projection=None, sort=None, hint=None, session=None, **kwargs):
        """查询并删除一条记录"""
        return self.database.get_collection(collect_name).find_one_and_delete(filter, projection, sort, hint, session, **kwargs)

    def count_documents(self, collect_name, filter, session=None, **kwargs):
        """查询文档数目"""
        return self.database.get_collection(collect_name).count_documents(filter, session, **kwargs)

    def find_one(self, collect_name, filter=None, *args, **kwargs):
        """查询一条记录"""
        return self.database.get_collection(collect_name).find_one(filter, *args, **kwargs)

    def find(self, collect_name, *args, **kwargs):
        """查询所有记录"""
        return self.database.get_collection(collect_name).find(*args, **kwargs)

    def update(self, collect_name, spec, document, upsert=False, manipulate=False,
               multi=False, check_keys=True, **kwargs):
        """更新所有记录"""
        return self.database.get_collection(collect_name).update(spec, document,
                                upsert, manipulate, multi, check_keys, **kwargs)

    def update_one(self, collect_name, filter, update, upsert=False, bypass_document_validation=False,
                                collation=None, array_filters=None, hint=None, session=None):
        """更新一条记录"""
        return self.database.get_collection(collect_name).update_one(filter, update,
                                upsert, bypass_document_validation, collation, array_filters, hint, session)

    def update_many(self, collect_name, filter, update, upsert=False, array_filters=None,
                                bypass_document_validation=False, collation=None, hint=None, session=None):
        """更新所有记录"""
        return self.database.get_collection(collect_name).update_many(filter, update,
                                upsert, array_filters, bypass_document_validation, collation, hint, session)

    def find_one_and_update(self, collect_name, filter, update, projection=None, sort=None, upsert=False,
                           return_document=False, array_filters=None, hint=None, session=None, **kwargs):
        """查询并更新一条记录"""
        return self.database.get_collection(collect_name).find_one_and_update(filter, update, projection,
                                sort, upsert, return_document, array_filters, hint, session, **kwargs)

if __name__ == "__main__":
    print("------------------start-------------------------")
    # mongoUtil = MongoDBUtil(ip="192.168.81.165", port="27017")
    mongoUtil = MongoDBUtil(ip="127.0.0.1", db_name="xl01", port="27017")
    """数据库操作"""
    stat = mongoUtil.create_database(db_name="xl01")
    # stat = mongoUtil.drop_database(db_name="xl01")
    stat = mongoUtil.list_database_names()
    stat = mongoUtil.get_database(db_name="xl01")
    """集合操作"""
    stat = mongoUtil.create_collection(collect_name="xl_collect_01")
    # stat = mongoUtil.drop_collection(collect_name="xl_collect_01")
    stat = mongoUtil.get_collection(collect_name="xl_collect_01")
    stat = mongoUtil.list_collection_names()
    """文档操作:增加"""
    document = {"name": "hao123", "type": "搜索引擎", "url": "http://www.hao123.com/"}
    stat = mongoUtil.insert_one(collect_name="xl_collect_01", document=document)
    # documents = [{'x': i} for i in range(2)]
    documents = [{"name": "hao123", "type": "搜索引擎"} for i in range(2)]
    # stat = mongoUtil.insert(collect_name="xl_collect_01", documents=documents)
    stat = mongoUtil.insert_many(collect_name="xl_collect_01", documents=documents)
    """文档操作:查询"""
    stat = mongoUtil.find_one(collect_name="xl_collect_01")
    print(type(stat), stat)
    rows = mongoUtil.find(collect_name="xl_collect_01")
    # for row in rows:
    #     print(row)
    filter = {'name': 'hao123'}
    # filter = {'x': 1}
    count = mongoUtil.count_documents(collect_name="xl_collect_01", filter=filter)
    print(type(stat), count)
    """文档操作:删除"""
    stat = mongoUtil.delete_one(collect_name="xl_collect_01", filter=filter)
    stat = mongoUtil.find_one_and_delete(collect_name="xl_collect_01", filter=filter)
    # stat = mongoUtil.delete_many(collect_name="xl_collect_01", filter=filter)
    print(type(stat), stat)
    """文档操作:修改"""
    spec = {"url": "http://www.baidu.com/"}
    # spec = {"url": "http://www.hao123.com/"}
    stat = mongoUtil.update(collect_name="xl_collect_01", spec=spec, document=document)
    print(type(stat), stat)
    update = {"$set": spec}
    stat = mongoUtil.update_one(collect_name="xl_collect_01", filter=filter, update=update)
    print(type(stat), stat.modified_count, stat)
    # stat = mongoUtil.update_many(collect_name="xl_collect_01", filter=filter, update=update)
    # print(type(stat), stat.modified_count, stat)
    stat = mongoUtil.find_one_and_update(collect_name="xl_collect_01", filter=filter, update=update)
    print(type(stat), stat)
    print("-------------------end--------------------------")

三、数据同步实现代码

代码语言:javascript复制
#-*- encoding: utf-8 -*-

import sys, uuid
sys.path.append(r'..')
from MongoDB.MongoDBUtil import MongoDBUtil
from MySQL.MySQLUtil import MySQLUtil

class SyncMysqlMongo:
    """
    mysql同步数据到MongoDB
    """
    def __init__(self, mysql_ip, mysql_user, mysql_passwd, mysql_db, mongo_ip, mongo_db):
        self.mysqlUtil = MySQLUtil(mysql_ip, mysql_user, mysql_passwd, mysql_db)
        self.mongoUtil = MongoDBUtil(mongo_ip, mongo_db)

    def mysqlToMongo(self, mysql_database, mysql_table, mongo_collect_name):
        """将一张MySQL表数据全量插入到MongoDB集合中"""
        """
        1、从mysql查询指定表的字段信息和表数据
        2、遍历表数据的同时,通过表字段构造字典并插入列表
        3、批量插入或遍历列表循环插入MongoDB
        """
        table_field = self.mysqlUtil.get_table_fields(mysql_database, mysql_table)
        table_data = self.mysqlUtil.execute("SELECT * FROM "   mysql_table)   ## 查询表所有数据
        ## 打印字段信息
        for field in table_field:
            print(field[0], end=" ")
        print()
        documents = []
        ## 遍历MySQL查询数据,将每行数据写入list,list的每个元素为用字典表示,字典存储字段和值
        for row in table_data:
            dict = {}
            for key, value in zip(table_field, row):
                dict[key[0]] = value
            documents.append(dict)
        ## 批量插入MongoDB
        # self.mongoUtil.insert_many(mongo_collect_name, documents)
        # print(documents)
        ## 循环插入MongoDB
        for document in documents:
            print(document)
            self.mongoUtil.insert_one(mongo_collect_name, document)

if __name__ == "__main__":
    mysql_user, mysql_passwd = "root", "123456"
    mysql_ip, mysql_db = "127.0.0.1", "test"
    mongo_ip, mongo_db = "127.0.0.1", "xl01"
    ## 数据库同步对象
    syncsql = SyncMysqlMongo(mysql_ip, mysql_user, mysql_passwd, mysql_db, mongo_ip, mongo_db)
    ## 将一张MySQL表数据全量插入到MongoDB集合中
    mysql_database, mysql_table = "test", "t_user"
    mongo_collect_name = "t_user"
    syncsql.mysqlToMongo(mysql_database, mysql_table, mongo_collect_name)

0 人点赞