案例 jd图书爬虫
jd图书网站爬取比较容易,主要是数据的提取
spider 代码:
代码语言:javascript复制import scrapy
from jdbook.pipelines import JdbookPipeline
import re
from copy import deepcopy
class JdbookspiderSpider(scrapy.Spider):
name = 'jdbookspider'
allowed_domains = ['jd.com']
start_urls = ['https://book.jd.com/booksort.html']
# 处理分类页面的数据
def parse(self, response):
# 这里借助了selenium 先访问jd图书网,因为直接get请求jdbook 获取到只是一堆js代码,没有有用的html元素,通过selenium正常访问网页,将page_source(就是当前网页的页面内容,selenium提供的属性)返回给spider进行数据处理
# 处理大分类的列表页
response_data, driver = JdbookPipeline.gain_response_data(url='https://book.jd.com/booksort.html')
driver.close()
item = {}
# 由于selenium返回的page_source是字符串,所以不能直接使用xpath,使用了正则(也可以借助bs4 再使用正则)
middle_group_link = re.findall('<em>.*?<a href="(.*?)">.*?</a>.*?</em>', response_data, re.S)
big_group_name = re.findall('<dt>.*?<a href=".*?">(.*?)</a>.*?<b>.*?</b>.*?</dt>', response_data, re.S)
big_group_link = re.findall('<dt>.*?<a href=".*?channel.jd.com/(.*?).html">.*?</a>.*?<b>.*?</b>.*?</dt>', response_data, re.S)
middle_group_name = re.findall('<em>.*?<a href=".*?">(.*?)</a>.*?</em>', response_data, re.S)
for i in range(len(middle_group_link)):
var = str(middle_group_link[i])
var1 = var[:var.find("com") 4]
var2 = var[var.find("com") 4:]
var3 = var2.replace("-", ",").replace(".html", "")
var_end = "https:" var1 "list.html?cat=" var3
for j in range(len(big_group_name)):
temp_ = var_end.find(str(big_group_link[j]).replace("-", ","))
if temp_ != -1:
item["big_group_name"] = big_group_name[j]
item["big_group_link"] = big_group_link[j]
item["middle_group_link"] = var_end
item["middle_group_name"] = middle_group_name[i]
# 请求大分组下的小分组的详情页
if var_end is not None:
yield scrapy.Request(
var_end,
callback=self.parse_detail,
meta={"item": deepcopy(item)}
)
# 处理图书列表页的数据
def parse_detail(self, response):
print(response.url)
item = response.meta["item"]
detail_name_list = re.findall('<div class="gl-i-wrap">.*?<div class="p-name">.*?<a target="_blank" title=".*?".*?<em>(.*?)</em>', response.body.decode(), re.S)
detail_content_list = re.findall(
'<div class="gl-i-wrap">.*?<div class="p-name">.*?<a target="_blank" title="(.*?)"', response.body.decode(),
re.S)
detail_link_list = re.findall('<div class="gl-i-wrap">.*?<div class="p-name">.*?<a target="_blank" title=".*?" href="(.*?)"', response.body.decode(), re.S)
detail_price_list = re.findall('<div class="p-price">.*?<strong class="J_.*?".*?data-done="1".*?>.*?<em>¥</em>.*?<i>(.*?)</i>', response.body.decode(), re.S)
page_number_end = re.findall('<span class="fp-text">.*?<b>.*?</b>.*?<em>.*?</em>.*?<i>(.*?)</i>.*?</span>', response.body.decode(), re.S)[0]
print(len(detail_price_list))
print(len(detail_name_list))
for i in range(len(detail_name_list)):
detail_link = detail_link_list[i]
item["detail_name"] = detail_name_list[i]
item["detail_content"] = detail_content_list[i]
item["detail_link"] = "https:" detail_link
item["detail_price"] = detail_price_list[i]
yield item
# 翻页
for i in range(int(page_number_end)):
next_url = item["middle_group_link"] "&page=" str(2*(i 1) 1) "&s=" str(60*(i 1)) "&click=0"
yield scrapy.Request(
next_url,
callback=self.parse_detail,
meta={"item": deepcopy(item)}
)
pipeline 代码:
代码语言:javascript复制# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
# See: https://docs.scrapy.org/en/latest/topics/item-pipeline.html
# useful for handling different item types with a single interface
import csv
from itemadapter import ItemAdapter
from selenium import webdriver
import time
class JdbookPipeline:
# 将数据写入csv文件
def process_item(self, item, spider):
with open('./jdbook.csv', 'a ', encoding='utf-8') as file:
fieldnames = ['big_group_name', 'big_group_link', 'middle_group_name', 'middle_group_link', 'detail_name',
'detail_content', 'detail_link', 'detail_price']
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writerow(item)
return item
def open_spider(self, spider):
with open('./jdbook.csv', 'w ', encoding='utf-8') as file:
fieldnames = ['big_group_name', 'big_group_link', 'middle_group_name', 'middle_group_link', 'detail_name',
'detail_content', 'detail_link', 'detail_price']
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
# 提供的正常访问jdbook 方法,借助selenium
@staticmethod
def gain_response_data(url):
drivers = webdriver.Chrome("E:python_studyspiderdatachromedriver_win32chromedriver.exe")
drivers.implicitly_wait(2)
drivers.get(url)
drivers.implicitly_wait(2)
time.sleep(2)
# print(tb_cookie)
return drivers.page_source, drivers
案例 当当图书爬虫
当当网的爬取也是比较容易, 但是这里需要结合scrapy-redis来实现分布式爬取数据
代码语言:javascript复制import urllib
from copy import deepcopy
import scrapy
from scrapy_redis.spiders import RedisSpider
import re
# 不再是继承Spider类,而是继承自scrapy_redis的RedisSpider类
class DangdangspiderSpider(RedisSpider):
name = 'dangdangspider'
allowed_domains = ['dangdang.com']
# http://book.dangdang.com/
# 同时,start_urls 也不在使用, 而是定义一个redis_key, spider要爬取的request对象就以该值为key, url为值存储在redis中,spider爬取时就从redis 中获取
redis_key = "dangdang"
# 处理图书分类数据
def parse(self, response):
div_list = response.xpath("//div[@class='con flq_body']/div")
for div in div_list:
item = {}
item["b_cate"] = div.xpath("./dl/dt//text()").extract()
item["b_cate"] = [i.strip() for i in item["b_cate"] if len(i.strip()) > 0]
# 中间分类分组
if len(item["b_cate"]) > 0:
div_data = str(div.extract())
dl_list = re.findall('''<dl class="inner_dl" ddt-area="d " dd_name=".*?">.*?<dt>(.*?)</dt>''',
div_data, re.S)
for dl in dl_list:
if len(str(dl)) > 100:
dl = re.findall('''.*?title="(.*?)".*?''', dl, re.S)
item["m_cate"] = str(dl).replace(" ", "").replace("rn", "")
# 小分类分组
a_link_list = re.findall(
'''<a class=".*?" href="(.*?)" target="_blank" title=".*?" nname=".*?" ddt-src=".*?">.*?</a>''',
div_data, re.S)
a_cate_list = re.findall(
'''<a class=".*?" href=".*?" target="_blank" title=".*?" nname=".*?" ddt-src=".*?">(.*?)</a>''',
div_data, re.S)
print(a_cate_list)
print(a_link_list)
for a in range(len(a_link_list)):
item["s_href"] = a_link_list[a]
item["s_cate"] = a_cate_list[a]
if item["s_href"] is not None:
yield scrapy.Request(
item["s_href"],
callback=self.parse_book_list,
meta={"item": deepcopy(item)}
)
# 处理图书列表页数据
def parse_book_list(self, response):
item = response.meta["item"]
li_list = response.xpath("//ul[@class='bigimg']/li")
# todo 改进,对不同的图书列表页做不同的处理
# if li_list is None:
# print(True)
for li in li_list:
item["book_img"] = li.xpath('./a[1]/img/@src').extract_first()
if item["book_img"] is None:
item["book_img"] = li.xpath("//ul[@class='list_aa ']/li").extract_first()
item["book_name"] = li.xpath("./p[@class='name']/a/@title").extract_first()
item["book_desc"] = li.xpath("./p[@class='detail']/text()").extract_first()
item["book_price"] = li.xpath(".//span[@class='search_now_price']/text()").extract_first()
item["book_author"] = li.xpath("./p[@class='search_book_author']/span[1]/a/text()").extract_first()
item["book_publish_date"] = li.xpath("./p[@class='search_book_author']/span[2]/text()").extract_first()
item["book_press"] = li.xpath("./p[@class='search_book_author']/span[3]/a/text()").extract_first()
next_url = response.xpath("//li[@class='next']/a/@href").extract_first()
yield item
if next_url is not None:
next_url = urllib.parse.urljoin(response.url, next_url)
yield scrapy.Request(
next_url,
callback=self.parse_book_list,
meta={"item": item}
)
pipeline 代码:
代码语言:javascript复制import csv
from itemadapter import ItemAdapter
class DangdangbookPipeline:
# 将数据写入到csv文件中
def process_item(self, item, spider):
with open('./dangdangbook.csv', 'a ', encoding='utf-8', newline='') as file:
fieldnames = ['b_cate', 'm_cate', 's_cate', 's_href', 'book_img', 'book_name', 'book_desc', 'book_price', 'book_author', 'book_publish_date', 'book_press']
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writerow(item)
return item
def open_spider(self, spider):
with open('./dangdangbook.csv', 'w ', encoding='utf-8', newline='') as file:
fieldnames = ['b_cate', 'm_cate', 's_cate', 's_href', 'book_img', 'book_name', 'book_desc', 'book_price',
'book_author', 'book_publish_date', 'book_press']
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
settings 代码:
代码语言:javascript复制BOT_NAME = 'dangdangbook'
SPIDER_MODULES = ['dangdangbook.spiders']
NEWSPIDER_MODULE = 'dangdangbook.spiders'
# 需要scrapy-redis 的去重功能,这里引用
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
# 以及调度器
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
SCHEDULER_PERSIST = True
# LOG_LEVEL = 'WARNING'
# 设置redis 的服务地址
REDIS_URL = 'redis://127.0.0.1:6379'
# Crawl responsibly by identifying yourself (and your website) on the user-agent
USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.4389.82 Safari/537.36'
# Obey robots.txt rules
ROBOTSTXT_OBEY = False
ITEM_PIPELINES = {
'dangdangbook.pipelines.DangdangbookPipeline': 300,
}
crontab 定时执行
以上都在Linux平台的直接操作crontab。
在python环境下我们可以借助pycrontab 来操作crontab 来设置定时任务。
补充
自定义的excel 到导出文件格式代码:
代码语言:javascript复制from scrapy.exporters import BaseItemExporter
import xlwt
class ExcelItemExporter(BaseItemExporter):
def __init__(self, file, **kwargs):
self._configure(kwargs)
self.file = file
self.wbook = xlwt.Workbook()
self.wsheet = self.wbook.add_sheet('scrapy')
self.row = 0
def finish_exporting(self):
self.wbook.save(self.file)
def export_item(self, item):
fields = self._get_serialized_fields(item)
for col, v in enumerate(x for _, x in fields):
self.wsheet.write(self.row, col, v)
self.row = 1