pyppeteer如何使用隧道代理

2023-06-16 15:22:04 浏览数 (1)

之前的文章中我们分享了很多Selenium的使用知识,它功能的确非常强大,但Selenium 也不是完美的,实际使用中有些地方还是不方便,比如环境的配置,得安装好相关浏览器,比如 Chrome、Firefox 等等,然后还要到官方网站去下载对应的驱动,最重要的还需要安装对应的 Python Selenium 库,而且版本也得好好看看是否对应,确实不是很方便,另外如果要做大规模部署的话,环境配置的一些问题也是个头疼的事情。

所以今天我们就给大家介绍一个类似的替代品,叫作 Pyppeteer,那Pyppeteer 又是什么呢?它实际上是 Puppeteer 的 Python 版本的实现,但它不是 Google 开发的,是一位来自于日本的工程师依据 Puppeteer 的一些功能开发出来的非官方版本。

Pyppeteer 是依赖于 Chromium 这个浏览器来运行的。那么有了 Pyppeteer 之后,我们就可以免去那些烦琐的环境配置等问题。如果第一次运行的时候,Chromium 浏览器没有安装,那么程序会帮我们自动安装和配置,就免去了烦琐的环境配置等工作。另外 Pyppeteer 是基于 Python 的新特性 async 实现的,所以它的一些执行也支持异步操作,效率相对于 Selenium 来说也提高了。

那么下面就让我们来一起了解下 Pyppeteer 的相关用法吧。

代理写法,亿牛云代理隧道代码

代码语言:javascript复制
#! -*- encoding:utf-8 -*-

    import requests
    import random

    # 要访问的目标页面
    targetUrl = "http://httpbin.org/ip"

    # 要访问的目标HTTPS页面
    # targetUrl = "https://httpbin.org/ip"

    # 代理服务器(产品官网 www.16yun.cn)
    proxyHost = "t.16yun.cn"
    proxyPort = "31111"

    # 代理验证信息
    proxyUser = "username"
    proxyPass = "password"

pyppeteer使用隧道代理demo

代码语言:javascript复制
    #! -*- encoding:utf-8 -*-
    import websockets
    from scrapy.http import HtmlResponse
    from logging import getLogger
    import asyncio
    import pyppeteer
    import logging
    from concurrent.futures._base import TimeoutError
    import base64
    import sys
    import random

    pyppeteer_level = logging.WARNING
    logging.getLogger('websockets.protocol').setLevel(pyppeteer_level)
    logging.getLogger('pyppeteer').setLevel(pyppeteer_level)

    PY3 = sys.version_info[0] >= 3


    def base64ify(bytes_or_str):
        if PY3 and isinstance(bytes_or_str, str):
            input_bytes = bytes_or_str.encode('utf8')
        else:
            input_bytes = bytes_or_str

        output_bytes = base64.urlsafe_b64encode(input_bytes)
        if PY3:
            return output_bytes.decode('ascii')
        else:
            return output_bytes


    class ProxyMiddleware(object):
        # 加载随机UserAgent(根据需求)
        # USER_AGENT = open('useragents.txt').readlines()

        def process_request(self, request, spider):
            # 代理服务器
            proxyHost = "t.16yun.cn"
            proxyPort = "31111"

            # 代理验证信息
            proxyUser = "username"
            proxyPass = "password"

            request.meta['proxy'] = "http://{0}:{1}".format(proxyHost, proxyPort)

            # 添加验证头
            encoded_user_pass = base64ify(proxyUser   ":"   proxyPass)
            request.headers['Proxy-Authorization'] = 'Basic '   encoded_user_pass

            # 设置IP切换头(根据需求)
            tunnel = random.randint(1, 10000)
            request.headers['Proxy-Tunnel'] = str(tunnel)

            # 设置随机UserAgent(根据需求)
            # request.headers['User-Agent'] = random.choice(self.USER_AGENT)

    class PyppeteerMiddleware(object):
        def __init__(self, **args):
            """
            init logger, loop, browser
            :param args:
            """
            self.logger = getLogger(__name__)
            self.loop = asyncio.get_event_loop()
            self.browser = self.loop.run_until_complete(
                pyppeteer.launch(headless=True))
            self.args = args

        def __del__(self):
            """
            close loop
            :return:
            """
            self.loop.close()

        def render(self, url, retries=1, script=None, wait=0.3, scrolldown=False, sleep=0,
                   timeout=8.0, keep_page=False):
            """
            render page with pyppeteer
            :param url: page url
            :param retries: max retry times
            :param script: js script to evaluate
            :param wait: number of seconds to wait before loading the page, preventing timeouts
            :param scrolldown: how many times to page down
            :param sleep: how many long to sleep after initial render
            :param timeout: the longest wait time, otherwise raise timeout error
            :param keep_page: keep page not to be closed, browser object needed
            :param browser: pyppetter browser object
            :param with_result: return with js evaluation result
            :return: content, [result]
            """

            # define async render
            async def async_render(url, script, scrolldown, sleep, wait, timeout, keep_page):
                try:
                    # basic render
                    page = await self.browser.newPage()
                    await asyncio.sleep(wait)
                    response = await page.goto(url, options={'timeout': int(timeout * 1000)})
                    if response.status != 200:
                        return None, None, response.status
                    result = None
                    # evaluate with script
                    if script:
                        result = await page.evaluate(script)

                    # scroll down for {scrolldown} times
                    if scrolldown:
                        for _ in range(scrolldown):
                            await page._keyboard.down('PageDown')
                            await asyncio.sleep(sleep)
                    else:
                        await asyncio.sleep(sleep)
                    if scrolldown:
                        await page._keyboard.up('PageDown')

                    # get html of page
                    content = await page.content()

                    return content, result, response.status
                except TimeoutError:
                    return None, None, 500
                finally:
                    # if keep page, do not close it
                    if not keep_page:
                        await page.close()

            content, result, status = [None] * 3

            # retry for {retries} times
            for i in range(retries):
                if not content:
                    content, result, status = self.loop.run_until_complete(
                        async_render(url=url, script=script, sleep=sleep, wait=wait,
                                     scrolldown=scrolldown, timeout=timeout, keep_page=keep_page))
                else:
                    break

            # if need to return js evaluation result
            return content, result, status

        def process_request(self, request, spider):
            """
            :param request: request object
            :param spider: spider object
            :return: HtmlResponse
            """
            if request.meta.get('render'):
                try:
                    self.logger.debug('rendering %s', request.url)
                    html, result, status = self.render(request.url)
                    return HtmlResponse(url=request.url, body=html, request=request, encoding='utf-8',
                                        status=status)
                except websockets.exceptions.ConnectionClosed:
                    pass

        @classmethod
        def from_crawler(cls, crawler):
            return cls(**crawler.settings.get('PYPPETEER_ARGS', {}))

0 人点赞