RabbitMQ rpc交互模式

rpc_client2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#  Author: Diedline
import pika
import uuid
import time

class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
#连接
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue #生成随机queue
self.channel.basic_consume(self.on_response, #只要一收到消息就调用on_response
no_ack = True,
queue=self.callback_queue) #声明要接受消息

def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body

def call(self,n):
self.response = None
self.corr_id = str(uuid.uuid4()) #uuid4()生成的是一串随机的数字
"""
每次给服务器端发的时候,
都会生成一个随机数字,返回的时候也返回这个uuid
代表这个结果肯定是上一条发送的结果
"""
self.channel.basic_publish(exchange="",
routing_key="rpc_queue",
properties=pika.BasicProperties(
reply_to=self.callback_queue, #让服务端执行完结果后把结果返回到
#callback_queue里
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events() #非阻塞版的start_consuming() 有消息就收没消息就继续
print("no message...")
time.sleep(0.5)
return int(self.response)





fibonacci_rpc = FibonacciRpcClient()
print(" [x] Requesting fib(6)")
response = fibonacci_rpc.call(6)
print(" [.] got %r"%response)

rpc_server2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#  Author: Diedline
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host="localhost"))
channel = connection.channel()
channel.queue_declare(queue="rpc_queue")
def fib(n):
if n==0:
return 0
elif n==1:
return 1
else:
return fib(n-1)+fib(n-2)


def on_request(ch, method, props, body):
n = int(body)
print(" [.]fib(%s)"%n)
response = fib(n)
ch.basic_publish(exchange="",
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=
props.correlation_id),
body =str(response)
)
ch.basic_ack(delivery_tag=method.delivery_tag) #确保消息被消费了,就是任务完成了把结果返回给客户端

channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue="rpc_queue")
print("[x] Awaiting RPC requests")
channel.start_consuming()