python多线程

2020-05-08 15:41:18 浏览数 (1)

1 多进程

代码语言:javascript复制
# 多进程,
import os
import time
from multiprocessing import Process
# 启动时必须在 if __name__ 判断下,windows 必须,其他 无限制
# =================================================
# def func(args):
#     print("子进程:",os.getpid())
#     print("子进程的父进程:",os.getppid())
#     time.sleep(10)
#     print("子进程结束")
# if __name__ =="__main__":
#     p = Process(target=func,args=(1,))  # 注册 并传入元祖 元祖有一个参数要加逗号
#     # p是进程对象
#     p.start() # 开启子进程
#     print("主进程:", os.getpid())
#     print("主进程的父进程:",os.getppid()) # cmd 或者是 pycharm
# 生命周期
#   主进程长:自己执行完结束
#   子进程长:等待子进程结束
# =================================================
# 多进程中的方法
# join
# def fun(arg1,arg2):
#     print('*'*arg1)
#     # time.sleep(5)
#     print('*'*arg2)
# if __name__ == "__main__":
#     p = Process(target=fun,args=(10,20))
#     p.start()
#     # p.join() # 感知子进程结束
#     # time.sleep(1)
#     print("all is stop")
#
#     print("最后的语句")
#     os.walk(r"目录") # 返回 文件夹中文件名字
# =================================================
# def fun():
#     print("xxx")
# if __name__ == "__main__":
#     for i in range(10):
#         p = Process(target=fun)
#         p.start()
#         p.join()  # 停止for循环 进程结束后继续
#         print("for")
#     print("主进程")
# =================================================
# 第二种方法
# class MyProcess(Process):
#     def __init__(self,args):
#         super().__init__()  # 若要传递参数,需要调用父类init
#
#     def run(self):
#         print("子进程",self.__dict__)
#         print(self.pid)
# if __name__ == "__main__":
#     print("主进程:",os.getpid())
#     p1 = MyProcess()
#     p1.start()
# =================================================
# 进程之间数据是隔离,命名空间不通
# def fun():
#     global n
#     n= 0
#     print("pid:%s" %os.getpid(),n)
# if __name__ == "__main__":
#     n=100
#     p = Process(target=fun)
#     p.start()
#     p.join()
#     print(n) # -->100
# =================================================
# 多进程tcp连接
# import socket
# # 客户端
# sk = socket.socket()
# sk.connect(("127.0.0.1",8080))
# sk.send('N好'.encode("utf8"))
# msg = sk.recv(1024).decode("utf8")
# print(msg)
# sk.close()
#
# # 服务端
# def server(conn):
#     ret= "你好".encode("utf8")
#     conn.send(ret)
#     msg = conn.recv(1024).decode("utf8")
#     print(msg)
#     conn.close()
#
# sk = socket.socket()
# sk.bind(("127.0.0.1",8080))
# sk.listen()
# if __name__ == "__main__":
#     while True:
#         conn, addr = sk.accept()
#         p = Process(target=server,args=(conn,))
#         p.start()
# =================================================
# 守护进程
# 默认情况 父进程 等待子进程结束
# p.daemon = True 在start前,设置为守护进程,守护进程随父进程(代码执行完毕)结束
#   若父进程在等待 子进程(非守护进程时) ,若父进程代码完毕,守护进程应该结束
# p.is_alive() 判断进程是否存活
# p.terminate() 终止进程
# =================================================
# 锁
# 未加锁实例:
# 火车票
import json
import time
from multiprocessing import Process
# def show(i):
#     with open('ticket') as f:
#         dic = json.load(f)
#     print('余票: %s'%dic['ticket'])
​
def buy_ticket(i):
    with open('ticket') as f:
        dic = json.load(f)
        time.sleep(0.1)
    if dic['ticket'] > 0 :
        dic['ticket'] -= 1
        print('33[32m%s买到票了33[0m'%i)
    else:
        print('33[31m%s没买到票33[0m'%i)
    time.sleep(0.1)
    with open('ticket','w') as f:
        json.dump(dic,f)
if __name__ == '__main__':
    # for i in range(10):
    #     p = Process(target=show,args=(i,))
    #     p.start()
    for i in range(10):
        p = Process(target=buy_ticket, args=(i))
        p.start()
# =================================================
# 锁
# 加锁实例
# 火车票
import json
import time
from multiprocessing import Process
from multiprocessing import Lock
​
# def show(i):
#     with open('ticket') as f:
#         dic = json.load(f)
#     print('余票: %s'%dic['ticket'])
​
def buy_ticket(i,lock):
    lock.acquire() #拿钥匙进门
    with open('ticket') as f:
        dic = json.load(f)
        time.sleep(0.1)
    if dic['ticket'] > 0 :
        dic['ticket'] -= 1
        print('33[32m%s买到票了33[0m'%i)
    else:
        print('33[31m%s没买到票33[0m'%i)
    time.sleep(0.1)
    with open('ticket','w') as f:
        json.dump(dic,f)
    lock.release()      # 还钥匙
​
if __name__ == '__main__':
    # for i in range(10):
    #     p = Process(target=show,args=(i,))
    #     p.start()
    lock = Lock()
    for i in range(10):
        p = Process(target=buy_ticket, args=(i,lock))
        p.start()
# =================================================
# =================================================
# =================================================

2 信号量_事件

