ipc-worker:进程间通信,多进程Woker通过共享内存或MQ工作。
项目描述
ipc-worker:进程间通信,多进程Woker通过共享内存或MQ工作。
support share memory (py>=3.8 and linux) and mq process worker (py >=3.6)
# -*- coding: utf-8 -*-
# @Time : 2021/11/23 9:35
'''
demo share memrory
recommended system linux and python >= 3.8
recommended linux
python 3.8
Do not recommended run in windows , it will report an error as follow
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
'''
import multiprocessing
import time,os
from ipc_worker.ipc_shm_loader import IPC_shm,SHM_process_worker
class My_worker(SHM_process_worker):
def __init__(self,config,*args,**kwargs):
super(My_worker,self).__init__(*args,**kwargs)
#config info , use by yourself
self._logger.info('Process id {}, group name {} ,shm name {}'.format(self._idx,self._group_name,self._shm_name))
self._logger.info(config)
self.config = config
#Process begin trigger this func
def run_begin(self):
self._logger.info('worker pid {}...'.format(os.getpid()))
self.handle = None
pass
# Process end trigger this func
def run_end(self):
if self.handle is not None:
pass
#any data put will trigger this func
def run_once(self,request_data):
#process request_data
if isinstance(request_data,dict):
request_data['b'] = 200
if self.handle is not None:
#do some thing
pass
return request_data
if __name__ == '__main__':
config = {
"anything" : "anything",
"aa": 100
}
evt_quit = multiprocessing.Manager().Event()
# group_name 为共享内存组名,需唯一
# manager is an agent and act as a load balancing
# worker is real doing your work
instance = IPC_shm(
CLS_worker=My_worker,
worker_args=(config,), # must be tuple
worker_num=10, # number of worker Process
manager_num=2, # number of agent Process
group_name='serving_shm', # share memory name
shm_size=1 * 1024 * 1024, # share memory size
queue_size=20, # recv queue size
is_log_time=True, # whether log compute time
)
instance.start()
#demo produce and consume message , you can process by http
for i in range(10):
print('produce message')
data = {"a" : 100}
request_id = instance.put(data)
data = instance.get(request_id)
print('get process result',data)
try:
instance.join()
except Exception as e:
evt_quit.set()
instance.terminate()
del evt_quit
# -*- coding: utf-8 -*-
# @Time : 2021/11/29 15:06
# @Author : wyw
import multiprocessing
import os
from ipc_worker.ipc_zmq_loader import IPC_zmq,ZMQ_process_worker
'''
demo ZMQ depend zmq
pip install pyzmq
test pass >= python3.6
'''
tmp_dir = './tmp'
if not os.path.exists(tmp_dir):
os.mkdir(tmp_dir)
os.environ['ZEROMQ_SOCK_TMP_DIR'] = tmp_dir
class My_worker(ZMQ_process_worker):
def __init__(self,config,*args,**kwargs):
super(My_worker,self).__init__(*args,**kwargs)
#config info , use by yourself
self._logger.info('Process id {}, group name {} , identity {}'.format(self._idx,self._group_name,self._identity))
self._logger.info(config)
self.config = config
#Process begin trigger this func
def run_begin(self):
self._logger.info('worker pid {}...'.format(os.getpid()))
self.handle = None
pass
# Process end trigger this func
def run_end(self):
if self.handle is not None:
pass
#any data put will trigger this func
def run_once(self,request_data):
#process request_data
if isinstance(request_data,dict):
request_data['b'] = 200
if self.handle is not None:
#do some thing
pass
return request_data
if __name__ == '__main__':
config = {
"anything" : "anything",
"aa": 100
}
evt_quit = multiprocessing.Manager().Event()
# group_name 为共享内存组名,需唯一
# manager is an agent and act as a load balancing
# worker is real doing your work
instance = IPC_zmq(
CLS_worker=My_worker,
worker_args=(config,), # must be tuple
worker_num=10, # number of worker Process
group_name='serving_zmq', # share memory name
evt_quit=evt_quit,
queue_size=20, # recv queue size
is_log_time=True, # whether log compute time
)
instance.start()
#demo produce and consume message , you can process by http
for i in range(10):
data = {"a" : 100}
request_id = instance.put(data)
data = instance.get(request_id)
print('get process result',request_id,data)
try:
instance.join()
except Exception as e:
evt_quit.set()
instance.terminate()
del evt_quit
项目详情
关
ipc_worker -0.0.6-py3-none-any.whl 的哈希值
| 算法 | 哈希摘要 | |
|---|---|---|
| SHA256 | 749dbf6b9269d35500311d9fc328ff93b0fd2bd7c8c5c29ed13e5a60785ed8f2 |
|
| MD5 | 21b780b10f1dcc9b9af887fbb0ccdfdf |
|
| 布莱克2-256 | edcceb73d855f79872aec7e6d55d5aba284a8b7fc674662bb6a5cb964affb5dd |