前言 最近在做扫描器的研究,不得不接触到多线程多进程的技术,不然总不能单个线程等它跑半天吧。一说到python学习就想到廖雪峰,就去他的网站上学习了一下,讲的还挺清楚。于是就有了本文,本文也是大多参考他的文章而做的简洁的技术总结,并在此基础上做了一些自己的补充。
首先说一说为什么我们需要多进程。我们现在运行的操作系统,支持多任务同时执行,可以一边听歌一边敲代码。这是两个不同的程序,但在我们眼里就是在同时执行。在单核CPU中,一个任务执行完了才能执行下面一个任务,同一时间只能执行一个程序。那我们就只能听完歌再敲代码,总之不能同时进行。为了解决这个问题,CPU将执行时间分为了许多很小的时间段。比如以0.1秒作为时间间隔。CPU执行0.1秒的听歌程序,在下一个0.1秒执行写代码程序。再无限循环这个过程。就可以模拟实现多任务同时运行的画面了。真正的并行执行任务只能在多核CPU上实现,但操作系统中并行的任务远远超出CPU的核心数,所以即使在多核CPU上,也使用上面的任务调度模型。
在操作系统中,一个程序就是一个进程。以浏览器为例,我们打开了两个浏览器就是打开了两个进程,而我们在一个浏览器页面中打开了两个标签页,就是打开了两个线程,线程是系统执行任务的最小单位。一个进程内可以分出多个线程。
于是我们执行多任务有三种模式
多线程 多进程 多进程+多线程 多进程 前置知识 要让Python程序实现多进程(multiprocessing),我们先了解操作系统的相关知识。
Unix/Linux操作系统提供了一个fork()
系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()
调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。
子进程永远返回0
,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()
就可以拿到父进程的ID。
Python的os
模块封装了常见的系统调用,其中就包括fork
,可以在Python程序中轻松创建子进程:
1 2 3 4 5 6 7 8 9 import osprint ('Process (%s) start...' % os.getpid()) pid = os.fork()if pid == 0 : print ('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))else : print ('I (%s) just created a child process (%s).' % (os.getpid(), pid))
注:由于Windows没有fork
调用,上面的代码在Windows上无法运行。
multiprocessing 那在win下就没有办法实现多进程了吗。答案是否定的,python提供了一个名叫multiprocessing
的库来跨平台实现多进程。
基本用法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from multiprocessing import Processimport osdef run_proc (name ): print ('Run child process %s (%s)...' % (name, os.getpid()))if __name__=='__main__' : print ('Parent process %s.' % os.getpid()) p = Process(target=run_proc, args=('test' ,)) print ('Child process will start.' ) p.start() p.join() print ('Child process end.' )
创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process
实例,用start()
方法启动,这样创建进程比fork()
还要简单。
join()
方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。
进程池 如果要启动大量的子进程,可以用进程池的方式批量创建子进程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from multiprocessing import Poolimport os, time, randomdef long_time_task (name ): print ('Run task %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3 ) end = time.time() print ('Task %s runs %0.2f seconds.' % (name, (end - start)))if __name__=='__main__' : print ('Parent process %s.' % os.getpid()) p = Pool(4 ) for i in range (5 ): p.apply_async(long_time_task, args=(i,)) print ('Waiting for all subprocesses done...' ) p.close() p.join() print ('All subprocesses done.' )
对Pool
对象调用join()
方法会等待所有子进程执行完毕,调用join()
之前必须先调用close()
,调用close()
之后就不能继续添加新的Process
了。
pool的默认大小是CPU的核心数,这段程序创建了容量为4的进程池,第5个程序必须等前4个中任意一个进程空出来时才开始执行
注意第15行,在向线程池中添加任务时用到的是apply_async()
,他是非阻塞异步的,不会等待子进程执行完毕,主进程会继续执行,会根据系统调度来进行进程切换。
与之对应的是apply()
,他是阻塞主进程,并且一个一个按顺序地执行子进程,等到全部子进程都执行完毕后,继续执行apply()
后面主进程的代码。
使用apply_async()
后面一定要调用close()
和join()
,否则由于系统调度太快,父进程会直接执行完毕,导致子进程的代码还来不及被执行就结束了。
进程间通信 在编写爬虫程序时会用到进程间通信。比如父进程不断给空闲的子进程分配要爬取的url,子进程不断将爬取完的结果送给父进程。python的multiprocessing
就提供了Queue
和Pipe
等多种方式来交换数据。这里分别介绍这两种工具的实现方式。
管道Pipe 管道顾名思义就是在两个线程间建立一个通道进行传输数据,学过计算机网络的同学应该很容易理解,这里可以把管道比喻成那根网线。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 #创建管道的类:Pipe ([duplex]) :在进程之间创建一条管道,并返回元组(conn1,conn2),其中conn1,conn2表示管道两端的连接对象,强调一点:必须在产生Process对象之前产生管道 #参数介绍: dumplex:默认管道是全双工的,如果将duplex射成False,conn1只能用于接收,conn2只能用于发送。 #主要方法: conn1.recv ():接收conn2.send (obj)发送的对象。如果没有消息可接收,recv方法会一直阻塞。如果连接的另外一端已经关闭,那么recv方法会抛出EOFError。 conn1.send (obj):通过连接发送对象。obj是与序列化兼容的任意对象 #其他方法: conn1.close ():关闭连接。如果conn1被垃圾回收,将自动调用此方法 conn1.fileno ():返回连接使用的整数文件描述符 conn1.poll ([timeout] ):如果连接上的数据可用,返回True。timeout指定等待的最长时限。如果省略此参数,方法将立即返回结果。如果将timeout射成None,操作将无限期地等待数据到达。 conn1.recv_bytes ([maxlength] ):接收c.send_bytes ()方法发送的一条完整的字节消息。maxlength指定要接收的最大字节数。如果进入的消息,超过了这个最大值,将引发IOError异常,并且在连接上无法进行进一步读取。如果连接的另外一端已经关闭,再也不存在任何数据,将引发EOFError异常。 conn.send_bytes (buffer [, offset [, size] ]):通过连接发送字节数据缓冲区,buffer是支持缓冲区接口的任意对象,offset是缓冲区中的字节偏移量,而size是要发送字节数。结果数据以单条消息的形式发出,然后调用c.recv_bytes ()函数进行接收 conn1.recv_bytes_into (buffer [, offset] ):接收一条完整的字节消息,并把它保存在buffer对象中,该对象支持可写入的缓冲区接口(即bytearray对象或类似的对象)。offset指定缓冲区中放置消息处的字节位移。返回值是收到的字节数。如果消息长度大于可用的缓冲区空间,将引发BufferTooShort异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 from multiprocessing import Process, Pipeimport os, time, randomdef write (c1 ): """docstring for write""" print ('Process to write: %s' % os.getpid()) for value in ['A' ,'B' ,'C' ]: print ('send %s to pipe...' % value) c1.send(value) time.sleep(random.random())def read (c2 ): """docstring for read""" print ('Process to read: %s' % os.getpid()) while True : value = c2.recv() print ('recv %s from pipe' % value)if __name__ == '__main__' : c1, c2 = Pipe() pw = Process(target=write, args=(c1,)) pr = Process(target=read, args=(c2,)) pw.start() pr.start() pw.join() pr.terminate()
管道的EOFError是怎么报出来的:管道空了,且一端关闭了 管道是由操作系统进行引用计数的,必须在所有进程中关闭管道后才能生成EOFError异常。
由于管道在数据管理上是不安全的,没有锁机制 队列的实现机制 就是 管道+锁
关于锁机制的内容会在后面讲解,简单来说就是多个线程对同一数据修改时会造成结果与预期不符
队列Queue 在父进程中创建两个子进程,一个往Queue
里写数据,一个从Queue
里读数据,这是典型的生产者消费者模型,write负责生产资源并加入队列,由read进行消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from multiprocessing import Process, Queueimport os, time, randomdef write (q ): print ('Process to write: %s' % os.getpid()) for value in ['A' , 'B' , 'C' ]: print ('Put %s to queue...' % value) q.put(value) time.sleep(random.random())def read (q ): print ('Process to read: %s' % os.getpid()) while True : value = q.get(True ) print ('Get %s from queue.' % value)if __name__=='__main__' : q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) pw.start() pr.start() pw.join() pr.terminate()
多线程 多任务可以由多进程完成,也可以由一个进程内的多线程完成。
python标准库提供了两个模块让我们来操作线程,分别是_thread
和threading
,前者是低级模块,后者是高级模块,对前者进行了封装,一般来说我们使用后者即可。
threading 基本使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 import time, threadingdef loop (): print ('thread %s is running...' % threading.current_thread().name) n = 0 while n < 5 : n = n + 1 print ('thread %s >>> %s' % (threading.current_thread().name, n)) time.sleep(1 ) print ('thread %s ended.' % threading.current_thread().name)print ('thread %s is running...' % threading.current_thread().name) t = threading.Thread(target=loop, name='LoopThread' ) t.start() t.join()print ('thread %s ended.' % threading.current_thread().name)
threading.Thread
启用了一个名叫LoopThread的线程,执行loop()
这个函数,而主线程不负责执行具体函数,只负责线程的调度。
线程池 既然有进程池,那么对应也会有线程池,用于处理大量并发任务。常用的有两个库提供了线程池,一个是multiprocessing.dummy
,另一个是concurrent.futures
,concurrent.futures
是python3.2中新添加的库。这两个库的区别是concurrent.futures
写法简单些,但性能稍微弱一些,使用map时,future是逐个提交,multiprocessing是批量提交,因此对于大批量jobs时,multiprocessing性能更好一些。而future则提供了更多的功能,如callback、check status、cancel等,对于对需要长时间运行的任务更好一些。
concurrent.futures的例子 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 import concurrent.futuresimport urllib.request URLS = ['https://www.baidu.com/' , 'https://www.sina.com/' , 'https://python.org/' ]def load_url (url, timeout ): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read()with concurrent.futures.ThreadPoolExecutor(max_workers=5 ) as executor: future_to_url = {executor.submit(load_url, url, 60 ): url for url in URLS} for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try : data = future.result() except Exception as exc: print ('%r generated an exception: %s' % (url, exc)) else : print ('%r page is %d bytes' % (url, len (data)))
例子来源于python官方文档 ,更详细的内容查看文档即可。
multiprocessing.dummy的例子 用法与进程池类似,只要在引入包时将from multiprocessing import Pool
改为from multiprocessing.dummy import Pool
即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from multiprocessing.dummy import Poolimport os, time, randomdef long_time_task (name ): print ('Run task %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(random.random() * 3 ) end = time.time() print ('Task %s runs %0.2f seconds.' % (name, (end - start)))if __name__=='__main__' : print ('Parent process %s.' % os.getpid()) p = Pool(4 ) for i in range (5 ): p.apply_async(long_time_task, args=(i,)) print ('Waiting for all subprocesses done...' ) p.close() p.join() print ('All subprocesses done.' )
再来一个爬虫的例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from multiprocessing.dummy import Pool as ThreadPoolimport urllib.request URLS = ['https://www.baidu.com/' , 'https://www.sina.com/' , 'https://python.org/' ]def load_url (url, timeout ): with urllib.request.urlopen(url, timeout=timeout) as conn: return conn.read() pool = ThreadPool(processes = 10 ) results = pool.map (load_url,URLS) pool.close() pool.join()
pool的8个任务分配函数 这里顺便说一下对于一个pool的map和apply的异同
多参数 并行 阻塞(同步) 结果有序 map no yes yes yes apply yes no yes no map_async no yes no yes apply_async yes yes no no
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 def fn (msg ): print ("msg:" , msg) time.sleep(2 ) print ("end" ) pool.map (fn,range (2 ))print ("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~" ) pool.close() pool.join()""" 输出结果 msg: 0 msg: 1 end end Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~ """ for i in range (2 ): pool.apply(fn, (i, ))print ("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~" ) pool.close() pool.join()""" 输出结果 msg: 0 end msg: 1 end Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ """ pool.map_async(fn,range (2 ))print ("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~" ) pool.close() pool.join()""" 输出结果 Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~ msg: 0 msg: 1 end end """ for i in range (2 ): pool.apply_async(fn, (i, ))print ("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~" ) pool.close() pool.join()""" 输出结果 Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ msg: 0 msg: 1 end end """
另外还有4个函数,简要说一下差别,imap、imap_unordered、starmap 和 starmap_async
map_async生成子进程时使用的是list,而imap和 imap_unordered则是Iterable,map_async效率略高,而imap和 imap_unordered内存消耗显著的小。
在处理结果上,imap 和 imap_unordered 可以尽快返回一个Iterable的结果,而map_async则需要等待全部Task执行完毕,返回list。
而imap 和 imap_unordered 的区别是:imap 和 map_async一样,都按顺序等待Task的执行结果,而imap_unordered则不必。 imap_unordered返回的Iterable,会优先迭代到先执行完成的Task。
starmap 和 starmap_async就是可以传入多个参数的map和map_async,如
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def fn (msg1, msg2 ): print ("msg1:" , msg1, "msg2:" , msg2) time.sleep(2 ) print ("end" ) pool.starmap(fn,[(1 ,1 ),(2 ,2 )])print ("Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~" ) pool.close() pool.join()""" 输出结果 msg1: 1 msg2: 1 msg1: 2 msg2: 2 end end Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~ """
锁机制 多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。举个例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import time, threading balance = 0 def change_it (n ): global balance balance = balance + n balance = balance - ndef run_thread (n ): for i in range (2000000 ): change_it(n) t1 = threading.Thread(target=run_thread, args=(5 ,)) t2 = threading.Thread(target=run_thread, args=(8 ,)) t1.start() t2.start() t1.join() t2.join()print (balance)
t1和t2这两个线程同时对balance
操作,预期结果应该是0,但实际上最终结果却通常不为0
如果我们要确保balance
计算正确,就要给change_it()
上一把锁,当某个线程开始执行change_it()
时,我们说,该线程因为获得了锁,因此其他线程不能同时执行change_it()
,只能等待,直到锁被释放后,获得该锁以后才能改。由于锁只有一个,无论多少线程,同一时刻最多只有一个线程持有该锁,所以,不会造成修改的冲突。创建一个锁就是通过threading.Lock()
来实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 balance = 0 lock = threading.Lock()def run_thread (n ): for i in range (100000 ): lock.acquire() try : change_it(n) finally : lock.release()
当多个线程同时执行lock.acquire()
时,只有一个线程能成功地获取锁,然后继续执行代码,其他线程就继续等待直到获得锁为止。
获得锁的线程用完后一定要释放锁,否则那些苦苦等待锁的线程将永远等待下去,成为死线程。所以我们用try...finally
来确保锁一定会被释放。
锁的好处就是确保了某段关键代码只能由一个线程从头到尾完整地执行,坏处当然也很多,首先是阻止了多线程并发执行,包含锁的某段代码实际上只能以单线程模式执行,效率就大大地下降了。其次,由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,可能会造成死锁,导致多个线程全部挂起,既不能执行,也无法结束,只能靠操作系统强制终止。
我们也可以使用with
来简化代码,这个模块中所有带有acquire()和release()方法的对象,都可以使用with语句。当进入with语句块时,acquire()方法被自动调用,当离开with语句块时,release()语句块被自动调用。包括Lock、RLock、Condition、Semaphore。
相当于
1 2 3 4 5 6 some_lock.acquire()try : pass finally : some_lock.release()
ThreadLocal 上面说到,多线程访问同一个共享变量的时候容易出现并发问题,特别是多个线程对一个变量进行写入的时候,为了保证线程安全,一般使用者在访问共享变量的时候需要进行额外的同步措施才能保证线程安全性。加锁就是一个额外的同步措施,ThreadLocal是除了加锁这种同步方式之外的一种保证一种规避多线程访问出现线程不安全的方法,当我们在创建一个变量后,如果每个线程对其进行访问的时候访问的都是线程自己的变量这样就不会存在线程不安全问题。创建一个全局dict,每个线程把自己的name作为key,再存入值,每个线程只对自己的变量进行修改。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import threading local_school = threading.local()def process_student (): std = local_school.student print ('Hello, %s (in %s)' % (std, threading.current_thread().name))def process_thread (name ): local_school.student = name process_student() t1 = threading.Thread(target= process_thread, args=('Alice' ,), name='Thread-A' ) t2 = threading.Thread(target= process_thread, args=('Bob' ,), name='Thread-B' ) t1.start() t2.start() t1.join() t2.join()""" 运行结果 Hello, Alice (in Thread-A) Hello, Bob (in Thread-B) """
全局变量local_school
就是一个ThreadLocal
对象,每个Thread
对它都可以读写student
属性,但互不影响。你可以把local_school
看成全局变量,但每个属性如local_school.student
都是线程的局部变量,可以任意读写而互不干扰,也不用管理锁的问题,ThreadLocal
内部会处理。
可以理解为全局变量local_school
是一个dict
,不但可以用local_school.student
,还可以绑定其他变量,如local_school.teacher
等等。
ThreadLocal
最常用的地方就是为每个线程绑定一个数据库连接,HTTP请求,用户身份信息等,这样一个线程的所有调用到的处理函数都可以非常方便地访问这些资源。
一个ThreadLocal
变量虽然是全局变量,但每个线程都只能读写自己线程的独立副本,互不干扰。ThreadLocal
解决了参数在一个线程中各个函数之间互相传递的问题。
多进程 vs 多线程 多进程模式最大的优点就是稳定性高,因为一个子进程崩溃了,不会影响主进程和其他子进程。(当然主进程挂了所有进程就全挂了,但是Master进程只负责分配任务,挂掉的概率低)著名的Apache最早就是采用多进程模式。
多进程模式的缺点是创建进程的代价大,在Unix/Linux系统下,用fork
调用还行,在Windows下创建进程开销巨大。另外,操作系统能同时运行的进程数也是有限的,在内存和CPU的限制下,如果有几千个进程同时运行,操作系统连调度都会成问题。
多线程模式通常比多进程快一点,但是也快不到哪去,而且,多线程模式致命的缺点就是任何一个线程挂掉都可能直接造成整个进程崩溃,因为所有线程共享进程的内存。
在Windows下,多线程的效率比多进程要高,所以微软的IIS服务器默认采用多线程模式。由于多线程存在稳定性的问题,IIS的稳定性就不如Apache。为了缓解这个问题,IIS和Apache现在又有多进程+多线程的混合模式,真是把问题越搞越复杂。
无论是多进程还是多线程,数量多肯定执行速度越快,但也不是绝对。当数量达到一定限度时,速度也会衰减。这是因为操作系统忙于切换任务,没有时间去执行任务,平白无故消耗很多系统资源却没有干成事。
计算密集型与IO密集型 Python的线程虽然是真正的线程,但解释器执行代码时,有一个GIL锁:Global Interpreter Lock,任何Python线程执行前,必须先获得GIL锁,然后,每执行100条字节码,解释器就自动释放GIL锁,让别的线程有机会执行。这个GIL全局锁实际上把所有线程的执行代码都给上了锁,所以,多线程在Python中只能交替执行,即使100个线程跑在100核CPU上,也只能用到1个核。
不过,也不用过于担心,Python虽然不能利用多线程实现多核任务,但可以通过多进程实现多核任务。多个Python进程有各自独立的GIL锁,互不影响。
计算密集型需要大量计算,主要用到CPU,如圆周率计算、高清视频解码。这种应用场景适合多进程。由于python中存在GIL锁,无法发挥出多核优势。对于计算密集型任务,最好采用C语言编写。
IO密集型需要对存储器进行大量操作,CPU用到得很少,大部分时间都在等待IO操作。涉及到网络、磁盘IO的任务都属于IO密集型,因此这种应用场景适合python的多线程。
小结: 进程:
优点:多核CPU中能充分利用多个CPU,速度快、稳定性高
缺点:耗费资源
线程:
优点:共享内存,资源占用低,速度快
缺点:稳定性差、加锁后性能差且有可能会造成死锁
计算密集型:CPU资源占用多,多进程,适合C语言
IO密集型:CPU资源占用少,时间都花在IO等待上,多线程,适合python