Python 多进程处理数据

2022-10-24 14:53:10 浏览数 (1)

文章目录
  • 1. multiprocessing.Process
  • 2. multiprocessing.Pool.starmap
  • 3. multiprocessing.Pool.starmap_async

1. multiprocessing.Process

代码语言:javascript复制
# _*_ coding: utf-8 _*_
# @Time : 2022/10/16 15:45
# @Author : Michael
# @File : multiprocess_demo.py
# @desc :
import multiprocessing
import os
import time


class compute_process(multiprocessing.Process):  # 计算处理进程
    def __init__(self, input_datafile, output_datafile):
        super(compute_process, self).__init__()
        self.input_datafile = input_datafile
        self.output_datafile = output_datafile

    def compute(self):
        with open(self.output_datafile, 'w') as fout:
            with open(self.input_datafile, 'r') as fin:
                for line in fin:
                    fout.write(f'{line.strip()} outn')

    def run(self):
        self.compute()
        print(f'finish compute process with {self.input_datafile}')


if __name__ == '__main__':
    num_workers = 4
    print('start compute')
    input_datafiles = ['./test/1.txt', './test/2.txt', './test/3.txt', './test/4.txt']
    output_datafiles = ['./test/out1.txt', './test/out2.txt', './test/out3.txt', './test/out4.txt']
    processes = [compute_process(i, o) for i, o in zip(input_datafiles, output_datafiles)]
    [p.start() for p in processes]
    # [p.join() for p in processes]  # 等待子进程结束在执行主进程
    for o in output_datafiles:
        if os.path.exists(o):
            print(f'exists {o}')
        else:
            print(f'not exists {o}')
    print(f'congratulations finish')

如果 没有 join ,主进程直接会执行后续代码

输出:

代码语言:javascript复制
start compute
not exists ./test/out1.txt
not exists ./test/out2.txt
not exists ./test/out3.txt
not exists ./test/out4.txt
congratulations finish
finish compute process with ./test/1.txt
finish compute process with ./test/2.txt
finish compute process with ./test/3.txt
finish compute process with ./test/4.txt

如果 打开 join 就会等待子进程结束才会继续执行

代码语言:javascript复制
start compute
finish compute process with ./test/2.txt
finish compute process with ./test/3.txt
finish compute process with ./test/1.txt
finish compute process with ./test/4.txt
exists ./test/out1.txt
exists ./test/out2.txt
exists ./test/out3.txt
exists ./test/out4.txt
congratulations finish

多进程也会相应消耗更多倍的资源,可以根据资源情况,设置进程数量来限制

代码语言:javascript复制
# _*_ coding: utf-8 _*_
# @Time : 2022/10/16 15:45
# @Author : Michael
# @File : multiprocess_demo.py
# @desc :
import multiprocessing
import os
import time
import math


class compute_process(multiprocessing.Process):
    def __init__(self, input_datafile, output_datafile):
        super(compute_process, self).__init__()
        self.input_datafile = input_datafile
        self.output_datafile = output_datafile

    def compute(self):
        with open(self.output_datafile, 'w') as fout:
            with open(self.input_datafile, 'r') as fin:
                for line in fin:
                    fout.write(f'{line.strip()} outn')

    def run(self):
        self.compute()
        print(f'finish compute process with {self.input_datafile}')


if __name__ == '__main__':
    num_workers = 3
    print('start compute')
    all_input_datafiles = ['./test/1.txt', './test/2.txt', './test/3.txt', './test/4.txt']
    all_output_datafiles = ['./test/out1.txt', './test/out2.txt', './test/out3.txt', './test/out4.txt']
    for idx in range(math.ceil(len(all_input_datafiles) / num_workers)):
        input_datafiles = all_input_datafiles[idx * num_workers: (idx   1) * num_workers]
        output_datafiles = all_output_datafiles[idx * num_workers: (idx   1) * num_workers]
        processes = [compute_process(i, o) for i, o in zip(input_datafiles, output_datafiles)]
        print(f'idx: {idx}, len of sub processes: {len(processes)}')
        [p.start() for p in processes]
        [p.join() for p in processes]  # 等待子进程结束在执行主进程
    for o in all_output_datafiles:
        if os.path.exists(o):
            print(f'exists {o}')
        else:
            print(f'not exists {o}')
    print(f'congratulations finish')

输出:

代码语言:javascript复制
start compute
idx: 0, len of sub processes: 3
finish compute process with ./test/1.txt
finish compute process with ./test/2.txt
finish compute process with ./test/3.txt
idx: 1, len of sub processes: 1
finish compute process with ./test/4.txt
exists ./test/out1.txt
exists ./test/out2.txt
exists ./test/out3.txt
exists ./test/out4.txt
congratulations finish

