python进程vs线程

2022-04-27 15:39:31 浏览数 (1)

进程

程序运行在操作系统上的一个实例,就称之为进程。进程需要相应的系统资源:内存、时间片、pid。创建进程:首先要导入multiprocessing中的Process:创建一个Process对象; 创建Process对象时,可以传递参数;

多进程

相关知识

Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。 子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID

创建一个线程

创建一个线程

代码语言:javascript复制
from multiprocessing import Process
# 导入多进程

def do_work(thread_id):
    print('current thread {} start '.format(thread_id))
    time.sleep(3)
    print('current thread {} over'.format(thread_id))


p = Process(target=do_work,args=[1])
p.start()
print("main thread over")

输出:
发现主进程先结束,子进程后结束

main thread over
current thread 1 start 
current thread 1 over

进程阻塞

对进程加一行join方法,表示阻塞其他进程的操作直到当前子进程执行完成

代码语言:javascript复制
def do_work(thread_id):
    print('current thread {} start '.format(thread_id))
    time.sleep(3)
    print('current thread {} over'.format(thread_id))


p = Process(target=do_work,args=[1])
p.start()
p.join()
print("main thread over")

进程

创建多个进程

代码语言:javascript复制
def do_work(thread_id):
    print('current thread {} start '.format(thread_id))
    time.sleep(3)
    print('current thread {} over'.format(thread_id))


    for i in range(10):
        p = Process(target=do_work, args=[i])
        print(p.name)
        p.start()

    for i in range(10):
        p.join()

    print("main thread over")
    print('我是主进程,PID是%s' % os.getpid())

进程池

如果要启动大量的子进程,可以用进程池的方式批量创建子进程

代码语言:javascript复制
p = Pool(4)
for i in range(5):
    p.apply_async(request_baidu, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')

print(time.clock() - start_time)

进程锁

代码语言:javascript复制
def do_work(thread_id):
    print('current thread {} start '.format(thread_id))
    print('开始撸代码,PID是%s' % os.getpid())
    time.sleep(3)
    print('current thread {} over'.format(thread_id))


p = Process(target=do_work,args=[1])
p.run()
print("main thread over")
print('我是主进程,PID是%s' % os.getpid())

输出:

使用run方法后,其实也是调用主线程的方法去执行.
current thread 1 start 
开始撸代码,PID是66113
current thread 1 over
main thread over
我是主进程,PID是66113

进程间通信

代码语言:javascript复制
from multiprocessing import Process, Queue
import os, time, random

# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random())

# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)

if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()

进程锁

多进程抢占资源 当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题

代码语言:javascript复制
import time
from multiprocessing import Process,Lock

def work(lock,n):
    lock.acquire()
    print('%s: %s is runing' % (n,os.getpid()))
    time.sleep(random.random())
    print('%s: %s is down' % (n, os.getpid()))
    lock.release()

if __name__ == '__main__':

    lock = Lock()

    for i in  range(3):
        p = Process(target=work, args=(lock,i,))
        p.start()

多进程保存数据

代码语言:javascript复制
from multiprocessing import Process
from multiprocessing import Manager

    manager = Manager()
    return_dict = manager.dict()

    for group in apk_list_group:

        logger.info(group)
        # 分组的apk列表
        for i in group:
            logger.info('待上传文件 ==> {}'.format(i))
            p = Process(target=upload_oss, args=(i,return_dict))
            logger.info('当前进程ID ==> {}'.format(p.name))
            p.start()
        for i in group:
            p.join()

        logger.info('构建主进程运行完成')


def upload_oss(file_path,return_dict):
    """
    上传到阿里云oss储存服务
    :return:
    """
    results = {}
    try:
        ....

        return_dict[file_path] = oss_url
        refresh_cdn(oss_url)
        # 刷新cdn地址

    except Exception as e:
        logger.error(results)
        logger.eror(e)
        results = {}
        results['code'] = '10000'
        results['message'] = '保存oss失败,请检查后再试。'

    finally:
        return results


return_dict.values()
# 拿到多线程的字典值,但是不能报错对象数据

多线程

多任务可以由多进程完成,也可以由一个进程内的多线程完成。 我们前面提到了进程是由若干线程组成的,一个进程至少有一个线程

创建线程

代码语言:javascript复制
import os
import time
import threading

def task():

    print(t.getName())
    print(threading.current_thread().name)
    print('task')



t = threading.Thread(target=task, name='LoopThread')
t.start()
print(threading.current_thread().name)

线程阻塞

调用join阻塞方法

代码语言:javascript复制
import os
import time
import threading

def task():

    print(t.getName())
    print(threading.current_thread().name)
    print('task')
    time.sleep(3)



t = threading.Thread(target=task, name='LoopThread')

t.start()

t.join()

print(threading.current_thread().name)

线程守护

当定义子线程为守护线程的话,当主线程结束了,不管子线程是否执行完,都会被直接给暂停掉。默认daemon为False

代码语言:javascript复制
def task():

    print(t.getName())
    print(threading.current_thread().name)
    print('task')
    time.sleep(3)



t = threading.Thread(target=task, name='LoopThread')

t.daemon = True

t.start()

run方法

run() 方法并不启动一个新线程,就是在主线程中调用了一个普通函数而已

线程池

代码语言:javascript复制
pip instll threadpool
代码语言:javascript复制
import threadpool
import time

def sayhello (a):
    print("hello: " a)
    time.sleep(2)

def main():
    global result
    seed=["a","b","c"]
    start = time.time()

    task_pool = threadpool.ThreadPool(5)
    requests = threadpool.makeRequests(sayhello,seed)

    for req in requests:
        task_pool.putRequest(req)
    task_pool.wait()
    end = time.time()
    time_m = end-start
    print("time: " str(time_m))

    start1 = time.time()
    for each in seed:
        sayhello(each)
    end1 = time.time()
    print("time1: " str(end1-start1))

if __name__ == '__main__':
    main()

线程锁

通过上面的结果比较可以知道,当多线程中需要“独占资源”的时候,要使用锁来控制,防止多个线程同时占用资源而出现其他异常 使用锁的时候就调用acquire()方法,以此告诉其他线程,我正在占用该资源,你们要等会;待使用资源后需要释放资源的时候就调用release()方法, 告诉其他线程,我已经完成使用该资源了,其他人可以过来使用了

代码语言:javascript复制
import threading
import time

lock = threading.Lock()
l = []


def test1(n):
    lock.acquire()
    l.append(n)
    print(l)
    lock.release()

def main():
    for i in xrange(0, 10):
        th = threading.Thread(target=test1, args=(i,))
        th.start()    

队列

队列一般是先进先出

代码语言:javascript复制
import queue

# q = queue.Queue() # 先进先出
q = queue.LifoQueue() # 先进后出

q = queue.PriorityQueue() # 优先级队列,

# q.put(1)
# q.put(2)
# q.put(3)


q.put((1,'11')) # 优先级队列添加的参数是一个元组,元组元素为优先
q.put((2,'22'))
q.put((3,'33'))
q.put((4,'44'))
print(q.get())

线程队列

代码语言:javascript复制
import queue
import threading
import time

exitFlag = 0

count = 1

class myThread (threading.Thread):
    def __init__(self, threadID, name, q):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.q = q

    def run(self):
        print ("开启线程:"   self.name)
        process_data(self.name, self.q)
        print ("退出线程:"   self.name)

def process_data(threadName, q):
    while not exitFlag:
        queueLock.acquire()
        global count
        count = count   1
        print(count)
        if not workQueue.empty():
            data = q.get()
            queueLock.release()
            print ("%s processing %s" % (threadName, data))
        else:
            queueLock.release()
        time.sleep(1)

threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = queue.Queue(10)
threads = []
threadID = 1

# 创建新线程
for tName in threadList:
    thread = myThread(threadID, tName, workQueue)
    thread.start()
    threads.append(thread)
    threadID  = 1

# 填充队列
queueLock.acquire()
for word in nameList:
    workQueue.put(word)
queueLock.release()

# 等待队列清空
while not workQueue.empty():
    pass

# 通知线程是时候退出
exitFlag = 1

# 等待所有线程完成
for t in threads:
    t.join()
print ("退出主线程")
        threading.Thread.__init__(self)

全局线程对象

一个线程的数据可以全局共享,local_school是一个字典,可以通过set和get方法操作数据

代码语言:javascript复制
import threading
# 创建全局ThreadLocal对象:
local_school = threading.local()

def process_student():
    # 获取当前线程关联的student:
    std = local_school.student
    print('Hello, %s (in %s)' % (std, threading.current_thread().name))


def process_thread(name):
    # 绑定ThreadLocal的student:
    local_school.student = name
    process_student()

t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

输出:
Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)