代码语言:javascript复制
# 多进程中的组件
# 一个资源 同一时间 被n个人访问
import time
import random
from multiprocessing import Process,Event
# ==============================
# 未用信号量
# def ktv(i):
#     print('%s走进ktv'%i)
#     time.sleep(random.randint(1,5))
#     print('%s走出ktv'%i)
# if __name__ == '__main__' :
#     for i in range(20):
#         p = Process(target=ktv,args=(i))
#         p.start()
# ==============================
from multiprocessing import Semaphore
​
# sem = Semaphore(4)
# sem.acquire()
# print('拿到第一把钥匙')
# sem.acquire()
# print('拿到第二把钥匙')
# sem.acquire()
# print('拿到第三把钥匙')
# sem.acquire()
# print('拿到第四把钥匙')
# sem.acquire()
# print('拿到第五把钥匙')
# def ktv(i,sem):
#     sem.acquire()    #获取钥匙
#     print('%s走进ktv'%i)
#     time.sleep(random.randint(1,5))
#     print('%s走出ktv'%i)
#     sem.release()    # 释放钥匙
#
#
# if __name__ == '__main__' :
#     sem = Semaphore(4)
#     for i in range(20):
#         p = Process(target=ktv,args=(i,sem))
#         p.start()
​
# ==============================
# 事件
#   信号是控制进程阻塞与否
#   事件创建后,默认是阻塞状态
# e = Event() # 创建事件
# e.is_set()  # False 默认阻塞
# print("xx") # 可打印 e.set() 设置为True e.clear() 设置为False
# e.wait()
# print("xx") # 阻塞
# 遇到wait()会判断is_set() 为False 阻塞
# ==============================
# 红绿灯事件
def cars(e,i):
    if not e.is_set():
        print("car%i在等待" % i)
        e.wait()
    print("car%i通过" % i)
​
​
def light(e):
    while True:
        if e.is_set():
            e.clear()
            print("33[31m红灯33[0m")
        else:
            e.set()
            print("33[32m绿灯33[0m")
        time.sleep(2)
if __name__ == "__main__":
    e = Event()
    p =Process(target=light,args=(e,))
    p.start()
    for i in range(1,21):
        car = Process(target=cars,args=(e,i))
        car.start()
        time.sleep(random.random())
# ==============================
# ==============================
# ==============================
# ==============================
# def test(e):
#     e.set()
#     print("xxx")
# if __name__=="__main__":
#     e = Event()
#     print(e.is_set())
#     p = Process(target=test,args=(e,))
#     p.start()
#     e.wait()
#     print("注")
​

3 进程通信_队列管道

代码语言:javascript复制
# IPC 内部进程通信,不能使用普通queue
from multiprocessing import Queue,Process
# ===============================
# q = Queue(5)  # 队列大小
# q.put(1)
# q.put(1)
# q.full()  # 若队列满了,阻塞等待
# q.get()
# q.empty()  # 若为空,阻塞等待有数据 后取值
# q.get()
# q.get_nowait()  # 用于跳过等待,需要用try
# ===============================
# def produce(q):
#     q.put('hello')
# def consume(q):
#     print(q.get())
# if __name__ =="__main__":
#     q = Queue()
#     p = Process(target=produce,args=(q,))
#     p.start()
#     p2 = Process(target=consume, args=(q,))
#     p2.start()
# ===============================
# 生产者消费者模型
# 若生产者,生产有数量,消费者,不停消费,最后消费进程会处于等待状态
# 可在主进程后边join生产进程,消费进程判断为空,但不准确
#   需要在队列put(None) 子进程判断,由于数据之间不能共享,需要put 消费数量的None
# ===============================
from multiprocessing import JoinableQueue
# consume :
#     ....
#     q.task_done()
# produce :
#     ...
#     q.join()
# ===============================
#  循环通知,致使进程结束
# JoinableQueue
# 生产者生产,不停不停消费,若q为空 一直等待,
#   生产者完毕后,会join等待消费值消费完毕,因为是同一个q,一个生产者完毕后,其他还没有完毕q会处于,他会处于阻塞
#   等待 消费者 全部消费完毕,q.join()会感知,因此 生产进程会结束,主进程最后join生产进程,生产结束
#   主进程就结束,身为守护进程的子进程也结束
# import time
# import random
# from multiprocessing import Process,JoinableQueue
# def consumer(q,name):
#     while True:
#         food = q.get()
#         print('33[31m%s消费了%s33[0m' % (name,food))
#         time.sleep(random.randint(1,3))
#         q.task_done()     # count - 1
#
# def producer(name,food,q):
#     for i in range(4):
#         time.sleep(random.randint(1,3))
#         f = '%s生产了%s%s'%(name,food,i)
#         print(f)
#         q.put(f)
#     q.join()    # 阻塞  直到一个队列中的所有数据 全部被处理完毕
#
# if __name__  == '__main__':
#     q = JoinableQueue(20)
#     p1 = Process(target=producer,args=('Egon','包子',q))
#     p2 = Process(target=producer, args=('wusir','泔水', q))
#     c1 = Process(target=consumer, args=(q,'alex'))
#     c2 = Process(target=consumer, args=(q,'jinboss'))
#     p1.start()
#     p2.start()
#     c1.daemon = True   # 设置为守护进程 主进程中的代码执行完毕之后,子进程自动结束
#     c2.daemon = True
#     c1.start()
#     c2.start()
#     p1.join()
#     p2.join()      # 感知一个进程的结束
​
#  在消费者这一端:
    # 每次获取一个数据
    # 处理一个数据
    # 发送一个记号 : 标志一个数据被处理成功
