简单记录一下,免得下次找不到,还得重写。
先看一下效果,然后直接上代码,并且每行都配注释。
代码语言:javascript复制# 导入线程池的包
from concurrent.futures import ThreadPoolExecutor, as_completed
# 整个的任务函数,方便调用。也可以拆开写
def extract_append_audio_features(extract_type='age', max_workers=16):
# feature_csv任务队列,可以理解为数组
datas_num = len(feature_csv)
# 不知道为什么tqdm没显示,所以这里搞个锁
compute_cnt_lock = threading.Lock()
# 与compute_cnt_lock锁配合,用来计当前已执行的总数
compute_cnt = 0
# 这是真正要被线程池调用的函数,id用来表示不同的线程
def func(id, start_index, end_index):
# 说明变量是从外部获得的
nonlocal compute_cnt
# 用来存处理的结果
feature_data = []
# 根据参数确定当前线程中执行数组中的哪部分
for path in feature_csv[start_index:end_index]:
t1 = time.time()
# 执行一些操作
data = do_feature(y)
# 将操作结果存下来
feature_data.append(data)
# 获取线程锁,并将计数 1
with compute_cnt_lock:
compute_cnt = 1
print(f'r完成进度[{id}=>{round(time.time()-t1, 3)}s]: {compute_cnt}/{datas_num}', end='')
# 全部完成后,返回本次处理的内容
return id, feature_data
# 这个函数用来根据指定的线程数,自动分配线程资源
def allocate_threads(thread_num=5):
# 计算每个线程会包含数组中的多少个元素
gap = (datas_num // thread_num) 1
# 创建线程池对象
pool = ThreadPoolExecutor(max_workers=max_workers)
threads_pool = []
futures = []
results = {}
# 开始根据指定的线程数,为不同线程分配资源
for id in range(thread_num):
# 第id个线程在数组中元素的起始位置和结束位置
start_index = id * gap
end_index = (id 1) * gap
# 如果数组越界,就让他等于数组最大值
start_index = start_index if start_index < datas_num else datas_num
end_index = end_index if end_index < datas_num else datas_num
# print(f'线程{id}分配内容: {start_index} - {end_index}')
# 分配完成,提交任务,并启动线程池
future = pool.submit(func, id, start_index, end_index)
futures.append(future)
print('>> 线程全部启动')
# 等待线程池完成,并获取返回值
for future in as_completed(futures):
id, feature_data = future.result()
results[id] = feature_data
print('n>> 线程全部完成')
# 根据分配的id顺序,重组结果
new_feature_data = []
for i in range(len(results)):
new_feature_data.extend(results[i])
# 返回最终的排完序的结果
return new_feature_data
# 调用资源分配函数,并获取结果
new_feature_data = allocate_threads(max_workers)
# 根据结果,可以直接return,也可以执行进一步的操作。
# xxxx
# xxxx
# 调用演示
extract_append_audio_features(extract_type='age', max_workers=16)