最近有盆友需要帮忙写个爬虫脚本,爬取雪球网一些上市公司的财务数据。盆友希望可以根据他自己的选择进行自由的抓取,所以简单给一份脚本交给盆友,盆友还需要自己搭建python环境,更需要去熟悉一些参数修改的操作,想来也是太麻烦了。
于是,结合之前做过的汇率计算器小工具,我这边决定使用PyQt5给朋友制作一个爬虫小工具,方便他的操作可视化。
一、效果演示
二、功能说明
- 可以自由选择证券市场类型:A股、美股和港股
- 可以自由选择上市公司:单选或全选
- 可以自由选择财务数据类型:单选或全选(主要指标、利润表、资产负债表、现金流表)
- 可以导出数据存储为excel表格文件
- 支持同一家上市公司同类型财务数据追加
三、制作过程
首先引入需要的库
代码语言:javascript复制import sys
from PyQt5 import QtCore, QtGui, QtWidgets
from PyQt5.QtWidgets import QApplication, QMainWindow,QFileDialog
import os
import requests
from fake_useragent import UserAgent
import json
import logging
import time
import pandas as pd
from openpyxl import load_workbook
雪球网页拆解
这一步的目的是获取需要爬取的数据的真正URL地址规律。
当我选中某只股票查看财务数据某类型数据报告时,点击下一页,网站地址没有变化,基本可以知道这是动态加载的数据,对于这类数据可以使用F12打开开发者模式。
在开发者模式下,选到Network—>XHR可以查看到真正的数据获取地址URL及请求方式(General里是请求URL和请求方式说明,Request Headers有请求头信息,如cookie,Query String Parameters就是可变参数项,一般来说数据源URL就是由基础URL和这里的可变参数组合而成)
我们分析这段URL,可以发现其基本结构如下:
基于上述结构,我们拆分最终的组合URL地址如下
代码语言:javascript复制#基础网站
base_url = f'https://stock.xueqiu.com/v5/stock/finance/{ABtype}'
#组合url地址
url = f'{base_url}/{data_type}.json?symbol={ipo_code}&type=all&is_detail=true&count={count_num}×tamp={start_time}'
操作界面设计
操作界面设计使用的是PyQt5,这里不做更详细的介绍,我们在后续中对PyQt5的使用再专题讲解。
使用QT designer对操作界面进行可视化设计,参考如下:
雪球网数据提取.ui中各个组件的相关设置,参考如下:
.ui文件可以使用pyuic5指令进行编译生成对应的.py文件,或者我们也可以在vscode里直接转译(这里也不做更详细的介绍,具体见后续专题讲解)。
本文没有将操作界面定义文件单独使用,而是将全部代码集中在同一个.py文件,因此其转译后的代码备用即可。
获取cookie及基础参数
获取cookie
为了便于小工具拿来即可使用,我们需要自动获取cookie地址并附加在请求头中,而不是人为打开网页在开发者模式下获取cookie后填入。
自动获取cookie,这里使用到的requests库的session会话对象。
requests库的session会话对象可以跨请求保持某些参数,简单来说,就是比如你使用session成功的登录了某个网站,则在再次使用该session对象请求该网站的其他网页都会默认使用该session之前使用的cookie等参数
代码语言:javascript复制import requests
from fake_useragent import UserAgent
url = 'https://xueqiu.com'
session = requests.Session()
headers = {"User-Agent": UserAgent(verify_ssl=False).random}
session.get(url, headers=headers)
#获取当前的Cookie
Cookie= dict(session.cookies)
基础参数
基础参数是用于财务数据请求时原始网址构成参数选择,我们在可视化操作工具中需要对财务数据类型进行选择,因此这里需要构建财务数据类型字典。
代码语言:javascript复制#原始网址
original_url = 'https://xueqiu.com'
#财务数据类型字典
dataType = {'全选':'all',
'主要指标':'indicator',
'利润表':'income',
'资产负债表':'balance',
'现金流量表':'cash_flow'}
获取获取各证券市场上市名录
因为我们在可视化操作工具上是选定股票代码后抓取相关数据并导出,对导出的文件名称希望是以股票代码 公司名称的形式(SH600000 浦发银行)存储,所以我们需要获取股票代码及名称对应关系的字典表。
这其实就是一个简单的网络爬虫及数据格式调整的过程,实现代码如下:
代码语言:javascript复制 1import requests
2import pandas as pd
3import json
4from fake_useragent import UserAgent
5#请求头设置
6headers = {"User-Agent": UserAgent(verify_ssl=False).random}
7#股票清单列表地址解析(通过设置参数size为9999可以只使用1个静态地址,全部股票数量不足5000)
8url = 'https://xueqiu.com/service/v5/stock/screener/quote/list?page=1&size=9999&order=desc&orderby=percent&order_by=percent&market=CN&type=sh_sz'
9#请求原始数据
10response = requests.get(url,headers = headers)
11#获取股票列表数据
12df = response.text
13#数据格式转化
14data = json.loads(df)
15#获取所需要的股票代码及股票名称数据
16data = data['data']['list']
17#将数据转化为dataframe格式,并进行相关调整
18data = pd.DataFrame(data)
19data = data[['symbol','name']]
20data['name'] = data['symbol'] ' ' data['name']
21data.sort_values(by = ['symbol'],inplace=True)
22data = data.set_index(data['symbol'])['name']
23#将股票列表转化为字典,键为股票代码,值为股票代码和股票名称的组合
24ipoCodecn = data.to_dict()
A股股票代码及公司名称字典如下:
获取上市公司财务数据并导出
根据在可视化操作界面选择的 财务报告时间区间、财务报告数据类型、所选证券市场类型以及所输入的股票代码后,需要先根据这些参数组成我们需要进行数据请求的网址,然后进行数据请求。
由于请求后的数据是json格式,因此可以直接进行转化为dataframe类型,然后进行导出。在数据导出的时候,我们需要判断该数据文件是否存在,如果存在则追加,如果不存在则新建。
获取上市公司财务数据
通过选定的参数生成财务数据网址,然后根据是否全选决定后续数据请求的操作,因此可以拆分为获取数据网址和请求详情数据两部分。
获取数据网址
数据网址是根据证券市场类型、财务数据类型、股票代码、单页数量及起始时间戳决定,而这些参数都是通过可视化操作界面进行设置。
证券市场类型 控件 是radioButton,可以通过你 ischecked() 方法判断是否选中,然后用if-else进行参数设定;
财务数据类型 和 股票代码 因为支持 全选,需要先进行全选判定(全选条件下是需要循环获取数据网址,否则是单一获取即可),因此这部分需要再做拆分;
单页数量 考虑到每年有4份财务报告,因此这里默认为年份差*4;
时间戳 是 根据起始时间中的 结束时间 计算得出,由于可视化界面输入的 是 整数年份,我们可以通过 mktime() 方法获取时间戳。
代码语言:javascript复制 1def Get_url(self,name,ipo_code):
2 #获取开始结束时间戳(开始和结束时间手动输入)
3 inputstartTime = str(self.start_dateEdit.date().toPyDate().year)
4 inputendTime = str(self.end_dateEdit.date().toPyDate().year)
5 endTime = f'{inputendTime}-12-31 00:00:00'
6 timeArray = time.strptime(endTime, "%Y-%m-%d %H:%M:%S")
7
8 #获取指定的数据类型及股票代码
9 filename = ipo_code
10 data_type =dataType[name]
11 #计算需要采集的数据量(一年以四个算)
12 count_num = (int(inputendTime) - int(inputstartTime) 1) * 4
13 start_time = f'{int(time.mktime(timeArray))}001'
14
15 #证券市场类型
16 if (self.radioButtonCN.isChecked()):
17 ABtype = 'cn'
18 num = 3
19 elif (self.radioButtonUS.isChecked()):
20 ABtype = 'us'
21 num = 6
22 elif (self.radioButtonHK.isChecked()):
23 ABtype = 'hk'
24 num = 6
25 else:
26 ABtype = 'cn'
27 num = 3
28
29 #基础网站
30 base_url = f'https://stock.xueqiu.com/v5/stock/finance/{ABtype}'
31
32 #组合url地址
33 url = f'{base_url}/{data_type}.json?symbol={ipo_code}&type=all&is_detail=true&count={count_num}×tamp={start_time}'
34
35 return url,num
请求详情数据
需要根据用户输入决定数据采集方式,代码中主要是根据用户输入做判断然后再进行详情数据请求。
代码语言:javascript复制 1#根据用户输入决定数据采集方式
2def Get_data(self):
3 #name为财务报告数据类型(全选或单个)
4 name = self.Typelist_comboBox.currentText()
5 #股票代码(全选或单个)
6 ipo_code = self.lineEditCode.text()
7 #判断证券市场类型
8 if (self.radioButtonCN.isChecked()):
9 ipoCodex=ipoCodecn
10 elif (self.radioButtonUS.isChecked()):
11 ipoCodex=ipoCodeus
12 elif (self.radioButtonHK.isChecked()):
13 ipoCodex=ipoCodehk
14 else:
15 ipoCodex=ipoCodecn
16#根据财务报告数据类型和股票代码类型决定数据采集的方式
17 if name == '全选' and ipo_code == '全选':
18 for ipo_code in list(ipoCodex.keys()):
19 for name in list(dataType.keys())[1:]:
20 self.re_data(name,ipo_code)
21 elif name == '全选' and ipo_code != '全选':
22 for name in list(dataType.keys())[1:]:
23 self.re_data(name,ipo_code)
24 elif ipo_code == '全选' and name != '全选':
25 for ipo_code in list(ipoCodex.keys()):
26 self.re_data(name,ipo_code)
27 else:
28 self.re_data(name,ipo_code)
29
30#数据采集,需要调用数据网址(Get.url(name,ipo_code)
31def re_data(self,name,ipo_code):
32 name = name
33 #获取url和num(url为详情数据网址,num是详情数据中根据不同证券市场类型决定的需要提取的数据起始位置)
34 url,num = self.Get_url(name,ipo_code)
35 #请求头
36 headers = {"User-Agent": UserAgent(verify_ssl=False).random}
37 #请求数据
38 df = requests.get(url,headers = headers,cookies = cookies)
39
40 df = df.text
41try:
42 data = json.loads(df)
43 pd_df = pd.DataFrame(data['data']['list'])
44 to_xlsx(num,pd_df)
45 except KeyError:
46 log = '<font color="#FF0000">该股票此类型报告不存在,请重新选择股票代码或数据类型</font>'
47 self.rizhi_textBrowser.append(log)
财务数据处理并导出
单纯的数据导出是比较简单的操作,直接to_excel() 即可。但是考虑到同一个上市公司的财务数据类型有四种,我们希望都保存在同一个文件下,且对于同类型的数据可能存在分批导出的情况希望能追加。因此,需要进行特殊的处理,用pd.ExcelWriter()方法操作。
代码语言:javascript复制 1#数据处理并导出
2def to_xlsx(self,num,data):
3 pd_df = data
4 #获取可视化操作界面输入的导出文件保存文件夹目录
5 filepath = self.filepath_lineEdit.text()
6 #获取文件名
7 filename = ipoCode[ipo_code]
8 #组合成文件详情(地址 文件名 文件类型)
9 path = f'{filepath}{filename}.xlsx'
10 #获取原始数据列字段
11 cols = pd_df.columns.tolist()
12 #创建空dataframe类型用于存储
13 data = pd.DataFrame()
14 #创建报告名称字段
15 data['报告名称'] = pd_df['report_name']
16 #由于不同证券市场类型下各股票财务报告详情页数据从不同的列才是需要的数据,因此需要用num作为起点
17 for i in range(num,len(cols)):
18 col = cols[i]
19 try:
20 #每列数据中是列表形式,第一个是值,第二个是同比
21 data[col] = pd_df[col].apply(lambda x:x[0])
22 # data[f'{col}_同比'] = pd_df[col].apply(lambda x:x[1])
23 except TypeError:
24 pass
25 data = data.set_index('报告名称')
26 log = f'{filename}的{name}数据已经爬取成功'
27 self.rizhi_textBrowser.append(log)
28 #由于存储的数据行索引为数据指标,所以需要对采集的数据进行转T处理
29 dataT = data.T
30 dataT.rename(index = eval(f'_{name}'),inplace=True)
31 #以下为判断数据报告文件是否存在,若存在则追加,不存在则重新创建
32 try:
33 if os.path.exists(path):
34 #读取文件全部页签
35 df_dic = pd.read_excel(path,None)
36 if name not in list(df_dic.keys()):
37 log = f'{filename}的{name}数据页签不存在,创建新页签'
38 self.rizhi_textBrowser.append(log)
39 #追加新的页签
40 with pd.ExcelWriter(path,mode='a') as writer:
41 book = load_workbook(path)
42 writer.book = book
43 dataT.to_excel(writer,sheet_name=name)
44 writer.save()
45 else:
46 log = f'{filename}的{name}数据页签已存在,合并中'
47 self.rizhi_textBrowser.append(log)
48 df = pd.read_excel(path,sheet_name = name,index_col=0)
49 d_ = list(set(list(dataT.columns)) - set(list(df.columns)))
50#使用merge()进行数据合并
51 dataT = pd.merge(df,dataT[d_],how='outer',left_index=True,right_index=True)
52 dataT.sort_index(axis=1,ascending=False,inplace=True)
53 #页签中追加数据不影响其他页签
54 with pd.ExcelWriter(path,engine='openpyxl') as writer:
55 book = load_workbook(path)
56 writer.book = book
57 idx = writer.book.sheetnames.index(name)
58 #删除同名的,然后重新创建一个同名的
59 writer.book.remove(writer.book.worksheets[idx])
60 writer.book.create_sheet(name, idx)
61 writer.sheets = {ws.title:ws for ws in writer.book.worksheets}
62
63 dataT.to_excel(writer,sheet_name=name,startcol=0)
64 writer.save()
65 else:
66 dataT.to_excel(path,sheet_name=name)
67
68 log = f'<font color="#00CD00">{filename}的{name}数据已经保存成功</font>'
69 self.rizhi_textBrowser.append(log)
70
71 except FileNotFoundError:
72 log = '<font color="#FF0000">未设置存储目录或存储目录不存在,请重新选择文件夹</font>'
73 self.rizhi_textBrowser.append(log)
上面就是制作过程讲解与关键代码,由于源代码内容较多,就不全量展示了,可在后台回复“XQ”可获取完整源代码文件!
-END-