下面我们将使用多进程解决多输入情况下的斐波那契数列问题,而不是之前我们使用的多线程的方法。
multiprocessing_fibonacci.py程序使用multiprocessing模块,为了顺利执行,还导入了如下模块:
import sys, time, random, re, requests
import concurrent.futures
from multiprocessing import, cpu_count, current_process, Manager
上面一些导入模块在之前的章节中提及过,然而,下面的这些模块我们需要特别注意:
- cpu_count: 该方法允许获得机器cpu的数量
- current_process: 该方法可以获得当前进程的信息,比如进程名称
- Manager: 该类通过对象的方式允许功在多进程之间共享python对象
下面的代码中我们可以注意到第一个方法有点不同,它将产生15个1到20之间的整数,这些整数将被当作fibo_dict的key使用。
接下来让我们一起来看producer_task方法,如下:
def producer_task(q, fibo_dict):
for i in range(15):
value = random.randint(1, 20)
fibo_dict[value] = None
print("Producer [%s] putting value [%d] into queue.." % (current_process().name, value))
q.put(value)
下面将定义一个函数来计算fibo_dict中key对应的斐波那契数列值,和之前章节介绍计算斐波那契序列值不同的是,这里把fibo_dict当作参数传入不同的processes。
下面是consumer_task方法,如下:
def consumer_task(q, fibo_dict):
while not q.empty():
value = q.get(True, 0.05)
a, b = 0, 1
for item in range(value):
a, b = b, a+b
fibo_dict[value] = a
print("consumer [%s] getting value [%d] from queue..." % (current_process().name, value))
更进一步,我们来看main函数中的代码,main函数中下面几个变量被定义:
- data_queue: 该参数由multiprocessing.Queueu来创建,是进程安全的
- number_of_cpus: 该参数由multiprocessing.cpu_count方法获得,获得机器cpu的个数
- fibo_dict: 这个字典类型变量从Manager实例获得,保存多进程计算结果
然后,我们将创建producer进程,并传入data_queue队列,data_queue队列值由producer_task方法获得:
producer = Process(target=producer_task, args=(data_queue, fibo_dict))
producer.start()
producer.join()
我们可以注意到Process实例的初始化过程和我们之前的Thread实例初始化过程类似。初始化函数接收target参数作为进程中要执行的函数,和args参数作为target传入的函数的参数。接下来我们通过start方式开始进程,然后使用join方法,等待producer进程执行完毕。
下面一块代码中,我们将定义consumer_list队列,存入初始化过的consumer进程。使用list存储consumer对象的原因是在所有进程结束开始后调用join方法。循环中的每一个worker被调用后,下一个worker将等待上一个worker执行完毕后才开始执行,下面代码将描述这一过程:
consumer_list = []
cpu = cpu_count()
print(cpu)
for i in range(cpu):
consumer = Process(target=consumer_task, args=(data_queue, fibo_dict))
consumer.start()
consumer_list.append(consumer)
[consumer.join() for consumer in consumer_list]
最终我们将迭代输出fibo_dict中的结果,如下面截图所示: