Riak vs. CouchDB for storing 100,000+ coupons…

6

Posted by Jason | Posted in DigiTar, NoSQL, Software Development | Posted on 03-07-2011

Tags: , , , , , ,

We’ve been using both CouchDB and Riak for sometime now on a number of our APIs and user-facing services. When CouchDB wins out over Riak it’s usually for two reasons:

  • secondary indexes
  • multi-data center replication (would be great if Basho would open source this)

Both Riak and CouchDB excel at storing records under a primary key, but sometimes you need to know a different axis on the data. For example, all of our records are stored in JSON dictionaries and at times we want to know all the records that match a particular field in the dictionaries. That’s the situation we’re in for a new service we’ll be standing up soon. We wanted to generate coupon codes that customers could redeem for service as an alternative to providing a credit card. The coupon codes can be sold through affiliates, so one of the axes we’ll need to know in addition to the coupon code is what are all the coupon codes belonging to affiliate ID X?

One approach (without using secondary indexes) would be to try and encode both the code and the affiliate ID in the record’s key (e.g. prefix_code_affiliate_id). The main issue with that approach is that some of our access patterns don’t have access to the affiliate ID (e.g. a user signing up for service), they only know the coupon code. So we need fast lookup of the record based on the code alone. That pretty much eliminated map/reducing for the coupon code, and firmly established the code alone as the right choice for the data key. The perfect solution would be a secondary index on the “affiliate_id” of the JSON dictionary…in other words, a map of affiliate ID to the data. Normally, this is where we’d turn to CouchDB’s views and call it a day. But we’re planning on having millions of coupons in the system with thousands of parallel accesses that need to be evenly loaded across the datastore…not a scenario where CouchDB excels. Riak would be the perfect choice…except there’s no native secondary indexes.

Doing indexes in Riak

There’s a couple ways you can do your own indexes in Riak. The simplest approach is to just create an index key of the form idx_<field_name>_<field_value> and shove in a JSON list of the keys containing matching records. What you’ll run into very quickly is multiple clients trying to update that index key with new records, and overwriting each other. Since Riak keeps multiple versions in the event of a conflict, you can code your clients to auto-merge conflicted versions into one master list and re-post the index. But…that puts a lot of maintenance logic in your clients, and in the event one of those updates is deleting a key from the index, the merge process can put the deleted key back into the index.

Since we don’t want index management to have that many moving parts we came up with a different approach:

  • For every field being indexed on a particular record, create a new separate index key in a dedicated index bucket.
  • Store the indexed field’s name in the bucket name for the index key and store the indexed value and data key’s name in the name of the index key.
  • MapReduce to get a list of matching index keys for any particular question by iterating over the keys of an index’s bucket and splitting the index key name apart to analyze the value.

The format of an index key becomes (we use key prefixes to namespace different data key types in the same bucket):

Bucket Name: idx=<field_name>=<data_key_prefix>
Key Name: <data_key_name>/<field_value>
Key Value: empty

The immediate advantage of having an index key for every indexed field of a data key is the reduced chance of write conflicts when maintaining the index (you pretty much eliminate the chance a deleted index key is going to get resurrected). Asking the question “How many coupons have a redeemed count < 50?” becomes a simple MapReduce job that iterates over the idx=redeemed_cnt=coupon index bucket to find index keys where the field_value is < 50.

You might have noticed that we don’t store any data in the value of the index key… That’s on purpose, because it allows us to leverage a new feature of Riak 0.14…key filters for MapReduce jobs.

Key filters

The index system described so far would work fine on any key/value store with support for MapReduce. However, one problem is that every key in the bucket has to be analyzed by the Javascript map and reduce phases to determine if it matches the question (i.e. is this indexed value < 50). The problem is one of optimizing query performance. It takes Riak more time to run a user-supplied Javascript function to see if a key matches than it would take for Riak to analyze the index key itself.

Luckily the smart folks at Basho gave us a new tool to do just that with key filters. By encoding the indexed value in the key name we can tell Riak via a key filter to:

  1. Tokenize the index key name using “/” as the separator.
  2. Look at the second token after the split (i.e. the indexed value).
  3. Treat that token as an integer.
  4. Only give the index key to the MapReduce job if the integer value is < 50.

In fact, with key filters we actually don’t have to write our own MapReduce phases to answer this question anymore. All we have to do is construct the key filter, and tell Riak to use the “identity reduce” reduce phase that’s built in (skip the map phase entirely). What we’ll get back is a list of index keys whose indexed value is < 50. We can then split those index key names in our client to get the key names of the data keys they map to.

