作为一个易上手的高性能爬虫框架,Scrapy 使用 Twisted 异步网络框架处理并发请求。
但是,在日常工作和面试过程中,经常发现有些同学会笃定地认为 Scrapy 采用的是多线程并发模型。实际上,虽然 Twisted 框架提供了线程池支持,但是其核心网络部分处理逻辑依赖的是「单线程 IO 多路复用」技术,在 Linux 平台上,是围绕 epoll()
系统调用实现的 Reactor 模式。
为了利用好 Scrapy 的异步任务能力,避免写出 “使用 urllib 和 requests 库完成 HTTP 请求” 这样的错误代码,本文将 Scrapy 各个组件的异步能力及可以使用什么样的异步技术进行一些总结。
可扩展组件:
Spider Middleware - 它是处于 Engine
和 Spider
之间的组件,可以用于处理 Spider
的输入 (response)和输出(item 和 request)。它一般可以用于:处理 Spider 回调函数的输出,可以用于修改、增加和删除 request 或者 item;处理 Spider.start_requests()
函数生成的 request;捕捉 Spider 回调函数抛出的异常等等。用户自己实现的 Spider Middleware 可以定义一个或多个如下方法:
process_spider_input(response, spider)
- 每个响应repsonse
进入 Spider 回调函数之前可由该 方法处理。process_spider_output(response, result, spider)
- Spider 处理完响应response
产生的结果result
可经该方法处理。process_spider_exception(response, exception, spider)
- Spider 回调函数、其它Spider Middleware
的process_spider_input
方法抛出的异常可由该方法处理。process_start_requests(start_requests, spider)
- Spider 启动后由start_requests()
方法产生 的Request
可经方法处理。
Downloader Middleware - 它是处于 Engine
和 Downloader
之间的组件,可以用于处理从 Engine
传递 给 Downloader
的 request 和从 Downloader
传递给 Engine
的 response。它一般可用于:处理即将发到网络上的请求;修改传递即将给 Spider 的响应数据;丢掉响应数据,然后生成一个新的请求;根据请求凭空构造一个响 应(并不发出实际的请求);丢弃某些请求等等。用户自己实现的 Downloader Middleware 可以定义一个或多个如下 方法:
process_request(request, spider)
- 这个方法可以处理每一个经过该中件间的 request。它可以返回None
、Response
实例、Request
实例或者抛出IgnoreRequest
异常。process_responsee(response, spider)
-这个方法可以处理每一个经过该中件间的 response。它可以返回Response
实例、Request
实例或者抛出IgnoreRequest
异常。process_exception(request, exception, spider)
- 这个方法可以处理下载器或者Downloader Middleware
的process_request
抛出的包括IgnoreRequest
在内的所有异常。它可以返回None
、Response
实例 或者Request
实例。
Item Pipeline - 它用于处理 Spider 生成的 item,对其进行清理、验证、持久化等处理。用户自己实现的Item Pipeline 可以定义一个或多个如下方法:
process_item(item, spider)
- 它用来处理 Spider 生成的 item。它可以返回字段类型的数据、Item
实例、Deferred
实例或者抛出DropItem
异常。open_spider(spider)
- Spider 打开时调用。close_spider(spider)
- Spider 关闭时调用。from_crawler(cls, crawler)
Scheduler - Scheduler接收来自engine的请求,并在engine请求它们时将它们排入队列以便稍后(也引导到engine)。
Extension - 提供了向 Scrapy 中插入自定义功能的机制。Extension 是普通的类,它们在 Scrapy 启动时实例化。 通常,Extension 实现向 Scrapy 注册信号处理函数,由信号触发完成相应工作。
Spider - Spiders是由Scrapy用户编写的自定义类,用于解析响应并从中提取items(也称为下载的items)或其他要跟进的requests。
异步手段
Twisted Deferred
我们本节主要汇总一下 Scrapy 中哪些可扩展组件支持返回 Deferred
对象。
Item Pipeline
对于 Item Pipeline,我们从文档中已经得知,用户自定义 Item Pipeline 的 process_item
可以返回 Deferred
实例。Item
在 pipeline 的处理本身就是由 Deferred
驱动的,作为其回调函数使用的 process_item
返回的 Deferred
便会插入到原始 Deferred
的处理流程中。
# scrapy.core.scraper.Scraper
def _process_spidermw_output(self, output, request, response, spider):
"""Process each Request/Item (given in the output parameter) returned
from the given spider
"""
if isinstance(output, Request):
self.crawler.engine.crawl(request=output, spider=spider)
elif isinstance(output, (BaseItem, dict)):
self.slot.itemproc_size = 1
dfd = self.itemproc.process_item(output, spider)
dfd.addBoth(self._itemproc_finished, output, response, spider)
return dfd
elif output is None:
pass
else:
###
Spider Middleware
对于 Spider Middleware,我们从文档得知,process_spider_input
和 process_spider_output
也均不能返回 Deferred
实例,这点我们从代码中也得到了印证:
# scrapy.core.spidermw.SpiderMiddlewareManager
def scrape_response(self, scrape_func, response, request, spider):
# 此处 scrape_func 实际引用 scrapy.core.scraper.Scrapyer.call_spider 函数
...
def process_spider_input(response):
for method in self.methods["process_spider_input"]:
...
result = method(response=response, spider=spider)
assert ###
...
return scrape_func(response, request, spider)
def process_spider_output(response):
for method in self.methods["process_spider_output")]:
result = method(response=response, exception=exception, spider=spider)
assert ###
...
# scrape_func 也就是 Scraper.call_spider 函数,会将 response 包装成 0.1 秒后触发的 `Deferred`
# 实例。这个 `Deferred` 实例由下面的 `mustbe_deferred` 函数直接返回。
dfd = mustbe_deferred(process_spider_input, response)
dfd.addErrback(process_spider_exception)
dfd.addCallback(process_spider_output)
return dfd
# scrapy.core.scraper.Scraper
def _scrape(self, response, request, spider):
# Engine 将 Downloader 的下载结果 response 交给 Scraper 后,传递到该函数
assert isinstance(response, (Response, Failure))
# 此处的 `Deferred` 实例依然是由 `call_spider` 创建的那个
dfd = self._scrape2(response, request, spider)
dfd.addErrback(self.handle_spider_error, request, response, spider)
dfd.addCallback(self.handle_spider_output, request, response, spider)
return dfd
def _scrape2(self, request_result, request, spider):
if not isinstance(request_result, Failure):
return self.spidermw.scrape_response(
self.call_spider, request_request, request, spider)
...
def call_spider(self, result, request, spider):
result.request = request
dfd = defer_result(result)
dfd.addCallbacks(request.callback or spider.parse, request.errback)
return dfd.addCallback(iterate_spider_output)
上述代码一直使用同一个 Deferred
实例,该实例由 call_spider
创建,延迟 0.1 秒后由 reactor 激活。 _scrape
函数返回后,在该 Deferred
实例上注册的 callback 和 errback 有:
callback errback
-------------------------------------------------------
request.callback or spider.parse request.errback
iterate_spider_output
scrape_repsonse.process_spider_exception
scrape_repsonse.process_spider_output
Scraper.handle_spider_error
Scraper.handle_spider_output
根据上面的代码摘录回调函数链,Spider Middleware 的 process_spider_input
的返回值必须是 None
值 或者抛出异常,这个结论是明确的。同时,它的 process_spider_output
的输出要交由 Scraper.handle_spider_output
函数处理,这个函数的逻辑如下:
# scrapy.core.scraper.Scraper
def handle_spider_output(self, result, request, response, spider):
...
it = iter_errback(result, self.handle_spider_error, request, response, spider)
dfd = parallel(it, self.concurrent_items,
self._process_spiderwm_output, request, response, spider)
return dfd
def _process_spidermw_output(self, output, request, response, spider):
"""Process each Request/Item (given in the output parameter) returned
from the given spider
"""
if isinstance(output, Request):
self.crawler.engine.crawl(request=output, spider=spider)
elif isinstance(output, (BaseItem, dict)):
self.slot.itemproc_size = 1
dfd = self.itemproc.process_item(output, spider)
dfd.addBoth(self._itemproc_finished, output, response, spider)
return dfd
elif output is None:
pass
else:
###
由 _process_spidermw_output
函数的逻辑可以看出,process_spider_output
如果返回 Request
实例、 BaseItem
实例 和 dict
实例以外的对象时,Scrapy 都当成错误并打错误日志。
Downloader Middleware
对 Downloader Middleware 来说,和 Spider Middleware 类似,文档也约定了用户实现的 process_request
和 process_response
函数不能返回 Deferred
实例。它的运行模式也和 Spider Middlerware 类似,但是 实现细节上却存在很大区别。
实际上,Downloader Middlerware 的 process_request
方法和 process_response
方法,是可以返回 Deferred
实例的。Scrapy 提供的一个下载中间件 scrapy.downloadermiddlewares.robotstxt
就利用了这种用 法,在发出实际请求之前,根据需求先去请求了网站的 robots.txt 文件。
接下来,我们从 Scrapy 这部分实现代码的角度证实一下这个结论。
首先,Engine
的 _download
方法调用 Downloader
开始请求下载。这个方法返回 Deferred
实例。
# scrapy.core.engine.ExecutionEngine
def _download(self, request, spider):
...
dwld = self.downloader.fetch(request, spider)
dwld.addCallbacks(_on_success)
dwld.addBoth(_on_complete)
return dwld
然后,Downloader
的 fetch
方法调用 DownloaderMiddlewareManager
的 download
方法构造用于处理当 前请求的 Deferred
实例及回调函数链。
# scrapy.core.downloader.__init__.Downloader
def fetch(self, request, spider):
...
dfd = self.middleware.download(self._enqueue_request, request, spider)
return dfd.addBoth(_deactivate)
# scrapy.core.downloader.middleware.DownloaderMiddlewareManager
def download(self, download_func, request, spider):
@defer.inlineCallbacks
def process_request(request):
for method in self.methods['process_request']:
response = yield method(request=request, spider=spider)
assert ###
(six.get_method_self(method).__class__.__name__, response.__class__.__name__)
if response:
defer.returnValue(response)
defer.returnValue((yield download_func(request=request,spider=spider)))
@defer.inlineCallbacks
def process_response(response):
assert response is not None, 'Received None in process_response'
if isinstance(response, Request):
defer.returnValue(response)
for method in self.methods['process_response']:
response = yield method(request=request, response=response,
spider=spider)
assert ###
(six.get_method_self(method).__class__.__name__, type(response))
if isinstance(response, Request):
defer.returnValue(response)
defer.returnValue(response)
@defer.inlineCallbacks
def process_exception(_failure):
exception = _failure.value
for method in self.methods['process_exception']:
response = yield method(request=request, exception=exception,
spider=spider)
assert ###
if response:
defer.returnValue(response)
defer.returnValue(_failure)
deferred = mustbe_deferred(process_request, request)
deferred.addErrback(process_exception)
deferred.addCallback(process_response)
return deferred
理清上面代码的关键是理解装饰器 twisted.internet.defer.inlineCallbacks
的用法。inlineCallbacks
装饰 的生成器函数被调用时,会返回一个生成器函数产生返回值时被激活的 Deferred
实例。
Your inlineCallbacks-enabled generator will return a
Deferred
object, which will result in the return value of the genrator (or will fail with a failure object if your generator raises an unhandled exception).
生成器函数中产生的 Deferred
实例使用 yield
等待求值,也就是说,inlineCallbacks
等待这些 Deferred
被激活后,将它的回调链产生的结果作为 yield
表达式的值返回。
When you call anything that results in a
Deferred
, you can simply yield it; your generator will automatically be resumed when the Deferred's result is availabe. The generator will be send the result of theDeferred
with thesend
method on generators, or if the result was a failure, "throw".
非 Deferred
类型的值也可以被 yield
处理,此时,inlineCallbacks
仅仅把它直接作为 yield
表达式的值。
Things that are not
Deferred
s may also be yielded, and your generator will be resumed with the same object sent back.
回到上面的 download
函数,mustbe_deferred(process_request, request)
返回的 Deferred
实例由装饰器inlineCallbacks
生成,并且在其装饰的生成器 process_request
调用 defer.returnValue
返回值或抛出异常 时被激动,继续执行后面的 callback 和 errback 链。而被 inlineCallbacks
装饰的生成器函数里被 yield
的 Deferred
实例由 inlineCallbacks
等待并求值。
这其中包括由 download_func
函数,也即,scrapy.core.downloader.Downloader._enqueue_request
函数生成 的 Deferred
实例。这个 Deferred
实例在对应请求被 Downloader
真正下载完成后,才被激活。
# scrapy.core.downloader.__init__.Downloader
def _process_queue(self, spider, slot):
...
while slot.queue and slot.free_transfer_slots() > 0:
...
request, deferred = slot.queue.popleft()
dfd = self._download(slot, request, spider)
dfd.chainDeferred(deferred)
...
综上,虽然 Downloader Middleware 的文档虽然并没有明确说明 process_request
、process_response
和process_exception
的返回值可以是 Deferred
类型,但是从上面对代码分析和 Scrapy 提供的一些下载中件间的 代码可以看出,这三个函数返回 Deferred
实例也是完全合法的。但是有点一点需要注意的时,这个 Deferred
实例 的最终返回值类型必须是 None
、Request
或 Response
的其中之一。
其它组件
Scrapy 框架上剩下的几个可扩展组件,Scheduler, Extension 和 Spider 也均不支持直接使用 Deferred
完成异步操作。
汇总
下面是 Scrapy 可扩展组件的方法返回 Deferred
实例的汇总表:
Twisted ThreadPool
Twisted 的 reactor 提供了线程池,用于执行那些无法使用非阻塞模式(本质上不支持非阻塞或者未能找到适合 Twisted 的非阻塞函数库)的操作。
Therefore, internally, Twisted makes very little use of threads. This is not to say that is makes no use of threads; there are plenty of APIs which have no non-blocking equivalent, so when Twisted needs to call those, it calls them in a thread. One prominent example of this is system host name resolution: unless you have configured Twisted to use its own DNS client in
twisted.names
, it will have to use your operating system's blocking APIs to map host names to IP addresses, in the reactor's thread pool. …Twisted does most things in one thread.
由上一节对 Twisted 的介绍我们知道,使用 Twisted 框架的程序基本上都是通过 reactor 循环驱动回调函数,完成业务逻辑。reactor 循环一般运行于主线程中,由 reactor.run()
函数启动, reactor.stop()
函数退出循环。
如果某它线程需要在 reactor 循环/线程中执行某函数时,这个线程需要使用 reactor.callFromThread
将此函数转 交给 reactor 线程:
代码语言:javascript复制
def callFromThread(callable, *args, **kw):
Cause a function to be executed by the reactor thread. Use this method when you want to run a function in the reactor's thread from another thread. … If you want to call a function in the next mainloop iteration, but you're in the same thread, use callLater
with a delay of 0.
如果在某个 reactor 循环的回调函数中需要执行某个阻塞操作时,可以使用 reactor.callInThread
函数将此阻塞操 作委托给独立线程:
代码语言:javascript复制
def callInThread(callable, *args, **kw):
Run the given callable object in a separate thread, with the given arguments and keyword arguments.
如果上面的场景下,需要在回调函数中获取阻塞操作的结果的话,这时可以使用 threads.deferToThread
函数。调用者 可以通过这个函数返回的 Deferred
实例获取阻塞操作的结果:
代码语言:javascript复制
def deferToThread(f, *args, **kwargs):
Run a function in a thread and return the result as a Deferred
. 另外,需要注意的是,这个函数使用 reactor 提供的线程池。
介绍完 Twisted 框架提供的线程接口后,我们回到 Scrapy 代码树。目前版本(1.4.0)的 Scrapy 核心代码中,只有 DNS 解析的功能使用了线程池:
代码语言:javascript复制# scrapy.crawler.CrawlProcess
def start(self, stop_after_crawl=True):
...
reactor.installResolver(self._get_dns_resolver())
def _get_dns_resolver(self):
...
return CachingThreadedResolver(
reactor=reactor,
cache_size=cache_size,
timeout=self.settings.getfloat("DNS_TIMEOUT")
)
# scrapy.resolver
from twisted.internet.base import ThreadedResolver
...
class CachingThreadedResolver(ThreadedResolver):
...
Scrapy 代码的非核心部分中,scrapy.pipelines.files
模块中的几个文件存储中间件也大量使用了线程池来处理阻塞 任务。使用线程可以简单地使用阻塞版本的各种客户端库和存储服务通信。
我们在业务中,经常开发 Pipeline 向 MySQL 数据库中写入数据。此时一般会使用 twisted.enterprise.dbapi
提供的非阻塞数据库操作。这个模块底层维护了一个独立于 reactor 线程池的线程池,并通过 threads.deferToThreadPool
将阻塞的数据库操作,也就是 Pipeline 中的数据库操作,委托给这个线程池处理。数据库的操作结果通过 Deferred
实 例告知调用者。
异步 Request
使用 Scrapy 开发针对业务开发爬取逻辑时,我们通过 Spider 向 Scrapy 提供初始的下载 URL 以驱动整个框架开始运转。获取到响应数据后,要从其中分析出新的 URL,然后构造 Request
实例,指定响应回调函数(callback 和errback),并交给 Scrapy 继续爬取。Scrapy 拿到 URL 的响应数据后,会调用回调函数,执行业务逻辑。
在这个过程中,我们不需要了解 Scrapy 的异步原理,就可以通过 Request
完成异步网络请求,使得整个过程非常高效。那么在 Scrapy 提供的可扩展组件中能否利用 Request
发起异步的网络请求呢?
首先,对于约定方法可以返回 Request
实例的扩展组件,我们只需要像开发 Spider 代码一样,为 Request
指定实现了业务逻辑的回调函数,然后将该 Request
作为方法返回值返回给 Scrapy 框架即可。
其次,对于约定方法不支持 Request
类型返回值的扩展组件,比如 Item Pipeline 或 Downloader Middleware, 我们可以利用这些约定方法支持 Deferred
类型返回值的特性,将网络请求和 Deferred
结合起来。网络请求完成后, 才激活该 Deferred
,这样原来的处理流程就可以继续向下进行了。
从 Scrapy 框架的代码中,我们可以找到这样的用法。
比如,scrapy.downloadermiddleware.robotstxt.RobotsTxtMiddleware
中间件就使用了这种方式。这个中间件的 主要任务是根据网站的 robots.txt 规则,判断当前即将发出的请求是否合法。robots.txt 文件由该中间件创建新 HTTP 请求下载。文件下载完成后,根据其中规则对原始请求进行检查,然后根据规则决定丢弃或继续原始请求的处理流程。
# scrapy.downloadermiddlewares.robotstxt.RobotsTxtMiddleware
def process_request(self, request, spider):
...
d = maybeDeferred(self, robot_parser, request, spider)
d.addCallback(self.process_request_2, request, spider)
return d
def robot_parser(self, request, spider)
...
if netloc not in self._parsers:
# 还未下载 netloc 对应的 robots.txt 文件时,创建请求该文件的 HTTP 请求。该请求使用
# Engine.download 函数处理,请求完成后,该函数返回的 Deferred 实例被激活。
self._parsers[netloc] = Deferred()
robotsurl = "%s://%s/robots.txt" % (url.scheme, url.netloc)
robotsreq = Request(robotsurl, priority=self.DOWNLOAD_PRIORITY, meta={'dont_obey_robotstxt': True})
dfd = self.crawler.engine.download(robotsreq, spider)
dfd.addCallback(self._parse_robots, netloc)
dfd.addErrback(self._logerror, robotsreq, spider)
dfd.addErrback(self._robots_error, netloc)
if isinstance(self._parsers[netloc], Deferred):
# 在 robots.txt 下载成功之前,Engine 发来的请求都会通过 Deferred 实例暂缓执行。这个
# Deferred 实例在 robots.txt 下载完成并在 _parse_robots 构建完成 RobotFileParser 对象
# 后被激活。
d = Deferred()
def cb(result):
d.callback(result)
return result
self._parsers[netloc].addCallback(cb)
return d
else:
# netloc 对应的 robots.txt 下载完成后,直接返回对应的 RobotFileParser 对象
return self._parsers[netloc]
def _parse_robots(self, response, netloc):
# robots.txt 下载完成后,使用该文件内容构造 RobotFileParser 对象
...
rp_dfd = self._parser[netloc]
self._parsers[netloc] = rp
rp_dfd.callback(rp)
def process_request_2(self, rp, request, spider):
...
raise IgnoreRequest()
最后,我们还可以在任何可扩展组件中构造请求 Request
对象,在其回调函数中实现业务逻辑。然后使用scrapy.core.engine.ExecutionEngine.crawl
函数将该请求交给 Scrapy 重新调度处理。Scrapy 使用和普通 Request
相同的逻辑处理该请求。实际上,在 scrapy.core.engine.ExecutionEngine
和 scrapy.core.scraper.Scraper
内部,都是使用该方法调度由 Spider Middleware 或 Downloader Middleware 生成的 Request
对象的。
另外,从上面的分析我们可以看到,scrapy.core.engine.ExecutionEngine
提供了两种提交 Request
并异步下载 该请求的方法。我们将其用法描述如下:
crawl(request, spider)
- 用户通过该方法向 Scrapy 提交请求,该请求和其它普通请求一样,由 Scrapy 框架统 一调度,由 Downloader 控制并发和发起频率等。并由 Downloader Middleware、Spider Middleware 等组件处 理。该方法无返回值,业务处理需要通过请求的回调函数完成。download(request, spider)
- 用户通过该方法向 Scrapy 提交的请求,直接交由 Downloader 处理,由其控制 并发和发起频率。该请求不会被 Spider Middleware 和 Scraper 处理,也就是说请求的回调函数不会被调用。该 方法返回Deferred
实例,请求的响应数据需要从该Deferred
实例中获取。