本篇是咸鱼日常撸视频的时候记录的一些代码实例,可以直接运用到项目中但是有些代码的可用性没有那么好,旨在分享思路,不喜勿喷~
搭建ip代理池(简易版)
推荐两个scrapy代理的项目
第一个是免费的代理插件,无需付费
https://github.com/aivarsk/scrapy-proxies
第二个是需要付费的代理插件
https://github.com/scrapy-plugins/scrapy-crawlera
撸视频的时候学到的代理池实例
获取西刺代理的代理列表并存入mysql数据库:
代码语言:javascript复制def crawl_xici():
headers = {"User-Agent":"Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/68.0.3440.106 Safari/537.36"}
for i in range(3411):
res = requests.get('http://www.xicidaili.com/nn/{}'.format(i), headers = headers)
ip_list = []
selector = Selector(text=res.text)
all_trs = selector.css("#ip_list tr")
for tr in all_trs[1:]:
speed_str = tr.css(".bar::attr(title)").extract()[0]
if speed_str:
speed = float(speed_str.split("秒")[0])
all_texts = tr.css("td::text").extract()
ip = all_texts[0]
port = all_texts[1]
proxy_type = all_texts[5]
ip_list.append((ip,port,proxy_type,speed))
# print(ip_list)
# for ip_info in ip_list:
# cursor.execute(
# "insert into proxy_ip(ip, port, speed, proxy_type) VALUES ('{0}', '{1}', {2}, '{3}')".format(
# ip_info[0], ip_info[1], ip_info[3], ip_info[2]
# )
# )
# conn.commit()
for ip_info in ip_list:
insert_sql = """
insert into proxy_ip(ip, port, speed, proxy_type)
VALUES (%s, %s, %s, %s) ON DUPLICATE KEY UPDATE ip=VALUES(ip), port=VALUES(port), speed=VALUES(speed), proxy_type=VALUES(proxy_type)
"""
params = (ip_info[0], ip_info[1], ip_info[3], ip_info[2])
cursor.execute(insert_sql,params)
conn.commit()
# print("入库成功")
定义随机获取ip的类方法(包括删除无效代理)
代码语言:javascript复制class GetIP(object):
# 从数据库删除ip
def delete_ip(self, ip):
delete_sql = """
delete from proxy_ip WHERE ip={0}
""".format(ip)
cursor.execute(delete_sql)
conn.commit()
print("删除成功")
return True
# 验证ip
def judge_ip(self, ip, port):
http_url = "http://www.baidu.com"
proxy_url = "http://{0}:{1}".format(ip, port)
try:
proxy_dict = {
"http":proxy_url
}
res = requests.get(http_url, proxies=proxy_dict)
except Exception as e:
print("invalid ip and port")
self.delete_ip(ip)
return False
else:
code = res.status_code
if code >= 200 and code < 300:
print("effective ip")
return True
else:
print("invalid ip and port")
self.delete_ip(ip)
return False
# 从数据库获取随机ip
def get_random_ip(self):
select_sql = """
SELECT ip,port from proxy_ip ORDER BY RAND() LIMIT 1
"""
result = cursor.execute(select_sql)
for ip_info in cursor.fetchall():
ip = ip_info[0]
port = ip_info[1]
judge_re = self.judge_ip(ip,port)
if judge_re:
return "http://{0}:{1}".format(ip, port)
else:
return self.get_random_ip()
# crawl_xici()
if __name__ == '__main__':
get_ip = GetIP()
get_ip.get_random_ip()
在middleware文件中应用我们写的代理
代码语言:javascript复制# 使用前要记得在setting中添加RadomProxyMiddleware
from tools.crawl_xici_ip import GetIP
# 随机ip代理
class RadomProxyMiddleware(object):
def process_request(self, request, spider):
get_ip = GetIP()
request.meta['proxy'] = get_ip.get_random_ip()
自定义pipline的使用实例
pipline存储json(自定义json存储)
代码语言:javascript复制import codecs
class JsonWithEncodingPipeline(object):
#自定义json文件的导出
def __init__(self):
self.file = codecs.open('article.json', 'w', encoding="utf-8")
def process_item(self, item, spider):
lines = json.dumps(dict(item), ensure_ascii=False) "n"
self.file.write(lines)
return item
def spider_closed(self, spider):
self.file.close()
pipline存储json(使用scrapy自带的组件)
代码语言:javascript复制from scrapy.exporters import JsonItemExporter
class JsonExporterPipleline(object):
#调用scrapy提供的json export导出json文件
def __init__(self):
self.file = open('export.json', 'wb')
self.exporter = JsonItemExporter(self.file, encoding="utf-8", ensure_ascii=False)
self.exporter.start_exporting()
def close_spider(self, spider):
self.exporter.finish_exporting()
self.file.close()
def process_item(self, item, spider):
self.exporter.export_item(item)
return item
pipline中的存储mysql(阻塞)
代码语言:javascript复制import MySQLdb
import MySQLdb.cursors
class MysqlPipeline(object):
#采用同步的机制写入mysql
def __init__(self):
self.conn = MySQLdb.connect('127.0.0.1', 'root', 'root', 'spider', charset="utf8", use_unicode=True)
self.cursor = self.conn.cursor()
def process_item(self, item, spider):
insert_sql = """
insert into jobbole_article(title, url, create_date, fav_nums)
VALUES (%s, %s, %s, %s)
"""
self.cursor.execute(insert_sql, (item["title"], item["url"], item["create_date"], item["fav_nums"]))
self.conn.commit()
pipline中存储mysql(异步)
代码语言:javascript复制class MysqlTwistedPipline(object):
def __init__(self, dbpool):
self.dbpool = dbpool
@classmethod
def from_settings(cls, settings):
dbparms = dict(
host = settings["MYSQL_HOST"],
db = settings["MYSQL_DBNAME"],
user = settings["MYSQL_USER"],
passwd = settings["MYSQL_PASSWORD"],
charset='utf8',
cursorclass=MySQLdb.cursors.DictCursor,
use_unicode=True,
)
dbpool = adbapi.ConnectionPool("MySQLdb", **dbparms)
return cls(dbpool)
def process_item(self, item, spider):
#使用twisted将mysql插入变成异步执行
query = self.dbpool.runInteraction(self.do_insert, item)
query.addErrback(self.handle_error, item, spider) #处理异常
def handle_error(self, failure, item, spider):
# 处理异步插入的异常
print (failure)
def do_insert(self, cursor, item):
insert_sql = """
insert into jobbole_article(title, url, create_date, fav_nums)
VALUES (%s, %s, %s, %s)
"""
cursor.execute(insert_sql, (item["title"], item["url"], item["create_date"], item["fav_nums"]))
# 想使用下面的插入方法需要在item中定义insert_sql
# def do_insert(self, cursor, item):
# #执行具体的插入
# #根据不同的item 构建不同的sql语句并插入到mysql中
# insert_sql, params = item.get_insert_sql()
# print (insert_sql, params)
# cursor.execute(insert_sql, params)
如何在scrapy中随机切换UA?
随机UA下载中间件(初始版)
setting文件:
代码语言:javascript复制DOWNLOADER_MIDDLEWARES = {
'ArticleSpider.middlewares.RandomUserAgentMiddleware': 543,
}
middlewares文件:
代码语言:javascript复制from fake_useragent import UserAgent
class RandomUserAgentMiddleware(object):
def __init__(self, crawler):
super(RandomUserAgentMiddleware, self).__init__()
self.ua = UserAgent()
self.ua_type = crawler.settings.get('RANDOM_UA_TYPE', 'random')
@classmethod
def from_crawler(cls, crawler):
return cls(crawler)
def process_request(self, request, spider):
def get_ua():
return getattr(self.ua, self.ua_type)
request.headers.setdefault('User-Agent', get_ua())
数据存错怎么办?
将redis数据库导入mongodb数据库
代码语言:javascript复制import json, redis, pymongo
def main():
# 指定Redis数据库信息
rediscli = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)
# 指定MongoDB数据库信息
mongocli = pymongo.MongoClient(host='localhost', port=27017)
# 创建数据库名
db = mongocli['sina']
# 创建表名
sheet = db['sina_items']
offset = 0
while True:
# FIFO模式为 blpop,LIFO模式为 brpop,获取键值
source, data = rediscli.blpop(["sinainfospider_redis:items"])
item = json.loads(data.decode("utf-8"))
sheet.insert(item)
offset = 1
print(offset)
try:
print("Processing: %s " % item)
except KeyError:
print("Error procesing: %s" % item)
if __name__ == '__main__':
main()
将redis数据存入mysql数据库
代码语言:javascript复制import redis, json, time
from pymysql import connect
# redis数据库链接
redis_client = redis.StrictRedis(host="127.0.0.1", port=6379, db=0)
# mysql数据库链接
mysql_client = connect(host="127.0.0.1", user="root", password="mysql",
database="sina", port=3306, charset='utf8')
cursor = mysql_client.cursor()
i = 1
while True:
print(i)
time.sleep(1)
source, data = redis_client.blpop(["sinainfospider_redis:items"])
item = json.loads(data.decode())
print("source===========", source)
print("item===========", item)
sql = "insert into sina_items(parent_url,parent_title,sub_title,sub_url,sub_file_name,son_url,head,content,crawled,spider) values(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"
params = [item["parent_url"], item["parent_title"], item["sub_title"], item["sub_url"], item["sub_file_name"],
item["son_url"], item["head"], item["content"], item["crawled"], item["spider"], ]
cursor.execute(sql, params)
mysql_client.commit()
i = 1