【代码】Python多线程执行并且按原本顺序返回[详细注释]

2023-03-01 14:08:09 浏览数 (1)

简单记录一下,免得下次找不到,还得重写。

先看一下效果,然后直接上代码,并且每行都配注释。

代码语言:javascript复制
# 导入线程池的包
from concurrent.futures import ThreadPoolExecutor, as_completed


# 整个的任务函数,方便调用。也可以拆开写
def extract_append_audio_features(extract_type='age', max_workers=16):
    # feature_csv任务队列,可以理解为数组
    datas_num = len(feature_csv)
    # 不知道为什么tqdm没显示,所以这里搞个锁
    compute_cnt_lock = threading.Lock()
    # 与compute_cnt_lock锁配合,用来计当前已执行的总数
    compute_cnt = 0
    
    # 这是真正要被线程池调用的函数,id用来表示不同的线程
    def func(id, start_index, end_index):
        # 说明变量是从外部获得的
        nonlocal compute_cnt
        # 用来存处理的结果
        feature_data = []
        # 根据参数确定当前线程中执行数组中的哪部分
        for path in feature_csv[start_index:end_index]:
            t1 = time.time()
            # 执行一些操作
            data = do_feature(y)
            # 将操作结果存下来
            feature_data.append(data)
            # 获取线程锁,并将计数 1
            with compute_cnt_lock:
                compute_cnt  = 1
            print(f'r完成进度[{id}=>{round(time.time()-t1, 3)}s]: {compute_cnt}/{datas_num}', end='')
        # 全部完成后,返回本次处理的内容
        return id, feature_data
    
    # 这个函数用来根据指定的线程数,自动分配线程资源
    def allocate_threads(thread_num=5):
        # 计算每个线程会包含数组中的多少个元素
        gap = (datas_num // thread_num) 1
        # 创建线程池对象
        pool = ThreadPoolExecutor(max_workers=max_workers)
        threads_pool = []
        futures = []
        results = {}
        # 开始根据指定的线程数,为不同线程分配资源
        for id in range(thread_num):
            # 第id个线程在数组中元素的起始位置和结束位置
            start_index = id * gap
            end_index = (id 1) * gap
            # 如果数组越界,就让他等于数组最大值
            start_index = start_index if start_index < datas_num else datas_num
            end_index = end_index if end_index < datas_num else datas_num
            # print(f'线程{id}分配内容: {start_index} - {end_index}')
            # 分配完成,提交任务,并启动线程池
            future = pool.submit(func, id, start_index, end_index)
            futures.append(future)
        print('>> 线程全部启动')
    
        # 等待线程池完成,并获取返回值
        for future in as_completed(futures):
            id, feature_data = future.result()
            results[id] = feature_data
        print('n>> 线程全部完成')
        
        # 根据分配的id顺序,重组结果
        new_feature_data = []
        for i in range(len(results)):
            new_feature_data.extend(results[i])

        # 返回最终的排完序的结果
        return new_feature_data
        

    # 调用资源分配函数,并获取结果
    new_feature_data = allocate_threads(max_workers)
    # 根据结果,可以直接return,也可以执行进一步的操作。
    # xxxx 
    # xxxx 


# 调用演示
extract_append_audio_features(extract_type='age', max_workers=16)

0 人点赞