Rubber meets the road…

So what does the performance look like with all of this? We wrote a couple of tests using Twisted Python to benchmark loading 100,000 coupons into CouchDB and Riak and then asking both how many of those coupons had a redeemed count < 50. Here’s a legend for what the different test titles mean:

  • CouchDB 1.0.2 (cold view): The amount of time it takes the view (index) to answer the question the first time the view is queried. This number is important because CouchDB doesn’t build the view until you query it the first time. From then on it just incrementally updates the view with changed values.
  • CouchDB 1.0.2 (computed/warm view): Amount of time it takes the view to answer the question on subsequent queries after the view has been computed initially.
  • Riak 0.14.1 (Raw MapReduce) 1-node: No indexes used as described above. A brute force MapReduce job that iterates over the data keys and examines the reduced_count field in the JSON. 1-node Riak “cluster”.
  • Riak 0.14.1 (Indexed w/ Raw MapReduce) 1-node: Using index keys as described above, but using Javascript MapReduce phases on the index bucket to produce the matching key list…no key filters used. 1-node Riak “cluster”.
  • Riak 0.14.1 (Indexed w/ Key Filter MR) 1-node: Using index keys as described, but with key filters to reduce the input and a simple Javascript map phase to reformat the output (this would be a JS reduce phase except Riak has a bug right now with MapReduce jobs that have only a JS reduce phase). 1 -node Riak “cluster.
  • Riak 0.14.1 (Indexed w/ Raw MapReduce) 4-node: Same as “Indexed w/ RawMapReduce” above except done on a 4-node Riak cluster.
  • Riak 0.14.1 (Indexed w/ Key Filter MR) 4-node: Same as “Indexed w/ Key Filter MR” above except done on a 4-node Riak cluster.

Before I show the numbers, you’d probably like to know what the test setup looked like. Each node was a SoftLayer Cloudlayer server with these specs (if you haven’t tried them, SoftLayer is really a phenomenal provider):

  • 1x 2.0GHz Xeon CPU
  • 2GB RAM
  • 25GB HDD
  • Gigabit NICs
  • Ubuntu 10.04.1 64-bit
  • Dallas 05 Datacenter
  • CouchDB 1.0.2 was built from source.
  • Riak 0.14.1 was installed from the .deb available from Basho.
  • Before each type of test the servers were rebooted to clear the filesystem cache.
  • Tests were run from a 5th node not running Riak or CouchDB. For the 4-node tests, the 5th node ran HAProxy 1.4.8 to round-robin client connections amongst the Riak nodes.

So without further ado…the numbers:

Generate Keys (secs) Show Keys w/ redeemed_count < 50 (secs)
CouchDB 1.0.2 (cold view) 495 74
CouchDB 1.0.2 (computed/warm view) 495 11
Riak 0.14.1 (Raw MapReduce) 1-node 358 82
Riak 0.14.1 (Indexed w/ Raw MapReduce) 1-node 692 65
Riak 0.14.1 (Indexed w/ Key Filter MR) 1-node 692 56
Riak 0.14.1 (Indexed w/ Raw MapReduce) 4-node 1025 40
Riak 0.14.1 (Indexed w/ Key Filter MR) 4-node 1025 34

 

Or if you’re more visual like me:

 

(If you’d like to run the tests yourself, we’ve put the code up: riak_perf_test.py, couchdb_perf_test.py).

 

Analyzing the outcome

One thing that’s very clear is how fast computed views can be in CouchDB (11 seconds flat is nothing to shake a stick at). However, what did we learn from the Riak numbers?

  • Indexed insertion is 91% slower than storing just the key data.
  • MapReduce with indexes is 20% faster than MR on the data keys alone.
  • MapReduce with indexes and key filters is 32% faster than MR on the data keys alone.
  • Adding Riak nodes substantially reduces query time. Adding 3 more nodes speeds up queries by 40%.

It’s not surprising that insertion time doubles with indexes, since you’ve just doubled the number of keys you’re inserting. However, the gains you get can be dramatic. Once the bug with Javascript reduce phases is ironed out, I expect the performance on this test to go even higher (since it will run only one reduce phase instead of the map code multiple times).

What’s a little puzzling is why insertion of keys on a 4-node cluster is 40% slower than a 1-node cluster? I had expected insertion to be the same speed or 25% faster. The reason I’d expected this is because Riak was set to use a write n-value of 3…meaning for every key inserted 3 copies were stored throughout the cluster. Accounting for coordination latency on a 3-node cluster, I’d expect almost the same insertion speed as a 1-node Riak instance. With an extra node in the cluster, I’d expect slightly faster performance since only 3 nodes out of the cluster are engaged in any given write.