​
# 在生产者这一端:
    # 每一次生产一个数据,
    # 且每一次生产的数据都放在队列中
    # 在队列中刻上一个记号
    # 当生产者全部生产完毕之后,
    # join信号 : 已经停止生产数据了
                # 且要等待之前被刻上的记号都被消费完
                # 当数据都被处理完时,join阻塞结束
​
# consumer 中把所有的任务消耗完
# producer 端 的 join感知到,停止阻塞
# 所有的producer进程结束
# 主进程中的p.join结束
# 主进程中代码结束
# 守护进程(消费者的进程)结束
# ===============================
# 管道 双向通信工具
from multiprocessing import Pipe
# conn,conn2 = Pipe()
# conn.send("123456")  # 不用字节
# print(conn2.recv())  # 不用指定大小
​
def fun(conn):
    conn.send("hello")
if __name__=="__main__":
    conn1,conn2  = Pipe()
    Process(target=fun,args=(conn1,)).start()
    print(conn1.recv())
# 管道返回2个连接
#    conn1, conn2
# P    发送 接受
# p2   接受 发送
# 若只发数据,可关闭一端,若取数据的时候,对面已关闭,则报错,看根据此 终止程序
from multiprocessing import Pipe,Process
# def func(conn1,conn2):
#     conn2.close()
#     while True:
#         try :
#             msg = conn1.recv()
#             print(msg)
#         except EOFError:
#             conn1.close()
#             break
#
# if __name__ == '__main__':
#     conn1, conn2 = Pipe()
#     Process(target=func,args = (conn1,conn2)).start()
#     conn1.close()
#     for i in range(20):
#         conn2.send('吃了么')
#     conn2.close()
# ===============================
# from multiprocessing import Lock,Pipe,Process
# def producer(con,pro,name,food):
#     con.close()
#     for i in range(100):
#         f = '%s生产%s%s'%(name,food,i)
#         print(f)
#         pro.send(f)
#     pro.send(None)
#     pro.send(None)
#     pro.send(None)
#     pro.close()
#
# def consumer(con,pro,name,lock):
#     pro.close()
#     while True:
#             lock.acquire()  # 不安全主要是recv的时候,因此两端加锁即可
#             food = con.recv()
#             lock.release()
#             if food is None:
#                 con.close()
#                 break
#             print('%s吃了%s' % (name, food))
# if __name__ == '__main__':
#     con,pro = Pipe()
#     lock= Lock()
#     p = Process(target=producer,args=(con,pro,'egon','泔水'))
#     c1 = Process(target=consumer, args=(con, pro, 'alex',lock))
#     c2 = Process(target=consumer, args=(con, pro, 'bossjin',lock))
#     c3 = Process(target=consumer, args=(con, pro, 'wusir',lock))
#     c1.start()
#     c2.start()
#     c3.start()
#     p.start()
#     con.close()
#     pro.close()
​
# from multiprocessing import Process,Pipe,Lock
#
# def consumer(produce, consume,name,lock):
#     produce.close()
#     while True:
#         lock.acquire()
#         baozi=consume.recv()
#         lock.release()
#         if baozi:
#             print('%s 收到包子:%s' %(name,baozi))
#         else:
#             consume.close()
#             break
#
# def producer(produce, consume,n):
#     consume.close()
#     for i in range(n):
#         produce.send(i)
#     produce.send(None)
#     produce.send(None)
#     produce.close()
#
# if __name__ == '__main__':
#     produce,consume=Pipe()
#     lock = Lock()
#     c1=Process(target=consumer,args=(produce,consume,'c1',lock))
#     c2=Process(target=consumer,args=(produce,consume,'c2',lock))
#     p1=Process(target=producer,args=(produce,consume,30))
#     c1.start()
#     c2.start()
#     p1.start()
#     produce.close()
#     consume.close()
​
# pipe 数据不安全性
# IPC
# 加锁来控制操作管道的行为 来避免进程之间争抢数据造成的数据不安全现象
​
# 队列 进程之间数据安全的
#   管道   锁

3_进程通信__数据共享

代码语言:javascript复制
from multiprocessing import Manager,Process,Lock
# 进程之间不能传递数据,通过方法的参数可以传过去,但修改后,无反应,不知道原因
def main(dict,lock):
    lock.acquire()
    dict['count']-=1
    lock.release()
if __name__ == "__main__":
    m = Manager()
    l = Lock()
    dict = m.dict({"count":100})
    p_list=[]
    for i in range(50):
        p = Process(target=main,args=(dict,l))
        p.start()
        p_list.append(p)
    for i in p_list:i.join()
    print(dict['count'])

4_进程池

