前言
如何通过 multiprocessing 模块实现Python的RPC
实现方法
首先, 我们需要实现一个RPC的注册处理类
改类实现了方法注册和保存功能, 通过connect对象,去循环接收执行函数并返回
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| import pickle
class RPCHandler: def __init__(self): self._functions = {}
def register_function(self, func): self._functions[func.__name__] = func
def handle_connection(self, connection): try: while True: func_name, args, kwargs = pickle.loads(connection.recv()) try: r = self._functions[func_name](*args, **kwargs) connection.send(pickle.dumps(r)) except Exception as e: connection.send(pickle.dumps(e))
|
我们还需要一个服务端的代码, 负责监听客户端的连接
1 2 3 4 5 6 7 8 9 10 11
| from multiprocessing.connection import Listener from threading import Thread
def rpc_server(handler, address, authkey): sock = Listener(address, authkey=authkey) while True: client = sock.accept() t = Thread(target=handler.handle_connection, args=(client,)) t.daemon = True t.start()
|
创建函数
1 2 3 4 5 6 7 8 9 10 11
| # functions.py
from RPCHandler import RPCHandler
def add(x, y): return x + y
# 注册函数 handler = RPCHandler() handler.register_function(add)
|
启动函数
1 2 3 4 5 6 7 8 9 10
| from server import rpc_server from functions import handler
def main(): rpc_server(handler, ('localhost', 17000), authkey=b'HKx2A7r@h3&z@#G2r7jT#w6ziqhnPzZ#')
if __name__ == '__main__': main()
|
通过上述方法, 实现了服务端的功能, 但是我们还需要一个客户端, 并且为了调用方便, 我们需要写一个Proxy的类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| import pickle
class RPCProxy: def __init__(self, connection): self._connection = connection
def __getattr__(self, name): def do_rpc(*args, **kwargs): self._connection.send(pickle.dumps((name, args, kwargs))) result = pickle.loads(self._connection.recv()) if isinstance(result, Exception): raise result return result return do_rpc
from multiprocessing.connection import Client c = Client(('localhost', 17000), authkey=b'HKx2A7r@h3&z@#G2r7jT#w6ziqhnPzZ#') proxy = RPCProxy(c)
print(proxy.add(2, 3))
|