Regardless, the query performance proves Riak is a good choice for our use case. While 34 seconds to answer the question is slower than the 11 seconds it took CouchDB, it’s clear that as we scale the cluster our query performance will scale with the size of our dataset. Providing we can find a solution for the 50% slower insertion speed, Riak will definitely be our datastore of choice for this project. Once again, Riak is incredibly impressive at how well it handles large data sets and how adept it’s simple toolset is at answering complex questions.

 

Where we go from here…and nagging questions

The indexed key approach has worked so well for us that we’re currently writing a library to layer on top of txRiak to transparently handle writing/updating indexes. I’ll put up another blog entry on that once it’s posted to Github (there’s a few issues we have to handle like escaping the separators, and we intend to use Riak’s Links to provide an alternate connection between index and data keys). Even more exciting is the news that Basho is currently working on adding native secondary index support to Riak. No news on how that will take shape, but I expect it will blow the performance of our homegrown indexes out of the water. I think built-in indexes are a cleaner more maintainable approach. Maintaining indexes in the client puts a lot of pressure on the clients not to screw up the index accidentally…especially if you’ve got code written in multiple languages accessing that data.

The only real nagging question right now was an issue we saw when we attempted to add a 5th node to the Riak cluster. I had originally intended to do an analysis of how much query performance improved with each node added. However, when the 5th node was added to the cluster it took close to 1 hour for Riak to fully redistribute the keys…and even then 3 of the 5 nodes showed that they were still waiting to transfer one partition each to another node. When we attempted to run the MapReduce index query against the newly expanded cluster, we received MapReduce errors that Riak couldn’t find random index keys as it attempted to pass these “missing” keys into the map phase. I suspect the culprit maybe some “failed node” testing we did before adding the 5th node.

Overall, the takeaway for me is that Riak is a phenomenally flexible data store, and that just because it’s missing a feature doesn’t mean you should shun it for that workload. More often than not, a little thought and chaining together of Riak’s other very powerful tools will give you the same result. To me, Riak vs CouchDB (or vs. SQL) is like a RISC chip vs. a CISC chip. It may not have one complex instruction you need, but you can build that instruction out of much simpler ones that you can run and scale at twice the speed.

Cloud-scale DBs in the cloud…just a quickie

4

Posted by Jason | Posted in DigiTar, Software Development | Posted on 03-17-2010

Tags: , , ,

Just a quick set of thoughts…do cloud-scale DBs save money because they’re based on commodity/cheap servers? Tonight I did some rough back-of-the-pad calculations, and was kind of surprised…

Let’s assume we’ve got an 11TB working set of data, how could we store this redundantly?

(cloud servers in these examples are dedicated servers at a cloud provider)

Option 1: Two beefy storage servers running MySQL in a master/slave config

  • CPU: 4-cores of your favorite CPU vendor
  • RAM: 16GB
  • HDDs: 48x 250 GB SATA
    • Lose 2 for mirrored boot, and 2 for RAID-6 parity
  • Cost:
    • Buy Your Own Hardware (Sun X4500): $50,000 for the pair
    • Host It in the Cloud (SoftLayer): $4,700/month for the pair

Option 2: 28 commodity servers (2 replica copies for each piece of data) running HBase or Cassandra

  • CPU: 4-cores of your favorite CPU vendor
  • RAM: 4GB
  • HDDs: 4x 250 GB SATA
    • Lose 1 for RAID-5 parity (we’ll mingle boot data and data data on the same drive pool)
  • Cost:
    • Buy Your Own Hardware (Dell R410): $43,300 for set of 28
    • Host It in the Cloud (SoftLayer): $12,000/month for the set of 28

Option 3: 42 commodity servers (3 replica copies for each piece of data) running HBase or Cassandra

  • CPU: 4-cores of your favorite CPU vendor
  • RAM: 4GB
  • HDDs: 4x 250 GB SATA
    • Lose 1 for RAID-5 parity (we’ll mingle boot data and data data on the same drive pool)
  • Cost:
    • Buy Your Own Hardware (Dell R410): $64,900 for set of 42
    • Host It in the Cloud (SoftLayer): $18,000/month for the set of 42

