0%

Python - Theme 9 Process and Thread

0. 背景知识

现代操作系统之所以可以执行多任务其实是轮流让各个任务交替执行,由于CPU速度足够快,所以看起来就像是所有任务在同时进行一样。
多任务的实现方式:

  • 多进程模式
  • 多线程模式
  • 多进程+多线程模式
    进程、线程之间的通信、协调、同步、数据共享等问题,增加了编程的复杂性。

1. 多进程

1.1. fork() - Unix

在Unix操作环境下,系统提供fork()系统调用,将当前进程(也称为父进程)复制一份(称为子进程)。父进程返回子进程的ID,子进程返回0,子进程可以通过getppid()来获取父进程的ID。
Apache服务器就是由父进程监听端口,有请求进来时,就会fork出子进程来处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
import os

print('Process (%s) start...' % os.getpid())
pid = os.fork()
if pid == 0:
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

#输出
Process (15807) start...
I (15807) just created a child process (15808).
I am child process (15808) and my parent is 15807.

1.2. multiprocessing - 跨平台多进程模块

multiprocessing提供一个Process类代表一个进程对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from multiprocessing import Process
import os


# 子进程要执行的代码
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))


if __name__ == '__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',)) #创建子进程时,只需要传入一个执行函数和函数的参数
print('Child process will start.')
p.start()
p.join() #等待子进程p结束后才向下进行;常用于进程间的同步
#p.join(5) 可以设置阻塞时间
print('Child process end.')

#输出
Parent process 54062.
Child process will start.
Run child process test (54063)...
Child process end.

1.3. Pool - 进程池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from multiprocessing import Pool
import os, time, random

def long_time_task(name): #运行时间
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random())
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__ == '__main__':
print('Parent process %s.' % os.getpid())
p = Pool(4) #设置进程池为4; 进程池默认为CPU核数
for i in range(5):
p.apply_async(long_time_task, args=(i,)) #apply_async为异步非阻塞;apply为阻塞的,类似于单进程串行
print('Waiting for all subprocesses done...')
p.close() #关闭进程池后不再接收新的进程(已启动的子进程还会继续)
p.join() #等待进程池内的所有子进程执行完毕
print('All subprocesses done.')

#输出
Parent process 54349.
Waiting for all subprocesses done...
Run task 0 (54350)...
Run task 1 (54351)...
Run task 2 (54352)...
Run task 3 (54353)...
Task 3 runs 0.46 seconds.
Run task 4 (54353)...
Task 0 runs 0.59 seconds.
Task 1 runs 0.68 seconds.
Task 2 runs 0.75 seconds.
Task 4 runs 0.56 seconds.
All subprocesses done.

1.4. subprocess

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#(1). subprocess.run()
>>> subprocess.run(['echo','hello'])
hello
CompletedProcess(args=['echo', 'hello'], returncode=0)
>>> subprocess.run('echo hello', shell = True)
hello
CompletedProcess(args='echo hello', returncode=0)

#(2). subprocess.call() - 返回值
>>> res = subprocess.call(['echo','hello'])
hello
>>> res
0

#(3). subprocess.Popen() - subprocess的使用方法都是对subprocess.Popen的封装
#(3.1).利用subprocess.PIPE可以将多个子进程的输入和输出连接在一起。subprocess.PIPE实际上为文本流提供了一个缓存区
import subprocess

child1 = subprocess.Popen(['echo', 'hello\nworld'], stdout=subprocess.PIPE) #将child1的stdout输出到缓存区
child2 = subprocess.Popen(["wc"], stdin=child1.stdout, stdout=subprocess.PIPE) #child2的stdin读取 PIPE
res = child2.communicate() #communicate()方法从PIPE中读取内容返回;communicate()会阻塞父进程
print(res)

#输出
(b' 2 2 12\n', None)

#(3.2). 可以利用communicate()方法来使用PIPE给子进程输入
import subprocess