代码语言:javascript复制
# 进程池
# 上一个例子中50个进程很慢
#   寄存器 堆栈 文件
#   操作系统调度,cup切换
# 高级线程池有数量限定 ,有最低,任务变多的时候 逐步加到最高限制
import os,time
from multiprocessing import Pool,Manager
# ==================================
# def func2(i):
#     print(os.getpid(),os.getppid())
#     i 1
# def func(list):
#     list[1]['set'].add(os.getpid())
#     print(len(list[1]['set']))
# # 一般超过5个使用pool
# if __name__ == "__main__":
#     pid = Manager()
#     dict1 = pid.dict({"set":set()})
#     pool = Pool(5)
#     # 执行不同的任务,map 自带join方法
#     #pool.map(func2,range(100))
#     pool.map(func, [[i,dict1]for i in range(100)])
#     print(len(dict1['set']))
# """
# 等于
#     for i in range(100):
#         p = Process(target=func,args=(i,))
#         p.start()
# """
# ==================================
# def fun(n):
#     print("start fun%s" %n,os.getpid())
#     time.sleep(1)
#     print("end fun%s" % n, os.getpid())
# if __name__ == "__main__":
#     p = Pool() # 默认cup核心数量
#     for i in range(10):
#         #p.apply(fun,args=(i,)) # 同步提交的
#         p.apply_async(fun,args=(i,)) # 异步提交,真的异步,因此需要join
#     p.close()  # 不再接受新的任务
#     p.join()  # 感知进程池中任务结束 保持 主进程 与子进程同步
​
# ==================================
# import socket
# from multiprocessing import Pool
#
# def func(conn):
#     conn.send(b'hello')
#     print(conn.recv(1024).decode('utf-8'))
#     conn.close()
#
# if __name__ == '__main__':
#     p = Pool(5)
#     sk = socket.socket()
#     sk.bind(('127.0.0.1',8080))
#     sk.listen()
#     while True:
#         conn, addr = sk.accept()
#         p.apply_async(func,args=(conn,))
#     sk.close()
#     import socket
#
#     sk = socket.socket()
#     sk.connect(('127.0.0.1', 8080))
#
#     ret = sk.recv(1024).decode('utf-8')
#     print(ret)
#     msg = input('>>>').encode('utf-8')
#     sk.send(msg)
#     sk.close()
# ==================================
# 进程池的返回值
# p = Pool()
# p.map(funcname,iterable)     默认异步的执行任务,且自带close和join
# p.apply   同步调用的
# p.apply_async 异步调用 和主进程完全异步 需要手动close 和 join
# from multiprocessing import Pool
# def func(i):
#     return i*i
#
# if __name__ == '__main__':
#     p = Pool(5)
#     for i in range(10):
#         res = p.apply(func,args=(i,))   # apply的结果就是func的返回值
#         print(res) --> 直接就是返回值
# ==================================
# import time
# from multiprocessing import Pool
# def func(i):
#     time.sleep(0.5)
#     return i*i
#
# if __name__ == '__main__':
#     p = Pool(5)
#     res_l = []
#     for i in range(10):
#         res = p.apply_async(func,args=(i,))   # apply的结果就是func的返回值
#         res_l.append(res)
#   若在for 中直接获取res.get()会在成阻塞,程序变同步执行
#     for res in res_l:print(res.get())# 等着 func的计算结果
#    调用res.get时返回
# ==================================
# import time
# from multiprocessing import Pool
# def func(i):
#     time.sleep(0.5)
#     return i*i
#
# if __name__ == '__main__':
#     p = Pool(5)
#     ret = p.map(func,range(100))
#     print(ret)  #  -> 直接返回全部,列表返回
# 自带join,close 最后一起返回
# ====================================
# 回调函数 , 回调的函数在主进程调用
# 对于子进程中再起子进程问题,还不知道
# 每个进程的回调函数 交给主进程顺序执行
import os
from multiprocessing import Pool,Process
def func2(nn):
    print('in func2',os.getpid())
    print(nn)
def func3(n):
    print('in func3', os.getpid())
    return n*n
def func1(n):
    print('in func1',os.getpid())
    p = Pool(5)
    p.apply_async(func3, args=(10,), callback=func2)
    p.close()
    p.join()
    return n*n
if __name__ == '__main__':
    print('主进程 :',os.getpid())
    p = Pool(5)
    p.apply_async(func1,args=(10,),callback=func2)
    p.close()
    p.join()
# ===================================================
import requests
from urllib.request import urlopen
from multiprocessing import Pool
# 200 网页正常的返回
# 404 网页找不到
# 502 504
# 场景:callback 耗时段,远小于网络延时,此时使用,在主进程运行,
def get(url):
    response = requests.get(url)
    if response.status_code == 200:
        return url, response.content.decode('utf-8')
​
​
def get_urllib(url):
    ret = urlopen(url)
    return ret.read().decode('utf-8')
​
​
def call_back(args):
    url, content = args
    print(url, len(content))
​
​
if __name__ == '__main__':
    url_lst = [
        'https://www.cnblogs.com/',
        'http://www.baidu.com',
        'https://www.sogou.com/',
        'http://www.sohu.com/',
    ]
    p = Pool(5)
    for url in url_lst:
        p.apply_async(get, args=(url,), callback=call_back) # callback 中的参数为 get函数的返回值
    p.close()
    p.join()

4_线程

两者之间应该有对应关系1:1 1:n

linux 中的nptl 1对1 线程

