Python 实现批量查询IP并解析为归

2020-01-17 13:00:01 浏览数 (1)

代码语言:javascript复制
一、背景:
    最近工作中做了一个小功能,目的是为了分析注册用户区域分布和订单的区域分布情况。所以需要将其对应的IP信息解析为归属地,并同步每天同步更新。
线上跑起来效率还是有优化的空间,优化的方向:在调用IP查询API过程可以调整为多线程并行解析IP。后续会更新这方便的调整。
技术:    Pyhton3
         postgreSQL
         env配置文件

附加信息:iP地址查询(iP138官方企业版):https://market.aliyun.com/products/56928004/cmapi015606.html#sku=yuncode960600002
     .可提供免费的IP查询API.

二、实现思路: 1、 读取数据库IP信息
                2、 调用第三方IP解析API进行解析
                3、 将解析归属地信息存入数据库
三、几点说明: 1、环境信息等参数配置
                2、日志输出
                3、异常处理: 数据库连接异常
                            请求连接查询IP的URL异常:HTTP ERROR 503
                4、json,字典,数组等类型数据输入输出
                5、分页查询并批量解析
                5.功能实现很简单,所以就没有做详细的介绍了。详情可直接看完整代码,有详细的备注。

四、步骤简单介绍:
 针对实现思路的3个步骤写了3个函数,彼此调用执行。
      函数:
      def get_ip_info(table_name):
      def get_ip_area(table_name):
      def ip_write_db(table_name):
      调用:
      ip_write_db("h_user_stat")
      
五、关键代码说明:
代码语言:javascript复制
语法:urllib.request.urlopen(url, data=None, [timeout, ]*, cafile=None, capath=None, cadefault=False, context=None)
代码语言:javascript复制
 # 对从数据库表中出查询的IP进行解析
       querys = 'callback&datatype=jsonp&ip='   get_ip
       bodys = {}
       url = host   path   '?'   querys
       request = urllib.request.Request(url)
       request.add_header('Authorization', 'APPCODE '   appcode)

       # 连接url时可能会出现 ERROR: HTTP Error 503: Service Unavailable
       try: 
         response = urllib.request.urlopen(request)
       except Exception as e:
         logging.error(e) # 输出异常日志信息 
         time.sleep(5)
         response = urllib.request.urlopen(request)
       finally:
         content = response.read()
         ip_area = content.decode('utf8')
         ip_area = json.loads(ip_area)['data'] # json类型转字典类型并取'data'健值
         arr.append([get_ip, ip_area]) # 将结果集存于二元数组
代码语言:javascript复制
说明:从数据库分页查询固定数量的IP存入数组,并遍历该数组并将解析后的地区信息data健值存于二元数组中。
代码语言:javascript复制
代码语言:javascript复制
代码语言:javascript复制
六、Python代码实现如下:
代码语言:javascript复制
  1 # 导入psycopg2包
  2 import psycopg2, time,datetime,sys
  3 import json
  4 import urllib, urllib.request
  5 import os
  6 import configparser
  7 import logging
  8                      # purpose: 连接数据库读取表IP信息
  9 def get_ip_info(table_name):
 10     # 全局变量作用局部作用域
 11     global pagesize # 每页查询数据条数
 12     global rows_count
 13 
 14     # 测试1
 15     starttime_1 = time.time()
 16     # 建立游标,用来执行数据库操作
 17     cur = conn.cursor()
 18     # 执行SQL命令
 19     cur.execute("SELECT remote_ip FROM (select remote_ip,min(created_at) from "   table_name   " group by remote_ip) h1 where remote_ip is not null and remote_ip <> '' and  not exists (select 1 from d_ip_area_mapping h2 where h1.remote_ip = h2.remote_ip) limit "   str(pagesize)   ";")
 20 
 21 
 22     # 获取结果集条数
 23     rows_count = cur.rowcount
 24    
 25     # print('解析用户IP的总数:'   str(rows_count))
 26 
 27      # 当有未解析的用户的IP,返回元组,否则退出程序
 28     if rows_count > 0:
 29       # 获取SELECT返回的元组
 30       rows =  cur.fetchall()        # all rows in table
 31 
 32       for row in rows:
 33           tuple = rows
 34 
 35       conn.commit()
 36       # 关闭游标
 37       cur.close()
 38 
 39     else:
 40         tuple = []
 41     logging.info('每页查询秒数:'   str(time.time() - starttime_1))
 42     return tuple
 43     # 调用解析函数
 44 
 45 
 46 def get_ip_area(table_name):
 47   # 内包含用户ID和IP的数组的元组
 48   tuple = get_ip_info(table_name)  
 49 
 50   # 测试2
 51   starttime_2 = time.time()
 52   host = 'http://ali.ip138.com'
 53   path = '/ip/'
 54   method = 'GET'
 55   appcode = '917058e6d7c84104b7cab9819de54b6e'
 56   arr = []
 57   for row in tuple:
 58 
 59        get_ip = row[0]
 60        #get_user = "".join(str(row))
 61        #get_user = row[0]
 62 
 63             # 对从数据库表中出查询的IP进行解析
 64        querys = 'callback&datatype=jsonp&ip='   get_ip
 65        bodys = {}
 66        url = host   path   '?'   querys
 67        request = urllib.request.Request(url)
 68        request.add_header('Authorization', 'APPCODE '   appcode)
 69 
 70        # 连接url时可能会出现 ERROR: HTTP Error 503: Service Unavailable
 71        try: 
 72          response = urllib.request.urlopen(request)
 73        except Exception as e:
 74          logging.error(e) # 输出异常日志信息 
 75          time.sleep(5)
 76          response = urllib.request.urlopen(request)
 77        finally:
 78          content = response.read()
 79          ip_area = content.decode('utf8')
 80          ip_area = json.loads(ip_area)['data'] # json类型转字典类型并取'data'健值
 81          arr.append([get_ip, ip_area]) # 将结果集存于二元数组
 82   logging.info('每页解析秒数:'   str(time.time() - starttime_2))
 83   return  arr
 84 
 85 
 86 def ip_write_db(table_name):
 87    
 88     write_ip = get_ip_area(table_name)  # 内包含用户ID和IP的数组的元组
 89 
 90 
 91     # 测试1
 92     starttime_3 = time.time()
 93 
 94      # 建立游标,用来执行数据库操作
 95     cur = conn.cursor()
 96     for row in write_ip:
 97         # get_user = row[0]  # 获取用户ID
 98         get_ip = row[0]  # 获取用户对应的IP
 99         country = row[1][0]  # 获取IP解析后的地区:国家