但是上面 for 循环有个问题,一次循环中需要等 耗时最长的子进程 结束才能开始下一个循环

2. multiprocessing.Pool.starmap

我把 1.txt 文件换成一个上百万行的文件

代码语言:javascript复制
# _*_ coding: utf-8 _*_
# @Time : 2022/10/16 18:13
# @Author : Michael
# @File : multiprocess_pool_demo.py
# @desc :
import multiprocessing
import os

def compute_func(input_datafile, output_datafile, sql):
    with open(output_datafile, 'w') as fout:
        with open(input_datafile, 'r') as fin:
            for line in fin:
                fout.write(f'{line.strip()} outn')
    print(f'finish compute process with {input_datafile} and sql {sql}')
    return output_datafile


if __name__ == '__main__':
    num_workers = 3
    print('start compute')
    all_input_datafiles = ['./test/1.txt', './test/2.txt', './test/3.txt', './test/4.txt']
    all_output_datafiles = ['./test/out1.txt', './test/out2.txt', './test/out3.txt', './test/out4.txt']
    sql = ['sql1', 'sql2', 'sql3', 'sql4']

    pool = multiprocessing.Pool(processes=num_workers)

    outputs = pool.starmap(compute_func, [(i, o, s) for i, o, s in zip(all_input_datafiles, all_output_datafiles, sql)])
    # 第二个参数 是 第一个参数 函数的参数
    
    # map版本是阻塞的
    print('outputs', outputs)

    print('pool finish')
    # pool.close()

    for o in all_output_datafiles:
        if os.path.exists(o):
            print(f'exists {o}')
        else:
            print(f'not exists {o}')
    print(f'congratulations finish')

输出

代码语言:javascript复制
start compute
finish compute process with ./test/2.txt and sql sql2
finish compute process with ./test/3.txt and sql sql3
finish compute process with ./test/4.txt and sql sql4
finish compute process with ./test/1.txt and sql sql1
outputs ['./test/out1.txt', './test/out2.txt', './test/out3.txt', './test/out4.txt']
pool finish
exists ./test/out1.txt
exists ./test/out2.txt
exists ./test/out3.txt
exists ./test/out4.txt
congratulations finish

可以看出 1.txt 文件是最后执行完

3. multiprocessing.Pool.starmap_async

代码语言:javascript复制
# _*_ coding: utf-8 _*_
# @Time : 2022/10/16 18:13
# @Author : Michael
# @File : multiprocess_pool_demo.py
# @desc :
import multiprocessing
import os

def compute_func(input_datafile, output_datafile, sql):
    with open(output_datafile, 'w') as fout:
        with open(input_datafile, 'r') as fin:
            for line in fin:
                fout.write(f'{line.strip()} outn')
    print(f'finish compute process with {input_datafile} and sql {sql}')
    return output_datafile


if __name__ == '__main__':
    num_workers = 3
    print('start compute')
    all_input_datafiles = ['./test/1.txt', './test/2.txt', './test/3.txt', './test/4.txt']
    all_output_datafiles = ['./test/out1.txt', './test/out2.txt', './test/out3.txt', './test/out4.txt']
    sql = ['sql1', 'sql2', 'sql3', 'sql4']

    pool = multiprocessing.Pool(processes=num_workers)

    outputs_async = pool.starmap_async(compute_func, [(i, o, s) for i, o, s in zip(all_input_datafiles, all_output_datafiles, sql)])
    # starmap_async 异步
    print('outputs_async', outputs_async)
    outputs_async = [o for o in outputs_async.get()]  # get 会等待所有子进程完成
    print('outputs_async', outputs_async)


    print('pool finish')
    # pool.close()
    # pool.join()  # 异步的话,如果没有 get 结果,这里需要join,不然会直接执行后续代码

    for o in all_output_datafiles:
        if os.path.exists(o):
            print(f'exists {o}')
        else:
            print(f'not exists {o}')
    print(f'congratulations finish')

输出:

代码语言:javascript复制
start compute
outputs_async <multiprocessing.pool.MapResult object at 0x000002D15FC86BE0>
finish compute process with ./test/2.txt and sql sql2
finish compute process with ./test/3.txt and sql sql3
finish compute process with ./test/4.txt and sql sql4
finish compute process with ./test/1.txt and sql sql1
outputs_async ['./test/out1.txt', './test/out2.txt', './test/out3.txt', './test/out4.txt']
pool finish
exists ./test/out1.txt
exists ./test/out2.txt
exists ./test/out3.txt
exists ./test/out4.txt
congratulations finish

如果把 get 结果行删掉,且下面没有 join 函数等待,最后没有输出文件生成

0 人点赞