代码语言:javascript复制
# 同一进程的线程间的数据共享的,共享的 共享的
#   可通过直接访问全局变量 global,还需要进程同步
#   创建,切换,撤销 相比进程 消耗小,轻量级
#   进程:资源分配单位,每个进程 至少一个线程
#   线程:cup调度单位
# thread 基本模块,避免使用,可能与threading 冲突
# threading thread的高级版本
# Queue 多线程之间共享数据的数据结构
# 与进程类似,好多方法相同
import time
from threading import Thread
import threading
# def func(n):
#     time.sleep(1)
#     print(n)
# t = Thread(target=func,args=(12,))
# t.daemon = True # 成为"守护线程"
# t.start()
# print("主线程") # 默认情况等待子线程结束
# ===================================
# class MyThread(Thread):
#     def __init__(self,name):
#         super().__init__()
#         self.name = name
#     def run(self):
#         # time.sleep(1)
#         print(self.name)
# MyThread("段志方").start()
# ================================
# GIL 锁的是线程,同一时间 只有一个线程 ,cpython解释器的问题,jpython 就不会
# 对于io密集型 没什么区别,只要io时会切换即可
# 但对于多核cup python 同时只能运行一个cup ,其他语言的会运行多个,因此...
# 即不能通过物理核心数增加速度,不能实现(并行)
# ============================================
# 多线程socket 可以input
# import socket
# from threading import Thread
# def chat(conn):
#     conn.send(b'hello')
#     msg = conn.recv(1024).decode('utf-8')
#     print(msg)
#     conn.close()
# sk = socket.socket()
# sk.bind(('127.0.0.1',8080))
# sk.listen()
# while True:
#     conn,addr = sk.accept()
#     Thread(target=chat,args = (conn,)).start()
# sk.close()
#
# import socket
# sk = socket.socket()
# sk.connect(('127.0.0.1',8080))
# msg = sk.recv(1024)
# print(msg)
# inp = input('>>> ').encode('utf-8')
# sk.send(inp)
# sk.close()
# =========================
# print(threading.current_thread()) # 当前线程
# print(threading.active_count()) # 全部线程,包括主线程
# print(threading.enumerate())  # 列表返回全部线程对象
# ==========================================
# 守护线程
# import time
# from threading import Thread
# def func1():
#     while True:
#         print('*'*10)
#         time.sleep(1)
# def func2():
#     print('in func2')
#     time.sleep(5)
#
# t = Thread(target=func1,)
# t.daemon = True
# t.start()
# t2 = Thread(target=func2,)
# t2.start()
# t2.join()
# print('主线程')
​
# (守护进程)随着(主进程代码)的执行结束而结束
# 守护(线程)会在主线程结束之后等待(其他非守护子线程)的结束才结束
​
# 主进程在执行完自己的代码之后不会立即结束 而是等待子进程结束之后 回收子进程的资源
# import time
# from multiprocessing import Process
# def func():
#     time.sleep(5)
#
# if __name__ == '__main__':
#         Process(target=func).start()
# =========================================
# 线程锁 ,与gil无关
import time
from threading import Lock,Thread
# Lock 互斥锁
# def func(lock):
#     global n
#     lock.acquire()
#     temp = n
#     time.sleep(0.2)
#     n = temp - 1
#     lock.release()
#
# n = 10
# t_lst = []
# lock = Lock()
# for i in range(10):
#     t = Thread(target=func,args=(lock,))
#     t.start()
#     t_lst.append(t)
​
# for t in  t_lst: t.join()
# print(n)
​
# 科学家吃面   还会死锁
​
# noodle_lock  = Lock()
# fork_lock = Lock()
# def eat1(name):
#     noodle_lock.acquire()
#     print('%s拿到面条啦'%name)
#     fork_lock.acquire()
#     print('%s拿到叉子了'%name)
#     print('%s吃面'%name)
#     fork_lock.release()
#     noodle_lock.release()
#
# def eat2(name):
#     fork_lock.acquire()
#     print('%s拿到叉子了'%name)
#     time.sleep(1)
#     noodle_lock.acquire()
#     print('%s拿到面条啦'%name)
#     print('%s吃面'%name)
#     noodle_lock.release()
#     fork_lock.release()
#
# Thread(target=eat1,args=('alex',)).start()
# Thread(target=eat2,args=('Egon',)).start()
# Thread(target=eat1,args=('bossjin',)).start()
# Thread(target=eat2,args=('nezha',)).start()
# ===============================================
from threading import RLock   # 递归锁
fork_lock = noodle_lock  = RLock()
# 一个钥匙串上的两把钥匙,同一个lock 在一个线程中可又多次acquire
# 传给其他线程时 不能被acquire
# def eat1(name):
#     print(name)
#     noodle_lock.acquire()            # 一把钥匙
#     print('%s拿到面条啦'%name)
#     fork_lock.acquire()
#     print('%s拿到叉子了'%name)
#     print('%s吃面'%name)
#     fork_lock.release()
#     noodle_lock.release()
#
# def eat2(name):
#     print(name)
#     fork_lock.acquire()
#     print('%s拿到叉子了'%name)
#     time.sleep(1)
#     noodle_lock.acquire()
#     print('%s拿到面条啦'%name)
#     print('%s吃面'%name)
#     noodle_lock.release()
#     fork_lock.release()
# Thread(target=eat1,args=('alex',)).start()
# Thread(target=eat2,args=('Egon',)).start()
# Thread(target=eat1,args=('bossjin',)).start()
# Thread(target=eat2,args=('nezha',)).start()
# =================================================

5_线程_信号量_事件_条件_定时器_列队_线程池

代码语言:javascript复制
import time
from threading import Semaphore,Thread
# ====================================
# def func(sem,a,b):
#     sem.acquire()
#     time.sleep(1)
#     print(a b)
#     sem.release()
# sem = Semaphore(4)
# for i in range(10):
#     t = Thread(target=func,args=(sem,i,i 5))
#     t.start()
# ====================================
# 事件被创建的时候
# False状态
    # wait() 阻塞
