原文链接:http://www.juzicode.com/archives/1501
在《Python进阶教程m10–多线程》中我们介绍了多线程的基本编程模型,文中例子多线程之间独立运行,主线程也只有在创建子线程的时候传递过函数入参,想在主线程中查询子线程某个变量的值却无法实现,子线程之间也不能相互传值。多个线程各跑各的,你看我不顺眼,我也瞧不上你。
如果能使用某种“变量”被多个线程共享,像下图这样在线程1中s可以被赋值,在线程2中s可以被读取,不就可以做到线程间通信了?
Python正好提供了这种机制,比如可以Manager、Pipe、Event、Lock等方法实现多线程间的通信,或者说多线程间的同步。这里“通信”或者“同步”的概念都是指线程之间可以传递某种变量值,利用这个变量值就可以做到线程间相互联系,一般“同步”的概念多用来指示某个线程等待其他线程发来某种信号,信号到了就可以继续往下运行。
1、全局变量
全局变量可以用在多个函数间访问,同样也可以在多线程间访问,利用这个特性可以实现多线程通信,下面这个例子中inp是一个全局变量,在func_input()中接受键盘的输入,在func_output()中能被读取,打印inp的内容。
print('-----欢迎来到www.juzicode.com')
print('-----公众号: 桔子code/juzicode \n')
import time, threading
inp = ''
def func_input():
print('进入线程: ', threading.current_thread().name)
while True:
global inp
inp = input('\n func_input: 请输入:')
def func_output():
print('进入线程: ', threading.current_thread().name)
inp_temp = ''
while True:
global inp
if inp_temp != inp:
print('\n func_display: inp = ',inp)
inp_temp = inp
time.sleep(0.5)
if __name__ == '__main__':
print('进入主线程: ', threading.current_thread().name)
t1 = threading.Thread(target=func_output, name='func_output')
t2 = threading.Thread(target=func_input, name='func_input')
t1.start()
t2.start()
while True:pass
print('退出主线程: ' ,threading.current_thread().name)
========运行结果:
-----欢迎来到www.juzicode.com
-----公众号: 桔子code/juzicode
进入主线程: MainThread
进入线程: func_output
进入线程: func_input
func_input: 请输入: www.juzicode.com
func_input: 请输入:
func_display: inp = www.juzicode.com
func_display: inp =
func_input: 请输入:
微信公众号 桔子code
func_input: 请输入:
func_display: inp = 微信公众号 桔子code
2、Manager
使用Manager()需要导入multiprocessing模块,Manager()构造的对象可以被不同的线程或者进程读写,Manager对象支持dict,list,Queue等数据类型。下面是构造一个dict和list的例子,先用manager= Manager()构造manager实例,再用manager.dict()或者manager.list()构造对应的数据类型。
import time,threading,sys
from threading import Thread
from multiprocessing import Manager
if __name__ == '__main__':
print('-----欢迎来到 www.juzicode.com')
print('-----公众号: 桔子code/juzicode \n')
manager= Manager()
man_dict = manager.dict()
print(type(man_dict))
man_dict['thread_1'] = 0
man_dict['thread_2'] = 1
man_dict['send_to_sub_cmd']=None
print(man_dict)
man_list = manager.list()
print(type(man_list))
man_list = [1,2,3,4,5]
print(man_list)
==========运行结果:
<class 'multiprocessing.managers.DictProxy'>
{'thread_1': 0, 'thread_2': 1, 'send_to_sub_cmd': None}
<class 'multiprocessing.managers.ListProxy'>
[1, 2, 3, 4, 5]
下面看看用Manager字典的方法在2个子线程间传值,这个例子中设计了线程1和线程2两个子线程。在子线程的无限循环中使用不同的循环间隔,每次循环后对应的计数加1,同时打印出对方线程中的循环计数值:
import time,threading,sys
from threading import Thread
from multiprocessing import Manager
def thread_1(man_dict):
print('进入线程: thread_1')
man_dict['thread_1'] = 0
while True:
#子线程计数增加
time.sleep(1)
man_dict['thread_1'] += 1
#在线程1中打印线程2的计数
print('thread_1:线程thread_2中循环计数:',man_dict['thread_2'])
print('退出线程: thread_1' )
def thread_2(man_dict):
print('进入线程: thread_2')
man_dict['thread_2'] = 0
while True:
#子线程计数增加
time.sleep(0.5)
man_dict['thread_2'] += 1
#在线程2中打印线程1的计数
print('thread_2: 线程thread_1中循环计数:',man_dict['thread_1'])
print('退出线程: thread_2' )
if __name__ == '__main__':
print('-----欢迎来到 www.juzicode.com')
print('-----公众号: 桔子code/juzicode \n')
manager= Manager()
man_dict = manager.dict()
man_dict['thread_1'] = 0
man_dict['thread_2'] = 0
t1 = Thread(target=thread_1,args=(man_dict,),name='thread_1',daemon=True)
t2 = Thread(target=thread_2,args=(man_dict,),name='thread_2',daemon=True)
t1.start()
t2.start()
#进入主进程循环过程
while True:
inp = input('\n------>')
if inp.lower() == 'quit':
time.sleep(1)
break
print('退出主线程')
==========运行结果:
-----欢迎来到 www.juzicode.com
-----公众号: 桔子code/juzicode
进入线程: thread_1
进入线程: thread_2
------>thread_2: 线程thread_1中循环计数: 0
thread_1:线程thread_2中循环计数: 1
thread_2: 线程thread_1中循环计数: 1
thread_2: 线程thread_1中循环计数: 1
thread_1:线程thread_2中循环计数: 3
thread_2: 线程thread_1中循环计数: 2
thread_2: 线程thread_1中循环计数: 2
thread_1:线程thread_2中循环计数: 5
thread_2: 线程thread_1中循环计数: 3
thread_2: 线程thread_1中循环计数: 3
thread_1:线程thread_2中循环计数: 7
thread_2: 线程thread_1中循环计数: 4
thread_2: 线程thread_1中循环计数: 4
thread_1:线程thread_2中循环计数: 9
thread_2: 线程thread_1中循环计数: 5
thread_2: 线程thread_1中循环计数: 5
thread_1:线程thread_2中循环计数: 11
thread_2: 线程thread_1中循环计数: 6
quit
thread_2: 线程thread_1中循环计数: 6
thread_1:线程thread_2中循环计数: 13
thread_2: 线程thread_1中循环计数: 7
退出主线程
在介绍Manager的官方文档中有这么一段话:
从这段话可以看出Manager是一种进程共享型对象,所以在创建Manager对象时会启动一个新的进程。在下面这个例子中,主进程连续创建5个Manager实例,从Windows系统的任务管理器我们可以看到,除了程序自身需要启动1个python进程,另外还有5个python进程,这5个python进程都是因为创建Manager实例生成的:
import time,threading,sys
from threading import Thread
from multiprocessing import Manager
if __name__ == '__main__':
print('-----网址: www.juzicode.com')
print('-----公众号: 桔子code/juzicode \n')
m_list=[]
for i in range(5):
m_list.append(Manager())
time.sleep(1)
while True:time.sleep(10)
任务管理器看到的进程:
3、Pipe
使用Pipe()也需要导入multiprocessing模块,利用conn1,conn2=Pipe()创建一个管道实例,可以得到一对“管道终端” (conn1,conn2 ),这对终端就是这个管道的2个端点,如果Pipe()初始化的时候传入的是默认的True,表示是一个双向管道,conn1和conn2可以相互发送和接收,反之则是一个单向管道:
如果Pipe()初始化传入的是False,表示创建的是单向管道,只能从conn2发送,用conn1接收,如果错误使用会抛OSError异常:
下面的例子是一个双向管道的例子,在线程1和线程2内部分别有个循环计数,线程1和2每次自加后向对方发送自己的数据,并等待对方线程发来数据,接收成功后显示对方线程的数值。
import time,threading,sys
from threading import Thread
from multiprocessing import Pipe
def thread_1(conn1):
print('进入线程: thread_1')
loop_cout = 100
while True:
time.sleep(0.5)
#发送自身计数
loop_cout += 1
conn1.send(loop_cout)
#接收线程2的计数
msg = conn1.recv()
print('thread_1:线程thread_2中循环计数:',msg)
print('退出线程: thread_1' )
def thread_2(conn2):
print('进入线程: thread_2')
loop_cout = 200
while True:
time.sleep(0.5)
#接收线程1的计数
msg = conn2.recv()
print('thread_2: 线程thread_1中循环计数:',msg)
#发送自身计数
loop_cout += 1
conn2.send(loop_cout)
print('退出线程: thread_2' )
if __name__ == '__main__':
print('-----欢迎来到 www.juzicode.com')
print('-----公众号: 桔子code/juzicode \n')
#创建Pipe实例
conn1,conn2=Pipe()
#开启线程
t1 = Thread(target=thread_1,args=(conn1,),name='thread_1',daemon=True)
t2 = Thread(target=thread_2,args=(conn2,),name='thread_2',daemon=True)
t1.start()
t2.start()
#进入主进程循环过程
while True:
cmd = input('\n------>')
if cmd == 'quit':
time.sleep(1)
break
print('退出主线程')
-----欢迎来到 www.juzicode.com
-----公众号: 桔子code/juzicode
进入线程: thread_1
进入线程: thread_2
------>thread_2: 线程thread_1中循环计数: 101
thread_1:线程thread_2中循环计数: 201
thread_2: 线程thread_1中循环计数: 102
thread_1:线程thread_2中循环计数: 202
thread_2: 线程thread_1中循环计数: 103
thread_1:线程thread_2中循环计数: 203
thread_2: 线程thread_1中循环计数: 104
thread_1:线程thread_2中循环计数: 204
thread_2: 线程thread_1中循环计数: 105
thread_1:线程thread_2中循环计数: 205
4、Event
使用Event可以从threading模块导入,也可以从multiprocessing导入,后者中的Event实际是threading模块的一个克隆而已。
Event创建的对象只有是与否2种标志位,专门用在线程间的“同步”,不适合传递数值。下面的例子中,在thread-2中 event.set() 设置事件触发,在thread-1中event.wait()等待事件触发后循环计数加1,同时event.clear()清除事件标志位,注意事件响应后需要手动清除事件标志位。下面的例子中如果不手动清除标志位,在thread-1中进入下一次循环时,event.wait()等到的结果实际是已触发状态。
import time,threading,sys
from threading import Thread
from threading import Event
def thread_1(event):
print('进入线程: thread_1')
loop_cout = 100
while True:
time.sleep(0.5)
event.wait()#无入参表示无限等待,有入参时必须是float类型的时长
loop_cout += 1
print('接收到一次evnet.set(),loop_cout =',loop_cout)
event.clear()#清除event标志位,为下次触发做准备
print('退出线程: thread_1' )
def thread_2(event):
print('进入线程: thread_2')
loop_cout = 200
while True:
inp = input('\n输入"set"触发一次事件: ')
if inp.lower() == 'set':
event.set()
print('退出线程: thread_2' )
if __name__ == '__main__':
print('-----欢迎来到 www.juzicode.com')
print('-----公众号: 桔子code/juzicode \n')
#创建event实例
event=Event()
#开启线程
t1 = Thread(target=thread_1,args=(event,),name='thread_1',daemon=True)
t2 = Thread(target=thread_2,args=(event,),name='thread_2',daemon=True)
t1.start()
t2.start()
#进入主进程循环过程
while True:
time.sleep(3)
print('退出主线程')
=========运行结果:
-----欢迎来到 www.juzicode.com
-----公众号: 桔子code/juzicode
进入线程: thread_1
进入线程: thread_2
输入"set"触发一次事件: set
接收到一次evnet.set(),loop_cout = 101
输入"set"触发一次事件: set
接收到一次evnet.set(),loop_cout = 102
输入"set"触发一次事件: abc
输入"set"触发一次事件: set
接收到一次evnet.set(),loop_cout = 103
5、Lock
使用Lock可以从threading模块导入,也可以从multiprocessing导入。Lock字面意思就是锁,拿到锁钥匙的人就有权限进入房间,其他人如果要进入房间,必须等前面的人归还钥匙并且拿到钥匙之后才能进入。
下面这个例子在线程thread-1中lock.acquire()会阻塞等待取到钥匙,取到钥匙后相应的循环计数加一,然后再归还钥匙。在thread-2中通过输入不同的命令取钥匙或还钥匙,从运行结果看thread-2取到钥匙后,thread-1中的计数就会停止,释放后计数才会继续增加。
import time,threading,sys
from threading import Thread
from threading import Lock
def thread_1(lock):
print('进入线程: thread_1')
loop_cout = 100
while True:
time.sleep(0.5)
lock.acquire()#取钥匙
loop_cout += 1
print('thread_1: 取到一次钥匙lock.acquire(),loop_cout =',loop_cout)
lock.release()#还钥匙
def thread_2(lock):
print('进入线程: thread_2')
loop_cout = 200
while True:
inp = input('\n输入"acq"取钥匙或"rel"归还钥匙: \n')
if inp.lower() == 'acq':
lock.acquire()#取钥匙
print('thread_2: 取到钥匙')
if inp.lower() == 'rel':
lock.release()#取钥匙
print('thread_2: 归还钥匙')
print('退出线程: thread_2' )
if __name__ == '__main__':
print('-----欢迎来到 www.juzicode.com')
print('-----公众号: 桔子code/juzicode \n')
#创建Lock实例
lock=Lock()
#开启线程
t1 = Thread(target=thread_1,args=(lock,),name='thread_1',daemon=True)
t2 = Thread(target=thread_2,args=(lock,),name='thread_2',daemon=True)
t1.start()
t2.start()
#进入主进程循环过程
while True:
time.sleep(3)
print('退出主线程')
==========运行结果
-----欢迎来到 www.juzicode.com
-----公众号: 桔子code/juzicode
进入线程: thread_1
进入线程: thread_2
输入"acq"取钥匙或"rel"归还钥匙:
thread_1: 取到一次钥匙lock.acquire(),loop_cout = 101
thread_1: 取到一次钥匙lock.acquire(),loop_cout = 102
thread_1: 取到一次钥匙lock.acquire(),loop_cout = 103
thread_1: 取到一次钥匙lock.acquire(),loop_cout = 104
thread_1: 取到一次钥匙lock.acquire(),loop_cout = 105
acq
thread_2: 取到钥匙 #####这里thread-2取到钥匙,thread-1的计数就停止了。
输入"acq"取钥匙或"rel"归还钥匙:
输入"acq"取钥匙或"rel"归还钥匙:
rel #####这里thread-2归还钥匙后,thread-1的计数继续增加。
thread_2: 归还钥匙
thread_1: 取到一次钥匙lock.acquire(),loop_cout = 106
thread_1: 取到一次钥匙lock.acquire(),loop_cout = 107
thread_1: 取到一次钥匙lock.acquire(),loop_cout = 108
结语: 这里介绍的Manager、Pipe、Event、Lock等只是Python多线程通信工具包的一部分,还有Queue、Semaphore、Barrier、RLock等等工具,至于应该使用哪种工具,需要根据具体的业务需求来决定。我们知道Lock是只有一把钥匙的锁,房间进去了一个人,其他人就只能在外面等待,而Semaphore(信号量)则是有多个钥匙的锁,只有全部钥匙被取走了,最后来的人才会被拒绝进入。比如在访问sqlite数据库时,读数据库可以有多个线程同时进行,而写操作只能有一个线程,这时我们就可以使用Semaphore控制读数据库的线程数量,而用Lock保证任意时刻写操作的唯一性。