RabbitMQ广播模式的三种类型

广播模式是实时的发信息,发消息的时候如果有设备未启动则该设备无法收到消息

1.fanout 广播模式
广播server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
#  Author: Diedline
import pika
import sys

connection = pika.BlockingConnection(
pika.ConnectionParameters(
host="localhost"
)
)
channel = connection.channel()
channel.exchange_declare(
exchange="logs",
exchange_type="fanout"

)
# " ".join(sys.argv[1:]) or
message = "info: Hello World!" #我要发的消息
channel.basic_publish(
exchange="logs",
routing_key="", #写队列名
body=message
)
print("[x] send %r"%message)
connection.close()

广播 client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
#  Author: Diedline
import pika
connection = pika.BlockingConnection(
pika.ConnectionParameters(host="localhost"
)
)
channel = connection.channel()
channel.exchange_declare(
exchange ="logs",
exchange_type="fanout"

)
result = channel.queue_declare(exclusive=True) #exclusive 唯一的 不指定queue名字,
# rabbit会随机生成一个名字 exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = result.method.queue
print("random queue name:",queue_name)
channel.queue_bind(exchange="logs",
queue=queue_name) #绑定一个转发器
print("[*] waiting for logs. to exit press ctrl c")
def callback(ch, method, properties, body): #回调函数
"""
ch 管道内存对象的地址
method包含你要把消息发给谁信息
"""
# time.sleep(30)
print(" [x] receive %r" %body)

# channel.basic_qos(prefetch_count=1)
channel.basic_consume(
callback, #如果收到消息就调用callback 函数来处理消息
queue=queue_name, #从hello队列中收取消息
no_ack=True #代表不确认消息对方是否收到都结束了
#当socket断了的时候自动回把消息给下一个消费者,就代表它还是一个新消息
# properties=pika.BasicProperties(
# # delivery_mode=2,
# # )
)
channel.start_consuming() #开始收消息 一启动就一直运行了

direct 广播模式:
能选择性的发送接收消息
在终端运行并输入参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#  Author: Diedline
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host="localhost"
))
channel = connection.channel()
channel.exchange_declare(exchange="direct_logs",
exchange_type="direct")

severity = sys.argv[1] if len(sys.argv)>1 else "info" #severity 重要程度什么级别的日志
"""
默认执行取你执行这个脚本所传的参数 如果取不到就 info
"""
message = "".join(sys.argv[2:]) or "hello world!" #消息内容你可以发很多
channel.basic_publish(exchange="direct_logs",
routing_key=severity,
body=message )
print("[x] send %r: %r"%(severity,message))
connection.close()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#  Author: Diedline
import pika
import sys

connection = pika.BlockingConnection(
pika.ConnectionParameters(host="localhost"
)
)
channel = connection.channel()
channel.exchange_declare(
exchange ="direct_logs",
exchange_type="direct"
)
result = channel.queue_declare(exclusive=True) #exclusive 唯一的 不指定queue名字,
# rabbit会随机生成一个名字 exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = result.method.queue
severities = sys.argv[1:] #获取到一个列表
print(severities)
if not severities:
sys.stderr.write("Usage: %s [info] [warning] [error]\n"%sys.argv[0])
sys.exit(1)
"""
usage 必须加上级别不然就退出了
"""
for severity in severities: #循环这个列表,把每个参数都绑定这个转发器
channel.queue_bind(exchange="direct_logs",
queue=queue_name,
routing_key=severity)
print("[*] waiting for logs. to exit press ctrl c")
def callback(ch, method, properties, body): #回调函数
"""
ch 管道内存对象的地址
method包含你要把消息发给谁信息
"""
# time.sleep(30)
print(" [x] receive %r" %body)

# channel.basic_qos(prefetch_count=1)
channel.basic_consume(
callback, #如果收到消息就调用callback 函数来处理消息
queue=queue_name, #从hello队列中收取消息
no_ack=True #代表不确认消息对方是否收到都结束了
#当socket断了的时候自动回把消息给下一个消费者,就代表它还是一个新消息
# properties=pika.BasicProperties(
# # delivery_mode=2,
# # )
)
channel.start_consuming() #开始收消息 一启动就一直运行了

Topic 细致的消息过滤广播模式
Topic_publisher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#  Author: Diedline

import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host="localhost"
))
channel = connection.channel()
channel.exchange_declare(exchange="topic_logs",
exchange_type="topic")

routing_key = sys.argv[1] if len(sys.argv)>1 else "anonymous.info" #severity 重要程度什么级别的日志
"""
默认执行取你执行这个脚本所传的参数 如果取不到就 info
"""
message = "".join(sys.argv[2:]) or "hello world!" #消息内容你可以发很多
channel.basic_publish(exchange="topic_logs",
routing_key=routing_key,
body=message )
print("[x] send %r: %r"%(routing_key,message))
connection.close()

Topic_consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#  Author: Diedline
import pika
import sys

connection = pika.BlockingConnection(
pika.ConnectionParameters(host="localhost"
)
)
channel = connection.channel()
channel.exchange_declare(
exchange ="topic_logs",
exchange_type="topic"
)
result = channel.queue_declare(exclusive=True) #exclusive 唯一的 不指定queue名字,
# rabbit会随机生成一个名字 exclusive=True会在使用此queue的消费者断开后,自动将queue删除
queue_name = result.method.queue
binding_keys = sys.argv[1:] #获取到一个列表

if not binding_keys :
sys.stderr.write("Usage: %s [info] [warning] [error]\n"%sys.argv[0])
sys.exit(1)
"""
usage 必须加上级别不然就退出了
"""
for binding_key in binding_keys: #循环这个列表,把每个参数都绑定这个转发器
channel.queue_bind(exchange="topic_logs",
queue=queue_name,
routing_key=binding_key)
print("[*] waiting for logs. to exit press ctrl c")
def callback(ch, method, properties, body): #回调函数
"""
ch 管道内存对象的地址
method包含你要把消息发给谁信息
"""
# time.sleep(30)
print(" [x] receive %r" %body)

# channel.basic_qos(prefetch_count=1)
channel.basic_consume(
callback, #如果收到消息就调用callback 函数来处理消息
queue=queue_name, #从hello队列中收取消息
no_ack=True #代表不确认消息对方是否收到都结束了
#当socket断了的时候自动回把消息给下一个消费者,就代表它还是一个新消息
# properties=pika.BasicProperties(
# # delivery_mode=2,
# # )
)
channel.start_consuming() #开始收消息 一启动就一直运行了