Now the issue here that surprised me isn’t the raw cost differential between stuffing your own hardware in your colo or using a cloud provider. And the other thing is, I’m not picking on SoftLayer…Rackspace and Voxel all work out to the same cost scaling as SoftLayer (and in the case of the other two vendors worse).

What surprised me:

  • When you buy your own hardware, “cloud-scale” databases do cost you less (~$7K) than buying beefy storage servers and running MySQL for the same data set.
  • However, when you are at a cloud provider, using cloud-scale databases on “cheap” hardware costs you 3x more than using beefy storage cloud servers running MySQL.

As I said, I’m not comparing the cost of running Option 1 on your own hardware vs. Option 1 at a cloud provider. Yes those costs are more at the cloud provider, but it’s to be expected (they’re bundling in bandwidth, colo, power, and most importantly people to manage the hardware and network).

What’s stunning is that beefy servers at a cloud provider are much more cost efficient. Beefy cloud servers cost you roughly 1/15 of the cost of the hardware every month. Whereas, “cheap” commodity cloud servers cost you roughly 1/3 of the cost of the hardware every month. Much higher mark up on the cheaper volume servers.

Please comment and correct me if I’m wrong in my analysis…I would actually like to be.

Rabbits and warrens.

80

Posted by Jason | Posted in Software Development, Technology | Posted on 01-13-2009

Tags: , ,

20089825.JPG

 

If you like Rabbit and Warrens checkout RabbitMQ in Action in the sidebar.

 

The goal was simple enough: decouple a particular type of analysis out-of-band from mainstream e-mail processing. We started down the MySQL road…put the things to be digested into a table…consume them in another daemon…bada bing bada boom. But pretty soon, complex ugliness crept into the design phase… You want to have multiple daemons servicing the queue?…no problem we’ll just hard code node numbers…what? you want dynamic load re-assignment when daemons join and die?

You get the idea…what was supposed to be simple (decouple something) was spinning its own Gordian knot. It seemed like a good time to see if every problem was looking like a nail (table), because all we had were hammers (MySQL).

A short search later, and we entered the world of message queueing. No, no…we know obviously what a message queue is. Heck, we do e-mail for a living. We’ve implemented all sorts of specialized, high-speed, in-memory queues for e-mail processing. What we weren’t aware of was the family of off-the-shelf, generalized, message queueing (MQ) servers…a language-agnostic, no-assembly required way to wire routing between applications over a network. A message queue we didn’t have to write ourselves? Hold your tongue.

Open up your queue…

Cutting to the chase, over the last 4 years there have been no shortage of open-source message queueing servers written. Most of them are one-offs by folks like LiveJournal to scratch a particular itch. Yeah, they don’t really care what kind of messages they carry, but their design parameters are usually creator-specific (and message persistence after a crash usually isn’t one of them). However, there are three in-particular, that are designed to be highly flexible message queues for their own sake:

Apache ActiveMQ gets the most press, but it appears to have some issues not losing messages. Next.

ZeroMQ and RabbitMQ both support an open messaging protocol called AMQP. The advantage to AMQP is that it’s designed to be a highly-robust and open alternative to the two commercial message queues out there (IBM and Tibco). Muy bueno. However, ZeroMQ doesn’t support message persistence across crashes reboots. No muy bueno. That leaves us with RabbitMQ. (That being said if you don’t need persistence ZeroMQ is pretty darn interesting…incredibly low latency and flexible topologies).

That leaves us with the carrot muncher…

RabbitMQ pretty much sold me the minute I read “written in Erlang”. Erlang is a highly parallel programming language developed over at Ericsson for running telco switches…yeah the kind with six bazillion 9s of uptime. In Erlang, its supposedly trivial to spin off processes and then communicate between them using message passing. Seems like the ideal underpinning for a message queue no?

Also, RabbitMQ supports persistence. Yes Virginia, if your RabbitMQ dies, your messages don’t have to die an unwitting death…they can be reborn in your queues on reboot. Oh…and as is always desired @ DigiTar, it plays nicely with python. All that being said, RabbitMQs documentation is well…horrible. Lemme rephrase, if you already understand AMQP, the docs are fine. But how many folks know AMQP? It’d be like MySQL docs assuming you knew some form of SQL…er…nevermind.

So, without further ado…here is a reduction of a weeks’ worth of reading up on AMQP and how it works in RabbitMQ…and how to play with it in Python:

Playing telephone

