爬虫实战-豆瓣电影Top250

2022-12-06 09:29:07 浏览数 (1)

摘要

本文通过requests和re库实现了豆瓣电影top250的爬取。

首先是对书上案例进行学习,了解如何定位网站中我们需要的信息,并使用re提供的正则表达式匹配我们的信息。然后为了加快爬虫速度,我们使用了多进程multiprocessing。最后进入实战,对真实的网站进行爬取。在实战中,我们遇到了一些新问题,需要处理网站的反爬虫机制。

书上案例

《Python3 网络爬虫开发实战》(第二版)作者崔庆才搭建的平台Scrape Center。对爬虫感兴趣的可以看一看。

我们进入第一个案例Scrape | Movie。爬取top250电影。

网站分析

在使用代码爬取前,我们需要分析网站是怎么放置电影信息的:

这里我们先对作者搭建的一个网站进行爬取(学会后我们再对真实的豆瓣爬取):

进入网址https://ssr1.scrape.center,然后F12打开开发者模式:

(注:下图用红色箭头标注的是选择工具,我们点击选择工具然后选定左边的页面元素,就可以在右边看到对应的代码)

点击选择工具,然后选择一个电影,发现每一个电影都是一个div节点,class属性都有el-card

接着看一下详情页。我们点击标题就会进入详情页,我们发现标题节点的外层有一个a节点带有href属性(指向详情/detail/1),我们只要将前缀https://ssr1.scrape.center和其拼起来就得到详情网址了。

现在我们已经知道如何爬取一页的电影了。下面需要翻页,继续爬取下一页:

我们可以发现每一页的规律是https://ssr1.scrape.center/page/页码

只有最后的页码不一样。

代码实现:

在上面的分析之后,我们得到如下思路:

1.进入电影列表页面,获取每个电影的详情URL。

2.对10页电影列表进行相同操作。

代码语言:javascript复制
import re

import requests
import logging
from urllib.parse import urljoin
# logging 用来打印信息
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s : %(message)s')

BASE_URL = 'https://ssr1.scrape.center'
TOTAL_PAGE = 10

def scrape_page(url):
    logging.info(f'scapying {url}...')
    try:
        response = requests.get(url)
        if response.status_code == 200:
            return response.text
        logging.error(f'get invalid status code{response.status_code} while scraping{url}')
    except requests.RequestException:
        logging.error(f"error_orrurred while scraping{url}",exc_info=True)

def scrape_index(page):
    index_url = f'{BASE_URL}/page/{page}'
    return scrape_page(index_url)

def parse_index(html):
    pattern = re.compile('<a.*?href="(.*?)".*?class="name">')
    items = re.findall(pattern,html)
    if not items:
        return []
    for item in items:
        detail_url = urljoin(BASE_URL,item)
        logging.info(f"get detail url {detail_url}")
        yield  detail_url


def main():
    for page in range(1,TOTAL_PAGE 1):
        index_html = scrape_index(page)
        detail_urls = parse_index(index_html)
        logging.info(f"detail urls {list(detail_urls)}" )

if __name__ == '__main__':
    main()

现在已经获得了所有电影的详情页的url。下面对每个电影的详情页进行特征提取。

代码语言:javascript复制
def parse_detail(html):
    cover_pattern = re.compile('class="item.*?<img.*?src="(.*?)".*?class="cover">',re.S)
    name_pattern = re.compile('<h2.*?>(.*?)</h2>')
    categories_pattern = re.compile('<button.*?category.*?<span>(.*?)</span>.*?</button>',re.S)
    published_at_pattern = re.compile('(d{4}-d{2}-d{2}s?上映)')
    drama_pattern = re.compile('<div.*?drama.*?>.*?<p.*?>(.*?)</p>',re.S)
    score_pattern = re.compile('<p.*?score.*?>(.*?)</p>',re.S)
    cover = re.search(cover_pattern, html).group(1).strip() 
            if re.search(cover_pattern, html) else None
    name = re.search(name_pattern,html).group(1).strip()
            if re.search(name_pattern,html) else None
    categories = re.findall(categories_pattern,html) if re.findall(categories_pattern,html) else []
    published_at = re.search(published_at_pattern,html).group(1)
            if re.search(published_at_pattern,html) else None
    drama = re.search(drama_pattern, html).group(1).strip() 
            if re.search(drama_pattern, html) else None
    score = re.search(score_pattern, html).group(1).strip() 
            if re.search(score_pattern, html)  else None
    score = float(score)

    return {
        'cover': cover,
        'name': name,
        'categories': categories,
        'published_at': published_at,
        'drama': drama,
        'score': score
    }

保存电影详情到json文件中:

