Skip to main content

ipc-worker:进程间通信,多进程Woker通过共享内存或MQ工作。

项目描述

ipc-worker:进程间通信,多进程Woker通过共享内存或MQ工作。

support share memory (py>=3.8 and linux) and mq process worker (py >=3.6)
# -*- coding: utf-8 -*-
# @Time    : 2021/11/23 9:35

'''
demo share memrory
recommended system linux and python >= 3.8
    recommended linux
    python 3.8

Do not recommended run in windows , it will report an error as follow
    RuntimeError:
            An attempt has been made to start a new process before the
            current process has finished its bootstrapping phase.

'''

import multiprocessing
import time,os
from ipc_worker.ipc_shm_loader import IPC_shm,SHM_process_worker



class My_worker(SHM_process_worker):
    def __init__(self,config,*args,**kwargs):
        super(My_worker,self).__init__(*args,**kwargs)
        #config info , use by yourself

        self._logger.info('Process id {}, group name {} ,shm name {}'.format(self._idx,self._group_name,self._shm_name))
        self._logger.info(config)
        self.config = config


    #Process begin trigger this func
    def run_begin(self):
        self._logger.info('worker pid {}...'.format(os.getpid()))
        self.handle = None
        pass

    # Process end trigger this func
    def run_end(self):
        if self.handle is not None:
            pass

    #any data put will trigger this func
    def run_once(self,request_data):
        #process request_data
        if isinstance(request_data,dict):
            request_data['b']  = 200
        if self.handle is not None:
            #do some thing
            pass
        return request_data


if __name__ == '__main__':
    config = {
        "anything" : "anything",
        "aa": 100
    }

    evt_quit = multiprocessing.Manager().Event()

    # group_name 为共享内存组名,需唯一
    # manager is an agent  and act as a load balancing
    # worker is real doing your work
    instance = IPC_shm(
        CLS_worker=My_worker,
        worker_args=(config,),  # must be tuple
        worker_num=10,  # number of worker Process
        manager_num=2,  # number of agent Process
        group_name='serving_shm',  # share memory name
        shm_size=1 * 1024 * 1024,  # share memory size
        queue_size=20,  # recv queue size
        is_log_time=True,  # whether log compute time
    )

    instance.start()

    #demo produce and consume message , you can process by http
    for i in range(10):
        print('produce message')
        data = {"a" : 100}
        request_id = instance.put(data)
        data = instance.get(request_id)
        print('get process result',data)
    try:
        instance.join()
    except Exception as e:
        evt_quit.set()
        instance.terminate()

    del evt_quit
# -*- coding: utf-8 -*-
# @Time    : 2021/11/29 15:06
# @Author  : wyw


import multiprocessing
import os
from ipc_worker.ipc_zmq_loader import IPC_zmq,ZMQ_process_worker
'''
    demo ZMQ depend zmq
    pip install pyzmq

    test pass >= python3.6
'''

tmp_dir = './tmp'
if not os.path.exists(tmp_dir):
    os.mkdir(tmp_dir)

os.environ['ZEROMQ_SOCK_TMP_DIR'] = tmp_dir

class My_worker(ZMQ_process_worker):
    def __init__(self,config,*args,**kwargs):
        super(My_worker,self).__init__(*args,**kwargs)
        #config info , use by yourself
        self._logger.info('Process id {}, group name {} , identity {}'.format(self._idx,self._group_name,self._identity))
        self._logger.info(config)
        self.config = config

    #Process begin trigger this func
    def run_begin(self):
        self._logger.info('worker pid {}...'.format(os.getpid()))
        self.handle = None
        pass

    # Process end trigger this func
    def run_end(self):
        if self.handle is not None:
            pass

    #any data put will trigger this func
    def run_once(self,request_data):
        #process request_data
        if isinstance(request_data,dict):
            request_data['b'] = 200
        if self.handle is not None:
            #do some thing
            pass
        return request_data


if __name__ == '__main__':
    config = {
        "anything" : "anything",
        "aa": 100
    }

    evt_quit = multiprocessing.Manager().Event()

    # group_name 为共享内存组名,需唯一
    # manager is an agent  and act as a load balancing
    # worker is real doing your work
    instance = IPC_zmq(
        CLS_worker=My_worker,
        worker_args=(config,),  # must be tuple
        worker_num=10,  # number of worker Process
        group_name='serving_zmq',  # share memory name
        evt_quit=evt_quit,
        queue_size=20,  # recv queue size
        is_log_time=True,  # whether log compute time
    )
    instance.start()

    #demo produce and consume message , you can process by http
    for i in range(10):
        data = {"a" : 100}
        request_id = instance.put(data)

        data = instance.get(request_id)
        print('get process result',request_id,data)
    try:
        instance.join()
    except Exception as e:
        evt_quit.set()
        instance.terminate()
    del evt_quit

项目详情


下载文件

下载适用于您平台的文件。如果您不确定要选择哪个,请了解有关安装包的更多信息。

内置分布

ipc_worker-0.0.6-py3-none-any.whl (26.7 kB 查看哈希

已上传 py3