There are four building blocks you really care about in AMQP: virtual hosts, exchanges, queues and bindings. A virtual host holds a bundle of exchanges, queues and bindings. Why would you want multiple virtual hosts? Easy. A username in RabbitMQ grants you access to a virtual host…in its entirety. So the only way to keep group A from accessing group B’s exchanges/queues/bindings/etc. is to create a virtual host for A and one for B. Every RabbitMQ server has a default virtual host named “/”. If that’s all you need, you’re ready to roll.

Exchanges, Queues and bindings…oh my!

Here’s where my railcar went off the tracks initially. How do all the parts thread together?

Queues are where your “messages” end up. They’re message buckets…and your messages sit there until a client (a.k.a. consumer) connects to the queue and siphons it off. However, you can configure a queue so that if there isn’t a consumer ready to accept the message when it hits the queue, the message goes poof. But we digress…

The important thing to remember is that queues are created programmatically by your consumers (not via a configuration file or command line program). That’s OK, because if a consumer app tries to “create” a queue that already exists, RabbitMQ pats it on the head, smiles gently and NOOPs the request. So you can keep your MQ configuration in-line with your app code…what a concept.

OK, so you’ve created and attached to your queue, and your consumer app is drumming its fingers waiting for a message…and drumming…and drumming…but alas no message. What happened? Well you gotta pump a message in first! But to do that you’ve got to have an exchange…

Exchanges are routers with routing tables. That’s it. End stop. Every message has what’s known as a “routing key”, which is simply a string. The exchange has a list of bindings (routes) that say, for example, messages with routing key “X” go to queue “timbuktu”. But we get slightly ahead of ourselves.

Your consumer application should create your exchanges (plural). Wait? You mean you can have more than one exchange? Yes, you can, but why? Easy. Each exchange operates in its own userland process, so adding exchanges, adds processes allowing you to scale message routing capacity with the number of cores in your server. As an example, on an 8-core server you could create 5 exchanges to maximize your utilization, leaving 3 cores open for handling the queues, etc.. Similarly, in a RabbitMQ cluster, you can use the same principle to spread exchanges across the cluster members to add even more throughput.

OK, so you’ve created an exchange…but it doesn’t know what queues the messages go in. You need “routing rules” (bindings). A binding essentially says things like this: put messages that show up in exchange “desert” and have routing key “ali-baba” into the queue “hideout”. In other words, a binding is a routing rule that links an exchange to a queue based on a routing key. It is possible for two binding rules to use the same routing key. For example, maybe messages with the routing key “audit” need to go both to the “log-forever” queue and the “alert-the-big-dude” queue. To accomplish this, just create two binding rules (each one linking the exchange to one of the queues) that both trigger on routing key “audit”. In this case, the exchange duplicates the message and sends it to both queues. Exchanges are just routing tables containing bindings.

Now for the curveball: there are multiple types of exchanges. They all do routing, but they accept different styles of binding “rules”. Why not just create one type of exchange for all style of rules? Because each rule style has a different CPU cost for analyzing if a message matches the rule. For example, a “topic” exchange tries to match a message’s routing key against a pattern like “dogs.*”. Matching that wildcard on the end takes more CPU than simply seeing if the routing key is “dogs” or not (e.g. a “direct” exchange). If you don’t need the extra flexibility of a “topic” exchange, you can get more messages/sec routed if you choose the “direct” exchange type. So what are the types and how do they route?

Fanout Exchange – No routing keys involved. You simply bind a queue to the exchange. Any message that is sent to the exchange is sent to all queues bound to that exchange. Think of it like a subnet broadcast. Any host on the subnet gets a copy of the packet. Fanout exchanges route messages the fastest.

Direct Exchange – Routing keys are involved. A queue binds to the exchange to request messages that match a particular routing key exactly. This is a straight match. If a queue binds to the exchange requesting messages with routing key “dog”, only messages labelled “dog” get sent to that queue (not “dog.puppy”, not “dog.guard“…only “dog”).

Topic Exchange – Matches routing keys against a pattern. Instead of binding with a particular routing key, the queue binds with a pattern string. The symbol # matches one or more words, and the symbol * matches any single word (no more, no less). So “audit.#” would match “audit.irs.corporate”, but “audit.*” would only match “audit.irs”. Our friends at RedHat have put together a great image to express how topic exchanges work:

AMQP Stack Diagram

Source: RabbitMQ in Action (by me and a very cool Uruguayan dude…Mr. Alvaro)

Persistent little bugger…

You spend all that time creating your queues, exchanges and bindings, and then BANG!…the server fries faster than the griddle at McDonald’s. All your queues, exchanges and bindings are there right? Oh geez…what about the messages in the queues you hadn’t serviced yet?

