-
线程通信-队列
- 队列的好处
- 线程安全:python实现了内部锁
- 解耦:用户只管输入,可以不知道后台多线程处理情况
- 提高效率:后台多线程处理效率高
- 举例:
from queue import Queue mq = Queue() # 默认先进先出队列,FIFO # 存放数据 mq.put(1) mq.put(2) mq.put(3) # get() 队列是空时,会等待新数据 print(mq.get()) # 1 print(mq.get()) # 2 print(mq.get()) # 3 # print(mq.get()) # 这里已经没有数据了,默认保持等待;可以设置block=False不等待 # get_nowait() 队列是空时,不等待,报空队列错误 print(mq.get_nowait()) # raise Empty
- 队列的好处
-
生产消费者模式
- 生产者:生产数据的线程;消费者:消费数据的线程;两者平衡配合
- 阻塞队列:生产者生产的数据存放在 阻塞队列中,消费者从阻塞队列中拿数据,两者解耦;队列满了,生产者不能存放数据;队列空了,消费者不能消费数据;故称为阻塞队列;
from threading import Thread from queue import Queue def producer(): num = 1 while True: if mq.qsize() < 10: # 或者Queue(maxsize=10) print("f生产了{num}号手机") mq.put(f"{num}号手机") num += 1 sleep(1) def consumer(): while True: print("购买了{}".format(mq.get())) if __name__ == "__main__": mq = Queue() t1 = Thread(target=producer) t2 = Thread(target=consumer) t3 = Thread(target=consumer) t1.start() t2.start() t3.start() # 第二个购买者,由于Queue是线程安全的,因此队列为空消费者会等待;
-
进程的实现
- multiprocessing模块
- 与线程类似:类创建和方法创建,start启动
from multiprocessing import Process from time import sleep class MyProcess(Process): def __init__(self, name): Process.__init__(self) self.name = name def run(): print(f"{name}开始。。。") sleep(2) print(f"{name}结束。。。") def func1(name): print(f"{name}开始。。。") sleep(2) print(f"{name}结束。。。") if __name=="__main__": p1 = Process(target=func1, args=('p1',)) # 方法创建 p1.start() p2 = MyProcess("p1") # 类创建 p2.start()
-
进程的通信
- 优缺点:并发执行;进程独立,数据安全;创建和删除消耗资源多;
- 2种主要通信方式
- 使用multiprocessing模块下的Queue类;
- 原理:操作系统开辟一个队列空间,各个进程可以存放数据到该队列,也可以拿走自己需要的信息;
from multiprocessing import Queue from time import sleep import os def func(name, mq): print("进程ID {} 获取了数据 {}".format(os.getpid(), mq.get())) # 获取到abc mq.put("abcde") if __name__=="__main__": mq = Queue() # 注意不能使用普通的Queue类,该类不支持进程间的传递 mq.put("abc") p1 = Process(target=func, args=("p1", mq)) pq.start() p1.join() print(mq.get()) # 获取到abcde
- pipe,管道实现,常用于2个进程间通信,2个进程分别位于管道两端;
- send(obj): obj必须可序列化,超过32M可能报错ValueError
- recv: 接收send发送过来的数据
- close:关闭管道
- poll([timeout]):返回连接中是否还有可读取数据
from multiprocessing import Process, current_process, Pipe from time import sleep import os def func(name, con): print("进程ID {} 获取了数据 {}".format(os.getpid(), con.recv())) # 获取到abc con.put("abcde") if __name__=="__main__": con1, con2 = Pipe() # 一个管道两端分别对应两个进程 p1 = Process(target=func, args=("p1", con1)) # con1给进程p1 con2.send("abc") # con2有当前主进程持有,并发送数据abc p1.join() # 等待p1执行完毕 print(con2.recv()) # 获取到abcde
- 使用multiprocessing模块下的Queue类;
- Manager管理器(另一种进程通信方法)
- 提供了一种创建共享数据的方法,从而可以在不同进程中共享
from multiprocessing import Process, current_process from time import sleep import os from multiprocessing import Manager def func(name, m_list, m_dict): print("进程ID {} 获取了数据 {}".format(os.getpid(), m_list)) # 获取 print("进程ID {} 获取了数据 {}".format(os.getpid(), m_dict)) # 获取 m_list.append("你好") m_dict["name"] = "abc" if __name__=="__main__": with Manager() as mgr: # 创建一个管理器,具有内建的list、dict供多进程使用 m_list = mgr.list() m_dict = mgr.dict() m_list.append("Hello") p1 = Process(target=func, args=("p1", m_list, m_dict)) # p1.start() p1.join() # 等待p1执行完毕 print(m_list) print(m_dict)
-
进程池
- 进程有自己独立的空间,多进程频繁切换耗时
- 进程池节省开辟(逐一)进程及其内存空间的时间,以及销毁进程的时间;节省内存空间
from multiprocessing import Pool import os from time import sleep def func1(name): print(f"当前进程的ID:{os.getpid()}, {name}") sleep(2) return name # 该行用于part2 def func2(args): print(args) if __name__ == "__main__": # part1-普通模式 pool = Pool(5) # 5个5个执行 pool.apply_async(func=func1, args=("abc1",)) # 异步,apply为同步,会逐个进程执行 pool.apply_async(func=func1, args=("abc2",)) pool.apply_async(func=func1, args=("abc3",)) pool.apply_async(func=func1, args=("abc4",)) pool.apply_async(func=func1, args=("abc5",)) pool.apply_async(func=func1, args=("abc6",)) pool.apply_async(func=func1, args=("abc7",)) pool.apply_async(func=func1, args=("abc8",), callback=func2) # 此处callback用于与part2对照,func1的return会交给callback做处理;callback与part2-args结果集的好处是指开启一次IO pool.close() # 关闭进程池 pool.join() # 回收进程池 # part2-测试map函数 with Pool(5) as pool: args = pool.map(func1, ("abc1", "abc2", "abc3", "abc4", "abc5", "abc6", "abc7", "abc8", )) for a in args: # args为结果集,收集了func1的return print(a)
-
协程
- 微线程,轻量级线程
- 可以自动切换执行多个任务;协程只有一个线程在执行,在该线程内部自己多任务切换,因此是用户或者说恒旭级别的;(图片来自尚学堂,下同)
- 协程的标准
- 协程的优缺点
from time import sleep def productor(): # 生产消费者模式 while True: # 循环-生成器 n = yield # 注意这个写法 sleep(1) print(f"生产了第{n}个产品") def consumer(): g = productor() # 创建生成器 next(g) # 触发生成器一次yield,注意此时yield为空,生成器停留在该状态 for i in range(5): g.send(i) # n=yield 接收i,并赋值于n,由此同样触发了生成器一次迭代 print(f"消费了第{i}个产品") if __name__ == "__main__": consumer()
本文地址:https://blog.csdn.net/cyz0202/article/details/110848758