Scrapy_Study02

2023-11-30 19:02:39 浏览数 (1)

案例 jd图书爬虫

jd图书网站爬取比较容易,主要是数据的提取

spider 代码:

代码语言:javascript复制
import scrapy
from jdbook.pipelines import JdbookPipeline
import re
from copy import deepcopy


class JdbookspiderSpider(scrapy.Spider):
    name = 'jdbookspider'
    allowed_domains = ['jd.com']
    start_urls = ['https://book.jd.com/booksort.html']
	# 处理分类页面的数据
    def parse(self, response):
        # 这里借助了selenium 先访问jd图书网,因为直接get请求jdbook 获取到只是一堆js代码,没有有用的html元素,通过selenium正常访问网页,将page_source(就是当前网页的页面内容,selenium提供的属性)返回给spider进行数据处理
        # 处理大分类的列表页
        response_data, driver = JdbookPipeline.gain_response_data(url='https://book.jd.com/booksort.html')
        driver.close()
        item = {}
        # 由于selenium返回的page_source是字符串,所以不能直接使用xpath,使用了正则(也可以借助bs4 再使用正则)
        middle_group_link = re.findall('<em>.*?<a href="(.*?)">.*?</a>.*?</em>', response_data, re.S)
        big_group_name = re.findall('<dt>.*?<a href=".*?">(.*?)</a>.*?<b>.*?</b>.*?</dt>', response_data, re.S)
        big_group_link = re.findall('<dt>.*?<a href=".*?channel.jd.com/(.*?).html">.*?</a>.*?<b>.*?</b>.*?</dt>', response_data, re.S)
        middle_group_name = re.findall('<em>.*?<a href=".*?">(.*?)</a>.*?</em>', response_data, re.S)
        for i in range(len(middle_group_link)):
            var = str(middle_group_link[i])
            var1 = var[:var.find("com")   4]
            var2 = var[var.find("com")   4:]
            var3 = var2.replace("-", ",").replace(".html", "")
            var_end = "https:"   var1   "list.html?cat="   var3
            for j in range(len(big_group_name)):
                temp_ = var_end.find(str(big_group_link[j]).replace("-", ","))
                if temp_ != -1:
                    item["big_group_name"] = big_group_name[j]
                    item["big_group_link"] = big_group_link[j]
            item["middle_group_link"] = var_end
            item["middle_group_name"] = middle_group_name[i]
            # 请求大分组下的小分组的详情页
            if var_end is not None:
                yield scrapy.Request(
                    var_end,
                    callback=self.parse_detail,
                    meta={"item": deepcopy(item)}
                )
	# 处理图书列表页的数据
    def parse_detail(self, response):
        print(response.url)
        item = response.meta["item"]
        detail_name_list = re.findall('<div class="gl-i-wrap">.*?<div class="p-name">.*?<a target="_blank" title=".*?".*?<em>(.*?)</em>', response.body.decode(), re.S)
        detail_content_list = re.findall(
            '<div class="gl-i-wrap">.*?<div class="p-name">.*?<a target="_blank" title="(.*?)"', response.body.decode(),
            re.S)
        detail_link_list = re.findall('<div class="gl-i-wrap">.*?<div class="p-name">.*?<a target="_blank" title=".*?" href="(.*?)"', response.body.decode(), re.S)
        detail_price_list = re.findall('<div class="p-price">.*?<strong class="J_.*?".*?data-done="1".*?>.*?<em>¥</em>.*?<i>(.*?)</i>', response.body.decode(), re.S)
        page_number_end = re.findall('<span class="fp-text">.*?<b>.*?</b>.*?<em>.*?</em>.*?<i>(.*?)</i>.*?</span>', response.body.decode(), re.S)[0]
        print(len(detail_price_list))
        print(len(detail_name_list))
        for i in range(len(detail_name_list)):
            detail_link = detail_link_list[i]
            item["detail_name"] = detail_name_list[i]
            item["detail_content"] = detail_content_list[i]
            item["detail_link"] = "https:"   detail_link
            item["detail_price"] = detail_price_list[i]
            yield item
        # 翻页
        for i in range(int(page_number_end)):

            next_url = item["middle_group_link"]   "&page="   str(2*(i 1)   1)   "&s="   str(60*(i 1))   "&click=0"
            yield scrapy.Request(
                next_url,
                callback=self.parse_detail,
                meta={"item": deepcopy(item)}
            )

