为什么要用分布式爬虫
学习爬虫已经有一段时间了,之前的爬虫都是一个python文件就实现的,没考虑性能,效率之类的。所以作为一个合格的spider,需要学习一下分布式爬虫。
什么分布式爬虫?简单地说就是用多台服务器去获取数据,让这些服务器去协同,分配各自的任务。
分布式爬虫设计
最常用的一种就是主从分布式爬虫,本文将使用Redis服务器来作为任务队列。
如图:
准备工作
安装python3和Redis
安装requests与Redis相关的库
pip install requests
pip install pyquery
pip install redis
代码
主函数(master.py)
代码语言:javascript复制import os
import requests
from pyquery import PyQuery as pq
import re
import json
import config
from cache import RedisCache
from model import Task
def parse_link(div):
获取连接
e pq(div)
href e.find('a').attr('href')
return href
def get_from_url(wrl):
获取列表连接
page get_page(url)
e pq(page)
items e('.epiltem.video')
links [parse_link(i) for i in items]
print(len(links))
links.reverse()
return links
def get_page(wrl):
获取页面
proxies config.proxies
try:
res requests.get(url,proxies = proxies)
# print(res.text)
except requests.exceptions.ConnectionError as e:
print('Error',e.args)
page res.content
return page
def get_all_file(patht
fiieList []):
获取目录下所有文件
get_dir os.listdir(path) #適历当前目录,获取文件列表
for i in get_dir:
sub_dir os.path.join(path,i) #把第一步获取的文件加入路径
# print(sub_dir)
if os.path.isdir(sub_dir): #如果当前仍然是文件夹,递归调用
get_all_file(sub_dir, fiieList)
else:
ax os.path.abspath(sub_dii') #如果当前路径不是文件夹,则把文件名放入列表
# print(ax)
fiieList.append(ax)
return fiieList
def init_finish_task(path):
初始化已结束任务
redis—cache RedisCache()
fiieList []
fiieList get_all_file(path, fiieList)
# print(fileList)
for file in fiieList:
file_name os.path.basename(file)
task_id file_name[:5]
# print(task_id)
redis_cache.sadd('Task:finish', task_id)
print('init_finish_task...end')
def init_task_url():
初始化已结束任务url
redis—cache RedisCache()
url config.list_url
link_list get_f rom_u r1(u r1)
for link in link_list:
task_url config.task_url_head link
# print(task_url)
task_id task_url[ 5:]
# redis—cache.set(_Task:id:{}:url_.format(task_id),task_url)
t task_from_url(task_url)
# print('add task {}'.format(t._diet_))
print('add task_id {}'.format(task_id))
redis_cache.set('Task:id:{}'.format(task_id), t._diet_)
# print(t)
print('init_task_url...end')
def task_from_url(tosfe_wrl):
获取任务
page get_page(task_url)
e pq(page)
task_id task_url[ 5:]
title e('.controlBar').find('.epi-title').text().replace('/'> '-')•neplace(')
file_url e('.audioplayer').find('audio').attr('sre')
ext file_url[ 4:]
file_name task_id '.' title ext
# content = e('.epi-description').html()
t Task()
t.id task一id
t.title title
t.url task_url
t.file_name file_name
t.file_url file_url
# t.content = content
return t
def main():
init_task_url()
init_finish_task(config.down_folder)
if _name_ '_main_':
main()
从函数(salver.py)
代码语言:javascript复制import os
import requests
from pyquery import PyQuery as pq
import re
import json
import config
from cache import RedisCache
def get_page(wrl):
获取页面
proxies config.proxies
try:
res requests.get(url,proxies = proxies)
# print(res.text)
except requests.exceptions.ConnectionError as e:
print('Error',e.args)
page res.content
return page
def begin_task(tosfe_id, fiLe_name, fiie_uri):
print('begin task {}'.format(task_id))
redis—cache RedisCache()
#添力5到正在下载列表
redis_cache.sadd('Task:begin', task_id)
print('download...format(file_name))
folder config.down_folder
path os.path.join(folder, file_name)
download(file_url, path)
print('end task {}'.format(task_id))
def do-/nload(linfe, path):
redis—cache RedisCache()
proxies config.proxies
if os.path.exists(path):
print('file exist')
else:
try:
r requests.get(link,proxies = proxies,streom=True)
total—length int(r.headers[.Content-Length.])
with open(path, "wb") as code:
code.write(r.content)
#文件是否完整
length os.path.getsize(path)
print('length={}'.format(length))
print('total_length={}'.format(total_length))
if total_length ! length:
#刪1^旧文件
os.remove(path)
#重新下载
download(path^ link)
else:
print('download success')
#添加到已下载
file_name os.path.basename(path)
content_id file_name[:5]
redis—cache.srem('Task:begin_, content_id)
redis—cache.sadd(*Task:finish *, content_id)
# print(r.text)
except requests.exceptions.ConnectionError as e:
print('Error',e.args)
def main():
redis—cache RedisCache()
#检^任务列表是否在已下载列表中
keys redis—cache.keys(’Task:id:[0-9]* _)
# print(keys)
new_key [key.decode〇 for key in keys]
# print(new_key)
#按1(1排序_
new_key sorted(new_key,feey = Lambda split(':')[2]))
# print(new_key)
for key in new_key:
task_id key.split(•:•)[2]
# print(task_id)
is_finish redis_cache.sismember('Task:finish', task_id)
is_begin redis—cache.sismember('Task:begin_, task_id)
if is_finish =1:
print('Task {} is finish'.format(task_id))
elif is_begin =1:
print('Task {} is begin'.format(task_id))
else:
file—name json.loads(redis_cache.get(key).decode('utf-8').replace(""""))['file_name']
file_url json.loads(redis_cache.get(key).decode('utf-8').replace(""""))['file_url']
# print(file_url)
begin_task(task_id, file—name, file_url)
if _name_ '_main_':
main()