代码语言:javascript复制
import json
from os import makedirs
from os.path import exists
RESULT_DIR = 'results'
exists(RESULT_DIR) or makedirs(RESULT_DIR)

def save_data(data):
    name = data.get('name')
    data_path = f'{RESULT_DIR}/{name}.json'
    with open(data_path,'w',encoding='utf-8') as f:
        json.dump(data,f, ensure_ascii=False, indent=2)

为了逻辑清晰,我们定义一个函数scrape_detail用于爬取详情页。

代码语言:javascript复制
def scrape_detail(url):
    return scrape_page(url)

并且在main函数中调用爬取详情:

代码语言:javascript复制
def main():
    for page in range(1,TOTAL_PAGE 1):
        index_html = scrape_index(page)
        detail_urls = parse_index(index_html)

        for detail_url in detail_urls:
            logging.info(f"get detail_url {detail_url}")
            detail_html = scrape_detail(detail_url)
            data = parse_detail(detail_html)
            logging.info(f"get detail data {data}")
            save_data(data)

最后附上完整代码:

代码语言:javascript复制
import re
import time

import requests
import requests
import logging
from urllib.parse import urljoin

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s : %(message)s')

BASE_URL = 'https://ssr1.scrape.center'
TOTAL_PAGE = 10

def scrape_page(url):
    logging.info(f'scapying {url}...')
    try:
        response = requests.get(url)
        if response.status_code == 200:
            return response.text
        logging.error(f'无效状态码:{response.status_code} while scraping{url}')
    except requests.RequestException:
        logging.error(f"爬取{url}时出错",exc_info=True)
    except Exception as e:
        print("未知错误", e)

def scrape_index(page):
    index_url = f'{BASE_URL}/page/{page}'
    return scrape_page(index_url)

def scrape_detail(url):
    return scrape_page(url)

def parse_index(html):
    pattern = re.compile('<a.*?href="(.*?)".*?class="name">')
    items = re.findall(pattern,html)
    if not items:
        return []
    for item in items:
        detail_url = urljoin(BASE_URL,item)
        logging.info(f"get detail url {detail_url}")
        yield  detail_url




def parse_detail(html):
    cover_pattern = re.compile('class="item.*?<img.*?src="(.*?)".*?class="cover">',re.S)
    name_pattern = re.compile('<h2.*?>(.*?)</h2>')
    categories_pattern = re.compile('<button.*?category.*?<span>(.*?)</span>.*?</button>',re.S)
    published_at_pattern = re.compile('(d{4}-d{2}-d{2}s?上映)')
    drama_pattern = re.compile('<div.*?drama.*?>.*?<p.*?>(.*?)</p>',re.S)
    score_pattern = re.compile('<p.*?score.*?>(.*?)</p>',re.S)
    cover = re.search(cover_pattern, html).group(1).strip() 
            if re.search(cover_pattern, html) else None
    name = re.search(name_pattern,html).group(1).strip()
            if re.search(name_pattern,html) else None
    categories = re.findall(categories_pattern,html) if re.findall(categories_pattern,html) else []
    published_at = re.search(published_at_pattern,html).group(1)
            if re.search(published_at_pattern,html) else None
    drama = re.search(drama_pattern, html).group(1).strip() 
            if re.search(drama_pattern, html) else None
    score = re.search(score_pattern, html).group(1).strip() 
            if re.search(score_pattern, html)  else None
    score = float(score)

    return {
        'cover': cover,
        'name': name,
        'categories': categories,
        'published_at': published_at,
        'drama': drama,
        'score': score
    }

import json
from os import makedirs
from os.path import exists
RESULT_DIR = 'results'
exists(RESULT_DIR) or makedirs(RESULT_DIR)

def save_data(data):
    name = data.get('name')
    data_path = f'{RESULT_DIR}/{name}.json'
    with open(data_path,'w',encoding='utf-8') as f:
        json.dump(data,f, ensure_ascii=False, indent=2)

import multiprocessing

def main(page):
    index_html = scrape_index(page)
    detail_urls = parse_index(index_html)

    for detail_url in detail_urls:
        logging.info(f"get detail_url {detail_url}")
        detail_html = scrape_detail(detail_url)
        data = parse_detail(detail_html)
        logging.info(f"get detail data {data}")
        save_data(data)

if __name__ == '__main__':
    pool = multiprocessing.Pool()
    pages = range(1,TOTAL_PAGE 1)
    pool.map(main, pages)
    pool.close()
    pool.join()

最后我们为了加速,将程序改成多进程multiprocessing,并行爬取10个网页。

然而运行时发现,有些电影没有爬取下来,并且出现报错 服务器拒绝连接,猜测时作者的服务器负载有限,拒绝了一些请求。

豆瓣TOP250

我们用同样的思路去爬取豆瓣TOP250

起始页:

