【Python自动化】多线程BFS站点结构爬虫代码,支持中断恢复,带注释

2023-10-13 09:26:21 浏览数 (1)

代码语言:javascript复制
from collections import deque
from urllib.parse import urljoin, urlparse
import requests
from pyquery import PyQuery as pq
import re
from EpubCrawler.util import request_retry
import traceback
from functools import reduce
from concurrent.futures import ThreadPoolExecutor

# 子线程入口,包装`get_next`并添加异常处理和结果传递
def tr_get_next_safe(i, url, res, args):
    try:
        print(url)
        ns = get_next(url, args)
        res[i] = ns
    except:
        traceback.print_exc()

def get_next(url, args):
    # 请求该网页
    # 这里是带重试的 GET
    # 可以看我其它项目源码,也可以自己写一个。
    html = request_retry(
        'GET', url, 
        retry=args.retry,
        proxies=args.proxy,
    ).text
    if not html: return []
    # 解析其中所有链接的`href`属性
    rt = pq(html)
    el_links = rt('a')
    links = [
        urljoin(url, pq(el).attr('href').strip()) 
        for el in el_links
        if pq(el).attr('href')
    ]
    # 过滤掉其它网站的链接
    hostname = urlparse(url).hostname
    links = [
        l for l in links 
        if urlparse(l).hostname == hostname
    ]
    # print(f'url: {url}nnext: {links}n')
    return links


def whole_site(args):
    # `args.site`:`str`,站点网址
    # `args.proxy`:`str`,代理地址,默认`None`
    # `args.retry`:`int`,重试次数
    # `args.threads`:`int`,线程数
    site = args.site
    if args.proxy: 
        args.proxy = {'http': args.proxy, 'https': args.proxy}
    pref = re.sub(r'[^w-.]', '-', site)
    # 结果保存文件和历史记录文件
    res_fname = f'{pref}.txt'
    rec_fname = f'{pref}_rec.txt'
    
    ofile = open(res_fname, 'a', encoding='utf8')
    rec_file = open(rec_fname, 'a ', encoding='utf8')
    # 判断记录文件是否有记录
    if rec_file.tell() != 0:
        # 读取所有行,过滤空行
        rec_file.seek(0, 0)
        rec = rec_file.read().split('n')
        rec = [l for l in rec if l.strip()]
        # -1 的数量是弹出操作的数量,从队列前方移除指定数量的条目
        pop_count = rec.count('-1')
        q = deque([l for l in rec if l != "-1"][pop_count:])
        vis = set(rec)
    else:
        # 初始化队列和已访问集
        q = deque([site])
        vis = set([site])
        rec_file.write(site   'n')
    pool = ThreadPoolExecutor(args.threads)

    while q:
        # 取出指定数量的链接
        pop_cnt = min(len(q), args.threads)
        urls = [q.popleft() for _ in range(pop_cnt)]
        # 调用子线程获取引用
        nexts = [[] for _ in range(pop_cnt)]
        hdls = []
        for i, url in enumerate(urls):
            h = pool.submit(tr_get_next_safe, i, url, nexts, args)
            hdls.append(h)
        for h in hdls: h.result()
        # 过滤空项、合并、去重
        nexts = [n for n in nexts if n]
        nexts = set(reduce(lambda x, y: x   y, nexts, []))
        # 这里可以过滤常见文件后缀,比如 PDF、DOC 等等
        # nexts = (u for u in nexts if not u.endswith('.xml'))
        # 将本次迭代结果更新到磁盘
        # -1 表示弹出元素
        for url in urls:
            ofile.write(url   'n')
            rec_file.write('-1n')
        # 将没有访问的引用标记访问,并更新到队列
        for n in nexts:
            if n not in vis:
                vis.add(n)
                q.append(n)
                rec_file.write(n   'n')
    
    ofile.close()
    rec_file.close()

0 人点赞