摘要
本文通过requests和re库实现了豆瓣电影top250的爬取。
首先是对书上案例进行学习,了解如何定位网站中我们需要的信息,并使用re提供的正则表达式匹配我们的信息。然后为了加快爬虫速度,我们使用了多进程multiprocessing。最后进入实战,对真实的网站进行爬取。在实战中,我们遇到了一些新问题,需要处理网站的反爬虫机制。
书上案例
《Python3 网络爬虫开发实战》(第二版)作者崔庆才搭建的平台Scrape Center。对爬虫感兴趣的可以看一看。
我们进入第一个案例Scrape | Movie。爬取top250电影。
网站分析
在使用代码爬取前,我们需要分析网站是怎么放置电影信息的:
这里我们先对作者搭建的一个网站进行爬取(学会后我们再对真实的豆瓣爬取):
进入网址https://ssr1.scrape.center
,然后F12
打开开发者模式:
(注:下图用红色箭头标注的是选择工具
,我们点击选择工具然后选定左边的页面元素,就可以在右边看到对应的代码)
点击选择工具,然后选择一个电影,发现每一个电影都是一个div
节点,class属性都有el-card
值
接着看一下详情页。我们点击标题就会进入详情页,我们发现标题节点的外层有一个a节点带有href属性(指向详情/detail/1
),我们只要将前缀https://ssr1.scrape.center
和其拼起来就得到详情网址了。
现在我们已经知道如何爬取一页的电影了。下面需要翻页,继续爬取下一页:
我们可以发现每一页的规律是https://ssr1.scrape.center/page/页码
只有最后的页码不一样。
代码实现:
在上面的分析之后,我们得到如下思路:
1.进入电影列表页面,获取每个电影的详情URL。
2.对10页电影列表进行相同操作。
代码语言:javascript复制import re
import requests
import logging
from urllib.parse import urljoin
# logging 用来打印信息
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s : %(message)s')
BASE_URL = 'https://ssr1.scrape.center'
TOTAL_PAGE = 10
def scrape_page(url):
logging.info(f'scapying {url}...')
try:
response = requests.get(url)
if response.status_code == 200:
return response.text
logging.error(f'get invalid status code{response.status_code} while scraping{url}')
except requests.RequestException:
logging.error(f"error_orrurred while scraping{url}",exc_info=True)
def scrape_index(page):
index_url = f'{BASE_URL}/page/{page}'
return scrape_page(index_url)
def parse_index(html):
pattern = re.compile('<a.*?href="(.*?)".*?class="name">')
items = re.findall(pattern,html)
if not items:
return []
for item in items:
detail_url = urljoin(BASE_URL,item)
logging.info(f"get detail url {detail_url}")
yield detail_url
def main():
for page in range(1,TOTAL_PAGE 1):
index_html = scrape_index(page)
detail_urls = parse_index(index_html)
logging.info(f"detail urls {list(detail_urls)}" )
if __name__ == '__main__':
main()
现在已经获得了所有电影的详情页的url。下面对每个电影的详情页进行特征提取。
代码语言:javascript复制def parse_detail(html):
cover_pattern = re.compile('class="item.*?<img.*?src="(.*?)".*?class="cover">',re.S)
name_pattern = re.compile('<h2.*?>(.*?)</h2>')
categories_pattern = re.compile('<button.*?category.*?<span>(.*?)</span>.*?</button>',re.S)
published_at_pattern = re.compile('(d{4}-d{2}-d{2}s?上映)')
drama_pattern = re.compile('<div.*?drama.*?>.*?<p.*?>(.*?)</p>',re.S)
score_pattern = re.compile('<p.*?score.*?>(.*?)</p>',re.S)
cover = re.search(cover_pattern, html).group(1).strip()
if re.search(cover_pattern, html) else None
name = re.search(name_pattern,html).group(1).strip()
if re.search(name_pattern,html) else None
categories = re.findall(categories_pattern,html) if re.findall(categories_pattern,html) else []
published_at = re.search(published_at_pattern,html).group(1)
if re.search(published_at_pattern,html) else None
drama = re.search(drama_pattern, html).group(1).strip()
if re.search(drama_pattern, html) else None
score = re.search(score_pattern, html).group(1).strip()
if re.search(score_pattern, html) else None
score = float(score)
return {
'cover': cover,
'name': name,
'categories': categories,
'published_at': published_at,
'drama': drama,
'score': score
}
保存电影详情到json文件中:
代码语言:javascript复制import json
from os import makedirs
from os.path import exists
RESULT_DIR = 'results'
exists(RESULT_DIR) or makedirs(RESULT_DIR)
def save_data(data):
name = data.get('name')
data_path = f'{RESULT_DIR}/{name}.json'
with open(data_path,'w',encoding='utf-8') as f:
json.dump(data,f, ensure_ascii=False, indent=2)
为了逻辑清晰,我们定义一个函数scrape_detail
用于爬取详情页。
def scrape_detail(url):
return scrape_page(url)
并且在main函数中调用爬取详情:
代码语言:javascript复制def main():
for page in range(1,TOTAL_PAGE 1):
index_html = scrape_index(page)
detail_urls = parse_index(index_html)
for detail_url in detail_urls:
logging.info(f"get detail_url {detail_url}")
detail_html = scrape_detail(detail_url)
data = parse_detail(detail_html)
logging.info(f"get detail data {data}")
save_data(data)
最后附上完整代码:
代码语言:javascript复制import re
import time
import requests
import requests
import logging
from urllib.parse import urljoin
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s : %(message)s')
BASE_URL = 'https://ssr1.scrape.center'
TOTAL_PAGE = 10
def scrape_page(url):
logging.info(f'scapying {url}...')
try:
response = requests.get(url)
if response.status_code == 200:
return response.text
logging.error(f'无效状态码:{response.status_code} while scraping{url}')
except requests.RequestException:
logging.error(f"爬取{url}时出错",exc_info=True)
except Exception as e:
print("未知错误", e)
def scrape_index(page):
index_url = f'{BASE_URL}/page/{page}'
return scrape_page(index_url)
def scrape_detail(url):
return scrape_page(url)
def parse_index(html):
pattern = re.compile('<a.*?href="(.*?)".*?class="name">')
items = re.findall(pattern,html)
if not items:
return []
for item in items:
detail_url = urljoin(BASE_URL,item)
logging.info(f"get detail url {detail_url}")
yield detail_url
def parse_detail(html):
cover_pattern = re.compile('class="item.*?<img.*?src="(.*?)".*?class="cover">',re.S)
name_pattern = re.compile('<h2.*?>(.*?)</h2>')
categories_pattern = re.compile('<button.*?category.*?<span>(.*?)</span>.*?</button>',re.S)
published_at_pattern = re.compile('(d{4}-d{2}-d{2}s?上映)')
drama_pattern = re.compile('<div.*?drama.*?>.*?<p.*?>(.*?)</p>',re.S)
score_pattern = re.compile('<p.*?score.*?>(.*?)</p>',re.S)
cover = re.search(cover_pattern, html).group(1).strip()
if re.search(cover_pattern, html) else None
name = re.search(name_pattern,html).group(1).strip()
if re.search(name_pattern,html) else None
categories = re.findall(categories_pattern,html) if re.findall(categories_pattern,html) else []
published_at = re.search(published_at_pattern,html).group(1)
if re.search(published_at_pattern,html) else None
drama = re.search(drama_pattern, html).group(1).strip()
if re.search(drama_pattern, html) else None
score = re.search(score_pattern, html).group(1).strip()
if re.search(score_pattern, html) else None
score = float(score)
return {
'cover': cover,
'name': name,
'categories': categories,
'published_at': published_at,
'drama': drama,
'score': score
}
import json
from os import makedirs
from os.path import exists
RESULT_DIR = 'results'
exists(RESULT_DIR) or makedirs(RESULT_DIR)
def save_data(data):
name = data.get('name')
data_path = f'{RESULT_DIR}/{name}.json'
with open(data_path,'w',encoding='utf-8') as f:
json.dump(data,f, ensure_ascii=False, indent=2)
import multiprocessing
def main(page):
index_html = scrape_index(page)
detail_urls = parse_index(index_html)
for detail_url in detail_urls:
logging.info(f"get detail_url {detail_url}")
detail_html = scrape_detail(detail_url)
data = parse_detail(detail_html)
logging.info(f"get detail data {data}")
save_data(data)
if __name__ == '__main__':
pool = multiprocessing.Pool()
pages = range(1,TOTAL_PAGE 1)
pool.map(main, pages)
pool.close()
pool.join()
最后我们为了加速,将程序改成多进程multiprocessing
,并行爬取10个网页。
然而运行时发现,有些电影没有爬取下来,并且出现报错 服务器拒绝连接,猜测时作者的服务器负载有限,拒绝了一些请求。
豆瓣TOP250
我们用同样的思路去爬取豆瓣TOP250
起始页:
https://movie.douban.com/top250
翻页:
https://movie.douban.com/top250?start=25&filter=
https://movie.douban.com/top250?start=50&filter=
经过测试发现,只需要修改start参数就可以实现翻页。
此外,豆瓣有反爬虫机制,需要给response加上浏览器头
代码语言:javascript复制headers = {
'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36 QIHU 360SE'
}
response = requests.get(url,headers=header)
之后的流程都差不多,主要不同的地方在于写正则表达式匹配我们需要的信息。
完整代码:
代码语言:javascript复制import re
import time
import requests
import requests
import logging
from urllib.parse import urljoin
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s : %(message)s')
BASE_URL = 'https://movie.douban.com/top250'
TOTAL_PAGE = 10
def scrape_page(url):
logging.info(f'scapying {url}...')
try:
header = {'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)
Chrome/102.0.5005.63 Safari/537.36 Edg/102.0.1245.39"}
response = requests.get(url, headers=header)
if response.status_code == 200:
return response.text
logging.error(f'无效状态码:{response.status_code} while scraping{url}')
except requests.RequestException:
logging.error(f"爬取{url}时出错", exc_info=True)
except Exception as e:
print("未知错误", e)
def scrape_index(page):
index_url = f'{BASE_URL}?start={25 * page}'
return scrape_page(index_url)
def scrape_detail(url):
return scrape_page(url)
def parse_index(html):
pattern = re.compile('<div class="item">.*?<a href="(.*?)"', re.S)
items = re.findall(pattern, html)
if not items:
print("没找到匹配的连接")
return []
for item in items:
detail_url = item
logging.info(f"get detail url {detail_url}")
yield detail_url
def parse_detail(html):
name_pattern = re.compile('<span property="v:itemreviewed">(.*?)</span>', re.S)
categories_pattern = re.compile('<span property="v:genre">(.*?)</span>', re.S)
score_pattern = re.compile('<strong class="ll rating_num" property="v:average">(d.d)</strong>', re.S)
name = re.search(name_pattern, html).group(1).strip()
if re.search(name_pattern, html) else None
categories = re.findall(categories_pattern, html) if re.findall(categories_pattern, html) else []
score = re.search(score_pattern, html).group(1).strip()
if re.search(score_pattern, html) else None
score = float(score) if score else 0
return {
'name': name,
'categories': categories,
'score': score
}
import json
from os import makedirs
from os.path import exists
RESULT_DIR = 'results250'
exists(RESULT_DIR) or makedirs(RESULT_DIR)
def save_data(data):
name = data.get('name')
data_path = f'{RESULT_DIR}/{name}.json'
with open(data_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
import multiprocessing
def main(page):
index_html = scrape_index(page)
detail_urls = parse_index(index_html)
for detail_url in detail_urls:
logging.info(f"get detail_url {detail_url}")
detail_html = scrape_detail(detail_url)
data = parse_detail(detail_html)
logging.info(f"get detail data {data}")
save_data(data)
if __name__ == '__main__':
pool = multiprocessing.Pool()
pages = range(0, TOTAL_PAGE)
pool.map(main, pages)
pool.close() # 关闭进程池,不在接受新的进程
pool.join()
小结
总结本文用到的技术:
使用requests.get
获取网址内容:
def scrape_page(url):
logging.info(f'scapying {url}...')
try:
header = {'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)
Chrome/102.0.5005.63 Safari/537.36 Edg/102.0.1245.39"}
response = requests.get(url, headers=header)
if response.status_code == 200:
return response.text
logging.error(f'无效状态码:{response.status_code} while scraping{url}')
except requests.RequestException:
logging.error(f"爬取{url}时出错", exc_info=True)
except Exception as e:
print("未知错误", e)
使用re
,re.complie('正则表达式')
编写匹配模式,re.search(pattern, html)
或re.findall(pattern, html)
匹配符合pattern的内容。
Python 正则表达式 | 菜鸟教程 (runoob.com)
使用json
保存字典数据:
def save_data(data):
name = data.get('name')
data_path = f'{RESULT_DIR}/{name}.json'
with open(data_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
使用multiprocessing
库提供的多进程并行,加速爬虫:
pool = multiprocessing.Pool() # 创建Pool
pages = range(0, TOTAL_PAGE)
pool.map(main, pages) # 创建函数和参数映射
pool.close() # 关闭进程池,不在接受新的进程
pool.join() #开始并行