python 调用 rpc 实现分布式系统

本贴最后更新于 2445 天前,其中的信息可能已经时移世改

今个来通俗的讲 rpc 是什么?

rpc 一般俗称,远程过程调用,把本地的函数,放到远端去调用。

通常我们调用一个方法,譬如: sumadd(10, 20),sumadd 方法的具体实现要么是用户自己定义,要么存在于该语言的库函数中,也就说在 sumadd 方法的代码实现在本地,它是一个本地调用!

“远程调用”意思就是:被调用方法的具体实现不在程序运行本地,而是在别的某个地方(分布到各个服务器),但是用起来像是在本地。

rpc 远程调用原理 :

比如 A 调用 B 提供的 remoteAdd 方法:

首先 A 与 B 之间建立一个 TCP 连接;

然后 A 把需要调用的方法名(这里是 remoteAdd)以及方法参数(10, 20)序列化成字节流发送出去;

B 接受 A 发送过来的字节流,然后反序列化得到目标方法名,方法参数,接着执行相应的方法调用(可能是 localAdd)并把结果 30 返回;

A 接受远程调用结果,然后 do()。

RPC 框架也就是把上线说的具体的细节封装起来,给用户好用的 API 使用(提示:有些远程调用选择比较底层的 socket 协议,有些远程调用选择比较上层的 HTTP 协议);

一般 rpc 配合 http 协议的多点,也就是走 http 的多。 当然还是看应用,我曾经一共的 rpc 框架是基于 zeromq 的 zerorpc。速度是挺快,server 和 client 都有 python 的 gevent 支持,速度没道理慢。(有兴趣的,可以看看有关 zerorpc 的文章 http://rfyiamcool.blog.51cto.com/1030776/1254000 )最少要比 python 本身的 xml-rpc 要快。 rpc over http(基于 http 的 rpc)有两种协议,一种是 xml-rpc ,还有一个是 json-rpc。

XML-RPC:XML Remote Procedure Call,即 XML 远程方法调用,利用 http+xml 封装进行 RPC 调用。基于 http 协议传输、XML 作为信息编码格式。一个 xml-rpc 消息就是一个请求体为 xml 的 http-post 请求,服务端执行后也以 xml 格式编码返回。这个标准面前已经演变为下面的 SOAP 协议。可以理解 SOAP 是 XML-RPC 的高级版本。

注释下,原文地址,blog.xiaorui.cc

JSON-RPC:JSON Remote Procedure Call,即 JSON 远程方法调用 。类似于 XML-RPC,不同之处是使用 JSON 作为信息交换格式

下面是一个例子,很简单。我们是用 python 的 rpc 库 SimpleXMLRPCServer 做的测试,创建 rpc server,然后注册一些函数,供应别的客户端去调用。

|

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

|

from SimpleXMLRPCServer import SimpleXMLRPCServer

原文:xiaorui.cc

def add(x,y):

return  x+y

def subtract(x, y):

return  x-y

def multiply(x, y):

return  x*y

def divide(x, y):

return  x/y

server = SimpleXMLRPCServer(("localhost", 8000))

print "Listening on port 8000..."

server.register_multicall_functions()

server.register_function(add, 'add')

server.register_function(subtract, 'subtract')

server.register_function(multiply, 'multiply')

server.register_function(divide, 'divide')

server.serve_forever()

|

这个是连接的 Client,python 下的 xmlrpclib 本身就提供了 server 和 client 类,当然咱们也可以自己写,看源码就知道他只是把 func,args 做了 xml 格式化而已。

|

1

2

3

4

5

6

7

8

9

10

11

|

import xmlrpclib

proxy = xmlrpclib.ServerProxy("http://localhost:8000/")

multicall = xmlrpclib.MultiCall(proxy)

multicall.add(7,3)

multicall.subtract(7,3)

multicall.multiply(7,3)

multicall.divide(7,3)

result = multicall()

print "7+3=%d, 7-3=%d, 7*3=%d, 7/3=%d" % tuple(result)

|

RPC 本来是单任务的,如果任务相对频繁,可以设置成多线程的默认,你不用在调用 threading 模块什么的,直接引用 。

|

1

|

class AsyncXMLRPCServer(SocketServer.ThreadingMixIn,SimpleXMLRPCServer): pass

|

然后 rpc 初始化的方法换成。

|

1

|

server = AsyncXMLRPCServer(('', 1111), SimpleXMLRPCRequestHandler)

|

这里再说下,和 xmlrpc 相似的 jsonrpc,貌似现在用 xmlrpc 的,要比 jsonrpc 的多点。 有时候到国外的 it 论坛看帖子,xmlrpc 用的交多点。其实现在较大的公司,一般干脆直接自己实现了 rpc 框架,像淘宝 Dubbo(朋友有搞过,搞了半天,没有对接成接口,说是有难度,不明觉厉!),百度的 xxx(忘名字了)。

|

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

|

import jsonrpc

server = jsonrpc.Server(jsonrpc.JsonRpc20(), jsonrpc.TransportTcpIp(addr=("127.0.0.1", 31415), logfunc=jsonrpc.log_file("myrpc.log")))

#原文:xiaorui.cc

注册一个函数方法

def echo(s):

return  s

def search(number=None, last_name=None, first_name=None):

sql_where  =  []

sql_vars  =  []

if  number is  not  None:

    sql_where.append("number=%s")

    sql_vars.append(number)

if  last_name is  not  None:

    sql_where.append("last_name=%s")

    sql_vars.append(last_name)

if  first_name is  not  None:

    sql_where.append("first_name=%s")

    sql_vars.append(first_name)

sql_query  =  "SELECT id, last_name, first_name, number FROM mytable"

if  sql_where:

    sql_query  +=  " WHERE"  +  " AND ".join(sql_where)

cursor  =  ...

cursor.execute(sql_query,  *sql_vars)

return  cursor.fetchall()

server.register_function( echo )

server.register_function( search )

start server

server.serve()

|

|

1

2

3

4

5

6

7

8

|

创建 jsonrpc 客户端

import jsonrpc

server = jsonrpc.ServerProxy(jsonrpc.JsonRpc20(), jsonrpc.TransportTcpIp(addr=("127.0.0.1", 31415)))

#调用远端的一个函数

result = server.echo("hello world")

found = server.search(last_name='Python')

|

我做过一些个压力的测试,XMLRPCSERVER 的开了 async 之后,每个连接特意堵塞 5 秒,他的并发在 40 个左右 。也就是每秒成功 40 个左右,剩下的还是在堵塞等待中。 其实他的瓶颈不是在于 rpc 的本身,是承载 rpc 的那个 basehttpserver,太弱爆了。

|

1

|

注释下,原文地址,blog.xiaorui.cc

|

接收请求,调用方法 !

现在开源社区这么发达,有不少人都根据 rpc 的协议,重写了承载 rpc 的 web 服务。 比如用 flask,tornado,配合 uwsgi,你猜咋招了。。。。如果不堵塞连接,那还可以,如果堵塞连接,uwsgi 的废材特色就显出来了,以前有文章说过,uwsgi 是 prework,他会预先启动进程,官方都推荐要根据你的 cpu 核数或者超线程来开启进程,如果开的太多,你会发现,uwsgi 他是驾驭不了那么多进程的。还是看我大 tornado,用了 @gen.engine 之后。轻易飙到 500 的并发连接。

(以上是我的吃饱又蛋疼测试,没听过谁会重复调用那么多的堵塞方法,自评 sx 行为)

不多说了,看 flask 实现 xmlrpc 服务端的代码,看了下 flask xmlrpc 的源码,实现的不难。

|

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

|

from flask import Flask

from flaskext.xmlrpc import XMLRPCHandler, Fault

app = Flask(name)

handler = XMLRPCHandler('api')

handler.connect(app, '/api')

@handler.register

def woca(name="world"):

if  not  name:

    raise  Fault("fuck...fuck",  "fuck shencan!")

return  "Hello, %s!"  %  name

原文:xiaorui.cc

app.run()

|

对于每个连接的超时,有多种的方法,如果你用的是 flask,tornado 做 web server,那就写个装饰器 single 起来,只是性能不好。 或者是前面挂一个 nginx,然后做个 client_header_timeout,client_body_timeout,proxy_connect_timeout(你懂的。),如果用的 python 自带的 xml-rpc 的话,需要引入 socket。

|

1

2

|

import socket

socket.setdefaulttimeout()

|

再说下 rpc 安全的问题。

至于安全方面,有兴趣就开个 ssl,或者是在程序里面判断下 client ip,反正配置都是统一下发的,你重载 daemon 的时候,也就知道该判断什么 ip 了。

我个人对于 rpc 的应用,更加的倾向于基本资源的获取和调用,毕竟单纯的用 socket 或者是 mq,你在程序里面还要做一个解析过来的数据,然后根据过来的数据在做调用。 (alert: 我想触发 add() ,如果是 rpc 的话,我不用管,只是传过去就行了,到那时 mq 和 socket 就需要 eval 调用函数了),一些复杂的应用还是喜欢用面向资源的 rest,也推荐大家用这个,靠谱的。

  • gRpc
    11 引用 • 9 回帖 • 60 关注

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...