RabbitMQ轮询接受消息和消息的持久化

RabbitMQ消息队列
Py
Threading QUEUE 不同线程之间进行交互。(不能跨进程)
进程 QUEUE (仅用于父进程与子进程进行交互,或者同属于同一父进程下的多个子进程进行交互。)

RabbitMQ是用语言erlang开发的
Linux上用rabbitmq-server start 来启动

使用rabbitmq pika轮询接受消息
no_ack=True
代表不管对方是否收到消息都结束了
默认不加这个no_ack
就能使当消息传个一个消费者的的时候那个消费者宕机了自动传给下一个人。

Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#  Author: Diedline
import pika

connection = pika.BlockingConnection(
pika.ConnectionParameters("localhost"))
"""
连接一个socket 可以在里面设置参数
"""
channel = connection.channel() #声明一个管道
channel.queue_declare(queue="hello") #在管道里单独声明一个队列
channel.basic_publish(
exchange="",
routing_key="hello", #routring_key 就是你的queue名字
body="hello,world!" #body你的消息内容

)
print("[x] Sent 'hello world!'")
connection.close()

consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#  Author: Diedline
import pika
"""
所有的socket类似的数据传输都是用byte格式的
"""
connection = pika.BlockingConnection(
pika.ConnectionParameters("localhost")) #创立连接和参数
channel = connection.channel() #创建管道

channel.queue_declare(queue="hello") #声明hello队列
"""
producer 已经声明过hello队列了
但是为什么还要声明?
你要确认这个queue是否被声明过了,
因为你无法确认是生产者还是消费者先运行这个queue,不然会报错
所以就要声明这个queue
"""
def callback(ch, method, properties, body):
print("---->",ch, method, properties)
"""
ch 管道内存对象的地址
method包含你要把消息发给谁信息
"""
print(" [x] receive %r" %body)


channel.basic_consume(
callback, #如果收到消息就调用callback 函数来处理消息
queue="hello", #从hello队列中收取消息
no_ack=True #代表不确认消息对方是否收到都结束了

)
print(" [*] Waiting for messages. TO exit press CTRL+C")
channel.start_consuming() #开始收消息 一启动就一直运行了


可以通过 rabbitmqctl.bat list_queues 来看当前有多少队列每个队列的消息有多少

RabbitMQ实现消息持久化
即使服务器宕机队列中的数据也不会丢失
加一个durable= True参数和
在channel.basic_publish中增加一个
properties = pike.BasicProperties(
delivery_mode =2,
)
producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#  Author: Diedline
import pika

connection = pika.BlockingConnection(
pika.ConnectionParameters("localhost"))
"""
连接一个socket 可以在里面设置参数
"""
channel = connection.channel() #声明一个管道
channel.queue_declare(queue="hello",durable=True) #在管道里单独声明一个队列
channel.basic_publish(
exchange="",
routing_key="hello", #routring_key 就是你的queue名字
body="hello,world!" , #body你的消息内容
properties = pika.BasicProperties(
delivery_mode=2,
)
)
print("[x] Sent 'hello world!'")
connection.close()

consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#  Author: Diedline
import pika
import time
"""
所有的socket类似的数据传输都是用byte格式的
"""
connection = pika.BlockingConnection(
pika.ConnectionParameters("localhost")) #创立连接和参数
channel = connection.channel() #创建管道

channel.queue_declare(queue="hello",durable=True) #声明hello队列
# durable=True 只把队列持久化了
"""
producer 已经声明过hello队列了
但是为什么还要声明?
你要确认这个queue是否被声明过了,
因为你无法确认是生产者还是消费者先运行这个queue,不然会报错
所以就要声明这个queue
"""
def callback(ch, method, properties, body): #回调函数
print("---->",ch, method, properties)
"""
ch 管道内存对象的地址
method包含你要把消息发给谁信息
"""
time.sleep(30)
print(" [x] receive %r" %body)
ch.basic_ack(delivery_tag=method.delivery_tag)
"""
必须手动确认才能将消息从队列中移除
"""
channel.basic_consume(
callback, #如果收到消息就调用callback 函数来处理消息
queue="hello", #从hello队列中收取消息
# no_ack=True #代表不确认消息对方是否收到都结束了
#当socket断了的时候自动回把消息给下一个消费者,就代表它还是一个新消息
properties=pika.BasicProperties(
delivery_mode=2,
)
)
print(" [*] Waiting for messages. TO exit press CTRL+C")
channel.start_consuming() #开始收消息 一启动就一直运行了

在客户端增加这一条可以使原本的消息轮询机制变成按性能分配,处理快分配的多
channel.basic_qos(prefetch_count=1)


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#  Author: Diedline

import pika
import time
"""
所有的socket类似的数据传输都是用byte格式的
"""
connection = pika.BlockingConnection(
pika.ConnectionParameters("localhost")) #创立连接和参数
channel = connection.channel() #创建管道

channel.queue_declare(queue="hello",durable=True) #声明hello队列
# durable=True 只把队列持久化了
"""
producer 已经声明过hello队列了
但是为什么还要声明?
你要确认这个queue是否被声明过了,
因为你无法确认是生产者还是消费者先运行这个queue,不然会报错
所以就要声明这个queue
"""
def callback(ch, method, properties, body): #回调函数
print("---->",ch, method, properties)
"""
ch 管道内存对象的地址
method包含你要把消息发给谁信息
"""
time.sleep(30)
print(" [x] receive %r" %body)
ch.basic_ack(delivery_tag=method.delivery_tag)
"""
必须手动确认才能将消息从队列中移除
"""
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
callback, #如果收到消息就调用callback 函数来处理消息
queue="hello", #从hello队列中收取消息
# no_ack=True #代表不确认消息对方是否收到都结束了
#当socket断了的时候自动回把消息给下一个消费者,就代表它还是一个新消息
# properties=pika.BasicProperties(
# delivery_mode=2,
# )
)
print(" [*] Waiting for messages. TO exit press CTRL+C")
channel.start_consuming() #开始收消息 一启动就一直运行了