[890]scrapy之pipeline的使用

2020-08-13 11:19:07 浏览数 (1)

scrapy的pipeline是一个非常重要的模块,主要作用是将return的items写入到数据库、文件等持久化模块,下面我们就简单的了解一下pipelines的用法。

pipeline核心方法

open_spider(self,spider) open_spider()方法是在Spider开启的时候被自动调用的。在这里我们可以做一些初始化操作,如开启数据库连接等。其中,参数spider就是被开启的Spider对象。

close_spider(self,spider) close_spider()方法是在Spider关闭的时候自动调用的。在这里我们可以做一些收尾工作,如关闭数据库连接等。其中,参数spider就是被关闭的Spider对象。

from_crawler(cls,crawler) 这个和我们在前面说spider的时候的用法是一样的,可以用于获取settings配置文件中的信息,需要注意的这个是一个类方法

from_crawler()方法是一个类方法,用@classmethod标识,是一种依赖注入的方式。它的参数是crawler,通过crawler对象,我们可以拿到Scrapy的所有核心组件,如全局配置的每个信息,然后创建一个Pipeline实例。参数cls就是Class,最后返回一个Class实例。

process_item(self,item,spider)

  • 每个item piple组件是一个独立的pyhton类,必须实现以process_item(self,item,spider)方法
  • 每个item pipeline组件都需要调用该方法,这个方法必须返回一个具有数据的dict,或者item对象,或者抛出DropItem异常,被丢弃的item将不会被之后的pipeline组件所处理
启用一个item Pipeline组件
代码语言:javascript复制
#配置MongoDB数据库的连接信息
MONGO_URL = '192.168.8.30'
MONGO_PORT = 27017
MONGO_DB = 'news'

ITEM_PIPELINES = {
    'myproject.pipelines.PricePipeline': 300,
    'myproject.pipelines.JsonWriterPipeline': 800,
}

每个pipeline后面有一个数值,这个数组的范围是0-1000,这个数值确定了他们的运行顺序,数字越小越优先

从pipeline的字典形式可以看出来,pipeline可以有多个,而且确实pipeline能够定义多个。

为什么需要多个pipeline:

1、一个spider的内容可能要做不同的操作,比如存入不同的数据库中 2、可能会有多个spider,不同的pipeline处理不同的item的内容

注意:

1、使用pipeline需要在setting.py中进行配置 2、pipeline的权重值越小优先级越高 3、pipeline中process_item不能修改为其他名称

例1:将item写入到MongoDB,并使用了from_crawler的用法

pipelines.py: 1、首先我们要从settings文件中读取数据的地址、端口、数据库名称(没有会自动创建)。 2、拿到数据库的基本信息后进行连接。 3、将数据写入数据库 4、关闭数据库 注意:只有打开和关闭是只执行一次,而写入操作会根据具体的写入次数而定。

代码语言:javascript复制
import pymongo
 
class MongoDBPipeline(object):
    """
    1、连接数据库操作
    """
    def __init__(self,mongourl,mongoport,mongodb):
        '''
        初始化mongodb数据的url、端口号、数据库名称
        :param mongourl:
        :param mongoport:
        :param mongodb:
        '''
        self.mongourl = mongourl
        self.mongoport = mongoport
        self.mongodb = mongodb
 
    @classmethod
    def from_crawler(cls,crawler):
        """
        1、读取settings里面的mongodb数据的url、port、DB。
        :param crawler:
        :return:
        """
        return cls(
            mongourl = crawler.settings.get("MONGO_URL"),
            mongoport = crawler.settings.get("MONGO_PORT"),
            mongodb = crawler.settings.get("MONGO_DB")
        )
 
    def open_spider(self,spider):
        '''
        1、连接mongodb数据
        :param spider:
        :return:
        '''
        self.client = pymongo.MongoClient(self.mongourl,self.mongoport)
        self.db = self.client[self.mongodb]
 
    def process_item(self,item,spider):
        '''
        1、将数据写入数据库
        :param item:
        :param spider:
        :return:
        '''
        name = item.__class__.__name__
        # self.db[name].insert(dict(item))
        self.db['user'].update({'url_token':item['url_token']},{'$set':item},True)
        return item
 
    def close_spider(self,spider):
        '''
        1、关闭数据库连接
        :param spider:
        :return:
        '''
        self.client.close()
例2:DropItem

这个例子实现的是判断item中是否包含price以及price_excludes_vat,如果存在则调整了price属性,都让item[‘price’] = item[‘price’] * self.vat_factor,如果不存在则返回DropItem

代码语言:javascript复制
from scrapy.exceptions import DropItem

class PricePipeline(object):

    vat_factor = 1.15

    def process_item(self, item, spider):
        if item['price']:
            if item['price_excludes_vat']:
                item['price'] = item['price'] * self.vat_factor
            return item
        else:
            raise DropItem("Missing price in %s" % item)
例3:将item写入到json文件中
代码语言:javascript复制
import json

class JsonWriterPipeline(object):

    def __init__(self):
        self.file = open('items.jl', 'wb')

    def process_item(self, item, spider):
        line = json.dumps(dict(item))   "n"
        self.file.write(line)
        return item
例4:使用 Scrapy 提供的 exporter 存储 Json 数据
代码语言:javascript复制
from scrapy.exporters import JsonItemExporter

class JsonExporterPipeline:
    # 调用 scrapy 提供的 json exporter 导出 json 文件
    def __init__(self):
        self.file = open('questions_exporter.json', 'wb')
        # 初始化 exporter 实例,执行输出的文件和编码
        self.exporter = JsonItemExporter(self.file,encoding='utf-8',ensure_ascii=False)
        # 开启倒数
        self.exporter.start_exporting()

    def close_spider(self, spider):
        self.exporter.finish_exporting()
        self.file.close()

    # 将 Item 实例导出到 json 文件
    def process_item(self, item, spider):
        self.exporter.export_item(item)
        return item

参考:https://www.cnblogs.com/lei0213/p/7899709.html https://blog.csdn.net/weixin_30664539/article/details/99345752 https://blog.csdn.net/Zhihua_W/article/details/103615741 https://blog.csdn.net/Ahri_J/article/details/72472170

0 人点赞