Posted by Jason | Posted in Software Development, Technology | Posted on 01-13-2009
Tags: amqp, py-amqplib, rabbitmq
If you like Rabbit and Warrens checkout RabbitMQ in Action in the sidebar.
The goal was simple enough: decouple a particular type of analysis out-of-band from mainstream e-mail processing. We started down the MySQL road…put the things to be digested into a table…consume them in another daemon…bada bing bada boom. But pretty soon, complex ugliness crept into the design phase… You want to have multiple daemons servicing the queue?…no problem we’ll just hard code node numbers…what? you want dynamic load re-assignment when daemons join and die?
You get the idea…what was supposed to be simple (decouple something) was spinning its own Gordian knot. It seemed like a good time to see if every problem was looking like a nail (table), because all we had were hammers (MySQL).
A short search later, and we entered the world of message queueing. No, no…we know obviously what a message queue is. Heck, we do e-mail for a living. We’ve implemented all sorts of specialized, high-speed, in-memory queues for e-mail processing. What we weren’t aware of was the family of off-the-shelf, generalized, message queueing (MQ) servers…a language-agnostic, no-assembly required way to wire routing between applications over a network. A message queue we didn’t have to write ourselves? Hold your tongue.
Open up your queue…
Cutting to the chase, over the last 4 years there have been no shortage of open-source message queueing servers written. Most of them are one-offs by folks like LiveJournal to scratch a particular itch. Yeah, they don’t really care what kind of messages they carry, but their design parameters are usually creator-specific (and message persistence after a crash usually isn’t one of them). However, there are three in-particular, that are designed to be highly flexible message queues for their own sake:
Apache ActiveMQ gets the most press, but it appears to have some issues not losing messages. Next.
ZeroMQ and RabbitMQ both support an open messaging protocol called AMQP. The advantage to AMQP is that it’s designed to be a highly-robust and open alternative to the two commercial message queues out there (IBM and Tibco). Muy bueno. However, ZeroMQ doesn’t support message persistence across crashes reboots. No muy bueno. That leaves us with RabbitMQ. (That being said if you don’t need persistence ZeroMQ is pretty darn interesting…incredibly low latency and flexible topologies).
That leaves us with the carrot muncher…
RabbitMQ pretty much sold me the minute I read “written in Erlang”. Erlang is a highly parallel programming language developed over at Ericsson for running telco switches…yeah the kind with six bazillion 9s of uptime. In Erlang, its supposedly trivial to spin off processes and then communicate between them using message passing. Seems like the ideal underpinning for a message queue no?
Also, RabbitMQ supports persistence. Yes Virginia, if your RabbitMQ dies, your messages don’t have to die an unwitting death…they can be reborn in your queues on reboot. Oh…and as is always desired @ DigiTar, it plays nicely with python. All that being said, RabbitMQs documentation is well…horrible. Lemme rephrase, if you already understand AMQP, the docs are fine. But how many folks know AMQP? It’d be like MySQL docs assuming you knew some form of SQL…er…nevermind.
So, without further ado…here is a reduction of a weeks’ worth of reading up on AMQP and how it works in RabbitMQ…and how to play with it in Python:
There are four building blocks you really care about in AMQP: virtual hosts, exchanges, queues and bindings. A virtual host holds a bundle of exchanges, queues and bindings. Why would you want multiple virtual hosts? Easy. A username in RabbitMQ grants you access to a virtual host…in its entirety. So the only way to keep group A from accessing group B’s exchanges/queues/bindings/etc. is to create a virtual host for A and one for B. Every RabbitMQ server has a default virtual host named “/”. If that’s all you need, you’re ready to roll.
Exchanges, Queues and bindings…oh my!
Here’s where my railcar went off the tracks initially. How do all the parts thread together?
Queues are where your “messages” end up. They’re message buckets…and your messages sit there until a client (a.k.a. consumer) connects to the queue and siphons it off. However, you can configure a queue so that if there isn’t a consumer ready to accept the message when it hits the queue, the message goes poof. But we digress…
The important thing to remember is that queues are created programmatically by your consumers (not via a configuration file or command line program). That’s OK, because if a consumer app tries to “create” a queue that already exists, RabbitMQ pats it on the head, smiles gently and NOOPs the request. So you can keep your MQ configuration in-line with your app code…what a concept.
OK, so you’ve created and attached to your queue, and your consumer app is drumming its fingers waiting for a message…and drumming…and drumming…but alas no message. What happened? Well you gotta pump a message in first! But to do that you’ve got to have an exchange…
Exchanges are routers with routing tables. That’s it. End stop. Every message has what’s known as a “routing key”, which is simply a string. The exchange has a list of bindings (routes) that say, for example, messages with routing key “X” go to queue “timbuktu”. But we get slightly ahead of ourselves.
Your consumer application should create your exchanges (plural). Wait? You mean you can have more than one exchange? Yes, you can, but why? Easy. Each exchange operates in its own userland process, so adding exchanges, adds processes allowing you to scale message routing capacity with the number of cores in your server. As an example, on an 8-core server you could create 5 exchanges to maximize your utilization, leaving 3 cores open for handling the queues, etc.. Similarly, in a RabbitMQ cluster, you can use the same principle to spread exchanges across the cluster members to add even more throughput.
OK, so you’ve created an exchange…but it doesn’t know what queues the messages go in. You need “routing rules” (bindings). A binding essentially says things like this: put messages that show up in exchange “desert” and have routing key “ali-baba” into the queue “hideout”. In other words, a binding is a routing rule that links an exchange to a queue based on a routing key. It is possible for two binding rules to use the same routing key. For example, maybe messages with the routing key “audit” need to go both to the “log-forever” queue and the “alert-the-big-dude” queue. To accomplish this, just create two binding rules (each one linking the exchange to one of the queues) that both trigger on routing key “audit”. In this case, the exchange duplicates the message and sends it to both queues. Exchanges are just routing tables containing bindings.
Now for the curveball: there are multiple types of exchanges. They all do routing, but they accept different styles of binding “rules”. Why not just create one type of exchange for all style of rules? Because each rule style has a different CPU cost for analyzing if a message matches the rule. For example, a “topic” exchange tries to match a message’s routing key against a pattern like “dogs.*”. Matching that wildcard on the end takes more CPU than simply seeing if the routing key is “dogs” or not (e.g. a “direct” exchange). If you don’t need the extra flexibility of a “topic” exchange, you can get more messages/sec routed if you choose the “direct” exchange type. So what are the types and how do they route?
Fanout Exchange – No routing keys involved. You simply bind a queue to the exchange. Any message that is sent to the exchange is sent to all queues bound to that exchange. Think of it like a subnet broadcast. Any host on the subnet gets a copy of the packet. Fanout exchanges route messages the fastest.
Direct Exchange – Routing keys are involved. A queue binds to the exchange to request messages that match a particular routing key exactly. This is a straight match. If a queue binds to the exchange requesting messages with routing key “dog”, only messages labelled “dog” get sent to that queue (not “dog.puppy”, not “dog.guard“…only “dog”).
Topic Exchange – Matches routing keys against a pattern. Instead of binding with a particular routing key, the queue binds with a pattern string. The symbol # matches one or more words, and the symbol * matches any single word (no more, no less). So “audit.#” would match “audit.irs.corporate”, but “audit.*” would only match “audit.irs”. Our friends at RedHat have put together a great image to express how topic exchanges work:
Source: RabbitMQ in Action (by me and a very cool Uruguayan dude…Mr. Alvaro)
Persistent little bugger…
You spend all that time creating your queues, exchanges and bindings, and then BANG!…the server fries faster than the griddle at McDonald’s. All your queues, exchanges and bindings are there right? Oh geez…what about the messages in the queues you hadn’t serviced yet?
Relax, providing you created everything with the default arguments, it’s all gone…poof…whoosh…nada…nil. That’s right, RabbitMQ rebooted as empty as a baby’s noggin. You gotta redo everything kemosabe. How do you keep this from happening in the future?
On your queues and your exchanges there’s a creation-time flag called “durable”. There’s only one thing durable means in AMQP-land…the queue or exchange marked durable will be re-created automatically on reboot. It does not mean the messages in the queues will survive the reboot. They won’t. So how do we make not only our config but messages persist through a reboot?
Well the first question is, do you really want your messages to persist? For a message to last through a reboot, it has to be written to disk, and even a simple checkpoint to disk takes time. If you value message routing speed more than the contents of the message, don’t make your messages persistent. That being said, for our particular needs @ DigiTar, persistence is important.
When you publish your message to an exchange, there’s a flag called “Delivery Mode”. Depending on the AMQP library you’re using there will be different ways of setting it (we’ll cover the Python library later). But the long and the short of it is you want the “Delivery Mode” set to the value 2, which means “persistent”. “Delivery Mode” usually (depending on your AMQP library) defaults to a value of 1, which means “non-persistent”. So the steps for persistent messaging are:
- Mark the exchange “durable”.
- Mark the queue “durable”.
- Set the message’s “delivery mode” to a value of 2
That’s it. Not really rocket science, but enough moving parts to make a mistake and send little Sally’s dental records into cyber-Nirvana.
There may be one thing nagging you though…what about the binding? We didn’t mark the binding “durable” when we created it. It’s alright. If you bind a durable queue to a durable exchange, RabbitMQ will automatically preserve the binding. Similarly, if you delete any exchange/queue (durable or not) any bindings that depend on it get deleted automatically.
Two things to be aware of:
- RabbitMQ will not allow you to bind a non-durable exchange to a durable queue, or vice-versa. Both the exchange and the queue must be durable for the binding operation to succeed.
- You cannot change the creation flags on a queue or exchange after you’ve created it. For example, if you create a queue as “non-durable”, and want to change it to “durable”, the only way to do this is to destroy the queue and re-create it. It’s a good reason to double check your declarations.
Food for snakes
A real empty area for AMQP usage is using it in Python programs. For other languages there are plenty of references:
- Java – http://www.rabbitmq.com/java-client.html
- .NET – http://www.rabbitmq.com/releases/rabbitmq-dotnet-client/v1.5.0/rabbitmq-dotnet-client-1.5.0-user-guide.pdf
- Ruby – http://somic.org/2008/06/24/ruby-amqp-rabbitmq-example/
But for little old Python, you need to dig it out yourself. So other folks don’t have to wander in the wilderness like I did, here’s a little primer on using Python to do the AMQP-tasks we’ve talked about:
First, you’ll need a Python AMQP library…and there are two:
- py-amqplib – General AMQP library
- txAMQP – An AMQP library that uses the Twisted framework, thereby allowing asynchronous I/O.
Depending on your needs, py-amqplib or txAMQP may be more to your liking. Being Twisted-based, txAMQP holds the promise of building super performing AMQP consumers that use async I/O. But Twisted programming is a topic all its own…so we’re going to use py-amqplib for clarity’s sake. UPDATE: Please check the comments for example code showing use of txAMQP from Esteve Fernandez.
AMQP supports pipelining multiple MQ communication channels over one TCP connection, where each channel is a communication stream used by your program. Every AMQP program has at least one connection and one channel:
from amqplib import client_0_8 as amqp conn = amqp.Connection(host="localhost:5672 ", userid="guest", password="guest", virtual_host="/", insist=False) chan = conn.channel()
Each channel is assigned an integer channel number automatically by the .channel() method of the Connection() class. Alternately, you can specify the channel number yourself by calling .channel(x) , where x is the channel number you want. More often than not, its a good idea to just let the .channel() method auto-assign the channel number to avoid collisions.
Now we’ve got a connection and channel to talk over. At this point, our code is going to diverge into two applications that use that same bit we’ve created so far: a consumer and the publisher. Let’s create the consumer app by creating a queue named “po_box” and an exchange named “sorting_room”:
chan.queue_declare(queue="po_box", durable=True, exclusive=False, auto_delete=False) chan.exchange_declare(exchange="sorting_room", type="direct", durable=True, auto_delete=False,)
What did that do? First, it created a queue called “po_box” that is durable (will be re-created on reboot) and will not be automatically deleted when the last consumer detaches from it (auto_delete=False). It’s important to set auto_delete to false when making a queue (or exchange) durable, otherwise the queue itself will disappear when the last consumer detaches (regardless of the durable flag). Setting both durable and auto_delete to true, would make a queue that would be recreated only if RabbitMQ died unexpectedly with consumers still attached.
(You may have noticed there’s another flag specified called “exclusive”. If set to true, only the consumer that creates the queue will be allowed to attach to it. It’s a queue that is private to the creating consumer.)
There’s also the exchange declaration for the “sorting_room” exchange. auto_delete and durable mean the same things as they do in a queue declaration. However, .exchange_declare() introduces an argument called type that defines what type of exchange you’re making (as described earlier): fanout, direct or topic.
At this point, you’ve got a queue to receive messages and an exchange to publish them to initially…but we need a binding to link the two together:
chan.queue_bind(queue="po_box", exchange="sorting_room", routing_key="jason")
The binding is pretty straight forward. Any messages arriving at the “sorting_room” exchange with the routing key “jason” gets routed to the “po_box” queue.
Now, there’s two methods of getting messages out of the queue. The first is to call chan.basic_get() to pull the next message off the queue (if there are no messages waiting on the queue, chan.basic_get() will return a None object…thereby blowing up the print msg.body code below if not trapped) :
msg = chan.basic_get("po_box") print msg.body chan.basic_ack(msg.delivery_tag)
But what if you want your application to be notified as soon as a message is available for it? To do that, instead of chan.basic_get(), you need to register a callback for new messages using chan.basic_consume():
def recv_callback(msg): print 'Received: ' + msg.body chan.basic_consume(queue='po_box', no_ack=True, callback=recv_callback, consumer_tag="testtag") while True: chan.wait() chan.basic_cancel("testtag")
chan.wait() is looped infinitely, which is what causes the channel to wait for the next message notification from the queue. chan.basic_cancel() is how you unregister your message notification callback. The argument specifies the consumer_tag you specified in the original chan.basic_consume() registration (that’s how it figures out which callback to unregister). In this case chan.basic_cancel() never gets called due to the infinite loop that precedes it…but you need to know about it, so it’s in the snippet.
The one additional thing you should pay attention to in the consumer is the no_ack argument. It’s accepted on both chan.basic_get() and chan.basic_consume() and defaults to false. When you grab a message off a queue, RabbitMQ needs you to explicitly acknowledge that you have it. If you don’t, RabbitMQ will re-assign the message to another consumer on the queue after a timeout interval (or on disconnect by the consumer that initially received it without ack’ing it). If you set the no_ack argument to true, then py-amqplib will add a “no_ack” property to your AMQP request for the next message. That will instruct the AMQP server to not expect an acknowledgement for that get/consume. However, in most cases, you probably want to send the acknowledgement yourself (e.g. you need to put the message contents in a database before you acknowledge). Acknowledgements are done by caling the chan.basic_ack() method, using the delivery_tag property of the message you’re acknowledging as the argument (see the chan.basic_get() code snippet above for an example).
That’s all she wrote for the consumer. (Download: amqp_consumer.py)
But what good is a consumer, if nobody is sending it messages? So you need a publisher. The code below will publish a simple message to the “sorting_room” exchange and mark it with the routing key “jason”:
msg = amqp.Message("Test message!") msg.properties["delivery_mode"] = 2 chan.basic_publish(msg,exchange="sorting_room",routing_key="jason")
You may notice that we set the delivery_mode element of the message’s properties to “2”. Since the queue and exchange were marked durable, this will ensure the message is sent as persistent (i.e. will survive a reboot of RabbitMQ while it is in transit to the consumer).
The only other thing we need to do (and this needs to be done on both consumer and publisher apps), is close the channel and connection:
Pretty simple, no? (Download: amqp_publisher.py)
Giving it a shot…
Now we’ve written our consumer and publisher, so let’s give it a go. (This assumes you have RabbitMQ installed and running on localhost.)
Open up the first terminal, and run python ./amqp_consumer.py to get the consumer running and to create your queues, exchanges and bindings.
Then run python ./amqp_publisher.py “AMQP rocks.” in a second terminal. If everything went well, you should see your message printed by the consumer on the first terminal.
Taking it all in
I realize this has been a really fast run through AMQP/RabbitMQ and using it from Python. Hopefully, it will fill in some of the holes of how all the concepts fit together and how they get used in a real Python program. If you find any errors in my write-up, I’d very much appreciate it if you’d please let me know (firstname.lastname@example.org). Similarly, I’d be happy to answer any questions that I can. Next up….clustering! But I’ve got to figure it out first.
NB: Special thanks to Barry Pederson and Gordon Sims for correcting my understanding of no_ack’s operation and for catching syntactically incorrect Python code I missed.
NB: My knowledge on the subject was distilled from these sources, which are excellent further reading: