Python进阶教程m10b–多线程通信

原文链接:http://www.juzicode.com/archives/1501

《Python进阶教程m10–多线程》中我们介绍了多线程的基本编程模型,文中例子多线程之间独立运行,主线程也只有在创建子线程的时候传递过函数入参,想在主线程中查询子线程某个变量的值却无法实现,子线程之间也不能相互传值。多个线程各跑各的,你看我不顺眼,我也瞧不上你。

如果能使用某种“变量”被多个线程共享,像下图这样在线程1中s可以被赋值,在线程2中s可以被读取,不就可以做到线程间通信了?

Python正好提供了这种机制,比如可以Manager、Pipe、Event、Lock等方法实现多线程间的通信,或者说多线程间的同步。这里“通信”或者“同步”的概念都是指线程之间可以传递某种变量值,利用这个变量值就可以做到线程间相互联系,一般“同步”的概念多用来指示某个线程等待其他线程发来某种信号,信号到了就可以继续往下运行。

1、全局变量

全局变量可以用在多个函数间访问,同样也可以在多线程间访问,利用这个特性可以实现多线程通信,下面这个例子中inp是一个全局变量,在func_input()中接受键盘的输入,在func_output()中能被读取,打印inp的内容。

print('-----欢迎来到www.juzicode.com')
print('-----公众号: 桔子code/juzicode \n')   
 
import time, threading
inp = ''
def func_input():
    print('进入线程:  ', threading.current_thread().name)
    while True:
        global  inp
        inp = input('\n func_input: 请输入:')

def func_output():
    print('进入线程:  ', threading.current_thread().name)
    inp_temp = ''
    while True:
        global inp
        if inp_temp != inp:
            print('\n func_display: inp = ',inp)
            inp_temp = inp
        time.sleep(0.5)

if __name__ == '__main__':
    print('进入主线程: ', threading.current_thread().name)
    t1 = threading.Thread(target=func_output, name='func_output')
    t2 = threading.Thread(target=func_input, name='func_input')
    t1.start()    
    t2.start()
    while True:pass
    print('退出主线程: ' ,threading.current_thread().name)
========运行结果:

-----欢迎来到www.juzicode.com
-----公众号: 桔子code/juzicode

进入主线程:  MainThread
进入线程:   func_output
进入线程:   func_input

 func_input: 请输入: www.juzicode.com

 func_input: 请输入:
 func_display: inp =   www.juzicode.com

 func_display: inp =
 func_input: 请输入:
微信公众号 桔子code

 func_input: 请输入:
 func_display: inp =  微信公众号 桔子code

2、Manager

使用Manager()需要导入multiprocessing模块,Manager()构造的对象可以被不同的线程或者进程读写,Manager对象支持dict,list,Queue等数据类型。下面是构造一个dict和list的例子,先用manager= Manager()构造manager实例,再用manager.dict()或者manager.list()构造对应的数据类型。

import time,threading,sys
from threading import Thread
from multiprocessing  import Manager

if __name__ == '__main__':
    print('-----欢迎来到 www.juzicode.com')
    print('-----公众号: 桔子code/juzicode \n')  
    manager= Manager()

    man_dict = manager.dict()
    print(type(man_dict))    
    man_dict['thread_1'] = 0
    man_dict['thread_2'] = 1
    man_dict['send_to_sub_cmd']=None
    print(man_dict) 
    
    man_list = manager.list()
    print(type(man_list))
    man_list = [1,2,3,4,5]
    print(man_list)  
==========运行结果:

<class 'multiprocessing.managers.DictProxy'>
{'thread_1': 0, 'thread_2': 1, 'send_to_sub_cmd': None}
<class 'multiprocessing.managers.ListProxy'>
[1, 2, 3, 4, 5]

下面看看用Manager字典的方法在2个子线程间传值,这个例子中设计了线程1和线程2两个子线程。在子线程的无限循环中使用不同的循环间隔,每次循环后对应的计数加1,同时打印出对方线程中的循环计数值:

import time,threading,sys
from threading import Thread
from multiprocessing  import Manager