pipeline 代码:

代码语言:javascript复制
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html


# useful for handling different item types with a single interface
import csv

from itemadapter import ItemAdapter
from selenium import webdriver
import time


class JdbookPipeline:
 	# 将数据写入csv文件
    def process_item(self, item, spider):
        with open('./jdbook.csv', 'a ', encoding='utf-8') as file:
            fieldnames = ['big_group_name', 'big_group_link', 'middle_group_name', 'middle_group_link', 'detail_name',
                          'detail_content', 'detail_link', 'detail_price']
            writer = csv.DictWriter(file, fieldnames=fieldnames)
            writer.writerow(item)
        return item

    def open_spider(self, spider):
        with open('./jdbook.csv', 'w ', encoding='utf-8') as file:
            fieldnames = ['big_group_name', 'big_group_link', 'middle_group_name', 'middle_group_link', 'detail_name',
                          'detail_content', 'detail_link', 'detail_price']
            writer = csv.DictWriter(file, fieldnames=fieldnames)
            writer.writeheader()
	# 提供的正常访问jdbook 方法,借助selenium
    @staticmethod
    def gain_response_data(url):
        drivers = webdriver.Chrome("E:python_studyspiderdatachromedriver_win32chromedriver.exe")
        drivers.implicitly_wait(2)
        drivers.get(url)
        drivers.implicitly_wait(2)
        time.sleep(2)
        # print(tb_cookie)
        return drivers.page_source, drivers
案例 当当图书爬虫

当当网的爬取也是比较容易, 但是这里需要结合scrapy-redis来实现分布式爬取数据

代码语言:javascript复制
import urllib
from copy import deepcopy
import scrapy
from scrapy_redis.spiders import RedisSpider
import re


# 不再是继承Spider类,而是继承自scrapy_redis的RedisSpider类
class DangdangspiderSpider(RedisSpider):
    name = 'dangdangspider'
    allowed_domains = ['dangdang.com']
    # http://book.dangdang.com/
    # 同时,start_urls 也不在使用, 而是定义一个redis_key, spider要爬取的request对象就以该值为key, url为值存储在redis中,spider爬取时就从redis 中获取
    redis_key = "dangdang"
	# 处理图书分类数据
    def parse(self, response):
        div_list = response.xpath("//div[@class='con flq_body']/div")
        for div in div_list:
            item = {}
            item["b_cate"] = div.xpath("./dl/dt//text()").extract()
            item["b_cate"] = [i.strip() for i in item["b_cate"] if len(i.strip()) > 0]
            # 中间分类分组
            if len(item["b_cate"]) > 0:
                div_data = str(div.extract())
                dl_list = re.findall('''<dl class="inner_dl" ddt-area="d " dd_name=".*?">.*?<dt>(.*?)</dt>''',
                                     div_data, re.S)
                for dl in dl_list:
                    if len(str(dl)) > 100:
                        dl = re.findall('''.*?title="(.*?)".*?''', dl, re.S)
                    item["m_cate"] = str(dl).replace(" ", "").replace("rn", "")
                    # 小分类分组
                    a_link_list = re.findall(
                        '''<a class=".*?" href="(.*?)" target="_blank" title=".*?" nname=".*?" ddt-src=".*?">.*?</a>''',
                        div_data, re.S)
                    a_cate_list = re.findall(
                        '''<a class=".*?" href=".*?" target="_blank" title=".*?" nname=".*?" ddt-src=".*?">(.*?)</a>''',
                        div_data, re.S)
                    print(a_cate_list)
                    print(a_link_list)
                    for a in range(len(a_link_list)):
                        item["s_href"] = a_link_list[a]
                        item["s_cate"] = a_cate_list[a]

                        if item["s_href"] is not None:
                            yield scrapy.Request(
                                item["s_href"],
                                callback=self.parse_book_list,
                                meta={"item": deepcopy(item)}
                            )
	# 处理图书列表页数据
    def parse_book_list(self, response):
        item = response.meta["item"]
        li_list = response.xpath("//ul[@class='bigimg']/li")
        # todo 改进,对不同的图书列表页做不同的处理
        # if li_list is None:
        #     print(True)

        for li in li_list:
            item["book_img"] = li.xpath('./a[1]/img/@src').extract_first()
            if item["book_img"] is None:
                item["book_img"] = li.xpath("//ul[@class='list_aa ']/li").extract_first()
            item["book_name"] = li.xpath("./p[@class='name']/a/@title").extract_first()
            item["book_desc"] = li.xpath("./p[@class='detail']/text()").extract_first()
            item["book_price"] = li.xpath(".//span[@class='search_now_price']/text()").extract_first()
            item["book_author"] = li.xpath("./p[@class='search_book_author']/span[1]/a/text()").extract_first()
            item["book_publish_date"] = li.xpath("./p[@class='search_book_author']/span[2]/text()").extract_first()
            item["book_press"] = li.xpath("./p[@class='search_book_author']/span[3]/a/text()").extract_first()
            next_url = response.xpath("//li[@class='next']/a/@href").extract_first()
            yield item
            if next_url is not None:
                next_url = urllib.parse.urljoin(response.url, next_url)
                yield scrapy.Request(
                    next_url,
                    callback=self.parse_book_list,
                    meta={"item": item}
                )

