python-并发编程

2022-07-26,,

  • 线程通信-队列

    • 队列的好处
      • 线程安全:python实现了内部锁
      • 解耦:用户只管输入,可以不知道后台多线程处理情况
      • 提高效率:后台多线程处理效率高
    • 举例:
    	from queue import Queue
    	mq = Queue()  # 默认先进先出队列,FIFO
    	# 存放数据
    	mq.put(1)
    	mq.put(2)
    	mq.put(3)
    	
    	# get() 队列是空时,会等待新数据
    	print(mq.get())  # 1
    	print(mq.get())  # 2
    	print(mq.get())  # 3
    	# print(mq.get())  # 这里已经没有数据了,默认保持等待;可以设置block=False不等待
    	
    	# get_nowait() 队列是空时,不等待,报空队列错误
    	print(mq.get_nowait())  # raise Empty 
    
  • 生产消费者模式

    • 生产者:生产数据的线程;消费者:消费数据的线程;两者平衡配合
    • 阻塞队列:生产者生产的数据存放在 阻塞队列中,消费者从阻塞队列中拿数据,两者解耦;队列满了,生产者不能存放数据;队列空了,消费者不能消费数据;故称为阻塞队列;
    from threading import Thread
    from queue import Queue
    def producer():
    	num = 1
    	while True:
    		if mq.qsize() < 10:  # 或者Queue(maxsize=10)
    			print("f生产了{num}号手机")
    			mq.put(f"{num}号手机")
    			num += 1
    		sleep(1)
    def consumer():
    	while True:
    		print("购买了{}".format(mq.get()))
    if __name__ == "__main__":
    	mq = Queue()
    	t1 = Thread(target=producer)
    	t2 = Thread(target=consumer)
    	t3 = Thread(target=consumer)
    	t1.start()
    	t2.start()
    	t3.start()  # 第二个购买者,由于Queue是线程安全的,因此队列为空消费者会等待;
    
  • 进程的实现

    • multiprocessing模块
    • 与线程类似:类创建和方法创建,start启动
    from multiprocessing import Process
    from time import sleep
    class  MyProcess(Process):
    	def __init__(self, name):
    		Process.__init__(self)
    		self.name = name
    	def run():
    		print(f"{name}开始。。。")
    		sleep(2)
    		print(f"{name}结束。。。")
    	
    def func1(name):
    	print(f"{name}开始。。。")
    	sleep(2)
    	print(f"{name}结束。。。")
    	
    if __name=="__main__":
    	p1 = Process(target=func1, args=('p1',))  # 方法创建
    	p1.start()
    
    	p2 = MyProcess("p1")  # 类创建
    	p2.start()
    	
    
  • 进程的通信

    • 优缺点:并发执行;进程独立,数据安全;创建和删除消耗资源多;
    • 2种主要通信方式
      • 使用multiprocessing模块下的Queue类;
        • 原理:操作系统开辟一个队列空间,各个进程可以存放数据到该队列,也可以拿走自己需要的信息;
        	from multiprocessing import Queue
        	from time import sleep
        	import os
        	def func(name, mq):
        		print("进程ID {} 获取了数据 {}".format(os.getpid(), mq.get()))  # 获取到abc
        		mq.put("abcde")
        	if __name__=="__main__":
        		mq = Queue()  # 注意不能使用普通的Queue类,该类不支持进程间的传递
        		mq.put("abc")
        		p1 = Process(target=func, args=("p1", mq))
        		pq.start()
        		p1.join()
        		print(mq.get())  # 获取到abcde
        
      • pipe,管道实现,常用于2个进程间通信,2个进程分别位于管道两端;
        • send(obj): obj必须可序列化,超过32M可能报错ValueError
        • recv: 接收send发送过来的数据
        • close:关闭管道
        • poll([timeout]):返回连接中是否还有可读取数据
        	from multiprocessing import Process, current_process, Pipe
        	from time import sleep
        	import os
        	def func(name, con):
        		print("进程ID {} 获取了数据 {}".format(os.getpid(), con.recv()))  # 获取到abc
        		con.put("abcde")
        	if __name__=="__main__":
        		con1, con2 = Pipe()  # 一个管道两端分别对应两个进程
        		p1 = Process(target=func, args=("p1", con1))  # con1给进程p1
        		con2.send("abc")  # con2有当前主进程持有,并发送数据abc
        		p1.join()  # 等待p1执行完毕
        		print(con2.recv())  # 获取到abcde
        
    • Manager管理器(另一种进程通信方法)
      • 提供了一种创建共享数据的方法,从而可以在不同进程中共享
      		from multiprocessing import Process, current_process
      		from time import sleep
      		import os
      		from multiprocessing import Manager
      		def func(name, m_list, m_dict):
      			print("进程ID {} 获取了数据 {}".format(os.getpid(), m_list))  # 获取
      			print("进程ID {} 获取了数据 {}".format(os.getpid(), m_dict))  # 获取
      			m_list.append("你好")
      			m_dict["name"] = "abc"
      		if __name__=="__main__":
      			with Manager() as mgr:  # 创建一个管理器,具有内建的list、dict供多进程使用
      				m_list = mgr.list()
      				m_dict = mgr.dict()
      				m_list.append("Hello")
      				p1 = Process(target=func, args=("p1", m_list, m_dict))  # 
      				p1.start()
      				p1.join()  # 等待p1执行完毕
      				print(m_list)
      				print(m_dict)
      
  • 进程池

    • 进程有自己独立的空间,多进程频繁切换耗时
    • 进程池节省开辟(逐一)进程及其内存空间的时间,以及销毁进程的时间;节省内存空间
    from multiprocessing import Pool
    import os
    from time import sleep
    def func1(name):
    	print(f"当前进程的ID:{os.getpid()}, {name}")
    	sleep(2)
    	return name  # 该行用于part2
    def func2(args):
    	print(args)
    if __name__ == "__main__":
    	# part1-普通模式
    	pool = Pool(5)  # 5个5个执行
    	pool.apply_async(func=func1, args=("abc1",)) # 异步,apply为同步,会逐个进程执行
    	pool.apply_async(func=func1, args=("abc2",))
    	pool.apply_async(func=func1, args=("abc3",))
    	pool.apply_async(func=func1, args=("abc4",))
    	pool.apply_async(func=func1, args=("abc5",))
    	pool.apply_async(func=func1, args=("abc6",))
    	pool.apply_async(func=func1, args=("abc7",))
    	pool.apply_async(func=func1, args=("abc8",), callback=func2)  # 此处callback用于与part2对照,func1的return会交给callback做处理;callback与part2-args结果集的好处是指开启一次IO
    	pool.close()  # 关闭进程池
    	pool.join()  # 回收进程池	
    
    	# part2-测试map函数
    	with Pool(5) as pool:
    		args = pool.map(func1, ("abc1", "abc2", "abc3", "abc4", "abc5", "abc6", "abc7", "abc8", ))  
    		for a in args:  # args为结果集,收集了func1的return
    			print(a)
    	
    
  • 协程

    • 微线程,轻量级线程
    • 可以自动切换执行多个任务;协程只有一个线程在执行,在该线程内部自己多任务切换,因此是用户或者说恒旭级别的;(图片来自尚学堂,下同)
    • 协程的标准
    • 协程的优缺点
    from time import sleep
    def productor():  # 生产消费者模式
    	while True:  # 循环-生成器
    		n = yield  # 注意这个写法
    		sleep(1)
    		print(f"生产了第{n}个产品")
    def consumer():
    	g = productor()  # 创建生成器
    	next(g)  # 触发生成器一次yield,注意此时yield为空,生成器停留在该状态
    	for i in range(5):
    		g.send(i)  # n=yield 接收i,并赋值于n,由此同样触发了生成器一次迭代
    		print(f"消费了第{i}个产品")
    if __name__ == "__main__":
    	consumer()
    

本文地址:https://blog.csdn.net/cyz0202/article/details/110848758

《python-并发编程.doc》

下载本文的Word格式文档,以方便收藏与打印。