Rabbits and warrens.

80

Posted by Jason | Posted in Software Development, Technology | Posted on 01-13-2009

Tags: , ,

20089825.JPG

 

If you like Rabbit and Warrens checkout RabbitMQ in Action in the sidebar.

 

The goal was simple enough: decouple a particular type of analysis out-of-band from mainstream e-mail processing. We started down the MySQL road…put the things to be digested into a table…consume them in another daemon…bada bing bada boom. But pretty soon, complex ugliness crept into the design phase… You want to have multiple daemons servicing the queue?…no problem we’ll just hard code node numbers…what? you want dynamic load re-assignment when daemons join and die?

You get the idea…what was supposed to be simple (decouple something) was spinning its own Gordian knot. It seemed like a good time to see if every problem was looking like a nail (table), because all we had were hammers (MySQL).

A short search later, and we entered the world of message queueing. No, no…we know obviously what a message queue is. Heck, we do e-mail for a living. We’ve implemented all sorts of specialized, high-speed, in-memory queues for e-mail processing. What we weren’t aware of was the family of off-the-shelf, generalized, message queueing (MQ) servers…a language-agnostic, no-assembly required way to wire routing between applications over a network. A message queue we didn’t have to write ourselves? Hold your tongue.

Open up your queue…

Cutting to the chase, over the last 4 years there have been no shortage of open-source message queueing servers written. Most of them are one-offs by folks like LiveJournal to scratch a particular itch. Yeah, they don’t really care what kind of messages they carry, but their design parameters are usually creator-specific (and message persistence after a crash usually isn’t one of them). However, there are three in-particular, that are designed to be highly flexible message queues for their own sake:

Apache ActiveMQ gets the most press, but it appears to have some issues not losing messages. Next.

ZeroMQ and RabbitMQ both support an open messaging protocol called AMQP. The advantage to AMQP is that it’s designed to be a highly-robust and open alternative to the two commercial message queues out there (IBM and Tibco). Muy bueno. However, ZeroMQ doesn’t support message persistence across crashes reboots. No muy bueno. That leaves us with RabbitMQ. (That being said if you don’t need persistence ZeroMQ is pretty darn interesting…incredibly low latency and flexible topologies).

That leaves us with the carrot muncher…

RabbitMQ pretty much sold me the minute I read “written in Erlang”. Erlang is a highly parallel programming language developed over at Ericsson for running telco switches…yeah the kind with six bazillion 9s of uptime. In Erlang, its supposedly trivial to spin off processes and then communicate between them using message passing. Seems like the ideal underpinning for a message queue no?

Also, RabbitMQ supports persistence. Yes Virginia, if your RabbitMQ dies, your messages don’t have to die an unwitting death…they can be reborn in your queues on reboot. Oh…and as is always desired @ DigiTar, it plays nicely with python. All that being said, RabbitMQs documentation is well…horrible. Lemme rephrase, if you already understand AMQP, the docs are fine. But how many folks know AMQP? It’d be like MySQL docs assuming you knew some form of SQL…er…nevermind.

So, without further ado…here is a reduction of a weeks’ worth of reading up on AMQP and how it works in RabbitMQ…and how to play with it in Python:

Playing telephone

There are four building blocks you really care about in AMQP: virtual hosts, exchanges, queues and bindings. A virtual host holds a bundle of exchanges, queues and bindings. Why would you want multiple virtual hosts? Easy. A username in RabbitMQ grants you access to a virtual host…in its entirety. So the only way to keep group A from accessing group B’s exchanges/queues/bindings/etc. is to create a virtual host for A and one for B. Every RabbitMQ server has a default virtual host named “/”. If that’s all you need, you’re ready to roll.

Exchanges, Queues and bindings…oh my!

Here’s where my railcar went off the tracks initially. How do all the parts thread together?

Queues are where your “messages” end up. They’re message buckets…and your messages sit there until a client (a.k.a. consumer) connects to the queue and siphons it off. However, you can configure a queue so that if there isn’t a consumer ready to accept the message when it hits the queue, the message goes poof. But we digress…

The important thing to remember is that queues are created programmatically by your consumers (not via a configuration file or command line program). That’s OK, because if a consumer app tries to “create” a queue that already exists, RabbitMQ pats it on the head, smiles gently and NOOPs the request. So you can keep your MQ configuration in-line with your app code…what a concept.

OK, so you’ve created and attached to your queue, and your consumer app is drumming its fingers waiting for a message…and drumming…and drumming…but alas no message. What happened? Well you gotta pump a message in first! But to do that you’ve got to have an exchange…

Exchanges are routers with routing tables. That’s it. End stop. Every message has what’s known as a “routing key”, which is simply a string. The exchange has a list of bindings (routes) that say, for example, messages with routing key “X” go to queue “timbuktu”. But we get slightly ahead of ourselves.

Your consumer application should create your exchanges (plural). Wait? You mean you can have more than one exchange? Yes, you can, but why? Easy. Each exchange operates in its own userland process, so adding exchanges, adds processes allowing you to scale message routing capacity with the number of cores in your server. As an example, on an 8-core server you could create 5 exchanges to maximize your utilization, leaving 3 cores open for handling the queues, etc.. Similarly, in a RabbitMQ cluster, you can use the same principle to spread exchanges across the cluster members to add even more throughput.

OK, so you’ve created an exchange…but it doesn’t know what queues the messages go in. You need “routing rules” (bindings). A binding essentially says things like this: put messages that show up in exchange “desert” and have routing key “ali-baba” into the queue “hideout”. In other words, a binding is a routing rule that links an exchange to a queue based on a routing key. It is possible for two binding rules to use the same routing key. For example, maybe messages with the routing key “audit” need to go both to the “log-forever” queue and the “alert-the-big-dude” queue. To accomplish this, just create two binding rules (each one linking the exchange to one of the queues) that both trigger on routing key “audit”. In this case, the exchange duplicates the message and sends it to both queues. Exchanges are just routing tables containing bindings.

Now for the curveball: there are multiple types of exchanges. They all do routing, but they accept different styles of binding “rules”. Why not just create one type of exchange for all style of rules? Because each rule style has a different CPU cost for analyzing if a message matches the rule. For example, a “topic” exchange tries to match a message’s routing key against a pattern like “dogs.*”. Matching that wildcard on the end takes more CPU than simply seeing if the routing key is “dogs” or not (e.g. a “direct” exchange). If you don’t need the extra flexibility of a “topic” exchange, you can get more messages/sec routed if you choose the “direct” exchange type. So what are the types and how do they route?

Fanout Exchange – No routing keys involved. You simply bind a queue to the exchange. Any message that is sent to the exchange is sent to all queues bound to that exchange. Think of it like a subnet broadcast. Any host on the subnet gets a copy of the packet. Fanout exchanges route messages the fastest.

Direct Exchange – Routing keys are involved. A queue binds to the exchange to request messages that match a particular routing key exactly. This is a straight match. If a queue binds to the exchange requesting messages with routing key “dog”, only messages labelled “dog” get sent to that queue (not “dog.puppy”, not “dog.guard“…only “dog”).

Topic Exchange – Matches routing keys against a pattern. Instead of binding with a particular routing key, the queue binds with a pattern string. The symbol # matches one or more words, and the symbol * matches any single word (no more, no less). So “audit.#” would match “audit.irs.corporate”, but “audit.*” would only match “audit.irs”. Our friends at RedHat have put together a great image to express how topic exchanges work:

AMQP Stack Diagram

Source: RabbitMQ in Action (by me and a very cool Uruguayan dude…Mr. Alvaro)

Persistent little bugger…

You spend all that time creating your queues, exchanges and bindings, and then BANG!…the server fries faster than the griddle at McDonald’s. All your queues, exchanges and bindings are there right? Oh geez…what about the messages in the queues you hadn’t serviced yet?

Relax, providing you created everything with the default arguments, it’s all gone…poof…whoosh…nada…nil. That’s right, RabbitMQ rebooted as empty as a baby’s noggin. You gotta redo everything kemosabe. How do you keep this from happening in the future?

On your queues and your exchanges there’s a creation-time flag called “durable”. There’s only one thing durable means in AMQP-land…the queue or exchange marked durable will be re-created automatically on reboot. It does not mean the messages in the queues will survive the reboot. They won’t. So how do we make not only our config but messages persist through a reboot?

Well the first question is, do you really want your messages to persist? For a message to last through a reboot, it has to be written to disk, and even a simple checkpoint to disk takes time. If you value message routing speed more than the contents of the message, don’t make your messages persistent. That being said, for our particular needs @ DigiTar, persistence is important.

When you publish your message to an exchange, there’s a flag called “Delivery Mode”. Depending on the AMQP library you’re using there will be different ways of setting it (we’ll cover the Python library later). But the long and the short of it is you want the “Delivery Mode” set to the value 2, which means “persistent”. “Delivery Mode” usually (depending on your AMQP library) defaults to a value of 1, which means “non-persistent”. So the steps for persistent messaging are:

  1. Mark the exchange “durable”.
  2. Mark the queue “durable”.
  3. Set the message’s “delivery mode” to a value of 2

That’s it. Not really rocket science, but enough moving parts to make a mistake and send little Sally’s dental records into cyber-Nirvana.

There may be one thing nagging you though…what about the binding? We didn’t mark the binding “durable” when we created it. It’s alright. If you bind a durable queue to a durable exchange, RabbitMQ will automatically preserve the binding. Similarly, if you delete any exchange/queue (durable or not) any bindings that depend on it get deleted automatically.

Two things to be aware of:

  • RabbitMQ will not allow you to bind a non-durable exchange to a durable queue, or vice-versa. Both the exchange and the queue must be durable for the binding operation to succeed.
  • You cannot change the creation flags on a queue or exchange after you’ve created it. For example, if you create a queue as “non-durable”, and want to change it to “durable”, the only way to do this is to destroy the queue and re-create it. It’s a good reason to double check your declarations.

Food for snakes

A real empty area for AMQP usage is using it in Python programs. For other languages there are plenty of references:

But for little old Python, you need to dig it out yourself. So other folks don’t have to wander in the wilderness like I did, here’s a little primer on using Python to do the AMQP-tasks we’ve talked about:

First, you’ll need a Python AMQP library…and there are two:

  • py-amqplib – General AMQP library
  • txAMQP – An AMQP library that uses the Twisted framework, thereby allowing asynchronous I/O.

Depending on your needs, py-amqplib or txAMQP may be more to your liking. Being Twisted-based, txAMQP holds the promise of building super performing AMQP consumers that use async I/O. But Twisted programming is a topic all its own…so we’re going to use py-amqplib for clarity’s sake. UPDATE: Please check the comments for example code showing use of txAMQP from Esteve Fernandez.

AMQP supports pipelining multiple MQ communication channels over one TCP connection, where each channel is a communication stream used by your program. Every AMQP program has at least one connection and one channel:

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
password="guest", virtual_host="/", insist=False)
chan = conn.channel()

Each channel is assigned an integer channel number automatically by the .channel() method of the Connection() class. Alternately, you can specify the channel number yourself by calling .channel(x) , where x is the channel number you want. More often than not, its a good idea to just let the .channel() method auto-assign the channel number to avoid collisions.

Now we’ve got a connection and channel to talk over. At this point, our code is going to diverge into two applications that use that same bit we’ve created so far: a consumer and the publisher. Let’s create the consumer app by creating a queue named “po_box” and an exchange named “sorting_room”:

chan.queue_declare(queue="po_box", durable=True,
exclusive=False, auto_delete=False)
chan.exchange_declare(exchange="sorting_room", type="direct", durable=True,
auto_delete=False,)

What did that do? First, it created a queue called “po_box” that is durable (will be re-created on reboot) and will not be automatically deleted when the last consumer detaches from it (auto_delete=False). It’s important to set auto_delete to false when making a queue (or exchange) durable, otherwise the queue itself will disappear when the last consumer detaches (regardless of the durable flag). Setting both durable and auto_delete to true, would make a queue that would be recreated only if RabbitMQ died unexpectedly with consumers still attached.

(You may have noticed there’s another flag specified called “exclusive”. If set to true, only the consumer that creates the queue will be allowed to attach to it. It’s a queue that is private to the creating consumer.)

There’s also the exchange declaration for the “sorting_room” exchange. auto_delete and durable mean the same things as they do in a queue declaration. However, .exchange_declare() introduces an argument called type that defines what type of exchange you’re making (as described earlier): fanout, direct or topic.

At this point, you’ve got a queue to receive messages and an exchange to publish them to initially…but we need a binding to link the two together:

chan.queue_bind(queue="po_box", exchange="sorting_room",
routing_key="jason")

The binding is pretty straight forward. Any messages arriving at the “sorting_room” exchange with the routing key “jason” gets routed to the “po_box” queue.

Now, there’s two methods of getting messages out of the queue. The first is to call chan.basic_get() to pull the next message off the queue (if there are no messages waiting on the queue, chan.basic_get() will return a None object…thereby blowing up the print msg.body code below if not trapped) :

msg = chan.basic_get("po_box")
print msg.body
chan.basic_ack(msg.delivery_tag)

But what if you want your application to be notified as soon as a message is available for it? To do that, instead of chan.basic_get(), you need to register a callback for new messages using chan.basic_consume():

def recv_callback(msg):
print 'Received: ' + msg.body
chan.basic_consume(queue='po_box', no_ack=True,
callback=recv_callback, consumer_tag="testtag")
while True:
chan.wait()
chan.basic_cancel("testtag")

chan.wait() is looped infinitely, which is what causes the channel to wait for the next message notification from the queue. chan.basic_cancel() is how you unregister your message notification callback. The argument specifies the consumer_tag you specified in the original chan.basic_consume() registration (that’s how it figures out which callback to unregister). In this case chan.basic_cancel() never gets called due to the infinite loop that precedes it…but you need to know about it, so it’s in the snippet.

The one additional thing you should pay attention to in the consumer is the no_ack argument. It’s accepted on both chan.basic_get() and chan.basic_consume() and defaults to false. When you grab a message off a queue, RabbitMQ needs you to explicitly acknowledge that you have it. If you don’t, RabbitMQ will re-assign the message to another consumer on the queue after a timeout interval (or on disconnect by the consumer that initially received it without ack’ing it). If you set the no_ack argument to true, then py-amqplib will add a “no_ack” property to your AMQP request for the next message. That will instruct the AMQP server to not expect an acknowledgement for that get/consume. However, in most cases, you probably want to send the acknowledgement yourself (e.g. you need to put the message contents in a database before you acknowledge). Acknowledgements are done by caling the chan.basic_ack() method, using the delivery_tag property of the message you’re acknowledging as the argument (see the chan.basic_get() code snippet above for an example).

That’s all she wrote for the consumer. (Download: amqp_consumer.py)

But what good is a consumer, if nobody is sending it messages? So you need a publisher. The code below will publish a simple message to the “sorting_room” exchange and mark it with the routing key “jason”:

msg = amqp.Message("Test message!")
msg.properties["delivery_mode"] = 2
chan.basic_publish(msg,exchange="sorting_room",routing_key="jason")

You may notice that we set the delivery_mode element of the message’s properties to “2”. Since the queue and exchange were marked durable, this will ensure the message is sent as persistent (i.e. will survive a reboot of RabbitMQ while it is in transit to the consumer).

The only other thing we need to do (and this needs to be done on both consumer and publisher apps), is close the channel and connection:

chan.close()
conn.close()

Pretty simple, no? (Download: amqp_publisher.py)

Giving it a shot…

Now we’ve written our consumer and publisher, so let’s give it a go. (This assumes you have RabbitMQ installed and running on localhost.)

Open up the first terminal, and run python ./amqp_consumer.py to get the consumer running and to create your queues, exchanges and bindings.

Then run python ./amqp_publisher.py “AMQP rocks.” in a second terminal. If everything went well, you should see your message printed by the consumer on the first terminal.

Taking it all in

I realize this has been a really fast run through AMQP/RabbitMQ and using it from Python. Hopefully, it will fill in some of the holes of how all the concepts fit together and how they get used in a real Python program. If you find any errors in my write-up, I’d very much appreciate it if you’d please let me know (williamsjj@digitar.com). Similarly, I’d be happy to answer any questions that I can. Next up….clustering! But I’ve got to figure it out first. :-)

NB: Special thanks to Barry Pederson and Gordon Sims for correcting my understanding of no_ack’s operation and for catching syntactically incorrect Python code I missed.

NB: My knowledge on the subject was distilled from these sources, which are excellent further reading:

Comments (80)

The only way to uniquely tag a message end-to-end is to put the ID in the message payload. There is no way to retrieve a specific message by ID. You can only consume the next message, if its not the one you want, refuse it so it will be re-queued for someone else and try again.

Hi,
First of all – Thanks for this great article.
I have a Django project with Orbited and stomp.py
It’s work just fine. But as I know for production we should use RabbitMQ. I can’t find any working example for stomp.py and rabbitMQ.
Can any one suggest something?
Best regards
Miro

[...] Asynchronous task queuing and control. [...]

After a bit of googling, I think the broken link to “zeromq: Message-oriented Middleware Analysis” at the end of the article should maybe point to this whitepaper: http://www.zeromq.org/whitepapers:amqp-analysis