def thread_1(man_dict):
    print('进入线程:  thread_1')
    man_dict['thread_1'] = 0
    
    while True:
        #子线程计数增加
        time.sleep(1)
        man_dict['thread_1'] += 1 
        #在线程1中打印线程2的计数
        print('thread_1:线程thread_2中循环计数:',man_dict['thread_2'])
        
    print('退出线程:  thread_1' )
    
def thread_2(man_dict):
    print('进入线程:  thread_2')
    man_dict['thread_2'] = 0
    
    while True:
        #子线程计数增加
        time.sleep(0.5)
        man_dict['thread_2'] += 1
        #在线程2中打印线程1的计数
        print('thread_2: 线程thread_1中循环计数:',man_dict['thread_1'])
        
    print('退出线程:  thread_2' )
    
if __name__ == '__main__':
    print('-----欢迎来到 www.juzicode.com')
    print('-----公众号: 桔子code/juzicode \n')  
    manager= Manager()
    man_dict = manager.dict()
    man_dict['thread_1'] = 0
    man_dict['thread_2'] = 0

    t1 = Thread(target=thread_1,args=(man_dict,),name='thread_1',daemon=True)
    t2 = Thread(target=thread_2,args=(man_dict,),name='thread_2',daemon=True)
    t1.start()
    t2.start()
    
    #进入主进程循环过程
    while True:
        inp = input('\n------>')
        if inp.lower() == 'quit':
            time.sleep(1)
            break
    print('退出主线程')    
==========运行结果:
-----欢迎来到 www.juzicode.com
-----公众号: 桔子code/juzicode

进入线程:  thread_1
进入线程:  thread_2

------>thread_2: 线程thread_1中循环计数: 0
thread_1:线程thread_2中循环计数: 1
thread_2: 线程thread_1中循环计数: 1
thread_2: 线程thread_1中循环计数: 1
thread_1:线程thread_2中循环计数: 3
thread_2: 线程thread_1中循环计数: 2
thread_2: 线程thread_1中循环计数: 2
thread_1:线程thread_2中循环计数: 5
thread_2: 线程thread_1中循环计数: 3
thread_2: 线程thread_1中循环计数: 3
thread_1:线程thread_2中循环计数: 7
thread_2: 线程thread_1中循环计数: 4
thread_2: 线程thread_1中循环计数: 4
thread_1:线程thread_2中循环计数: 9
thread_2: 线程thread_1中循环计数: 5
thread_2: 线程thread_1中循环计数: 5
thread_1:线程thread_2中循环计数: 11
thread_2: 线程thread_1中循环计数: 6
quit
thread_2: 线程thread_1中循环计数: 6
thread_1:线程thread_2中循环计数: 13
thread_2: 线程thread_1中循环计数: 7
退出主线程

在介绍Manager的官方文档中有这么一段话:

从这段话可以看出Manager是一种进程共享型对象,所以在创建Manager对象时会启动一个新的进程。在下面这个例子中,主进程连续创建5个Manager实例,从Windows系统的任务管理器我们可以看到,除了程序自身需要启动1个python进程,另外还有5个python进程,这5个python进程都是因为创建Manager实例生成的:

import time,threading,sys
from threading import Thread
from multiprocessing  import Manager

if __name__ == '__main__':
    print('-----网址: www.juzicode.com')
    print('-----公众号: 桔子code/juzicode \n')  
    m_list=[]
    for i in range(5):
        m_list.append(Manager())
        time.sleep(1)
    while True:time.sleep(10)

任务管理器看到的进程:

3、Pipe

使用Pipe()也需要导入multiprocessing模块,利用conn1,conn2=Pipe()创建一个管道实例,可以得到一对“管道终端” (conn1,conn2 ),这对终端就是这个管道的2个端点,如果Pipe()初始化的时候传入的是默认的True,表示是一个双向管道,conn1和conn2可以相互发送和接收,反之则是一个单向管道:

如果Pipe()初始化传入的是False,表示创建的是单向管道,只能从conn2发送,用conn1接收,如果错误使用会抛OSError异常:

下面的例子是一个双向管道的例子,在线程1和线程2内部分别有个循环计数,线程1和2每次自加后向对方发送自己的数据,并等待对方线程发来数据,接收成功后显示对方线程的数值。

