Sorry, your browser cannot access this site
This page requires browser support (enable) JavaScript
Learn more >

Gray-Ice

个人博客兼个人网站

参考文档: 官方文档 hello world - python。本来我是想将整个文档都翻译下来的,后来考虑到这样做会耽误我学习,就只能粗略的记录一下使用方法了。本文中用到的图片是直接复制了RabbitMQ教程上的链接,并不是保存到了本地再加载,所以如果你发现图片加载不出来而且确认自己的网络没问题,请联系博主更新图片。本篇博文中的”message”统一翻译成消息。

博主用的是docker容器,直接执行该命令会自动安装好并运行:

1
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management 

这里我加了rm选项,容器一旦停止就会自动删除。

开始前的准备

你应该已经安装了RabbitMQ并且它正运行在localhost:5672上。

Hello World!

(使用Pika Python客户端)

在这部分教程里我们将会用Python写两个小程序: 一个发送一条信息的生产者(发送者),和一个接受消息并打印出消息的消费者(接受者)。这是一个消息传递的”Hello World”。

在下图中,”P”是我们的生产者,”C”是我们的消费者。这个中间的盒子是一个队列 - 一个RabbitMQ代表消费者保留的消息缓冲区。

总体设计看起来像下图一样:

生产者发送消息到”hello”队列。消费者从这个队列接收消息。

RabbitMQ库

RabbitMQ使用多种协议,这个教程使用了AMQP 0-9-1,这是一种消息传递的开放式通用协议。这里有一些RabbitMQ的不同语言的客户端。在这一系列教程里我们将会使用RabbitMQ团队推荐的Python客户端Pika 1.0.0。你可以使用pip包管理工具安装它:

1
python -m pip install pika --upgrade

现在我们已经安装好了Pika,可以开始写代码了。

发送

我们的第一个程序send.py将会发送一个信息到队列。我们要做的第一件事情是与RabbitMQ服务建立一个连接。

1
2
3
4
5
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

我们连接到本地代理上——所以是localhost。如果我们想要连接到其他机器上,我们应该在这里指定它的名字或者IP地址。[博主注: “这里”指的是’localhost’字符串这里,我不想翻译的画蛇添足,于是便没有在译文中说明]

接下来,在发送消息之前我们需要确保接受队列是存在的。如果我们发送一条消息到不存在的位置,RabbitMQ只会丢弃掉那条消息。让我们创建一个hello队列,消息将会发送到这个队列。

1
channel.queue_declare(queue='hello')

此时我们已经准备好发送消息了。我们的第一个消息将会只包含一个字符串”Hello World!”并且我们会将它发送到我们的hello队列。

在RabbitMQ中一条消息永远不会被直接发送给队列,它总是需要交换[博主注: 我也不知道需要交换什么,原文这里写的是 it always needs to go through an exchange]。但我们不要在意这些细节 - 你可以从第三部分教程获取更多关于交换的内容。现在我们需要知道的是如何使用由空字符串标识的默认交换[博主注: 原文: All we need to know now is how to use a default exchange identified by an empty string]、这个交换是特别的 - 它允许我们准确的指定消息应该去的队列。队列名需要在routing_key参数中被指定:

1
2
3
4
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")

在退出程序之前我们需要确认网络缓冲区已经刷新过了并且我们的消息已经真正的发到了RabbitMQ。我们通过关闭连接来做到它。

1
connection.close()

发送无效!

如果这是你第一次使用RabbitMQ并且你没有看到”已发送消息”你可能会感到摸不着头脑并想知道什么地方出了问题。也许代理启动时没有足够的可用磁盘空间(默认至少需要200MB空闲空间)并且因此拒绝消息。检查代理日志文件确认或在必要情况下减少限制。配置文件文档将会告诉你如何设置*磁盘空闲空间限制(disk_free_limit)*。

接收

我们的第二个程序receive.py将会从队列接收并打印消息到屏幕。

再次的,我们需要连接到RabbitMQ服务器。负责连接Rabbit的代码和之前一样。

第二步,就像之前一样,是确认队列是否存在。使用queue_declared创建队列是幂等的 - 我们可以运行这个命令许多次,但只有一次会创建。

1
channel.queue_declare(queue='hello')

你也许会问为什么我们要再次声明这个队列 - 我们已经在我们的上一个代码里声明过了。如果我们确认队列已经存在,我们就可以避免这种情况。比如send.py已经运行过了,但我们不确定哪个程序先运行。在这种情况下,最好在两个程序中重复声明队列。

查看队列

你也许想看到RabbitMQ有哪些队列并且它们之中有哪些消息,你可以通过在特权用户下使用rabbitmqctl工具做到这一点:

1
sudo rabbitmqctl list_queues

在Windows下,忽略sudo:

1
rabbitmqctl.bat list_queues

从队列接收消息是更复杂的。它通过将回调函数订阅到队列来工作。当我们接收到一条消息时,回调函数将会通过Pika库被调用。在我们的例子中,这个函数将在屏幕上打印消息的内容。

1
2
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)

接下来,我们需要告诉 RabbitMQ 这个特定的回调函数应该从我们的 hello 队列接收消息:

1
2
3
channel.basic_consume(queue='hello',
auto_ack=True,
on_message_callback=callback)

为了使命令成功运行,我们必须确保我们要订阅的队列存在。幸运的是,我们对此充满信心——我们已经在上面使用 queue_declare创建了一个队列。

auto_ack参数将会在后面描述。

最终,我们进入了一个等待数据的死循环并且在需要的时候运行回调函数。并在程序关闭期间捕获 KeyboardInterrupt异常。

1
2
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
1
2
3
4
5
6
7
8
9
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

代码全部放在这里:

send.py(源文件)

1
2
3
4
5
6
7
8
9
10
11
12
#!/usr/bin/env python
import pika

connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()

receive.py (源文件)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#!/usr/bin/env python
import pika, sys, os

def main():
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
print(" [x] Received %r" % body)

channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
print('Interrupted')
try:
sys.exit(0)
except SystemExit:
os._exit(0)

现在我们可以在终端尝试我们的程序了。第一步,让我们启动一个消费者,它将不断的等待交付(等待生产者发送消息):

1
2
3
python receive.py
# => [*] Waiting for messages. To exit press CTRL+C
# => [x] Received 'Hello World!'

接下来启动生产者,生产者程序每一次运行后都将停止:

1
2
python send.py
# => [x] Sent 'Hello World!'

欢呼吧!我们已经能够通过RabbitMQ发送第一条消息了。你可能已经注意到了,receive.py 程序不会退出。 它将随时准备接收更多消息,并且它可能会被 Ctrl-C 中断[博主注: 这是文档翻译的翻译: 意思是可以通过Ctrl + C中断这个程序]。

尝试在新的终端中运行send.py。[博主注: 不一定要在新终端,只要你别关闭消费者的那个终端就行。其实关闭了问题也不大,如果消息少的话,打开消费者还是能照样一字不差接收到消息]

本篇完。

评论



愿火焰指引你