[...] for our use cases; therefore we investigated the AMQP servers much more heavily. Here’s a decent example of how to apply AMQP to a particular use case (with emphasis on [...]

[...] RabbitMQ作为一个工业级的消息队列服务器,在其客户端手册列表的Python段当中推荐了一篇blog,作为RabbitMQ+Python的入门手册再合适不过了。不过,正如其标题Rabbit and Warrens(兔子和养兔场)一样,这篇英文写的相当俏皮,以至于对于我等非英文读者来说不像一般的技术文档那么好懂,所以,翻译一下吧。翻译过了,希望其他人可以少用一些时间。翻译水平有限,不可能像原文一样俏皮,部分地方可能就意译了,希望以容易懂为准。想看看老外的幽默的,推荐去看原文,其实,也不是那么难理解…… [...]

Hi Miroslav. Unfortunately, I don’t have any real experience with STOMP. We use AMQP libraries everywhere. Now mostly txAMQP and Pika.

[...] RabbitMQ作为一个工业级的消息队列服务器,在其客户端手册列表的Python段当中推荐了一篇blog,作为RabbitMQ+Python的入门手册再合适不过了。不过,正如其标题Rabbit and Warrens(兔子和养兔场)一样,这篇英文写的相当俏皮,以至于对于我等非英文读者来说不像一般的技术文档那么好懂,所以,翻译一下吧。翻译过了,希望其他人可以少用一些时间。翻译水平有限,不可能像原文一样俏皮,部分地方可能就意译了,希望以容易懂为准。想看看老外的幽默的,推荐去看原文,其实,也不是那么难理解…… [...]

Thanks for the great article.

I think that you might want to checkout celeryproject.org. It binds rabbitmq with python in a nice simple way. My company has been using it for some time and we are very happy with how it works.

It’s really good article about RabbitMQ.We are going to use it as our messagebus.

@Jason; dont you thing putting the ID in the message payload would be an overhead, especially when we are talking about a monsterous project?

thank you very much for sharing this. It saved me days of going through the docs and figuring out the quirkiness myself.

I am still deciding between beanstalkd and rabbitmq. For manging our email queues with rate control, I feel that rabbitMQ could be an overkill. Hope you could shed some light on this. Also I am looking at some way to aggregate logs from several application and web server instances. This [messaging] could be a great way to do so. I believe there must be an existing solution already out there which I haven;t heard of …yet.

thanks again mate. Wherever you are on the planet I owe you a big one.

[...] middleware. Jason Williams describes what it is and what it can be used [...]

[...] 英文原文在此:Rabbits and warrens. [...]

[...] RabbitMQ作为一个工业级的消息队列服务器,在其客户端手册列表的Python段当中推荐了一篇blog,作为RabbitMQ+Python的入门手册再合适不过了。不过,正如其标题Rabbit and Warrens(兔子和养兔场)一样,这篇英文写的相当俏皮,以至于对于我等非英文读者来说不像一般的技术文档那么好懂,所以,翻译一下吧。翻译过了,希望其他人可以少用一些时间。翻译水平有限,不可能像原文一样俏皮,部分地方可能就意译了,希望以容易懂为准。想看看老外的幽默的,推荐去看原文,其实,也不是那么难理解…… [...]

[...] RabbitMQ作为一个工业级的消息队列服务器,在其客户端手册列表的Python段当中推荐了一篇blog,作为RabbitMQ+Python的入门手册再合适不过了。不过,正如其标题Rabbit and Warrens(兔子和养兔场)一样,这篇英文写的相当俏皮,以至于对于我等非英文读者来说不像一般的技术文档那么好懂,所以,翻译一下吧。翻译过了,希望其他人可以少用一些时间。翻译水平有限,不可能像原文一样俏皮,部分地方可能就意译了,希望以容易懂为准。想看看老外的幽默的,推荐去看原文,其实,也不是那么难理解……原文:http://blogs.digitar.com/jjww/2009/01/rabbits-and-warrens/兔子和兔子窝当时我们的动机很简单:从生产环境的电子邮件处理流程当中分支出一个特定的离线分析流程。我们开始用的MySQL,将要处理的东西放在表里面,另一个程序从中取。不过很快,这种设计的丑陋之处就显现出来了…… 你想要多个程序从一个队列当中取数据来处理?没问题,我们硬编码程序的个数好了……什么?还要能够允许程序动态地增加和减少的时候动态进行压力分配?是的,当年我们想的简单的东西(做一个分支处理)逐渐变成了一个棘手的问题。以前拿着锤子(MySQL)看所有东西都是钉子(表)的年代是多么美好……在搜索了一下之后,我们走进了消息队列(message queue)的大门。不不,我们当然知道消息队列是什么,我们可是以做电子邮件程序谋生的。我们实现过各种各样的专业的,高速的内存队列用来做电子邮件处理。我们不知道的是那一大类现成的、通用的消息队列(MQ)服务器——无论是用什么语言写出的,不需要复杂的装配的,可以自然的在网络上的应用程序之间传送数据的一类程序。不用我们自己写?看看再说。让大家看看你们的Queue吧……过去的4年里,人们写了有好多好多的开源的MQ服务器啊。其中大多数都是某公司例如LiveJournal写出来用来解决特定问题的。它们的确不关心上面跑的是什么类型的消息,不过他们的设计思想通常是和创建者息息相关的(消息的持久化,崩溃恢复等通常不在他们考虑范围内)。不过,有三个专门设计用来做及其灵活的消息队列的程序值得关注:Apache ActiveMQZeroMQRabbitMQApache ActiveMQ 曝光率最高,不过看起来它有些问题,可能会造成丢消息。不可接受,下一个。ZeroMQ 和 RabbitMQ 都支持一个开源的消息协议,成为AMQP。AMQP的一个优点是它是一个灵活和开放的协议,以便和另外两个商业化的Message Queue (IBM和Tibco)竞争,很好。不过ZeroMQ不支持消息持久化和崩溃恢复,不太好。剩下的只有RabbitMQ了。如果你不在意消息持久化和崩溃恢复,试试ZeroMQ吧,延迟很低,而且支持灵活的拓扑。剩下的只有这个吃胡萝卜的家伙了……当我读到它是用Erlang写的时候,RabbitMQ震了我一下。Erlang 是爱立信开发的高度并行的语言,用来跑在电话交换机上。是的,那些要求6个9的在线时间的东西。在Erlang当中,充斥着大量轻量进程,它们之间用消息传递来通信。听起来思路和我们用消息队列的思路是一样的,不是么?而且,RabbitMQ支持持久化。是的,如果RabbitMQ死掉了,消息并不会丢失,当队列重启,一切都会回来。而且,正如在DigiTar(注:原文作者的公司)做事情期望的那样,它可以和Python无缝结合。除此之外,RabbitMQ的文档相当的……恐怖。如果你懂AMQP,这些文档还好,但是有多少人懂AMQP?这些文档就像MySQL的文档假设你已经懂了SQL一样……不过没关系啦。好了,废话少说。这里是花了一周时间阅读关于AMQP和关于它如何在RabbitMQ上工作的文档之后的一个总结,还有,怎么在Python当中使用。开始吧AMQP当中有四个概念非常重要:虚拟主机(virtual host),交换机(exchange),队列(queue)和绑定(binding)。一个虚拟主机持有一组交换机、队列和绑定。为什么需要多个虚拟主机呢?很简单,RabbitMQ当中,用户只能在虚拟主机的粒度进行权限控制。因此,如果需要禁止A组访问B组的交换机/队列/绑定,必须为A和B分别创建一个虚拟主机。每一个RabbitMQ服务器都有一个默认的虚拟主机“/”。如果这就够了,那现在就可以开始了。交换机,队列,还有绑定……天哪!刚开始我思维的列车就是在这里脱轨的…… 这些鬼东西怎么结合起来的?队列(Queues)是你的消息(messages)的终点,可以理解成装消息的容器。消息就一直在里面,直到有客户端(也就是消费者,Consumer)连接到这个队列并且将其取走为止。不过。你可以将一个队列配置成这样的:一旦消息进入这个队列,biu~,它就烟消云散了。这个有点跑题了……需要记住的是,队列是由消费者(Consumer)通过程序建立的,不是通过配置文件或者命令行工具。这没什么问题,如果一个消费者试图创建一个已经存在的队列,RabbitMQ就会起来拍拍他的脑袋,笑一笑,然后忽略这个请求。因此你可以将消息队列的配置写在应用程序的代码里面。这个概念不错。OK,你已经创建并且连接到了你的队列,你的消费者程序正在百无聊赖的敲着手指等待消息的到来,敲啊,敲啊…… 没有消息。发生了什么?你当然需要先把一个消息放进队列才行。不过要做这个,你需要一个交换机(Exchange)……交换机可以理解成具有路由表的路由程序,仅此而已。每个消息都有一个称为路由键(routing key)的属性,就是一个简单的字符串。交换机当中有一系列的绑定(binding),即路由规则(routes),例如,指明具有路由键 “X” 的消息要到名为timbuku的队列当中去。先不讨论这个,我们有点超前了。你的消费者程序要负责创建你的交换机们(复数)。啥?你是说你可以有多个交换机?是的,这个可以有,不过为啥?很简单,每个交换机在自己独立的进程当中执行,因此增加多个交换机就是增加多个进程,可以充分利用服务器上的CPU核以便达到更高的效率。例如,在一个8核的服务器上,可以创建5个交换机来用5个核,另外3个核留下来做消息处理。类似的,在RabbitMQ的集群当中,你可以用类似的思路来扩展交换机一边获取更高的吞吐量。OK,你已经创建了一个交换机。但是他并不知道要把消息送到哪个队列。你需要路由规则,即绑定(binding)。一个绑定就是一个类似这样的规则:将交换机“desert(沙漠)”当中具有路由键“阿里巴巴”的消息送到队列“hideout(山洞)”里面去。换句话说,一个绑定就是一个基于路由键将交换机和队列连接起来的路由规则。例如,具有路由键“audit”的消息需要被送到两个队列,“log-forever”和“alert-the-big-dude”。要做到这个,就需要创建两个绑定,每个都连接一个交换机和一个队列,两者都是由“audit”路由键触发。在这种情况下,交换机会复制一份消息并且把它们分别发送到两个队列当中。交换机不过就是一个由绑定构成的路由表。现在复杂的东西来了:交换机有多种类型。他们都是做路由的,不过接受不同类型的绑定。为什么不创建一种交换机来处理所有类型的路由规则呢?因为每种规则用来做匹配分子的CPU开销是不同的。例如,一个“topic”类型的交换机试图将消息的路由键与类似“dogs.*”的模式进行匹配。匹配这种末端的通配符比直接将路由键与“dogs”比较(“direct”类型的交换机)要消耗更多的CPU。如果你不需要“topic”类型的交换机带来的灵活性,你可以通过使用“direct”类型的交换机获取更高的处理效率。那么有哪些类型,他们又是怎么处理的呢?Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。我在RedHat的朋友做了一张不错的图,来表明topic交换机是如何工作的:持久化这些小东西们你花了大量的时间来创建队列、交换机和绑定,然后,砰~服务器程序挂了。你的队列、交换机和绑定怎么样了?还有,放在队列里面但是尚未处理的消息们呢?放松~如果你是用默认参数构造的这一切的话,那么,他们,都,biu~,灰飞烟灭了。是的,RabbitMQ重启之后会干净的像个新生儿。你必须重做所有的一切,亡羊补牢,如何避免将来再度发生此类杯具?队列和交换机有一个创建时候指定的标志durable,直译叫做坚固的。durable的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列当中的消息会在重启后恢复。那么如何才能做到不只是队列和交换机,还有消息都是持久的呢?但是首先一个问题是,你真的需要消息是持久的吗?对于一个需要在重启之后回复的消息来说,它需要被写入到磁盘上,而即使是最简单的磁盘操作也是要消耗时间的。如果和消息的内容相比,你更看重的是消息处理的速度,那么不要使用持久化的消息。不过对于我们@DigiTar来说,持久化很重要。当你将消息发布到交换机的时候,可以指定一个标志“Delivery Mode”(投递模式)。根据你使用的AMQP的库不同,指定这个标志的方法可能不太一样(我们后面会讨论如何用Python搞定)。简单的说,就是将Delivery Mode设置成2,也就是持久的(persistent)即可。一般的AMQP库都是将Delivery Mode设置成1,也就是非持久的。所以要持久化消息的步骤如下:将交换机设成 durable。将队列设成 durable。将消息的 Delivery Mode 设置成2 。就这样,不是很复杂,起码没有造火箭复杂,不过也有可能犯点小错误。下面还要罗嗦一个东西……绑定(Bindings)怎么办?我们无法在创建绑定的时候设置成durable。没问题,如果你绑定了一个durable的队列和一个durable的交换机,RabbitMQ会自动保留这个绑定。类似的,如果删除了某个队列或交换机(无论是不是durable),依赖它的绑定都会自动删除。注意两点:RabbitMQ 不允许你绑定一个非坚固(non-durable)的交换机和一个durable的队列。反之亦然。要想成功必须队列和交换机都是durable的。一旦创建了队列和交换机,就不能修改其标志了。例如,如果创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建。因此,最好仔细检查创建的标志。开始喂蛇了~【译注】说喂蛇是因为Python的图标是条蛇。AMQP的一个空白地带是如何在Python当中使用。对于其他语言有一大坨材料。Java – http://www.rabbitmq.com/java-client.html.NET – http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.5.0/rabbitmq-dotnet-client-1.5.0-user-guide.pdfRuby – http://somic.org/2008/06/24/ruby-amqp-rabbitmq-example/但是对Python老兄来说,你需要花点时间来挖掘一下。所以我写了这个,这样别的家伙们就不需要经历我这种抓狂的过程了。首先,我们需要一个Python的AMQP库。有两个可选:py-amqplib – 通用的AMQPtxAMQP – 使用 Twisted 框架的AMQP库,因此允许异步I/O。根据你的需求,py-amqplib或者txAMQP都是可以的。因为是基于Twisted的,txAMQP可以保证用异步IO构建超高性能的AMQP程序。但是Twisted编程本身就是一个很大的主题……因此清晰起见,我们打算用 py-amqplib。更新:请参见Esteve Fernandez关于txAMQP的使用和代码样例的回复。AMQP支持在一个TCP连接上启用多个MQ通信channel,每个channel都可以被应用作为通信流。每个AMQP程序至少要有一个连接和一个channel。from amqplib import client_0_8 as amqp conn = amqp.Connection(host="localhost:5672 ", userid="guest", password="guest", virtual_host="/", insist=False) chan = conn.channel()每个channel都被分配了一个整数标识,自动由Connection()类的.channel()方法维护。或者,你可以使用.channel(x)来指定channel标识,其中x是你想要使用的channel标识。通常情况下,推荐使用.channel()方法来自动分配channel标识,以便防止冲突。现在我们已经有了一个可以用的连接和channel。现在,我们的代码将分成两个应用,生产者(producer)和消费者(consumer)。我们先创建一个消费者程序,他会创建一个叫做“po_box”的队列和一个叫“sorting_room”的交换机:chan.queue_declare(queue="po_box", durable=True, exclusive=False, auto_delete=False) chan.exchange_declare(exchange="sorting_room", type="direct", durable=True, auto_delete=False,)这段代码干了啥?首先,它创建了一个名叫“po_box”的队列,它是durable的(重启之后会重新建立),并且最后一个消费者断开的时候不会自动删除(auto_delete=False)。在创建durable的队列(或者交换机)的时候,将auto_delete设置成false是很重要的,否则队列将会在最后一个消费者断开的时候消失,与durable与否无关。如果将durable和auto_delete都设置成True,只有尚有消费者活动的队列可以在RabbitMQ意外崩溃的时候自动恢复。(你可以注意到了另一个标志,称为“exclusive”。如果设置成True,只有创建这个队列的消费者程序才允许连接到该队列。这种队列对于这个消费者程序是私有的)。还有另一个交换机声明,创建了一个名字叫“sorting_room”的交换机。auto_delete和durable的含义和队列是一样的。但是,.excange_declare() 还有另外一个参数叫做type,用来指定要创建的交换机的类型(如前面列出的): fanout, direct 和 topic.到此为止,你已经有了一个可以接收消息的队列和一个可以发送消息的交换机。不过我们需要创建一个绑定,把它们连接起来。chan.queue_bind(queue=”po_box”, exchange=”sorting_room”,routing_key=”jason”)这个绑定的过程非常直接。任何送到交换机“sorting_room”的具有路由键“jason” 的消息都被路由到名为“po_box” 的队列。现在,你有两种方法从队列当中取出消息。第一个是调用chan.basic_get(),主动从队列当中拉出下一个消息(如果队列当中没有消息,chan.basic_get()会返回None, 因此下面代码当中print msg.body 会在没有消息的时候崩掉):msg = chan.basic_get("po_box") print msg.body chan.basic_ack(msg.delivery_tag)但是如果你想要应用程序在消息到达的时候立即得到通知怎么办?这种情况下不能使用chan.basic_get(),你需要用chan.basic_consume()注册一个新消息到达的回调。def recv_callback(msg):     print 'Received: ' + msg.body chan.basic_consume(queue='po_box', no_ack=True, callback=recv_callback, consumer_tag="testtag") while True: chan.wait() chan.basic_cancel("testtag")chan.wait() 放在一个无限循环里面,这个函数会等待在队列上,直到下一个消息到达队列。chan.basic_cancel() 用来注销该回调函数。参数consumer_tag 当中指定的字符串和chan.basic_consume() 注册的一直。在这个例子当中chan.basic_cancel() 不会被调用到,因为上面是个无限循环…… 不过你需要知道这个调用,所以我把它放在了代码里。需要注意的另一个东西是no_ack参数。这个参数可以传给chan.basic_get()和chan.basic_consume(),默认是false。当从队列当中取出一个消息的时候,RabbitMQ需要应用显式地回馈说已经获取到了该消息。如果一段时间内不回馈,RabbitMQ会将该消息重新分配给另外一个绑定在该队列上的消费者。另一种情况是消费者断开连接,但是获取到的消息没有回馈,则RabbitMQ同样重新分配。如果将no_ack 参数设置为true,则py-amqplib会为下一个AMQP请求添加一个no_ack属性,告诉AMQP服务器不需要等待回馈。但是,大多数时候,你也许想要自己手工发送回馈,例如,需要在回馈之前将消息存入数据库。回馈通常是通过调用chan.basic_ack()方法,使用消息的delivery_tag属性作为参数。参见chan.basic_get() 的实例代码。好了,这就是消费者的全部代码。(下载:amqp_consumer.py)不过没有人发送消息的话,要消费者何用?所以需要一个生产者。下面的代码示例表明如何将一个简单消息发送到交换区“sorting_room”,并且标记为路由键“jason” :msg = amqp.Message("Test message!") msg.properties["delivery_mode"] = 2 chan.basic_publish(msg,exchange="sorting_room",routing_key="jason")你也许注意到我们设置消息的delivery_mode属性为2,因为队列和交换机都设置为durable的,这个设置将保证消息能够持久化,也就是说,当它还没有送达消费者之前如果RabbitMQ重启则它能够被恢复。剩下的最后一件事情(生产者和消费者都需要调用的)是关闭channel和连接:chan.close() conn.close()很简单吧。(下载:amqp_publisher.py)来真实地跑一下吧……现在我们已经写好了生产者和消费者,让他们跑起来吧。假设你的RabbitMQ在localhost上安装并且运行。打开一个终端,执行python ./amqp_consumer.py让消费者运行,并且创建队列、交换机和绑定。然后在另一个终端运行python ./amqp_publisher.py “AMQP rocks.” 。如果一切良好,你应该能够在第一个终端看到输出的消息。付诸使用吧我知道这个教程是非常粗浅的关于AMQP/RabbitMQ和如何使用Python访问的教程。希望这个可以说明所有的概念如何在Python当中被组合起来。如果你发现任何错误,请联系原作者(williamsjj@digitar.com) 【译注:如果是翻译问题请联系译者】。同时,我很高兴回答我知道的问题。【译注:译者也是一样的】。接下来是,集群化(clustering)!不过我需要先把它弄懂再说。注:关于RabbitMQ的知识我主要来自这些来源,推荐阅读:zeromq:消息中间件分析RabbitMQ .NET客户端库用户手册高级消息队列协议(Advanced Message Queuing Protocol):协议规约0.8 版本–完– ActiveMQ MQ phthon RabbitMQ 消息队列 TwitterDiggFacebookDeliciousStumbleUpon [...]

Have a look at: http://pypi.python.org/pypi/kombu/

It’s another AMQP library for python. It looks pretty decent, doc’s aren’t half bad either.

I would suggest Celery as a good Python library to use with RabbitMQ (or even some other AMQP Broker): http://celeryproject.org/

[...] for our use cases; therefore we investigated the AMQP servers much more heavily. Here’s a decent example of how to apply AMQP to a particular use case (with emphasis on [...]

Thank you for shedding light on the AMQP and rabbitmq. I have grasped the concepts well. I want to do asynchronous communication using RabbitMQ. Does celery provides that and I am using pika.

Again Thanks for UR Help

Technically, all AMQP is asynchronous. When you publish, there is no synchronous response from the broker indicating acceptance or delivery (you can use publisher confirms to do this asynchronously). Once you publish you can move on to the next task in your program. Consumption is also asynchronous, though clients typically engineer consumption as a loop to wait for the next message. You can choose a client that uses an event-oriented programming pattern and will therefore allow your single-threaded program to do other things while it waits for messages to arrive. txAMQP and the async connection class in Pika both accomplish this. Celery is an entirely different animal. It is a way for your programs to create and ship tasks that can be executed remotely by celery clients (excellent for batch-oriented workloads).

Glad it helped! If you need more detail, our Rabbit book is now content complete: http://manning.com/videla

No. Unique IDs are used all the time in high volume applications. Particularly in our own.

Hi jason, i have a rabbitmq server running on my system and i want to connect to it.
I’m able to connect and create a channel but i don’t know how find parameters like queue, exchange for a specific process.
Is there command’s line that display me all the information about rabbit server and process bind to it?

Hope you can help me.
Stefano

Could you provide a link for this update:

UPDATE: Please check the comments for example code showing use of txAMQP from Esteve Fernandez.

Tnx,

Jelle

“ZeroMQ and RabbitMQ both support an open messaging protocol called AMQP.”

ZeroMQ is NOT an AMQP implementation. It’s a transport agnostic socket interface, among other things, with a few overlapping functionalities with AMQP. The most significant difference is that ZeroMQ is a broker-less system. RabbitMQ and ZeroMQ are just different and ZeroMQ has little to nothing to do with AMQP.

See here: http://www.zeromq.org/docs:welcome-from-amqp

Please correct the article, it might confuse people who are just starting out. This is a deep rabbit hole (excuse the pun.)

Actually, when this was written ZeroMQ did support AMQP. I understand it’s been dropped from support since. I’ll look at marking it as a historical comment.

Ha, did not know that. Thanks!

Yeah, when iMatix first released ZMQ it was after they’d had a spitting match with the AMQP Working Group. iMatix wrote the first AMQP broker implementations, and there was a spat between iMatix on one-side and RedHat/JPMorgan on the other over what became the abomination that is AMQP 1.0. iMatix decided to pick up their marbles and go home…and ZMQ showed up. Honestly, I wish they’d stayed in the AMQP WG because they were right about 1.0…it’s a cluster. AMQP 0.9.1 is great and should have been labelled 1.0.

Use ‘rabbitmqctl list_connections’ to see who’s connected.

Write a comment