python实现mysql数据同步到elasticsearch

2018-04-24 16:47:15 浏览数 (1)

环境: python3.5 支持包: pymysql elasticsearch_dsl

安装 pymysql elasticsearch_dsl

代码语言:javascript复制
pip install elasticsearch_dsl
pip pymysql

代码实现

代码语言:javascript复制
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time    : 2018/4/18 下午6:30
# @Author  : lizhao
# @File    : mysql_data_to_elasticsearch.py
# @Version : 1.0
# 说明: 将mysql上的数据按规则放入elasticsearch中

# 引入es_type包
from tools.es_types import ZukerType
from w3lib.html import remove_tags
# 引入处理mysql的程序包
import pymysql

##############
# 数据库参数
zukerDB_ip = '********'
zukerDB_db = '********'
zukerDB_user = '********'
zukerDB_pw = '********' 
zukerDB_******** = '********'  #表名 


##############
class MysqlMesToElastic():

    def __init__(self):
        pass

    # 获取数据库数据
    def get_mysql_data(self):
        id = 1
        db_zuker = pymysql.connect(zukerDB_ip, zukerDB_user, zukerDB_pw,
                                   zukerDB_db,charset="utf8",cursorclass = pymysql.cursors.DictCursor)
        cursor = db_zuker.cursor()
        # 取出最后一条数据
        SQL_get_mes = "select * from 58house_info order by id desc limit 1;"
        cursor.execute(SQL_get_mes)
        last_id = cursor.fetchone()['id']
        print(last_id)
        while id <= int(last_id):
        # while id <= 5:
            dict_mes = {}
            # try:
            SQL_get_mes = "select * from %s WHERE id = %s;" % (zukerDB_58house_info,id)
            # SQL_get_mes = "select * from 58house_info WHERE id = 5;"
            cursor.execute(SQL_get_mes)
            results = cursor.fetchone()
            # 如果有元素则分析元素
            if results:
                dict_mes = {
                    'title': results['name'],
                    'price': results['price'],
                    'create_date': results['sendTime'],
                    'desc': '   '.join([results['leasingMethod'],
                                  results['tags'],results['houseType'],results['company']]),
                    'area': results['address'],
                    'longitude': results['longitude'],
                    'latitude': results['latitude'],
                    'url':results['url']
                }
            id  = 1
            if dict_mes:
                # 调用 process_item方法 向数据库中插数据
                self.process_item(dict_mes)



    # item = get_mysql_data()
    # 将数据写入到ES中
    def process_item(self,item):
        print(item)
        zuker = ZukerType()
        zuker.title = item['title']  # 'title': '名称',
        zuker.price = item['price']  # 'price': '价格',
        zuker.create_date =  item['create_date'] # 'create_date': '时间',
        zuker.desc = item['desc']  # 'desc': '介绍',
        zuker.area = item['area']  # 'area': '位置',
        zuker.longitude = item['longitude']  # 'longitude': '经度',
        zuker.latitude = item['latitude']  # 'latitude': '维度',
        zuker.url = item['url']  # 'url': 'url',

        # 保存
        try:
            zuker.save()
        except:
            pass

if __name__ == "__main__":
    item = MysqlMesToElastic()
    item.get_mysql_data()

0 人点赞