Relax, providing you created everything with the default arguments, it’s all gone…poof…whoosh…nada…nil. That’s right, RabbitMQ rebooted as empty as a baby’s noggin. You gotta redo everything kemosabe. How do you keep this from happening in the future?

On your queues and your exchanges there’s a creation-time flag called “durable”. There’s only one thing durable means in AMQP-land…the queue or exchange marked durable will be re-created automatically on reboot. It does not mean the messages in the queues will survive the reboot. They won’t. So how do we make not only our config but messages persist through a reboot?

Well the first question is, do you really want your messages to persist? For a message to last through a reboot, it has to be written to disk, and even a simple checkpoint to disk takes time. If you value message routing speed more than the contents of the message, don’t make your messages persistent. That being said, for our particular needs @ DigiTar, persistence is important.

When you publish your message to an exchange, there’s a flag called “Delivery Mode”. Depending on the AMQP library you’re using there will be different ways of setting it (we’ll cover the Python library later). But the long and the short of it is you want the “Delivery Mode” set to the value 2, which means “persistent”. “Delivery Mode” usually (depending on your AMQP library) defaults to a value of 1, which means “non-persistent”. So the steps for persistent messaging are:

  1. Mark the exchange “durable”.
  2. Mark the queue “durable”.
  3. Set the message’s “delivery mode” to a value of 2

That’s it. Not really rocket science, but enough moving parts to make a mistake and send little Sally’s dental records into cyber-Nirvana.

There may be one thing nagging you though…what about the binding? We didn’t mark the binding “durable” when we created it. It’s alright. If you bind a durable queue to a durable exchange, RabbitMQ will automatically preserve the binding. Similarly, if you delete any exchange/queue (durable or not) any bindings that depend on it get deleted automatically.

Two things to be aware of:

  • RabbitMQ will not allow you to bind a non-durable exchange to a durable queue, or vice-versa. Both the exchange and the queue must be durable for the binding operation to succeed.
  • You cannot change the creation flags on a queue or exchange after you’ve created it. For example, if you create a queue as “non-durable”, and want to change it to “durable”, the only way to do this is to destroy the queue and re-create it. It’s a good reason to double check your declarations.

Food for snakes

A real empty area for AMQP usage is using it in Python programs. For other languages there are plenty of references:

But for little old Python, you need to dig it out yourself. So other folks don’t have to wander in the wilderness like I did, here’s a little primer on using Python to do the AMQP-tasks we’ve talked about:

First, you’ll need a Python AMQP library…and there are two:

  • py-amqplib – General AMQP library
  • txAMQP – An AMQP library that uses the Twisted framework, thereby allowing asynchronous I/O.

Depending on your needs, py-amqplib or txAMQP may be more to your liking. Being Twisted-based, txAMQP holds the promise of building super performing AMQP consumers that use async I/O. But Twisted programming is a topic all its own…so we’re going to use py-amqplib for clarity’s sake. UPDATE: Please check the comments for example code showing use of txAMQP from Esteve Fernandez.

AMQP supports pipelining multiple MQ communication channels over one TCP connection, where each channel is a communication stream used by your program. Every AMQP program has at least one connection and one channel:

from amqplib import client_0_8 as amqp
conn = amqp.Connection(host="localhost:5672 ", userid="guest",
password="guest", virtual_host="/", insist=False)
chan = conn.channel()

Each channel is assigned an integer channel number automatically by the .channel() method of the Connection() class. Alternately, you can specify the channel number yourself by calling .channel(x) , where x is the channel number you want. More often than not, its a good idea to just let the .channel() method auto-assign the channel number to avoid collisions.

Now we’ve got a connection and channel to talk over. At this point, our code is going to diverge into two applications that use that same bit we’ve created so far: a consumer and the publisher. Let’s create the consumer app by creating a queue named “po_box” and an exchange named “sorting_room”:

chan.queue_declare(queue="po_box", durable=True,
exclusive=False, auto_delete=False)
chan.exchange_declare(exchange="sorting_room", type="direct", durable=True,
auto_delete=False,)

What did that do? First, it created a queue called “po_box” that is durable (will be re-created on reboot) and will not be automatically deleted when the last consumer detaches from it (auto_delete=False). It’s important to set auto_delete to false when making a queue (or exchange) durable, otherwise the queue itself will disappear when the last consumer detaches (regardless of the durable flag). Setting both durable and auto_delete to true, would make a queue that would be recreated only if RabbitMQ died unexpectedly with consumers still attached.

