matinal:Python 控制函数运行时间

2023-10-14 15:35:16 浏览数 (1)

在某个Flask项目在做后端接口时需要设置超时响应,因为接口中使用爬虫请求了多个网站,响应时间时长时短。

我需要设置一个最大响应时间,时间内如果接口爬虫没跑完,直接返回请求超时。

从网上了解到有两种方法,废话不多说直接上代码。

方法1:使用线程控制

代码语言:javascript复制
import requests, datetime, time
import threading


class MyThread(threading.Thread):
    def __init__(self, target, args=()):
        """
        why: 因为threading类没有返回值,因此在此处重新定义MyThread类,使线程拥有返回值
        此方法来源 https://www.cnblogs.com/hujq1029/p/7219163.html?utm_source=itdadao&utm_medium=referral
        """
        super(MyThread, self).__init__()
        self.func = target
        self.args = args

    def run(self):
        # 接受返回值
        self.result = self.func(*self.args)

    def get_result(self):
        # 线程不结束,返回值为None
        try:
            return self.result
        except Exception:
            return None


# 为了限制真实请求时间或函数执行时间的装饰器
def limit_decor(limit_time):
    """
    :param limit_time: 设置最大允许执行时长,单位:秒
    :return: 未超时返回被装饰函数返回值,超时则返回 None
    """

    def functions(func):
        # 执行操作
        def run(*params):
            thre_func = MyThread(target=func, args=params)
            # 主线程结束(超出时长),则线程方法结束
            thre_func.setDaemon(True)
            thre_func.start()
            # 计算分段沉睡次数
            sleep_num = int(limit_time // 1)
            sleep_nums = round(limit_time % 1, 1)
            # 多次短暂沉睡并尝试获取返回值
            for i in range(sleep_num):
                time.sleep(1)
                infor = thre_func.get_result()
                if infor:
                    return infor
            time.sleep(sleep_nums)
            # 最终返回值(不论线程是否已结束)
            if thre_func.get_result():
                return thre_func.get_result()
            else:
                return"请求超时"  #超时返回  可以自定义

        return run

    return functions

#接口函数
def a1():
    print("开始请求接口")

    #这里把逻辑封装成一个函数,使用线程调用
    a_theadiing = MyThread(target=a2)
    a_theadiing.start()
    a_theadiing.join()

    #返回结果
    a = a_theadiing.get_result()

    print("请求完成")
    return a
@limit_decor(3)   #超时设置为3s   2s逻辑未执行完毕返回接口超时
def a2():
    print("开始执行")
    time.sleep(2)
    print("执行完成")
    a=2
    return a

# 程序入口     未超时返回a的值   超时返回请求超时
if __name__ == '__main__':
    a = a1()  #调用接口(这里把函数a1看做一个接口)
    print(a)

超时设置3s,线程调用函数运行2s,这里返回a的值2。

方法2:使用信号模块signal(只能在unix系统使用)

signal负责在Python程序内部处理信号,典型的操作包括预设信号处理函数,暂停并等待信号,以及定时发出SIGALRM等。

要注意,signal包主要是针对UNIX平台(比如Linux, MAC OS),而Windows内核中由于对信号机制的支持不充分,所以在Windows上的Python不能发挥信号系统的功能。

信号是进程之间通讯的方式,是一种软件中断。一个进程一旦接收到信号就会打断原来的程序执行流程来处理信号。

代码语言:javascript复制
def set_timeout(num):
    def wrap(func):
        def handle(signum, frame):  # 收到信号 SIGALRM 后的回调函数,第一个参数是信号的数字,第二个参数是the interrupted stack frame.
            raise RuntimeError

        def to_do(*args):
            try:
                signal.signal(signal.SIGALRM, handle)  # 设置信号和回调函数
                signal.alarm(num)  # 设置 num 秒的闹钟
                print('start alarm signal.')
                r = func(*args)
                print('close alarm signal.')
                signal.alarm(0)  # 关闭闹钟
                return r
            except RuntimeError as e:
                return "超时啦"
        return to_do

    return wrap


@set_timeout(2)  # 限时 2 秒超时
def connect():  # 要执行的函数
    time.sleep(3)  # 函数执行时间,写大于2的值,可测试超时
    return "完成"


if __name__ == '__main__':
    a = connect()

讲述了Python实现可设置持续运行时间、线程数及时间间隔的多线程异步post请求功能。分享给大家供大家参考,具体如下:

代码语言:javascript复制
#coding=utf8
'''
random.randint(a, b):用于生成一个指定范围内的整数。
其中参数a是下限,参数b是上限,生成的随机数n: a <= n <= b
random.choice(sequence):从序列中获取一个随机元素
参数sequence表示一个有序类型(列表,元组,字符串)
'''
import httplib,json
import time
import threading
from random import randint,choice
#创建请求函数
def postRequest(threadNum):
  postJson={
        }
  #定义需要进行发送的数据
  postData=json.dumps(postJson)
  #定义一些文件头
  headerdata = {
    "content-type":"application/json",
     }
  #接口
  requrl ="/v1/query"
  #请求服务,例如:www.baidu.com
  hostServer=""
  #连接服务器
  conn = httplib.HTTPConnection(hostServer)
  #发送请求
  conn.request(method="POST",url=requrl,body=postData,headers=headerdata)
  #获取请求响应
  response=conn.getresponse()
  #打印请求状态
  if response.status in range(200,300):
    print u"线程" str(threadNum) u"状态码:" str(response.status)
  conn.close()
def run(threadNum,internTime,duration):
  #创建数组存放线程
  threads=[]
  try:
    #创建线程
    for i in range(1,threadNum):
      #针对函数创建线程
      t=threading.Thread(target=postRequest,args=(i,))
      #把创建的线程加入线程组
      threads.append(t)
  except Exception,e:
    print e
  try:
    #启动线程
    for thread in threads:
        thread.setDaemon(True)
        thread.start()
        time.sleep(internTime)
    #等待所有线程结束
    for thread in threads:
        thread.join(duration)
  except Exception,e:
      print e
if __name__ == '__main__':
  startime=time.strftime("%Y%m%d%H%M%S")
  now=time.strftime("%Y%m%d%H%M%S")
  duratiion=raw_input(u"输入持续运行时间:")
  while (startime str(duratiion))!=now:
    run(10,1,int(duratiion))
    now=time.strftime("%Y%m%d%H%M%S")

0 人点赞