0. 背景知识 现代操作系统之所以可以执行多任务其实是轮流让各个任务交替执行,由于CPU速度足够快,所以看起来就像是所有任务在同时进行一样。 多任务的实现方式:
多进程模式
多线程模式
多进程+多线程模式 进程、线程之间的通信、协调、同步、数据共享等问题,增加了编程的复杂性。
1. 多进程 1.1. fork() - Unix 在Unix操作环境下,系统提供fork()系统调用,将当前进程(也称为父进程)复制一份(称为子进程)。父进程返回子进程的ID,子进程返回0,子进程可以通过getppid()来获取父进程的ID。 Apache服务器就是由父进程监听端口,有请求进来时,就会fork出子进程来处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 import osprint ('Process (%s) start...' % os.getpid())pid = os.fork() if pid == 0 : print ('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid())) else : print ('I (%s) just created a child process (%s).' % (os.getpid(), pid)) Process (15807 ) start... I (15807 ) just created a child process (15808 ). I am child process (15808 ) and my parent is 15807.
1.2. multiprocessing - 跨平台多进程模块
multiprocessing提供一个Process类代表一个进程对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 from multiprocessing import Processimport osdef run_proc (name ): print ('Run child process %s (%s)...' % (name, os.getpid())) if __name__ == '__main__' : print ('Parent process %s.' % os.getpid()) p = Process(target=run_proc, args=('test' ,)) print ('Child process will start.' ) p.start() p.join() print ('Child process end.' ) Parent process 54062. Child process will start. Run child process test (54063 )... Child process end.
1.3. Pool - 进程池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 from multiprocessing import Poolimport os, time, randomdef long_time_task (name ): print ('Run task %s (%s)...' % (name, os.getpid())) start = time.time() time.sleep(random.random()) end = time.time() print ('Task %s runs %0.2f seconds.' % (name, (end - start))) if __name__ == '__main__' : print ('Parent process %s.' % os.getpid()) p = Pool(4 ) for i in range (5 ): p.apply_async(long_time_task, args=(i,)) print ('Waiting for all subprocesses done...' ) p.close() p.join() print ('All subprocesses done.' ) Parent process 54349. Waiting for all subprocesses done... Run task 0 (54350 )... Run task 1 (54351 )... Run task 2 (54352 )... Run task 3 (54353 )... Task 3 runs 0.46 seconds. Run task 4 (54353 )... Task 0 runs 0.59 seconds. Task 1 runs 0.68 seconds. Task 2 runs 0.75 seconds. Task 4 runs 0.56 seconds. All subprocesses done.
1.4. subprocess 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 >>> subprocess.run(['echo' ,'hello' ])hello CompletedProcess(args=['echo' , 'hello' ], returncode=0 ) >>> subprocess.run('echo hello' , shell = True )hello CompletedProcess(args='echo hello' , returncode=0 ) >>> res = subprocess.call(['echo' ,'hello' ])hello >>> res0 import subprocesschild1 = subprocess.Popen(['echo' , 'hello\nworld' ], stdout=subprocess.PIPE) child2 = subprocess.Popen(["wc" ], stdin=child1.stdout, stdout=subprocess.PIPE) res = child2.communicate() print (res)(b' 2 2 12\n' , None ) import subprocessp = subprocess.Popen(['python' ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output, err = p.communicate(b'print("hello, python")' ) print (output, err)print (output.decode('utf-8' ))print ('Exit code:' , p.returncode)b'hello, python\n' b'' hello, python Exit code: 0
1.5. 进程间通信
Python的multiprocessing提供了Queue,Pipes等多种方式来交换数据。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 from multiprocessing import Process, Queueimport os, time, randomdef write (q ): print ('Process to write: %s' % os.getpid()) for value in ['A' , 'B' , 'C' ]: print ('Put %s to queue...' % value) q.put(value) time.sleep(random.random()) def read (q ): print ('Process to read: %s' % os.getpid()) while True : value = q.get(True ) print ('Get %s from queue.' % value) if __name__=='__main__' : q = Queue() pw = Process(target=write, args=(q,)) pr = Process(target=read, args=(q,)) pw.start() pr.start() pw.join() pr.terminate() Process to write: 5305 Put A to queue... Process to read: 5306 Get A from queue. Put B to queue... Get B from queue. Put C to queue... Get C from queue.
2. 多线程
线程是操作系统直接支持的执行单元,一个进程至少有一个线程。
2.1. 高级模块-threading 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import time, threadingdef loop (): print ('thread %s is running...' % threading.current_thread().name) n = 0 while n < 3 : n = n + 1 print ('thread %s >>> %s' % (threading.current_thread().name, n)) time.sleep(1 ) print ('thread %s ended.' % threading.current_thread().name) print ('thread %s is running...' % threading.current_thread().name) t = threading.Thread(target=loop, name='LoopThread' ) t.start() t.join() print ('thread %s ended.' % threading.current_thread().name)thread MainThread is running... thread LoopThread is running... thread LoopThread >>> 1 thread LoopThread >>> 2 thread LoopThread >>> 3 thread LoopThread ended. thread MainThread ended.
2.2. Lock 多进程:同一个变量,各自拷贝一份存于各个进程中,互不影响。 多线程:所有的变量由所有线程共享,因此可能会造成多个线程同时修改一个变量,导致出错
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 import threadingbalance = 0 def change_it (n ): global balance balance = balance + n balance = balance - n def run_thread (n ): for i in range (100000 ): change_it(n) t1 = threading.Thread(target=run_thread, args=(100 ,)) t2 = threading.Thread(target=run_thread, args=(50 ,)) t1.start() t2.start() t1.join() t2.join() print (balance)0 /50 /-50 /100 ...
threading.Lock() 创建锁: 由于创建了锁,因此其他线程不能同时执行change_it(n)函数;又由于锁只有一个,同一时刻只有一个线程持有该锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import threadingbalance = 0 lock = threading.Lock() def change_it (n ): global balance balance = balance + n balance = balance - n def run_thread (n ): for i in range (100000 ): lock.acquire() try : change_it(n) finally : lock.release() t1 = threading.Thread(target=run_thread, args=(100 ,)) t2 = threading.Thread(target=run_thread, args=(50 ,)) t1.start() t2.start() t1.join() t2.join() print (balance)
2.3. 锁的优劣 好处:确保了某段关键代码同时只能由一个线程执行 坏处:
阻止了多线程的并发能力,大大降低了效率
当存在多个锁时,不同线程持有不同的锁,当试图获取对方持有的锁时,可能会造成死锁,导致多个线程全部挂起
2.4. 多核CPU 由于Python有GIL(Global Interpreter Lock)全局锁,因此不能利用多线程实现多核任务。只能使用多进程来实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import threading, multiprocessingdef loop (): while True : pass for i in range (multiprocessing.cpu_count()): t = threading.Thread(target=loop) t.start() import multiprocessing, osdef loop (): print (os.getpid()) while True : pass numCpu = multiprocessing.cpu_count() p = multiprocessing.Pool(numCpu) for i in range (numCpu): p.apply_async(loop) p.close() p.join()
3. ThreadLocal 多线程环境下,每个线程使用自己的局部变量比全局变量好,这样一来能减少各个线程间的相互影响。 ThreadLocal 解决了参数在一个线程中的各个函数之间相互传递的问题。ThreadLocal虽然是全局变量,但是每个线程只能读写自己的独立副本。 ThreadLocal 最常用的地方就是为每个线程绑定一个数据库连接、Http请求,用户身份信息等,这样一个线程所有用到的处理函数就可以非常方便的访问这些资源了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 import threadinglocal_school = threading.local() def process_student (): std = local_school.student print ('Hello, %s (in %s)' % (std, threading.current_thread().name)) def process_thread (name ): local_school.student = name process_student() t1 = threading.Thread(target=process_thread, args=('Alice' ,), name='Thread-A' ) t2 = threading.Thread(target=process_thread, args=('Bob' ,), name='Thread-B' ) t1.start() t2.start() t1.join() t2.join() Hello, Alice (in Thread-A) Hello, Bob (in Thread-B)
4. 进程Vs线程 4.1. 多任务 通常多任务都会设计为:Master-Worker 模式,Master来分配任务,worker来执行任务。 多进程:稳定性高,资源消耗大。(早期的Apache服务) 多线程:由于共享的模式,子线程可能会导致整个进程挂掉,但是通常比多进程快一些
4.2. 任务切换 无论是多进程还是多线程,只要数量一多,效率就会下降。因为切换作业是有代价的,需要保存现场环境(CPU寄存器状态、内存页等等),切换的速度虽然快,但如果作业一多,也是相当耗费时间。
4.3. 计算密集型 Vs IO密集型 计算密集型:这类任务通常需要大量计算,消耗CPU,如:视频解码。这种类型虽然也可以多任务完成,但是任务越多,花在切换任务的时间越多,CPU执行效率越低。如果要高效的利用CPU,就要使计算任务的数量等于CPU核心数。同时,代码的执行效率也至关重要,选择效率高的语言,如:C,比较合适。 IO密集型:这类任务通常是主要涉及网络、磁盘的任务。对CPU消耗少,大部分时间都用在等待IO操作完成,因此一般情况下,任务越多,效率越高,比如:Wed应用。但是,在这种情况下使用执行效率高的编程语言并不会产生很大作用,因此可以选择有利于开发效率的语言,例如:python
4.4. 异步IO 由于CPU和IO之间的速度差异,大多数时间都会花在IO等待上,因此我们才需要多进程和多线程来并发的执行任务。不过现代操作系统也做出了巨大改进,支持了异步IO。这种全新的模型称为事件驱动模型,如Nginx,在单核CPU上采用单进程模式就可以高效的支持多任务了。Python语言中的单线程异步模型称为: 协程,这样一来就可以基于事件驱动的来编写多任务程序了。
5. 分布式进程 Python的multiprocessing模块不但支持多进程,其中的managers子模块还支持把多进程分布到多个机器上。依靠网络通信,一个服务进程可以作为调度者,将任务分布到其他机器上。 将通过Queue通信的多进程分布到两台机器上: 通过managers模块把Queue通过网络暴露出去即可。
5.1. Master机器(进程) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import random, time, queuefrom multiprocessing.managers import BaseManagertask_queue = queue.Queue() result_queue = queue.Queue() class QueueManager (BaseManager ): pass QueueManager.register('get_task_queue' , callable =lambda : task_queue) QueueManager.register('get_result_queue' , callable =lambda : result_queue) manager = QueueManager(address=('' , 8000 ), authkey=b'test' ) manager.start() task = manager.get_task_queue() result = manager.get_result_queue() for i in range (5 ): n = random.randint(0 , 1000 ) print ('Put task %d...' % n) task.put(n) print ('Try get results...' )for i in range (5 ): r = result.get(timeout=10 ) print ('Result: %s' % r) manager.shutdown() print ('Master Exit.' )
5.2. Worker机器(进程) : 因为Queue对象存储在Master的进程中,因此worker中并没有创建Queue的代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import time, sys, queuefrom multiprocessing.managers import BaseManagerclass QueueManager (BaseManager ): pass QueueManager.register('get_task_queue' ) QueueManager.register('get_result_queue' ) server_addr = '127.0.0.1' print ('Connect to server %s...' % server_addr)m = QueueManager(address=('' , 8000 ), authkey=b'test' ) m.connect() task = m.get_task_queue() result = m.get_result_queue() for i in range (5 ): try : n = task.get(timeout=1 ) print ('Run task %d * %d' % (n, n)) r = '%d * %d = %d' % (n, n, n*n) time.sleep(1 ) result.put(r) except queue.Empty: print ('Task queue is empty.' ) print ('Worker Exit.' )
5.3. 输出 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 (1 ). Master Try get results... Result: 463 * 463 = 214369 Result: 322 * 322 = 103684 Result: 567 * 567 = 321489 Result: 372 * 372 = 138384 Result: 120 * 120 = 14400 Master Exit. (2 ). Worker Connect to server 127.0 .0 .1 ... Run task 463 * 463 Run task 322 * 322 Run task 567 * 567 Run task 372 * 372 Run task 120 * 120 Worker Exit.