环境: python3.5 支持包: pymysql elasticsearch_dsl
安装 pymysql
elasticsearch_dsl
代码语言:javascript复制pip install elasticsearch_dsl
pip pymysql
代码实现
代码语言:javascript复制#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2018/4/18 下午6:30
# @Author : lizhao
# @File : mysql_data_to_elasticsearch.py
# @Version : 1.0
# 说明: 将mysql上的数据按规则放入elasticsearch中
# 引入es_type包
from tools.es_types import ZukerType
from w3lib.html import remove_tags
# 引入处理mysql的程序包
import pymysql
##############
# 数据库参数
zukerDB_ip = '********'
zukerDB_db = '********'
zukerDB_user = '********'
zukerDB_pw = '********'
zukerDB_******** = '********' #表名
##############
class MysqlMesToElastic():
def __init__(self):
pass
# 获取数据库数据
def get_mysql_data(self):
id = 1
db_zuker = pymysql.connect(zukerDB_ip, zukerDB_user, zukerDB_pw,
zukerDB_db,charset="utf8",cursorclass = pymysql.cursors.DictCursor)
cursor = db_zuker.cursor()
# 取出最后一条数据
SQL_get_mes = "select * from 58house_info order by id desc limit 1;"
cursor.execute(SQL_get_mes)
last_id = cursor.fetchone()['id']
print(last_id)
while id <= int(last_id):
# while id <= 5:
dict_mes = {}
# try:
SQL_get_mes = "select * from %s WHERE id = %s;" % (zukerDB_58house_info,id)
# SQL_get_mes = "select * from 58house_info WHERE id = 5;"
cursor.execute(SQL_get_mes)
results = cursor.fetchone()
# 如果有元素则分析元素
if results:
dict_mes = {
'title': results['name'],
'price': results['price'],
'create_date': results['sendTime'],
'desc': ' '.join([results['leasingMethod'],
results['tags'],results['houseType'],results['company']]),
'area': results['address'],
'longitude': results['longitude'],
'latitude': results['latitude'],
'url':results['url']
}
id = 1
if dict_mes:
# 调用 process_item方法 向数据库中插数据
self.process_item(dict_mes)
# item = get_mysql_data()
# 将数据写入到ES中
def process_item(self,item):
print(item)
zuker = ZukerType()
zuker.title = item['title'] # 'title': '名称',
zuker.price = item['price'] # 'price': '价格',
zuker.create_date = item['create_date'] # 'create_date': '时间',
zuker.desc = item['desc'] # 'desc': '介绍',
zuker.area = item['area'] # 'area': '位置',
zuker.longitude = item['longitude'] # 'longitude': '经度',
zuker.latitude = item['latitude'] # 'latitude': '维度',
zuker.url = item['url'] # 'url': 'url',
# 保存
try:
zuker.save()
except:
pass
if __name__ == "__main__":
item = MysqlMesToElastic()
item.get_mysql_data()