# True状态
    # wait() 非阻塞
# clear 设置状态为False
# set  设置状态为True
#  数据库 - 文件夹
#  文件夹里有好多excel表格
    # 1.能够更方便的对数据进行增删改查
    # 2.安全访问的机制
#  起两个线程
#  第一个线程 : 连接数据库
        # 等待一个信号 告诉我我们之间的网络是通的
        # 连接数据库
#  第二个线程 : 检测与数据库之间的网络是否连通
        # time.sleep(0,2) 2
        # 将事件的状态设置为True
# import time
# import random
# from threading import Thread,Event
# def connect_db(e):
#     count = 0
#     while count < 3:
#         e.wait(0.5)   # 状态为False的时候,我只等待1s就结束
#         if e.is_set() == True:
#             print('连接数据库')
#             break
#         else:
#             count  = 1
#             print('第%s次连接失败'%count)
#     else:
#         raise TimeoutError('数据库连接超时')
# def check_web(e):
#     time.sleep(random.randint(0,3))
#     e.set()
# e = Event()
# t1 = Thread(target=connect_db,args=(e,))
# t2 = Thread(target=check_web,args=(e,))
# t1.start()
# t2.start()
# ====================================
# 条件 复杂的锁
# 条件
from threading import Condition
# 条件
# 锁
# acquire release
# 一个条件被创建之初 默认有一个(False)状态
# False状态 会影响wait一直处于等待状态
# notify(int数据类型)  造钥匙
# from threading import Thread,Condition
# def func(con,i):
#     con.acquire()
#     con.wait() # 等钥匙
#     print('在第%s个循环里'%i)
#     con.release()
# con = Condition()
# for i in range(10):
#     Thread(target=func,args = (con,i)).start()
# while True:
#     num = int(input('>>>'))
#     con.acquire()
#     con.notify(num)  # 造钥匙
#     con.release()
# ====================================
#定时器
# import time
# from threading import Timer
# def func():
#     print('时间同步')   #1-3
# while True:
#     t = Timer(5,func).start()   # 非阻塞的 ,异步的 ,会把所有的5s在一起
#     time.sleep(5) # 睡5s 每5s进行意思时间同步
# ====================================
# 加锁 麻烦 所以使用队列
#线程通信
# queue
# import queue #直接导入普通queue 是线程安全的
# q = queue.Queue()  # 队列 先进先出
# q.put()
# q.get()
# q.put_nowait()
# q.get_nowait()
# q = queue.LifoQueue()  # 栈 先进后出
# q.put(1)
# q.put(2)
# q.put(3)
# print(q.get())
# print(q.get())
# q = queue.PriorityQueue()  # 优先级队列
# q.put((1,'a'))
# q.put((10,'b'))
# q.put((30,'c'))
# q.put((1,'d'))
# q.put((1,'f'))
# print(q.get())
# 元祖中的元素按顺序比较,数字越小优先级大,祖父按照ascii越小优先级越大
# ====================================
# 线程池
import time
# 以前没有线程池
from concurrent.futures import ThreadPoolExecutor
# ProcessPoolExecutor 该模块下还有一个进程池,与multi 功能相同
# submit(fn,*args,**kwargs) 异步提交任务
# map(fun,*iterables,timeout=None,chunksize - 1) 循环的submit
# shutdown(wait=True) # 等于原来的 close join 合并
# result(time=None) 取得结果
# add_done_callback(fn) 回调函数
def func(n):
    time.sleep(2)
    print(n)
    return n*n
def call_back(m):
    print('结果是 %s'%m.result())
# 若使用进程池 只换ThreadPoolExecutor->ProcessPoolExecutor
tpool = ThreadPoolExecutor(max_workers=5)   #  默认 不要超过cpu个数*5
for i in range(20):
    tpool.submit(func,i).add_done_callback(call_back)
tpool.shutdown()
# tpool.map(func,range(20))  # 拿不到返回值
​
# t_lst = []
# for i in  range(20):
#     t = tpool.submit(func,i)
#     t_lst.append(t)
# tpool.shutdown()  # close join    #
# print('主线程')
# for t in t_lst:print('***',t.result()) # 拿返回值

6_协程