import time,threading,sys
from threading import Thread
from multiprocessing  import Pipe

def thread_1(conn1):
    print('进入线程:  thread_1')
    loop_cout = 100
    while True:
        time.sleep(0.5)
        #发送自身计数
        loop_cout += 1
        conn1.send(loop_cout)
        #接收线程2的计数
        msg = conn1.recv()
        print('thread_1:线程thread_2中循环计数:',msg)
        
    print('退出线程:  thread_1' )
    
def thread_2(conn2):
    print('进入线程:  thread_2')
    loop_cout = 200
    while True:     
        time.sleep(0.5)
        #接收线程1的计数
        msg = conn2.recv()
        print('thread_2: 线程thread_1中循环计数:',msg)
        #发送自身计数
        loop_cout += 1
        conn2.send(loop_cout)

    print('退出线程:  thread_2' )
    
if __name__ == '__main__':
    print('-----欢迎来到 www.juzicode.com')
    print('-----公众号: 桔子code/juzicode \n')  
    #创建Pipe实例
    conn1,conn2=Pipe()
    #开启线程
    t1 = Thread(target=thread_1,args=(conn1,),name='thread_1',daemon=True)
    t2 = Thread(target=thread_2,args=(conn2,),name='thread_2',daemon=True)
    t1.start()
    t2.start()  
    #进入主进程循环过程
    while True:
        cmd = input('\n------>')
        if cmd == 'quit':
            time.sleep(1)
            break
    print('退出主线程')    
-----欢迎来到 www.juzicode.com
-----公众号: 桔子code/juzicode

进入线程:  thread_1
进入线程:  thread_2

------>thread_2: 线程thread_1中循环计数: 101
thread_1:线程thread_2中循环计数: 201
thread_2: 线程thread_1中循环计数: 102
thread_1:线程thread_2中循环计数: 202
thread_2: 线程thread_1中循环计数: 103
thread_1:线程thread_2中循环计数: 203
thread_2: 线程thread_1中循环计数: 104
thread_1:线程thread_2中循环计数: 204
thread_2: 线程thread_1中循环计数: 105
thread_1:线程thread_2中循环计数: 205

4、Event

使用Event可以从threading模块导入,也可以从multiprocessing导入,后者中的Event实际是threading模块的一个克隆而已。

Event创建的对象只有是与否2种标志位,专门用在线程间的“同步”,不适合传递数值。下面的例子中,在thread-2中 event.set() 设置事件触发,在thread-1中event.wait()等待事件触发后循环计数加1,同时event.clear()清除事件标志位,注意事件响应后需要手动清除事件标志位。下面的例子中如果不手动清除标志位,在thread-1中进入下一次循环时,event.wait()等到的结果实际是已触发状态。

import time,threading,sys
from threading import Thread
from threading import Event

def thread_1(event):
    print('进入线程:  thread_1')
    loop_cout = 100
    while True:
        time.sleep(0.5)
        event.wait()#无入参表示无限等待,有入参时必须是float类型的时长
        loop_cout += 1
        print('接收到一次evnet.set(),loop_cout =',loop_cout)
        event.clear()#清除event标志位,为下次触发做准备
    print('退出线程:  thread_1' )
    
def thread_2(event):
    print('进入线程:  thread_2')
    loop_cout = 200
    while True:     
        inp = input('\n输入"set"触发一次事件: ')
        if inp.lower() == 'set':
            event.set()
        
    print('退出线程:  thread_2' )
    
if __name__ == '__main__':
    print('-----欢迎来到 www.juzicode.com')
    print('-----公众号: 桔子code/juzicode \n')  
    #创建event实例
    event=Event()
    #开启线程
    t1 = Thread(target=thread_1,args=(event,),name='thread_1',daemon=True)
    t2 = Thread(target=thread_2,args=(event,),name='thread_2',daemon=True)
    t1.start()
    t2.start()  
    #进入主进程循环过程
    while True:
        time.sleep(3)
        
    print('退出主线程')    
=========运行结果:
-----欢迎来到 www.juzicode.com
-----公众号: 桔子code/juzicode

进入线程:  thread_1
进入线程:  thread_2

