1. 引言
- 进程是操作系统资源分配的基本单位,而线程是任务调度的基本单位。
- 一个应用程序至少包括一个进程,而一个进程至少包括一个线程,线程的尺度更小
- 每个进程在执行过程中都拥有独立的内存单元,而每个进程的多个子线程共享同一个内存单元
2. 多进程
2.1 Python中的多进程与multiprocess
模块
Python中的多进程编程主要依靠的是multiprocess
模块,现在我们对比两个任务,一个采用多进程,另一个不采用多进程,输出运行时间。
import time
import os
def use_time_task():
print('当前[子]进程为:{}'.format(os.getpid()))
time.sleep(2)
print('结果:{}'.format(10 ** 24))
if __name__ == '__main__':
print('当前[母]进程为:{}'.format(os.getpid()))
start = time.perf_counter()
for i in range(2):
use_time_task()
end = time.perf_counter()
print('耗时为 {} 秒'.format(end-start))
# 输出
当前[母]进程为:37016
当前[子]进程为:37016
结果:1000000000000000000000000
当前[子]进程为:37016
结果:1000000000000000000000000
耗时为 4.007984525 秒
可以看出,计算时间为4秒,子进程和母进程相同,此时只有一个进程参与计算。
下面我们采用2个进程计算该任务。
import time
import os
import multiprocessing
def use_time_task(i):
print('当前[子]进程为:{}--任务{}'.format(os.getpid(),i))
time.sleep(2)
print('结果:{}'.format(10 ** 24))
if __name__ == '__main__':
print('当前[母]进程为:{}'.format(os.getpid()))
start = time.perf_counter()
task1 = multiprocessing.Process(target=use_time_task,args=(1,))
task2 = multiprocessing.Process(target=use_time_task,args=(2,))
print('所有子进程完成')
task1.start()
task2.start()
task1.join()
task2.join()
end = time.perf_counter()
print('耗时为 {} 秒'.format(end-start))
# 输出
当前[母]进程为:37055
所有子进程完成
当前[子]进程为:37056--任务1
当前[子]进程为:37057--任务2
结果:1000000000000000000000000
结果:1000000000000000000000000
耗时为 2.0143263890000003 秒
可以看到,多进程的耗时仅为单个进程的一半,并发执行的时间确实要少很多。虽然我们就创建了一个进程,但是1个母进程中却包含2个子进程,我们在代码中采用了join()
就是为了让母进程阻塞,等所有的子进程完成后才打印总耗时。
2.1.2 总结
- 新的进程创建和切换都是要消耗资源的,且进程数受制于CPU的核心数,一般不能太大
- 进程之间的内存空间是独立的,不方便互相通信(其实我们可以使用
Queue([maxsize])
命令还构建进程通信) - 除了
Process
方法外,我们还可以通过Pool
类创建多进程
那么,我们来看看如何用Pool
类如何创建多线程吧。
2.2 利用multiprocess
模块的Pool
类创建多进程
很多时候,系统需要创建多个进程以提高CPU的利用率,当数量少时,我们可以手动创建,但是进程数量很多的时候,线程池Pool就发挥作用了,我们可以传递参数限制并发的数量,默认值为CPU的核心数。
Pool
类会提供指定数量的进程供用户调用,当新的请求提交到Pool
中时,如果进程池没有满,就会创建一个新的进程来执行请求,如果池满就会告知先等待,直到线程池中有进程结束,才会创建新的进程来执行这些请求。
2.2.1 multiprocess.pool
的几个方法:
1.apply.async
函数原型:apply_async(self, func, args=(), kwds={}, callback=None,error_callback=None):
作用:向进程池提交需要执行的函数及参数,各个进程采用非阻塞(异步)的调用方式,每个进程制只管运行自己的,不管其他进程是否完成(默认方式)
2.map()
函数原型:map(self, func, iterable, chunksize=None)
作用:Pool
类的map方法,与内置的map函数用法基本一致,它会使得进程阻塞(同步)知道返回结果。
注意:虽然第二个参数是迭代器,但在实际应用中,必须在整个队列就绪后,程序才会返回运行的子进程。
3.map_async()
函数原型:map_async(self, func, iterable, chunksize=None, callback=None,error_callback=None):
作用:与map()
函数作用一致,但是它是非阻塞(异步)的。
4.close()
作用:关闭线程池,不再接受新的任务。
5.terminate()
作用:结束工作进程,不再处理未处理的任务。
6.join()
作用:主进程阻塞等待子进程的退出,join()
方法要在close()
或terminate()
之后使用
2.2.2举例
from multiprocessing import Pool
import os
import time
def use_time_task(i):
print('当前[子]进程为:{}--任务{}'.format(os.getpid(),i))
time.sleep(2)
print('结果:{}\n'.format(10 ** 24))
if __name__ == '__main__':
print('cpu内核数:{}'.format(os.cpu_count()))
print('当前[母]进程为:{}'.format(os.getpid()))
start = time.perf_counter()
p = Pool(4)
for i in range(5):
p.apply_async(use_time_task,args=(i,))
print('等待所有子进程完成')
p.close()
p.join()# 主进程阻塞等待子进程退出
end = time.perf_counter()
print('耗时为 {} 秒'.format(end-start))
# 输出
cpu内核数:4
当前[母]进程为:37299
等待所有子进程完成
当前[子]进程为:37300--任务0
当前[子]进程为:37301--任务1
当前[子]进程为:37302--任务2
当前[子]进程为:37303--任务3
结果:1000000000000000000000000
结果:1000000000000000000000000
结果:1000000000000000000000000
结果:1000000000000000000000000
当前[子]进程为:37302--任务4
结果:1000000000000000000000000
耗时为 4.158054153 秒
2.2.3 总结
Pool
对象调用join()
方法会等待所有进程执行完毕,调用join()
之前必须先调用close()
或terminate()
方法,让它不再接受其他Process。
由于我的MacBook Pro13 仅有4核心,所以一次只能调用一个容量为4的进程池,4个进程计算5次,必须等待所有进程计算结束才能分配剩下的一次任务给一个进程。
在Python中,由于有GIL(全局解释器锁)的存在,它的作用是保证同一时间只有一个线程可以执行代码,很多人,包括我之前也认为Python中的多线程其实不是真正的多线程,如果想要充分的利用多核CPU仍然需要使用多进程,其余细节我会在博客的后续中续继续讨论。
2.3 多进程之间的数据共享
通常来说,进程之间是相互独立的,每个进程都有独立的内存。多进程共享必然会导致进程间的相互竞争,所以要尽最大可能防止使用共享状态。
还有一种方法是使用队列queue来实现进程之间的通信和数据共享,下面这个例子中,我们创建2个进程,一个负责写,一个负责读,实现共享一个队列queue。
from multiprocessing import Process,Queue
import os,time,random
# 写进程数据
def write_process(q):
print('进程{}正在写入'.format(os.getpid()))
for value in ['A','B','C']:
print('把[{}]放入队列'.format(value))
q.put(value)
time.sleep(random.random())
# 读进程数据
def read_process(q):
print('进程{}正在读取'.format(os.getpid()))
while True:
value = q.get(True)
print('在队列中得到进程[{}]'.format(value))
if __name__ == '__main__':
# 父进程创建Queue,并传递给各个子进程
q = Queue()
pw = Process(target=write_process,args=(q,))
pr = Process(target=read_process,args=(q,))
# 启动子进程pw
pw.start()
# 启动子进程or
pr.start()
# 等待pw结束
pw.join()
# pr里是死循环,直接强行终止
pr.terminate()
# 输出
进程37664正在写入
把[A]放入队列
进程37665正在读取
在队列中得到进程[A]
把[B]放入队列
在队列中得到进程[B]
把[C]放入队列
在队列中得到进程[C]
3. 多线程
Python中的多线程其实是在同一时刻执行多个不同程序,多线程的优点很多:
- 使用线程可以把占时程序的任务放到后台去处理
- 程序的运行速度加快
- 在一些等待服务,例如:用户注册、短信验证码和网络收发数据等等,使用线程可以明显的改善内存占用的问题
每个独立的线程都有一个程序入口、顺序执行序列和程序的出入口,但是线程不能够独立的运行,必须依赖于应用程序,由应用程序提供的多个线程执行控制。
3.1 Python中的多线程与threading
模块
Python3中的多线程一般使用threading
模块,在Python2中使用的是thread
模块,该模块在Python3中已经被弃用,为了兼容性,Python3中把thread
模块重命名为_thread
。
threading.Thread
模块可以接收两个参数,一个是target
,一般指向函数名,另一个是args
,用来传入函数需要的参数。调用start()
方法启动多线程,还可以使用current_thread().name
打印当前线程的名字,下面我们采用之前多进程中采用的例子,把它用线程来实现。
import threading
import time
def use_time_task(i):
print('当前[子]线程为:{}--任务{}'.format(threading.current_thread().name,i))
time.sleep(2)
print('结果:{}\n'.format(10 ** 24))
if __name__ == '__main__':
start = time.perf_counter()
print('当前[主]线程为:{}'.format(threading.current_thread().name))
t1 = threading.Thread(target=use_time_task,args=(1,))
t2 = threading.Thread(target=use_time_task,args=(2,))
t1.start()
t2.start()
end = time.perf_counter()
print('总耗时{}秒'.format(end-start))
# 输出
当前[主]线程为:MainThread
当前[子]线程为:Thread-1--任务1
当前[子]线程为:Thread-2--任务2
总耗时0.0028649530000000034秒
结果:1000000000000000000000000
结果:1000000000000000000000000
很明显,发生了和多进程中一样的问题,总耗时并没有真正计算到子线程运算的时间,直接把主线程运算时间给输出了,我们必须要使用join()
方法,代码如下:
import threading
import time
def use_time_task(i):
print('当前[子]线程为:{}--任务{}'.format(threading.current_thread().name,i))
time.sleep(2)
print('结果:{}\n'.format(10 ** 24))
if __name__ == '__main__':
start = time.perf_counter()
print('当前[主]线程为:{}'.format(threading.current_thread().name))
t1 = threading.Thread(target=use_time_task,args=(1,))
t2 = threading.Thread(target=use_time_task,args=(2,))
t1.start()
t2.start()
t1.join()
t2.join()# 等待t2完成后再计算时间
end = time.perf_counter()
print('总耗时{}秒'.format(end-start))
# 输出
当前[主]线程为:MainThread
当前[子]线程为:Thread-1--任务1
当前[子]线程为:Thread-2--任务2
结果:1000000000000000000000000
结果:1000000000000000000000000
总耗时2.00421429秒
我们再让代码简化一些,不要手动添加join()
import threading
import time
def use_time_task(i):
print('当前[子]线程为:{}--任务{}\n'.format(threading.current_thread().name,i))
time.sleep(2)
print('结果:{}\n'.format(10 ** 24))
if __name__ == '__main__':
start = time.perf_counter()
print('当前[主]线程为:{}\n'.format(threading.current_thread().name))
thread_list = []
for i in range(1,3):
t = threading.Thread(target=use_time_task,args=(i,))
thread_list.append(t)
print('thread_list中含有的线程为:{}'.format(thread_list))
for t in thread_list:
t.start()
for t in thread_list:
t.join()
end = time.perf_counter()
print('总耗时{}秒'.format(end-start))
# 输出
当前[主]线程为:MainThread
thread_list中含有的线程为:[<Thread(Thread-1, initial)>, <Thread(Thread-2, initial)>]
当前[子]线程为:Thread-1--任务1
当前[子]线程为:Thread-2--任务2
结果:1000000000000000000000000
结果:1000000000000000000000000
总耗时2.005684626秒
当我们设置多线程时,主线程会创建多个子线程,如果希望主线程等待子线程实现线程同步,那么我们就需要使用join()
方法,如果我们希望一个主线程结束后就不再执行子线程,那么我们就需要使用setDaemon(True)
(守护进程)来实现。
import threading
import time
def use_time_task(i):
print('当前[子]线程为:{}--任务{}\n'.format(threading.current_thread().name,i))
time.sleep(2)
print('结果:{}\n'.format(10 ** 24))
if __name__ == '__main__':
start = time.perf_counter()
print('当前[主]线程为:{}\n'.format(threading.current_thread().name))
for i in range(5):
t = threading.Thread(target=use_time_task,args=(i,))
t.setDaemon(True)
t.start()
end = time.perf_counter()
print('总耗时{}秒'.format(end-start))
# 输出
当前[主]线程为:MainThread
当前[子]线程为:Thread-1--任务0
当前[子]线程为:Thread-2--任务1
当前[子]线程为:Thread-3--任务2
当前[子]线程为:Thread-4--任务3
当前[子]线程为:Thread-5--任务4
总耗时0.0010438910000000051秒
3.2 继承Thread
类来重写run
方法创建新进程
除了使用threading.Thread()
方法来创建新的线程外,还可以通过继承Thread
类重写run方法来创建新的线程。
import threading
import time
def use_time_task(i):
time.sleep(2)
return 10 ** 24
class MyThread(threading.Thread):
def __init__(self, func, args, name=' '):
threading.Thread.__init__(self)
self.func = func
self.args = args
self.name = name
self.result = None
def run(self):
print('开始[子]进程{}\n'.format(self.name))
self.result = self.func(self.args[0],)
print('结果为:{}\n'.format(self.result))
print('{}子进程结束'.format(self.name))
if __name__ == '__main__':
start = time.time()
threads = []
for i in range(1,3):
t = MyThread(use_time_task,args=(i,),name=str(i))
threads.append(t)
for t in threads:
t.start()
for t in threads:
t.join()
end = time.time()
print('总耗时{}秒'.format(end-start))
# 输出
开始[子]进程1
开始[子]进程2
结果为:1000000000000000000000000
1子进程结束
结果为:1000000000000000000000000
2子进程结束
总耗时2.0061709880828857秒
通过继承threading.Thread
类,我们定义了一个新的MyThread().run()
方法,通过该类的实例化创建了2个子线程。
3.3 不同线程之间的数据共享
之前我们提到,线程之间共享同一部分内存空间,那么意味着任何一个线程都可以改变摸一个变量,因此线程之间的共享数据最大危险在于多个线程同时修改一个变量。针对这种情况,我们可以使用threading.lock()
方法对一个共享变量进行锁定,修改完后供其他线程使用。
import threading
import time
class Account:
def __init__(self):
self.balance = 0
def add(self,lock):
# 获得锁
lock.acquire()
for i in range(0,100000):
self.balance += 1
# 释放锁
lock.release()
def delete(self,lock):
# 获得锁
lock.acquire()
for i in range(0,100000):
self.balance -= 1
# 释放锁
lock.release()
if __name__ == '__main__':
account = Account()
lock = threading.Lock()
# 创建线程
thread_add = threading.Thread(target=account.add,args=(lock,),name='Add')
thread_delete = threading.Thread(target=account.delete,args=(lock,),name='Delete')
# 启动线程
thread_add.start()
thread_delete.start()
# 线程等待
thread_add.join()
thread_delete.join()
print('最终的balance为:{}'.format(account.balance))
# 输出
最终的balance为:0
显然,添加的线程和删除的线程相互抵消,最终的balance为0。
3.4 使用queue队列通信
下面的例子中,创建了2个线程,一个用于生成,另一个用于消费,生成的产品放在queue里,实现了不通的线程沟通。
from queue import Queue
import random,threading,time
# 生产者类
class Producer(threading.Thread):
def __init__(self, name ,queue):
threading.Thread.__init__(self,name=name)
self.queue = queue
def run(self):
for i in range(1,5):
print('{} is producing {} to queue!'.format(self.getName(),i))
self.queue.put(i)
time.sleep(random.randrange(10)/5)
print('{} finished! '.format(self.getName()))
# 消费者类
class Customer(threading.Thread):
def __init__(self, name, queue):
threading.Thread.__init__(self,name=name)
self.queue = queue
def run(self):
for i in range(1,5):
val = self.queue.get()
print('{} is consuming {} in queue'.format(self.getName(),val))
time.sleep(random.randrange(10))
print('{} finished !'.format(self.getName()))
def main():
queue = Queue()
producer = Producer('Producer',queue)
customer = Customer('customer',queue)
producer.start()
customer.start()
producer.join()
customer.join()
print('全部线程 完成!')
if __name__ == '__main__':
main()
# 输出
Producer is producing 1 to queue!
customer is consuming 1 in queue
Producer is producing 2 to queue!
Producer is producing 3 to queue!
Producer is producing 4 to queue!
Producer finished!
customer is consuming 2 in queue
customer is consuming 3 in queue
customer is consuming 4 in queue
customer finished !
全部线程 完成!
上面的例子中,队列queue
的put
方法可以将一个对象obj放入队列中,如果队列满了,则将阻塞直到队列有空间可用为止,queue
的get
方法则会一次返回队列中的一个成员,如果队列为空,则将阻塞直到队列有种成员为止。
同时,queue
还有empty()
和full()
方法,用来查看队列的空、满状态。
4.总结
- 对于CPU密集型代码——多进程效率更高
- 对于IO密集型代码(文件操作、爬虫等)——多线程效率高
对于IO密集型应用,大部分时间消耗是用于等待,在文件或爬虫中,即使是多CPU也不能完全被利用起来,多进程效率必然不高。
当Python遇到IO密集型代码时,会释放GIL供新的线程使用,实现了线程之间的切换。
Q.E.D.