代码语言:javascript复制
# 进程 多个进程,操作系统负责
# 线程 不能同一时间多个cup 其他语言可以,但不影响高io
#   开启线程 创建线程 寄存器 堆栈
#   关闭一个线程
# 协程
#   本质是一个线程
#   能够在多个任务间切换,不需要寄存器,堆栈切换
#   任务之间切换时间开销 远小于线程
#   计算任务之间切换消耗也很大,一般都是遇到io的时候切换
#   进程(cup数 1) 线程(cup数*5) 协程(500)  = 50000
#   适合爬虫
# 实现并发的手段
# import time
# 实现在 con,pro之间来回切换
# def consumer():
#     while True:
#         x = yield
#         time.sleep(1)
#         print('处理了数据 :',x)
#
# def producer():
#     c = consumer()
#     next(c)
#     for i in range(10):
#         time.sleep(1)
#         print('生产了数据 :',i)
#         c.send(i)
#
# producer()
# =============================================
# 真正的协程模块就是使用greenlet完成的切换
from greenlet import greenlet
# def eat():
#     print('eating start')
#     g2.switch()
#     print('eating end')
#     g2.switch()
#
# def play():
#     print('playing start')
#     g1.switch()
#     print('playing end')
# g1 = greenlet(eat)  # 必须先有g1 ,g2 函数中才能使用g
# g2 = greenlet(play) # 不会自动切换
# g1.switch()
# ======================================
#  不能感知time.sleep(1)
# 可以感知gevent.sleep(1),在第一行引入 如下from...
# 后边的time 都会经过特殊处理,time.sleep() 就可以被识别
# from gevent import monkey;monkey.patch_all()
# import time
# import gevent
# import threading
# def eat():
#     DummyThread-1 虚拟的线程
#     print(threading.current_thread().getName())
#     print(threading.current_thread())
#     print('eating start')
#     time.sleep(1)
#     print('eating end')
#
# def play():
#     DummyThread-2 虚拟的线程
#     print(threading.current_thread().getName())
#     print(threading.current_thread())
#     print('playing start')
#     time.sleep(1)
#     print('playing end')
#
# g1 = gevent.spawn(eat) # 注册进入,会自动切换,不是操作系统调度
# g2 = gevent.spawn(play) # gevent 负责协程的调度 通过封装的greenlet switch
# g1.join()  gevent 是完全异步的  join等待协程结束
# g2.join()
# 进程和线程的任务切换右操作系统完成
# 协程任务之间的切换由程序(代码)完成,只有遇到协程模块能识别的IO操作,(时间片等不识别)的时候,程序才会进行任务切换,实现并发的效果
# ========================================
# 同步 和 异步
# from gevent import monkey;monkey.patch_all() # 放最前面
# import time
# import gevent
# def task(n):
#     time.sleep(1)
#     print(n)
# def sync(): # 同步
#     for i in range(10):
#         task(i)
# def async(): # 异步
#     g_lst = []
#     for i in range(10):
#         g = gevent.spawn(task,i)
#         g_lst.append(g)
#     gevent.joinall(g_lst)   #两种方法都可
#     for g in g_lst:g.join()
# ======================================
# 协程 : 能够在一个线程中实现并发效果的概念
    #    能够规避一些任务中的IO操作
    #    在任务的执行过程中,检测到IO就切换到其他任务
​
# 多线程 被弱化了
# 协程 在一个线程上 提高CPU 的利用率
# 协程相比于多线程的优势 切换的效率更快
# ==========================================
# 爬虫的例子
# 请求过程中的IO等待
# from gevent import monkey;monkey.patch_all()
# import gevent
# from urllib.request import urlopen    # 内置的模块
# urlopen html时有个格式的 reguests 无格式
# def get_url(url):
#     response = urlopen(url)
#     content = response.read().decode('utf-8')
#     return len(content)
#
# g1 = gevent.spawn(get_url,'http://www.baidu.com')
# g2 = gevent.spawn(get_url,'http://www.sogou.com')
# g3 = gevent.spawn(get_url,'http://www.taobao.com')
# g4 = gevent.spawn(get_url,'http://www.hao123.com')
# g5 = gevent.spawn(get_url,'http://www.cnblogs.com')
# gevent.joinall([g1,g2,g3,g4,g5])
# print(g1.value)
# print(g2.value)
# print(g3.value)
# print(g4.value)
# print(g5.value)
​
# ret = get_url('http://www.baidu.com')
# print(ret)
# ======================================
from gevent import monkey;monkey.patch_all()
import socket
import gevent
def talk(conn):
    conn.send(b'hello')
    print(conn.recv(1024).decode('utf-8'))
    conn.close()
​
sk = socket.socket()
sk.bind(('127.0.0.1',8080))
sk.listen()
while True:
    conn,addr = sk.accept()
    gevent.spawn(talk,conn)
sk.close()
​
import socket
sk = socket.socket()
sk.connect(('127.0.0.1',8080))
print(sk.recv(1024))
msg = input('>>>').encode('utf-8')
sk.send(msg)
sk.close()

7_io模型

阻塞模型

非阻塞模型

io多路复用

代码语言:javascript复制
# 同步 提交一个任务之后要等待这个任务执行完毕
# 异步 只管提交任务,不等待这个任务执行完毕就可以做其他事情
# 阻塞 recv recvfrom accept
# 非阻塞

# 阻塞   线程   运行状态 --> 阻塞状态 --> 就绪
# 非阻塞

# IO多路复用
    # select机制  Windows  linux  都是操作系统轮询每一个被监听的项,看是否有读操作
    # poll机制    linux          它可以监听的对象比select机制可以监听的数量多
                                 # 随着监听项的增多,导致效率降低
    # epoll机制   linux           更高级,绑定回调函数,
# =================================
# 以前的都是阻塞io
# =================================
# 非阻塞io实例
# import socket
# sk = socket.socket()
# sk.bind(('127.0.0.1',9000))
# sk.setblocking(False)  # 设置不阻塞
# sk.listen()
# conn_l = []
# del_conn = []
# while True:
#     try:
#         conn,addr = sk.accept()  #不阻塞,但是没人连我会报错
#         print('建立连接了:',addr)
#         conn_l.append(conn)
#     except BlockingIOError:
#         for con in conn_l:
#             try:
#                 msg = con.recv(1024)  # 非阻塞,如果没有数据就报错
#                 if msg == b'':   # 若客户端关闭 会发送空消息
#                     del_conn.append(con)
#                     continue
#                 print(msg)
#                 con.send(b'byebye')
#             except BlockingIOError:pass
#         for con in del_conn:
#             con.close()
#             conn_l.remove(con)
#         del_conn.clear()
# # while True : 10000   500  501
#
# import time
# import socket
# import threading
# def func():
#     sk = socket.socket()
#     sk.connect(('127.0.0.1',9000))
#     sk.send(b'hello')
#     time.sleep(1)
#     print(sk.recv(1024))
#     sk.close()
#
# for i in range(2):
#     threading.Thread(target=func).start()
# =================================
# io 多路复用, 监听列表的循环 变为有操作系统执行
import select
import socket