进程 vs 线程

  • 多进程模式最大的优点就是稳定性高,因为一个子进程崩溃了,不会影响主进程和其他子进程
  • 多进程模式的缺点是创建进程的代价大,在Unix/Linux系统下
  • 多线程模式通常比多进程快一点,但是也快不到哪去,而且,多线程模式致命的缺点就是任何一个线程挂掉都可能直接造成整个进程崩溃,因为所有线程共享进程的内存

线程切换

线程切换是有代价的,多任务一旦多到一个限度,就会消耗掉系统所有的资源,结果效率急剧下降,所有任务都做不好

计算密集型 vs IO密集型

是否采用多任务的第二个考虑是任务的类型。我们可以把任务分为计算密集型和IO密集型。 计算密集型任务的特点是要进行大量的计算,消耗CPU资源,比如计算圆周率、对视频进行高清解码等等,全靠CPU的运算能力。这种计算密集型任务虽然也可以用多任务完成,但是任务越多,花在任务切换的时间就越多,CPU执行任务的效率就越低,所以,要最高效地利用CPU,计算密集型任务同时进行的数量应当等于CPU的核心数。 计算密集型任务由于主要消耗CPU资源,因此,代码运行效率至关重要。Python这样的脚本语言运行效率很低,完全不适合计算密集型任务。对于计算密集型任务,最好用C语言编写。 第二种任务的类型是IO密集型,涉及到网络、磁盘IO的任务都是IO密集型任务,这类任务的特点是CPU消耗很少,任务的大部分时间都在等待IO操作完成(因为IO的速度远远低于CPU和内存的速度)。对于IO密集型任务,任务越多,CPU效率越高,但也有一个限度。常见的大部分任务都是IO密集型任务,比如Web应用。 IO密集型任务执行期间,99%的时间都花在IO上,花在CPU上的时间很少,因此,用运行速度极快的C语言替换用Python这样运行速度极低的脚本语言,完全无法提升运行效率。对于IO密集型任务,最合适的语言就是开发效率最高(代码量最少)的语言,脚本语言是首选,C语言最差。

0 人点赞