https://movie.douban.com/top250

翻页:

https://movie.douban.com/top250?start=25&filter=

https://movie.douban.com/top250?start=50&filter=

经过测试发现,只需要修改start参数就可以实现翻页。

此外,豆瓣有反爬虫机制,需要给response加上浏览器头

代码语言:javascript复制
headers = {
    'User-Agent':'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36 QIHU 360SE'
}
response = requests.get(url,headers=header)

之后的流程都差不多,主要不同的地方在于写正则表达式匹配我们需要的信息。

完整代码:

代码语言:javascript复制
import re
import time

import requests
import requests
import logging
from urllib.parse import urljoin

logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s : %(message)s')

BASE_URL = 'https://movie.douban.com/top250'
TOTAL_PAGE = 10


def scrape_page(url):
    logging.info(f'scapying {url}...')
    try:
        header = {'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) 
        Chrome/102.0.5005.63 Safari/537.36 Edg/102.0.1245.39"}
        response = requests.get(url, headers=header)
        if response.status_code == 200:
            return response.text
        logging.error(f'无效状态码:{response.status_code} while scraping{url}')
    except requests.RequestException:
        logging.error(f"爬取{url}时出错", exc_info=True)
    except Exception as e:
        print("未知错误", e)


def scrape_index(page):
    index_url = f'{BASE_URL}?start={25 * page}'
    return scrape_page(index_url)


def scrape_detail(url):
    return scrape_page(url)


def parse_index(html):
    pattern = re.compile('<div class="item">.*?<a href="(.*?)"', re.S)
    items = re.findall(pattern, html)
    if not items:
        print("没找到匹配的连接")
        return []
    for item in items:
        detail_url = item
        logging.info(f"get detail url {detail_url}")
        yield detail_url


def parse_detail(html):

    name_pattern = re.compile('<span property="v:itemreviewed">(.*?)</span>', re.S)
    categories_pattern = re.compile('<span property="v:genre">(.*?)</span>', re.S)

    score_pattern = re.compile('<strong class="ll rating_num" property="v:average">(d.d)</strong>', re.S)


    name = re.search(name_pattern, html).group(1).strip() 
        if re.search(name_pattern, html) else None
    categories = re.findall(categories_pattern, html) if re.findall(categories_pattern, html) else []

    score = re.search(score_pattern, html).group(1).strip() 
        if re.search(score_pattern, html) else None
    score = float(score) if score else 0

    return {   
        'name': name,
        'categories': categories,
        'score': score
    }


import json
from os import makedirs
from os.path import exists

RESULT_DIR = 'results250'
exists(RESULT_DIR) or makedirs(RESULT_DIR)


def save_data(data):
    name = data.get('name')
    data_path = f'{RESULT_DIR}/{name}.json'
    with open(data_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)


import multiprocessing


def main(page):
    index_html = scrape_index(page)
    detail_urls = parse_index(index_html)

    for detail_url in detail_urls:
        logging.info(f"get detail_url {detail_url}")
        detail_html = scrape_detail(detail_url)
        data = parse_detail(detail_html)
        logging.info(f"get detail data {data}")
        save_data(data)


if __name__ == '__main__':
    pool = multiprocessing.Pool()
    pages = range(0, TOTAL_PAGE)

    pool.map(main, pages)
    pool.close() # 关闭进程池,不在接受新的进程
    pool.join()

小结

总结本文用到的技术:

使用requests.get 获取网址内容:

代码语言:javascript复制
def scrape_page(url):
    logging.info(f'scapying {url}...')
    try:
        header = {'User-Agent': "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) 
        Chrome/102.0.5005.63 Safari/537.36 Edg/102.0.1245.39"}
        response = requests.get(url, headers=header)
        if response.status_code == 200:
            return response.text
        logging.error(f'无效状态码:{response.status_code} while scraping{url}')
    except requests.RequestException:
        logging.error(f"爬取{url}时出错", exc_info=True)
    except Exception as e:
        print("未知错误", e)

使用re,re.complie('正则表达式')编写匹配模式,re.search(pattern, html)re.findall(pattern, html)匹配符合pattern的内容。

Python 正则表达式 | 菜鸟教程 (runoob.com)

使用json保存字典数据:

代码语言:javascript复制
def save_data(data):
    name = data.get('name')
    data_path = f'{RESULT_DIR}/{name}.json'
    with open(data_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)

使用multiprocessing库提供的多进程并行,加速爬虫:

代码语言:javascript复制
    pool = multiprocessing.Pool() # 创建Pool
    pages = range(0, TOTAL_PAGE) 

    pool.map(main, pages) # 创建函数和参数映射
    pool.close() # 关闭进程池,不在接受新的进程
    pool.join() #开始并行

0 人点赞