sk = socket.socket()
sk.bind(('127.0.0.1',8000))
sk.setblocking(False)
sk.listen()

read_lst = [sk] # 监听列表
while True:   # [sk,conn]
    # 等待读列表,写列表,修改列表 都必传
    # 返回元祖中3个列表,对应三个list,一般只用第一个
    # r_lst里面就是sk对象
    r_lst,w_lst,x_lst = select.select(read_lst,[],[])
    for i in r_lst:
        if i is sk:
            conn,addr = i.accept()
            read_lst.append(conn)
        else:
            ret = i.recv(1024)
            if ret == b'':
                i.close()
                read_lst.remove(i)
                continue
            print(ret)
            i.send(b'goodbye!')
            import time
            import socket
            import threading


            def func():
                sk = socket.socket()
                sk.connect(('127.0.0.1', 8000))
                sk.send(b'hello')
                time.sleep(3)
                print(sk.recv(1024))
                sk.close()
            for i in range(20):
                threading.Thread(target=func).start()

# =================================
import selectors # 选择合适的多路复用机制
from socket import *

def accept(sk,mask):
    conn,addr=sk.accept()
    sel.register(conn,selectors.EVENT_READ,read)

def read(conn,mask):
    try:
        data=conn.recv(1024)
        if not data:
            print('closing',conn)
            sel.unregister(conn)
            conn.close()
            return
        conn.send(data.upper() b'_SB')
    except Exception:
        print('closing', conn)
        sel.unregister(conn)
        conn.close()

sk=socket()
sk.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
sk.bind(('127.0.0.1',8088))
sk.listen(5)
sk.setblocking(False) #设置socket的接口为非阻塞

sel = selectors.DefaultSelector()   # 自动选择一个适合我的IO多路复用的机制
sel.register(sk,selectors.EVENT_READ,accept)
#相当于网select的读列表里append了一个sk对象,并且绑定了一个回调函数accept
# 说白了就是 如果有人请求连接sk,就调用accrpt方法

while True:
    events=sel.select() #检测所有的sk,conn,是否有完成wait data阶段
    for sel_obj,mask in events:  # [conn]
        callback=sel_obj.data #callback=read
        callback(sel_obj.fileobj,mask) #read(conn,1)

pymysql

代码语言:javascript复制
import pymysql
# 连接
conn = pymysql.connect(
    host="106.15.39.74",
    port=3306,
    database="test",
    user="root",
    password="dzf123,.",
    charset="utf8" # 没有"-" 没有
)
cursor = conn.cursor()
sql = "select*from student"
name = "dzf"
password = "123456"
sql = "select * from student where name = %s and password = %s"
ret = cursor.execute(sql,[name,password])
# 自己拼接需要加引号,使用防注入sql不用加引号,参数不能少,多
#print(cursor.lastrowid) # 获取刚插入数据的id 应该就是主键 自增的那个,与名字无关
print(ret) # 返回受影响行数
ret = cursor.fetchall() # 元祖 大元组里边小元祖
print(ret,"a")
ret = cursor.fetchone() # 取一条数据
print(ret,"a")
ret = cursor.fetchone()
print(ret,"a")
# 直接返回一条元素,格式是 小元祖,或只有list中的一个小字典,外边没有元祖或list
# 若连续fetchone() 第一次第一条,第二次第二条,一次向下取
# 若取完后 再次 fetchone() 取不到
# -->(('dzf','1234'),('dzf','1234'))
# 在执行语句前 修改cursor格式
cursor.fetchmany(3) # 在cursor位置接下取3条,大元组中小元祖
# 移动光标
cursor.scroll(1,mode="absolute") # 绝对移动 移到1位置,从2开始 ,
cursor.scroll(1,mode="relative") # 相对移动 原来在3 位置,从4 开始读,现在 移动到4 从5开始读
# 向上移可以使用负的
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) # 指定为字典格式
"""
    [
    {'id':1,'name':'dzf'},
    {'id':1,'name':'dzf'},
    ]
"""

cursor.close()
conn.close()
# ====================================
# 插入数据还是用cursor.execute(),注意提交后conn.commit()
# 若多语句,可能错误,conn.rollback()
# sql2 = "insert into student (name,password) values(%s, %s)"
# ret = cursor.execute(sql2,['123','123'])
# conn.commit()
# 或insert into student (name,password) values(%(name)s, %(pwd)s)
# 下边传入字典excute(sql,{"name":xxx..})
# ====================================
# 批量执行
data = (['12','12'],['23','32'],['32','23']) # 格式必须固定
cursor.executemany(sql,data)  # 内部的for循环
# try 防止异常,要回滚, 会取消以前正确的插入语句
# =================================
# 删除,同理,也要提交
# ================================
# 修改 记得提交

0 人点赞