100         province = row[1][1]  # 获取IP解析后的地区:省
101         city = row[1][2]  # 获取IP解析后的地区:市
102         isp = row[1][3]  # 获取IP解析后的服务提供商
103 
104         # 执行SQL命令
105         sql = "insert into d_ip_area_mapping(remote_ip,country,province,city,isp,created_at,updated_at,job_id) values (%s,%s,%s,%s,%s,%s,%s,%s);"
106         val = [get_ip, country, province, city, isp, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
107                time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),time.strftime("%Y-%m-%d",time.localtime())]
108         
109         cur.execute(sql, val)
110         conn.commit()    
111     # 关闭游标
112     cur.close()
113     logging.info('每页插入秒数:'   str(time.time() - starttime_3))
114 
115 
116 # 1.程序开始执行计时
117 starttime = time.time()
118 
119 
120      # 读取配置文件环境信息 
121 
122 # 项目路径
123 rootDir = os.path.split(os.path.realpath(__file__))[0]
124 
125 
126 ############################### config.env文件路径  #############################################################
127 
128 configFilePath = os.path.join(rootDir, 'db_udw.env')
129 config = configparser.ConfigParser()
130 config.read(configFilePath)
131 
132 # 读取数据库环境信息
133 db_database = config.get('postgresql','database')
134 db_user = config.get('postgresql','user')
135 db_password = config.get('postgresql','password')
136 db_host = config.get('postgresql','host')
137 db_port = config.get('postgresql','port')
138 
139 # 读取输出日志路径
140 log = config.get('log','log_path')
141 
142 # 每页查询数据条数
143 pagesize = config.get('page','pagesize') 
144 
145 # 读取解析IP条数限制
146 ip_num_limit = config.get('ip_num','ip_num_limit') 
147 
148 # 配置输出日志格式
149 logging.basicConfig(level=logging.DEBUG,#控制台打印的日志级别
150                       filename='{my_log_path}/ip_analyzer.log'.format(my_log_path=log),  # 指定日志文件及路径
151                       filemode='a',##模式,有w和a,w就是写模式,每次都会重新写日志,覆盖之前的日志 #a是追加模式,默认如果不写的话,就是追加模式
152                       format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s'#日志格式
153                       )
154 
155 ###############################   程序开始执行  #############################################################
156 try:
157 
158   # 连接到一个给定的数据库
159   conn = psycopg2.connect(database=db_database, user=db_user, password=db_password, host=db_host, port=db_port)
160 except Exception as e:
161   logging.error(e) # 输出连接异常日志信息
162 
163 # 返回查询行数 默认为0
164 rows_count = 0
165  # 用户表IP解析总数
166 user_ip_num = 0 
167  # 订单表IP解析总数
168 order_ip_num = 0 
169 
170 
171 
172 try:
173 
174   # 解析用户表注册IP信息
175   while user_ip_num <= eval(ip_num_limit):
176      i = 1  # 循环次数
177      ip_write_db("h_user_stat")
178      user_ip_num = user_ip_num   rows_count*i
179      i  = i   1
180      if rows_count == 0 :
181          break
182 
183   # 解析订单表下单IP信息 
184   while user_ip_num <= eval(ip_num_limit):
185       # 解析用户表注册IP信息
186       i = 1  # 循环次数
187       ip_write_db("h_order")
188       order_ip_num = order_ip_num   rows_count*i
189       i = i   1
190       if rows_count == 0 :
191          break
192 except Exception as e:
193   logging.error(e) # 输出异常日志信息
194 finally:
195   # 关闭数据库连接
196   conn.close()
197 
198 # 2 程序结束执行计时
199   endtime = time.time()
200   
201   # print('解析用户IP的总数:'   str(user_ip_num))
202   # print('解析订单IP的总数:'   str(order_ip_num))
203   # # 打印程序执行总耗时
204   # print('解析总耗时秒数:'   str(endtime - starttime))
205   logging.info('解析用户IP的总数:'   str(user_ip_num))
206   logging.info('解析订单IP的总数:'   str(order_ip_num))
207   logging.info('解析总耗时秒数:'   str(endtime - starttime))
代码语言:javascript复制

 环境配置db_udw.envdb_udw.env 如下:

代码语言:javascript复制
# 数据库环境信息
[postgresql]
database = ahaschool_udw
user = admin
password = 123456
host = 127.0.0.0
port = 5432

# 设置日志文件路径
[log]
log_path = /home/hjmrunning/bi_etl_product/scripts/log

# 每页查询数据条数
[page]
pagesize = 1000  

# IP解析条数限制
[ip_num]
ip_num_limit = 150000

最后

    我接触Python时间也不是很久,实现方法可能会有疏漏。如果有什么疑问和见解,欢迎评论区交流。

代码语言:javascript复制

0 人点赞