Python 编程 | 连载 25 - Python 多进程
一、进程与线程
进程的概念
对于操作系统来说,一个任务就是一个进程,进程就是程序执行的载体,如Python脚本中执行main函数就启动了一个进程,打开微信或者浏览器就是开启了一个进程,进程的运行需要资源支持,也就需要消耗CPU和内存
PID是各进程的代号,每个进程有唯一的PID编号
多进程就是操作系统同时运行多个进程,比如一边用Chrome上网一边听音乐一边在进行上传文件,这就是多进程(任务),至少同时有3个任务同时运行
线程的概念
线程(Thread)是操作系统的最小执行单元,进程至少由一个线程组成,如何调度进程和线程,完全有操作系统决定,程序不能自己决定什么时候执行,执行多久,有些进程还不止做一件事,比如微信可以同时语音、文字、视频聊天等
进程由线程组成,线程是执行任务逻辑的角色,进程提供线程执行程序的前置要求,线程在重组的资源配备下执行程序
打开一个浏览器就是启动一个进程,并且获取足够的资源,通过主进程中的主线程执行业务逻辑,主线程创建多个线程也就是对应了浏览器的多个tab页,线程之间互不影响,线程之间共享资源,显然常见多线程要比创建多进程节省资源
二、多进程
多进程的创建与使用
创建进程需要使用到multiprocessing模块,该模块中的常用函数有:
- Process:创建一个进程,返回一个对象
- start:执行进程,无参数、无返回值
- join:阻塞程序,无参数、无返回值
- kill:杀死进程,无参数、无返回值
- is_alive:判断今晨个是否存活,返回布尔值
import time
def alpha():
for i in range(10):
print(i, 'alpha')
time.sleep(1)
def bravo():
for i in range(10):
print(i, 'bravo')
time.sleep(1)
if __name__ == '__main__':
start = time.time()
alpha()
bravo()
end = time.time()
print("执行了{}".format(end - start))
导入os模块,分别在两个for循环以及main函数中打印出PID
print('PID:{}'.format(os.getpid()))
根据控制台的打印,可以说明两个for循环是在同一个进程中执行的,并且是先执行alpha中的for循环再执行bravo中的for循环,所以整个程序耗时较长。
使用多进程可以提高程序的执行效率,在程序中导入多进程模块multiprocessing,修改main函数,创建新的进程来执行alpha函数。
if __name__ == '__main__':
start = time.time()
# 使用多继承执行alpha
alpha_p = multiprocessing.Process(target=alpha)
alpha_p.start()
# alpha()
bravo()
end = time.time()
print("执行了{}".format(end - start))
print('主PID为:{}'.format(os.getpid()))
两个for循环几乎同时执行完成,bravo是在主进程上执行的,而alpha是在其他进程执行的,两个函数的PID是不同的,所以总的执行时间缩短了一半。
再创建一个进程来执行bravo函数,目前程序中存在三个进程,分别是执行alpha的进程、执行bravo的进程和执行main函数的进程。
if __name__ == '__main__':
start = time.time()
# 使用多继承执行alpha
alpha_p = multiprocessing.Process(target=alpha)
alpha_p.start()
# alpha()
# bravo()
bravo_p = multiprocessing.Process(target=bravo)
bravo_p.start()
end = time.time()
print("执行了{}".format(end - start))
print('主PID为:{}'.format(os.getpid()))
子进程和主进程之间互不影响,所以时间差非常短,但是我们希望这个时间差是从开始执行到执行结束所耗费的时间,并不是main进程启动后就执行。
修改main函数中的代码
if __name__ == '__main__':
start = time.time()
# 使用多继承执行alpha
alpha_p = multiprocessing.Process(target=alpha)
# alpha_p.start()
# alpha()
# bravo()
bravo_p = multiprocessing.Process(target=bravo)
# bravo_p.start()
for p in (alpha_p, bravo_p):
p.start()
for p in (alpha_p, bravo_p):
p.join()
end = time.time()
print("执行了{}".format(end - start))
print('主PID为:{}'.format(os.getpid()))
子进程结束之后再去执行主进程。
注释for循环,在a子进程执行完之后,调用join()函数,在调用b函数。
if __name__ == '__main__':
start = time.time()
# 使用多继承执行alpha
alpha_p = multiprocessing.Process(target=alpha)
alpha_p.start()
# 阻塞子进程
alpha_p.join()
# alpha()
# bravo()
bravo_p = multiprocessing.Process(target=bravo)
bravo_p.start()
# 执行两个函数
# for p in (alpha_p, bravo_p):
# p.start()
# # 阻塞两个子进程
# for p in (alpha_p, bravo_p):
# p.join()
end = time.time()
print("执行了{}".format(end - start))
print('主PID为:{}'.format(os.getpid()))
此时的时间差是alpha函数执行的耗时,alpha函数执行完成之后,bravo函数才开始执行
关闭alpha_p和bravo_p的执行,再增加一个for循环,打印出进程是否存活
if __name__ == '__main__':
start = time.time()
# 使用多继承执行alpha
alpha_p = multiprocessing.Process(target=alpha)
# alpha_p.start()
# 阻塞子进程
# alpha_p.join()
# alpha()
# bravo()
bravo_p = multiprocessing.Process(target=bravo)
# bravo_p.start()
# 执行两个函数
for p in (alpha_p, bravo_p):
p.start()
# 阻塞两个子进程
for p in (alpha_p, bravo_p):
p.join()
# 判断进程是否存活
for p in (alpha_p, bravo_p):
print('is alive:{}'.format(p.is_alive()))
end = time.time()
print("执行了{}".format(end - start))
print('主PID为:{}'.format(os.getpid()))
在alpha和bravo执行完成之后,两个进程都已经关闭。
多线程的优点是缩短脚本执行时间,提高执行效率。
多进程存在的问题有:
- 通过进程模块执行的函数无法获取返回值
- 多个今进程同时修改文件可能会出现错误
- 进程数量太多会造成资源不足、死机的情况
进程池
进程池的概念与数据库连接池的概念是类似的,都是为了提高效率,避免线程创建于关闭的消耗
多进程模块multiprocessing中进程池的相关函数:
- Pool:进程池的创建,参数为要创建的进程的个数,返回一个进程池对象
- applu_async:任务加入线程池(异步),参数函数名和函数的参数,无返回值
- close:关闭进程池,无参数、无返回值
- join:等待进程池任务结束,无参数、无返回值
import multiprocessing
import os
import time
def alpha(count):
print(count, os.getpid())
time.sleep(5)
if __name__ == '__main__':
pool = multiprocessing.Pool(5)
for i in range(20):
pool.apply_async(func=alpha, args=(i,))
time.sleep(20)
进程被重复利用了,这里调用了异步,异步就是非同步,导致前后使用的进程号顺序不一致。
进程池结束任务之前,主进程就已经结束了,程序结束,进程池就被关闭了。
pool.close()
pool.join()
在time.sleep()函数下添加代码,并注释time.sleep()函数。
20个任务全部完成,需要通过close()函数和join()函数,来保证在子线程执行结束之后,再结束主线程,在退出程序。
alpha()函数添加return, 异步是可以获取返回值的。
import multiprocessing
import os
import time
def alpha(count):
print(count, os.getpid())
time.sleep(5)
return 'result is %s, pid is %s' % (count, os.getpid())
if __name__ == '__main__':
pool = multiprocessing.Pool(5)
res_list = []
for i in range(20):
res = pool.apply_async(func=alpha, args=(i, ))
res_list.append(res)
for res in res_list:
print(res.get())
# time.sleep(20)
# pool.close()
# pool.join()
第一组先执行,执行完成之后打印出结果,同时第二组也开始执行。
进程锁
当一个进程开始执行任务的时候,为了避免进程被其他任务使用,需要通过锁开控制,只有解锁之后才能执行下一个任务
进程锁相关的函数:
acquire:上锁,无参数、无返回值 release:开锁,无参数、无返回值
import multiprocessing
import os
import time
def alpha(count, lock):
# 上锁
lock.acquire()
print(count, os.getpid())
time.sleep(5)
# 解锁
lock.release()
return 'result is %s, pid is %s' % (count, os.getpid())
if __name__ == '__main__':
pool = multiprocessing.Pool(5)
manager = multiprocessing.Manager()
lock = manager.Lock()
res_list = []
for i in range(20):
res = pool.apply_async(func=alpha, args=(i, lock))
# res_list.append(res)
pool.close()
pool.join()
每次只有一个进程在工作,锁不可以滥用,锁没有解开就会造成死锁现象。
三、进程之间的通信
两个进程之间需要相互配合工作,就需要通信的帮助。进程之间通过队列进行通信,队列可以解决进程模块执行的函数无法获取返回值的问题
队列是一种数据结构,队列中数据存储的特点是先入先出或者后入后出
多线程模块multipartprocessing中队列相关函数
- Queue:队列的创建,返回一个队列对象
- put:将信息放入队列,参数为放入队列的信息,无返回值
- get:获取队列中的信息,无参数,返回值为字符串既具体的消息
import json
import multiprocessing
class Work():
def __init__(self, queue):
self.queue = queue
def send(self, message):
if not isinstance(message, str):
message = json.dumps(message)
self.queue.put(message)
def reveive(self):
while True:
result = self.queue.get()
try:
res = json.loads(result)
except:
res = result
print('Message is {}'.format(res))
if __name__ == '__main__':
queue = multiprocessing.Queue()
work = Work(queue)
send = multiprocessing.Process(target=work.send, args=({'name': 'stark'},))
receive = multiprocessing.Process(target=work.reveive)
send.start()
receive.start()
send.join()
此时接收到数据之后,程序并不会停止,而是持续运行,需要通过调用函数来终止程序,在脚本末尾增加代码。
receive.terminate()
print(send.is_alive())
将批量消息放入队列中,增加send_list()函数。
import json
import multiprocessing
import time
class Work():
def __init__(self, queue):
self.queue = queue
# 其余代码不变
def send_list(self):
for i in range(10):
self.queue.put('Mark {}'.format(i))
time.sleep(1)
if __name__ == '__main__':
queue = multiprocessing.Queue()
work = Work(queue)
send = multiprocessing.Process(target=work.send, args=({'name': 'stark'},))
receive = multiprocessing.Process(target=work.reveive)
# 为send_list函数创建一个进程
send_list = multiprocessing.Process(target=work.send_list)
# 启动该进程
send_list.start()
send.start()
receive.start()
# print(send.is_alive())
send.join()
要想程序能够正常停止,只需阻塞最长的进程即可。
# send.join()
send_list.join()
receive.terminate()
print(send.is_alive())
相关文章
- Python 编程骚操作连载(二)- 类与对象
- vb编程入门_python编程入门
- Python Flask 编程 | 连载 08 - Jinja2 过滤器
- Python 编程 | 连载 17 - 高阶函数与装饰器
- Python 编程 | 连载 19 - Package 和 Module
- python滑动验证码_python编程是啥
- Python升级之路( Lv14 ) 并发编程初识
- Python 编程 | 连载 26 - Python 多线程
- Python Flask 编程 | 连载 05 - Jinja2 模板引擎
- Python 编程 | 连载 10 - 字典及操作
- Python 编程 | 连载 16 - 类的特性
- Python 编程 | 连载 04 - 字典与运算符
- Python Flask 编程 | 连载 09 - Jinja2 模板特性
- Python 编程 | 连载 25 - Python 多进程
- Python 编程 | 连载 06 - 格式化与转义字符
- Python Flask 编程 | 连载 04 - Flask 响应
- Python 编程 | 连载 02 - 数字与字符串
- Python基础21-网络编程
- Python 编程 | 连载 12 - Python 数据类型转换
- Python 编程 | 连载 21 -序列化与加密模块