博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ消息队列(七):适用于云计算集群的远程调用(RPC)
阅读量:2170 次
发布时间:2019-05-01

本文共 5427 字,大约阅读时间需要 18 分钟。

1. 客户端接口 Client interface

        为了展示一个RPC服务是如何使用的,我们将创建一段很简单的客户端class。 它将会向外提供名字为call的函数,这个call会发送RPC请求并且阻塞,直到收到RPC运算的结果。代码如下:

fibonacci_rpc = FibonacciRpcClient()result = fibonacci_rpc.call(4)print "fib(4) is %r" % (result,)

  

2. 回调函数队列 Callback queue

        总体来说,在RabbitMQ进行RPC远程调用是比较容易的。client端发送请求的Message,然后server端返回响应结果。为了收到server端响应client端的请求,我们在client端设置publish message时需要提供一个”callback“(回调)的queue地址。code如下:

result = channel.queue_declare(exclusive=True)callback_queue = result.method.queuechannel.basic_publish(exchange='',                      routing_key='rpc_queue',                      properties=pika.BasicProperties(                            reply_to = callback_queue,                            ),                      body=request)# ... and some code to read a response message from the callback_queue ...

2.1 Message properties

AMQP 预定义了14个属性。它们中的绝大多很少会用到。以下几个是平时用的比较多的:

  • delivery_mode: 持久化一个Message(通过设定值为2)。其他任意值都是非持久化。
  • content_type: 描述mime-type 的encoding。比如设置为JSON编码:设置该property为application/json。
  • reply_to: 一般用来指明用于回调的queue(Commonly used to name a callback queue)。
  • correlation_id: 在请求中关联处理RPC响应(correlate RPC responses with requests)。

  

3. 相关id Correlation id

一个客户端可能会发送多个请求给服务器,当服务器处理完后,客户端无法辨别在回调队列中的响应具体和那个请求时对应的。为了处理这种情况,客户端在发送每个请求时,同时会附带一个独有correlation_id属性,这样客户端在回调队列中根据correlation_id字段的值就可以分辨此响应属于哪个请求。

       在上个小节里,实现方法是对每个RPC请求都会创建一个callback queue。这是不高效的。幸运的是,在这里有一个解决方法:为每个client创建唯一的callback queue。

       这又有其他问题了:收到响应后它无法确定是否是它的,因为所有的响应都写到同一个queue了。上一小节的correlation_id在这种情况下就派上用场了:对于每个request,都设置唯一的一个值,在收到响应后,通过这个值就可以判断是否是自己的响应。如果不是自己的响应,就不去处理。

4. 总结

     工作流程:

  • 当客户端启动时,它创建了匿名的exclusive callback queue.
  • 客户端的RPC请求时将同时设置两个properties: reply_to设置为callback queue;correlation_id设置为每个request一个独一无二的值.
  • 请求将被发送到an rpc_queue queue.
  • RPC端或者说server一直在等待那个queue的请求。当请求到达时,它将通过在reply_to指定的queue回复一个message给client。
  • client一直等待callback queue的数据。当message到达时,它将检查correlation_id的值,如果值和它request发送时的一致那么就将返回响应。

5. 最终实现

The code for rpc_client.py:

root@ansible:~/workspace/rabbitmq/sixth_rpc# cat rpc_client.py # coding:utf-8import pikaimport sysimport uuid"""客户端主要作用是发送message,然后通过订阅得到server断返回的消息"""class FibonacciRpcClient(object):    def __init__(self):        """        客户端启动时,创建回调队列,并开启会话 用于发送RPC请求以及接受响应        """        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))        # 建立一个会话,每个channel代表一个会话任务        self.channel = self.connection.channel()        # durable表示queue的持久化        # channel.queue_declare(queue='hello',durable=True)        # 声明回调队列,再次声明的原因,服务器和客户端不知道谁先启动,该声明是幂等性的,多次声明只生效一次        result = self.channel.queue_declare(exclusive=True)        # 回调队列名字        self.callback_queue_name = result.method.queue        # 客户端订阅回调队列,当回调队列中有相应时,调用'on_response'方法对响应进行处理        # 调用回调函数        self.channel.basic_consume(self.on_response,no_ack=True,queue=self.callback_queue_name)    # 定义回调函数,对回调队列中的响应进行处理的函数    def on_response(self,ch,method,props,body):        if self.corr_id == props.correlation_id:            self.reponse = body    def call(self,n):        # 初始化response        self.reponse = None        # 生成correlation_id        self.corr_id = str(uuid.uuid4())        # 发送RPC请求内容到RPC请求队列rpc_queue,同时发送reply_to 和 correlation_id        self.channel.basic_publish(exchange='',routing_key='rpc_queue',                                   properties=pika.BasicProperties(reply_to=self.callback_queue_name,                                                                   correlation_id=self.corr_id,),                                   body=str(n))        while self.reponse is None:            # 当没有收到consumer 消息时,循环等待consumer的消息然后处理数据,这个是阻塞的            self.connection.process_data_events()        return int(self.reponse)fibonacci_rpc = FibonacciRpcClient()# 发送rpc请求print "[x] requesting fib(30)"response = fibonacci_rpc.call(30)print "[.] got %r" %response

The code for rpc_server.py:

root@ansible:~/workspace/rabbitmq/sixth_rpc# cat rpc_server.py # coding:utf-8import pikaimport timeimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))channel = connection.channel()channel.queue_declare(queue='rpc_queue')# 数据处理方法def fib(n):    if n == 0:        return 0    elif n == 1:        return 1    else:        return fib(n-1) + fib(n-2)# 对rpc请求队列中的请求进行处理def on_request(ch,method,props,body):    n = int(body)    print "[.] fib(%s)" %n    # 调用处理方法    response = fib(n)    # 将处理结果(响应) 发送到回调队列中    ch.basic_publish(exchange='',routing_key=props.reply_to,                     properties = pika.BasicProperties(correlation_id=props.correlation_id),                     body = str(response),                     )    ch.basic_ack(delivery_tag=method.delivery_tag)channel.basic_qos(prefetch_count=1)channel.basic_consume(on_request,queue='rpc_queue')# 循环监听print "[*] Waiting RPC request..."channel.start_consuming()

  

以下是我自己总结RPC代码流程:

  • 当客户端启动时,它创建了匿名的exclusive callback queue.
  • 客户端的RPC请求时将同时设置两个properties: reply_to设置为callback queue;correlation_id设置为每个request一个独一无二的值.这里的client会等待server端返回的响应(这里的client端会订阅server端的queue(callback queue))
  • 请求将被发送到an rpc_queue queue. server端会订阅rpc_queue,等待client端请求过来
  • RPC端或者说server一直在等待那个queue的请求。当请求到达时,它将通过在reply_to指定的queue响应一个message给client。
  • client一直等待callback queue的数据。当message到达时,它将检查correlation_id的值,如果值和它request发送时的一致那么就将返回响应

 执行测试一下吧:

root@ansible:~/workspace/rabbitmq/sixth_rpc# python rpc_server.pyroot@ansible:~/workspace/rabbitmq/sixth_rpc# python rpc_client.py

 

转载于:https://www.cnblogs.com/wanstack/p/8891053.html

你可能感兴趣的文章
composer安装YII
查看>>
Sublime text3快捷键演示
查看>>
sublime text3 快捷键修改
查看>>
关于PHP几点建议
查看>>
硬盘的接口、协议
查看>>
VLAN与子网划分区别
查看>>
Cisco Packet Tracer教程
查看>>
02. 交换机的基本配置和管理
查看>>
03. 交换机的Telnet远程登陆配置
查看>>
微信小程序-调用-腾讯视频-解决方案
查看>>
phpStudy安装yaf扩展
查看>>
密码 加密 加盐 常用操作记录
查看>>
TP 分页后,调用指定页。
查看>>
Oracle数据库中的(+)连接
查看>>
java-oracle中几十个实用的PL/SQL
查看>>
PLSQL常用方法汇总
查看>>
几个基本的 Sql Plus 命令 和 例子
查看>>
PLSQL单行函数和组函数详解
查看>>
Oracle PL/SQL语言初级教程之异常处理
查看>>
Oracle PL/SQL语言初级教程之游标
查看>>