p = subprocess.Popen(['python'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'print("hello, python")')
print(output, err)
print(output.decode('utf-8'))
print('Exit code:', p.returncode)

#输出: 相当于打开python交互模式, 执行print("hello, python")
b'hello, python\n' b''
hello, python

Exit code: 0

1.5. 进程间通信

Python的multiprocessing提供了Queue,Pipes等多种方式来交换数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
from multiprocessing import Process, Queue
import os, time, random


# 写数据进程执行的代码:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())


# 读数据进程执行的代码:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)


if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程pw,写入:
pw.start()
# 启动子进程pr,读取:
pr.start()
# 等待pw结束:
pw.join()
# pr进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()

#输出
Process to write: 5305
Put A to queue...
Process to read: 5306
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

2. 多线程

线程是操作系统直接支持的执行单元,一个进程至少有一个线程。

2.1. 高级模块-threading

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import time, threading

# 新线程执行的代码:
def loop():
print('thread %s is running...' % threading.current_thread().name)
n = 0
while n < 3:
n = n + 1
print('thread %s >>> %s' % (threading.current_thread().name, n))
time.sleep(1)
print('thread %s ended.' % threading.current_thread().name)


print('thread %s is running...' % threading.current_thread().name) #任何进程默认会启动一个线程,称为主线程 MainThread
t = threading.Thread(target=loop, name='LoopThread') #这里传入子线程要执行的函数;并且可以自定义线程名。若不定义,系统会自动命名为:Thread-1,Thread-2
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)

#输出
thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread ended.
thread MainThread ended.

2.2. Lock

多进程:同一个变量,各自拷贝一份存于各个进程中,互不影响。
多线程:所有的变量由所有线程共享,因此可能会造成多个线程同时修改一个变量,导致出错

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import threading

# 假定这是银行存款:
balance = 0

def change_it(n):
# 先存后取,结果应该为0:
global balance
balance = balance + n
balance = balance - n

def run_thread(n):
for i in range(100000):
change_it(n)