(You may have noticed there’s another flag specified called “exclusive”. If set to true, only the consumer that creates the queue will be allowed to attach to it. It’s a queue that is private to the creating consumer.)

There’s also the exchange declaration for the “sorting_room” exchange. auto_delete and durable mean the same things as they do in a queue declaration. However, .exchange_declare() introduces an argument called type that defines what type of exchange you’re making (as described earlier): fanout, direct or topic.

At this point, you’ve got a queue to receive messages and an exchange to publish them to initially…but we need a binding to link the two together:

chan.queue_bind(queue="po_box", exchange="sorting_room",
routing_key="jason")

The binding is pretty straight forward. Any messages arriving at the “sorting_room” exchange with the routing key “jason” gets routed to the “po_box” queue.

Now, there’s two methods of getting messages out of the queue. The first is to call chan.basic_get() to pull the next message off the queue (if there are no messages waiting on the queue, chan.basic_get() will return a None object…thereby blowing up the print msg.body code below if not trapped) :

msg = chan.basic_get("po_box")
print msg.body
chan.basic_ack(msg.delivery_tag)

But what if you want your application to be notified as soon as a message is available for it? To do that, instead of chan.basic_get(), you need to register a callback for new messages using chan.basic_consume():

def recv_callback(msg):
print 'Received: ' + msg.body
chan.basic_consume(queue='po_box', no_ack=True,
callback=recv_callback, consumer_tag="testtag")
while True:
chan.wait()
chan.basic_cancel("testtag")

chan.wait() is looped infinitely, which is what causes the channel to wait for the next message notification from the queue. chan.basic_cancel() is how you unregister your message notification callback. The argument specifies the consumer_tag you specified in the original chan.basic_consume() registration (that’s how it figures out which callback to unregister). In this case chan.basic_cancel() never gets called due to the infinite loop that precedes it…but you need to know about it, so it’s in the snippet.

The one additional thing you should pay attention to in the consumer is the no_ack argument. It’s accepted on both chan.basic_get() and chan.basic_consume() and defaults to false. When you grab a message off a queue, RabbitMQ needs you to explicitly acknowledge that you have it. If you don’t, RabbitMQ will re-assign the message to another consumer on the queue after a timeout interval (or on disconnect by the consumer that initially received it without ack’ing it). If you set the no_ack argument to true, then py-amqplib will add a “no_ack” property to your AMQP request for the next message. That will instruct the AMQP server to not expect an acknowledgement for that get/consume. However, in most cases, you probably want to send the acknowledgement yourself (e.g. you need to put the message contents in a database before you acknowledge). Acknowledgements are done by caling the chan.basic_ack() method, using the delivery_tag property of the message you’re acknowledging as the argument (see the chan.basic_get() code snippet above for an example).

That’s all she wrote for the consumer. (Download: amqp_consumer.py)

But what good is a consumer, if nobody is sending it messages? So you need a publisher. The code below will publish a simple message to the “sorting_room” exchange and mark it with the routing key “jason”:

msg = amqp.Message("Test message!")
msg.properties["delivery_mode"] = 2
chan.basic_publish(msg,exchange="sorting_room",routing_key="jason")

You may notice that we set the delivery_mode element of the message’s properties to “2”. Since the queue and exchange were marked durable, this will ensure the message is sent as persistent (i.e. will survive a reboot of RabbitMQ while it is in transit to the consumer).

The only other thing we need to do (and this needs to be done on both consumer and publisher apps), is close the channel and connection:

chan.close()
conn.close()

Pretty simple, no? (Download: amqp_publisher.py)

Giving it a shot…

Now we’ve written our consumer and publisher, so let’s give it a go. (This assumes you have RabbitMQ installed and running on localhost.)

Open up the first terminal, and run python ./amqp_consumer.py to get the consumer running and to create your queues, exchanges and bindings.

Then run python ./amqp_publisher.py “AMQP rocks.” in a second terminal. If everything went well, you should see your message printed by the consumer on the first terminal.

Taking it all in

I realize this has been a really fast run through AMQP/RabbitMQ and using it from Python. Hopefully, it will fill in some of the holes of how all the concepts fit together and how they get used in a real Python program. If you find any errors in my write-up, I’d very much appreciate it if you’d please let me know (williamsjj@digitar.com). Similarly, I’d be happy to answer any questions that I can. Next up….clustering! But I’ve got to figure it out first. :-)

NB: Special thanks to Barry Pederson and Gordon Sims for correcting my understanding of no_ack’s operation and for catching syntactically incorrect Python code I missed.