pipeline 代码:

代码语言:javascript复制
import csv

from itemadapter import ItemAdapter


class DangdangbookPipeline:
	# 将数据写入到csv文件中
    def process_item(self, item, spider):
        with open('./dangdangbook.csv', 'a ', encoding='utf-8', newline='') as file:
            fieldnames = ['b_cate', 'm_cate', 's_cate', 's_href', 'book_img', 'book_name', 'book_desc', 'book_price', 'book_author', 'book_publish_date', 'book_press']
            writer = csv.DictWriter(file, fieldnames=fieldnames)
            writer.writerow(item)
        return item

    def open_spider(self, spider):
        with open('./dangdangbook.csv', 'w ', encoding='utf-8', newline='') as file:
            fieldnames = ['b_cate', 'm_cate', 's_cate', 's_href', 'book_img', 'book_name', 'book_desc', 'book_price',
                          'book_author', 'book_publish_date', 'book_press']
            writer = csv.DictWriter(file, fieldnames=fieldnames)
            writer.writeheader()

settings 代码:

代码语言:javascript复制
BOT_NAME = 'dangdangbook'

SPIDER_MODULES = ['dangdangbook.spiders']
NEWSPIDER_MODULE = 'dangdangbook.spiders'
# 需要scrapy-redis 的去重功能,这里引用
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
# 以及调度器
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
SCHEDULER_PERSIST = True
# LOG_LEVEL = 'WARNING'
# 设置redis 的服务地址
REDIS_URL = 'redis://127.0.0.1:6379'
# Crawl responsibly by identifying yourself (and your website) on the user-agent
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36'

# Obey robots.txt rules
ROBOTSTXT_OBEY = False

ITEM_PIPELINES = {
   'dangdangbook.pipelines.DangdangbookPipeline': 300,
}

crontab 定时执行

以上都在Linux平台的直接操作crontab。

在python环境下我们可以借助pycrontab 来操作crontab 来设置定时任务。

补充

自定义的excel 到导出文件格式代码:

代码语言:javascript复制
from scrapy.exporters import BaseItemExporter
import xlwt
class ExcelItemExporter(BaseItemExporter):
	def __init__(self, file, **kwargs):
		self._configure(kwargs)
		self.file = file
		self.wbook = xlwt.Workbook()
		self.wsheet = self.wbook.add_sheet('scrapy')
		self.row = 0
	def finish_exporting(self):
		self.wbook.save(self.file)
	def export_item(self, item):
		fields = self._get_serialized_fields(item)
		for col, v in enumerate(x for _, x in fields):
			self.wsheet.write(self.row, col, v)
		self.row  = 1

0 人点赞