t1 = threading.Thread(target=run_thread, args=(100,))
t2 = threading.Thread(target=run_thread, args=(50,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

#输出
0/50/-50/100 ... #由于两个线程在同时操作一个变量,因此出现了错误

threading.Lock() 创建锁: 由于创建了锁,因此其他线程不能同时执行change_it(n)函数;又由于锁只有一个,同一时刻只有一个线程持有该锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import threading

# 假定这是银行存款:
balance = 0
lock = threading.Lock()

def change_it(n):
# 先存后取,结果应该为0:
global balance
balance = balance + n
balance = balance - n

def run_thread(n):
for i in range(100000):
lock.acquire() #获取锁
try:
change_it(n)
finally:
lock.release() #释放锁


t1 = threading.Thread(target=run_thread, args=(100,))
t2 = threading.Thread(target=run_thread, args=(50,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)

2.3. 锁的优劣

好处:确保了某段关键代码同时只能由一个线程执行
坏处:

  • 阻止了多线程的并发能力,大大降低了效率
  • 当存在多个锁时,不同线程持有不同的锁,当试图获取对方持有的锁时,可能会造成死锁,导致多个线程全部挂起

2.4. 多核CPU

由于Python有GIL(Global Interpreter Lock)全局锁,因此不能利用多线程实现多核任务。只能使用多进程来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#多线程(尽管创建了和系统核数相同数量的线程,但是也只能跑满单核):
import threading, multiprocessing

def loop():
while True:
pass

for i in range(multiprocessing.cpu_count()):
t = threading.Thread(target=loop)
t.start()

#多进程(跑满CPU)
import multiprocessing, os

def loop():
print(os.getpid())
while True:
pass

numCpu = multiprocessing.cpu_count()
p = multiprocessing.Pool(numCpu)
for i in range(numCpu):
p.apply_async(loop)

p.close()
p.join()

3. ThreadLocal

多线程环境下,每个线程使用自己的局部变量比全局变量好,这样一来能减少各个线程间的相互影响。
ThreadLocal 解决了参数在一个线程中的各个函数之间相互传递的问题。ThreadLocal虽然是全局变量,但是每个线程只能读写自己的独立副本。
ThreadLocal 最常用的地方就是为每个线程绑定一个数据库连接、Http请求,用户身份信息等,这样一个线程所有用到的处理函数就可以非常方便的访问这些资源了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import threading

# 创建全局ThreadLocal对象:
local_school = threading.local()


def process_student():
# 获取当前线程关联的student:
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))


def process_thread(name):
# 绑定ThreadLocal的student:
local_school.student = name
process_student()


t1 = threading.Thread(target=process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target=process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()

#输出
Hello, Alice (in Thread-A)
Hello, Bob (in Thread-B)

4. 进程Vs线程

4.1. 多任务

通常多任务都会设计为:Master-Worker 模式,Master来分配任务,worker来执行任务。
多进程:稳定性高,资源消耗大。(早期的Apache服务)
多线程:由于共享的模式,子线程可能会导致整个进程挂掉,但是通常比多进程快一些

4.2. 任务切换

无论是多进程还是多线程,只要数量一多,效率就会下降。因为切换作业是有代价的,需要保存现场环境(CPU寄存器状态、内存页等等),切换的速度虽然快,但如果作业一多,也是相当耗费时间。

4.3. 计算密集型 Vs IO密集型

计算密集型:这类任务通常需要大量计算,消耗CPU,如:视频解码。这种类型虽然也可以多任务完成,但是任务越多,花在切换任务的时间越多,CPU执行效率越低。如果要高效的利用CPU,就要使计算任务的数量等于CPU核心数。同时,代码的执行效率也至关重要,选择效率高的语言,如:C,比较合适。
IO密集型:这类任务通常是主要涉及网络、磁盘的任务。对CPU消耗少,大部分时间都用在等待IO操作完成,因此一般情况下,任务越多,效率越高,比如:Wed应用。但是,在这种情况下使用执行效率高的编程语言并不会产生很大作用,因此可以选择有利于开发效率的语言,例如:python

4.4. 异步IO

由于CPU和IO之间的速度差异,大多数时间都会花在IO等待上,因此我们才需要多进程和多线程来并发的执行任务。不过现代操作系统也做出了巨大改进,支持了异步IO。这种全新的模型称为事件驱动模型,如Nginx,在单核CPU上采用单进程模式就可以高效的支持多任务了。Python语言中的单线程异步模型称为: 协程,这样一来就可以基于事件驱动的来编写多任务程序了。

5. 分布式进程

Python的multiprocessing模块不但支持多进程,其中的managers子模块还支持把多进程分布到多个机器上。依靠网络通信,一个服务进程可以作为调度者,将任务分布到其他机器上。
将通过Queue通信的多进程分布到两台机器上: 通过managers模块把Queue通过网络暴露出去即可。

5.1. Master机器(进程)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import random, time, queue
from multiprocessing.managers import BaseManager

# 发送任务的队列
task_queue = queue.Queue()
# 接收结果的队列
result_queue = queue.Queue()


# 继承BaseManager
class QueueManager(BaseManager):
pass


# 将两个Queue注册到网络上,callable参数关联了Queue对象
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口8000,设置验证码: 'test'
manager = QueueManager(address=('', 8000), authkey=b'test')
# 启动Queue
manager.start()
# 通过网络访问的Queue对象
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放入几个任务
for i in range(5):
n = random.randint(0, 1000)
print('Put task %d...' % n)
task.put(n)

# 从result队列读取结果
print('Try get results...')
for i in range(5):
r = result.get(timeout=10)
print('Result: %s' % r)

# 关闭
manager.shutdown()
print('Master Exit.')

5.2. Worker机器(进程) : 因为Queue对象存储在Master的进程中,因此worker中并没有创建Queue的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import time, sys, queue
from multiprocessing.managers import BaseManager

# 创建类似的QueueManger
class QueueManager(BaseManager):
pass


# 由于QueueManager只从网络上获取Queue, 因此注册时只提供名字
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')

# 连接到Master服务器
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)

# 端口和验证码与Master保持一致
m = QueueManager(address=('', 8000), authkey=b'test')
# 通过网络连接
m.connect()
# 获取Queue的对象
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列读取任务,并把结果写入result队列
for i in range(5):
try:
n = task.get(timeout=1)
print('Run task %d * %d' % (n, n))
r = '%d * %d = %d' % (n, n, n*n)
time.sleep(1)
result.put(r)
except queue.Empty:
print('Task queue is empty.')
# 处理结果
print('Worker Exit.')

5.3. 输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
(1). Master
Try get results...
Result: 463 * 463 = 214369
Result: 322 * 322 = 103684
Result: 567 * 567 = 321489
Result: 372 * 372 = 138384
Result: 120 * 120 = 14400
Master Exit.

(2). Worker
Connect to server 127.0.0.1...
Run task 463 * 463
Run task 322 * 322
Run task 567 * 567
Run task 372 * 372
Run task 120 * 120
Worker Exit.