NB: My knowledge on the subject was distilled from these sources, which are excellent further reading:

Carpenters vs. general contractors

2

Posted by | Posted in Software Development | Posted on 09-07-2005

The past few years at DigiTar have been a heck of a learning curve for me as a developer…er…manager of developers. You graduate college with a CS degree and at the world you go…happy as a pig in stuff. Think you know everything you need. What you don't realize is that you don't know anything. Sure you know data structures, and you can program in 5 different languages. You might even have built some pretty complex programs (or so it seemed at the time). But you don't know what you're talking about. Trust me.

You've been trained as a carpenter essentially. A really basic carpenter. Sure you know what a hammer looks like and you've framed up a door here and a wall there. The problems with what you don't know come to the surface when you go to build your first house. Or better yet, your first room. Up go the four walls, a door and a window…but the doors don't quite hang right and darn…you forgot to put in conduit for the electrical.

Nobody at “carpenter school” ever took you aside and showed you how to think about building a room. Best practices for hanging a door…laying out a window…nope…you haven't got any of that. The root issue of all the toe stubbing you're doing at your first job is that nobody ever taught you to build a room on paper first. So what do you do? You start building the room with wood and nails, and you quickly find that mistakes made in that medium are really expensive. It takes a lot of work to recover from the crooked doors. As for the bad framing…well you just gotta rip it out. The cruelest irony is that you don't know any better, so you probably spend a lot of years before somebody shows you a better way.

So what am I getting at with all the blathering? Computer Science programs don't teach their students how to build software (software engineering programs are a different story). They teach them how to be glorified hobbyist programmers, and then throw them into the buzz saw that is experience. For me, it was amazing how much more effective we became in a 6 month period by learning four best practices that school never taught me:

  • Use version control.
  • Track your bugs formally.
  • Fix bugs before writing new code (you'll never hit a schedule otherwise).
  • Don't write a lick of code until you've written a functional spec.

There are a few other important lessons (i.e. unit tests) but those are the four that made the biggest impact. To paraphrase Steve McConnell, CS programs turn out scientists not engineers. Scientists build stuff that works in the lab, and is too darn frail to live in the real world. The software development world expects CS students to be engineers, but they're not. They don't receive any of the “thinking” training or apprenticing that would make them engineers. I personally owe a debt of gratitude to Joel Spolsky and Steve McConnell for writing books that filled in the gaping sink holes in my education.

Another thing to keep in mind with this whole carpenter metaphor is that its actually a lot worse than that! As a software developer, you can't just be a carpenter. You're also the general contractor. Everything from the framing to the electrical and the plumbing….you're up slugger!

It seems to me that the CS program I went through (no I won't tell you where) needs at least two new courses. The first needs to be taught right after they teach you the first programming language, and simultaneous with your data structures course. Let's call it “Coding Habits” for total originality. Coding Habits would teach you how to lay out your program's functions, classes, etc. in English pseudocode before you do real code, and how to translate your “English” into real code. It would basically need to cover a slew of “best practices”. A course like that would have saved me a lot of hours in the rest of my CS courses…and its absolutely essential for doing professional software development. As for a course text, I'd suggest Code Complete. Take a look atTom's Developer's Nightmare blog for more on this topic. Tom is one of our very own, and has some very definite opinions on the subject.

The second course you need is the one I'm more qualified to pontificate on…let's use the moniker “Software Projects”. Software Projects would work best as a second-semester Junior-level course. It's basic premise is how to go from an idea to a full-blown project. Topics would include writing, crystalizing your team's ideas, dealing with stakeholders (such as users) to winnow features, and the whole functional spec process. People and time management would need to feature prominently. Personally, I'd probably teach off the Joel Test. That's just me. There's certain things that only experience can teach, but let's give CS students a fighting chance.

All universities hope their graduates reach some sort of managment responsibility. They need tools to do that. Heck, if a student's first interviewer is worth their salt, he'll ask a few interesting questions regarding how they think about software development. The answers will be drastically improved after a course like Software Projects. At minimum the answers will be more interesting than “Uh…object-oriented programming rules!”

Any CS program that does a better job of training “engineers” instead of scientists does their students a great favor. They'll get better jobs, and the school will get more recognition…which feeds back into getting better jobs. I'll make this offer to any school amenable: DigiTar will teach an abridged version of both these courses over the spring break of your choice. You pick the students for each course. We'll teach them. In fact, we'll offer the top students jobs. Personally, I'd rather see them learn what they need to know while they're still in school, and before we get 'em!