输入"set"触发一次事件: set
接收到一次evnet.set(),loop_cout = 101

输入"set"触发一次事件: set
接收到一次evnet.set(),loop_cout = 102

输入"set"触发一次事件: abc

输入"set"触发一次事件: set
接收到一次evnet.set(),loop_cout = 103

5、Lock

使用Lock可以从threading模块导入,也可以从multiprocessing导入。Lock字面意思就是锁,拿到锁钥匙的人就有权限进入房间,其他人如果要进入房间,必须等前面的人归还钥匙并且拿到钥匙之后才能进入。

下面这个例子在线程thread-1中lock.acquire()会阻塞等待取到钥匙,取到钥匙后相应的循环计数加一,然后再归还钥匙。在thread-2中通过输入不同的命令取钥匙或还钥匙,从运行结果看thread-2取到钥匙后,thread-1中的计数就会停止,释放后计数才会继续增加。

import time,threading,sys
from threading import Thread
from threading import Lock

def thread_1(lock):
    print('进入线程:  thread_1')
    loop_cout = 100
    while True:
        time.sleep(0.5)
        lock.acquire()#取钥匙
        loop_cout += 1
        print('thread_1: 取到一次钥匙lock.acquire(),loop_cout =',loop_cout)
        lock.release()#还钥匙
    
def thread_2(lock):
    print('进入线程:  thread_2')
    loop_cout = 200
    while True:     
        inp = input('\n输入"acq"取钥匙或"rel"归还钥匙: \n')
        if inp.lower() == 'acq':
            lock.acquire()#取钥匙
            print('thread_2: 取到钥匙')
        if inp.lower() == 'rel':
            lock.release()#取钥匙
            print('thread_2: 归还钥匙')
            
    print('退出线程:  thread_2' )
    
if __name__ == '__main__':
    print('-----欢迎来到 www.juzicode.com')
    print('-----公众号: 桔子code/juzicode \n')  
    #创建Lock实例
    lock=Lock()
    #开启线程
    t1 = Thread(target=thread_1,args=(lock,),name='thread_1',daemon=True)
    t2 = Thread(target=thread_2,args=(lock,),name='thread_2',daemon=True)
    t1.start()
    t2.start()  
    #进入主进程循环过程
    while True:
        time.sleep(3)
        
    print('退出主线程')    
==========运行结果
-----欢迎来到 www.juzicode.com
-----公众号: 桔子code/juzicode

进入线程:  thread_1
进入线程:  thread_2

输入"acq"取钥匙或"rel"归还钥匙:
thread_1: 取到一次钥匙lock.acquire(),loop_cout = 101
thread_1: 取到一次钥匙lock.acquire(),loop_cout = 102
thread_1: 取到一次钥匙lock.acquire(),loop_cout = 103
thread_1: 取到一次钥匙lock.acquire(),loop_cout = 104
thread_1: 取到一次钥匙lock.acquire(),loop_cout = 105
acq
thread_2: 取到钥匙     #####这里thread-2取到钥匙,thread-1的计数就停止了。

输入"acq"取钥匙或"rel"归还钥匙:

输入"acq"取钥匙或"rel"归还钥匙:
rel                    #####这里thread-2归还钥匙后,thread-1的计数继续增加。
thread_2: 归还钥匙
thread_1: 取到一次钥匙lock.acquire(),loop_cout = 106
thread_1: 取到一次钥匙lock.acquire(),loop_cout = 107
thread_1: 取到一次钥匙lock.acquire(),loop_cout = 108

结语: 这里介绍的Manager、Pipe、Event、Lock等只是Python多线程通信工具包的一部分,还有Queue、Semaphore、Barrier、RLock等等工具,至于应该使用哪种工具,需要根据具体的业务需求来决定。我们知道Lock是只有一把钥匙的锁,房间进去了一个人,其他人就只能在外面等待,而Semaphore(信号量)则是有多个钥匙的锁,只有全部钥匙被取走了,最后来的人才会被拒绝进入。比如在访问sqlite数据库时,读数据库可以有多个线程同时进行,而写操作只能有一个线程,这时我们就可以使用Semaphore控制读数据库的线程数量,而用Lock保证任意时刻写操作的唯一性。

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注