跳转至

多进程之进程池间通信


使用Manager完成进程池内的进程间通信

import os
import time
from multiprocessing import Manager, Queue, Pool


def send_someting(queue: Queue, something):
    print(f'发送消息: {something}, 发送消息的进程id: {os.getpid()}')
    for _ in range(3):
        send_time = time.time()
        print(f'发送时间: {send_time}')
        queue.put((something, send_time))


def receive_something(queue: Queue):
    # queue.qsize() 是消息队列中的消息数量
    while queue.qsize():
        something, receive_time = queue.get()
        print(f'接受消息: {something}, 接收时间: {receive_time}, 接收消息的进程id: {os.getpid()}')


def main():
    print(f'主进程开始运行, pid为: {os.getpid()}')

    queue = Manager().Queue()
    pool = Pool()

    # 发送消息的进程
    pool.apply_async(send_someting, (queue, 'Hello World'))
    # 因为写入需要时间,谨防还没写完就开始读取
    time.sleep(1)
    # 接收消息的进程
    pool.apply_async(receive_something, kwds={'queue': queue})

    pool.close()
    pool.join()

    print('主进程运行完毕')


if __name__ == '__main__':
    main()

输出:

主进程开始运行, pid为: 93817
发送消息: Hello World, 发送消息的进程id: 93829
发送时间: 1659790972.757417
发送时间: 1659790972.75799
发送时间: 1659790972.758079
接受消息: Hello World, 接收时间: 1659790972.757417, 接收消息的进程id: 93826
接受消息: Hello World, 接收时间: 1659790972.75799, 接收消息的进程id: 93826
接受消息: Hello World, 接收时间: 1659790972.758079, 接收消息的进程id: 93826
主进程运行完毕