Replayable Pub/Sub Queues with Cassandra and ZooKeeper

When first playing around with Cassandra and discovering how fast it is at giving you columns for a row, it appears to be an excellent choice for implementing a distributed queue. However, in reality queues tend to bring out the worst of Cassandra’s thorniest areas: tombstones and consistency level, and are thus seen as an antipattern.

Row-Based vs Column-Based

To implement a queue in Cassandra, you must choose from either row-based or column-based.  In row-based, the item to be processed is stored as a row key. In column-based, the item to be processed is stored as a column in a specific row.

With the item to be processed stored as a row key, consistency becomes a bottleneck. Since the items to process are unknown, getting range slices across row keys is the only way to fetch data; this operation ends up querying every node when all keys are needed, as the location and number of keys are unknown ahead of time. Since not all nodes are available at any given time, this is less than ideal.

Column-based queues in Cassandra are much more streamlined because, on the read path, fetching a set of columns is what Cassandra excels at. There is still an issue with marking items as processed for the dequeue to properly happen, however; a column must be deleted. Cassandra handles deletions by marking columns with tombstones that get cleaned up later, but this requires running repairs/compactions and being tolerant of potentially reprocessing data.

Given these considerations, Cassandra has been viewed with skepticism as a platform to run a queue. What if it is paired with another system?

ZooKeeper

ZooKeeper is a distributed coordination service whose interface is like a file system. Paths in ZooKeeper, called znodes, are both a directory and a file in one, i.e. they contain both data and links to child znodes. ZooKeeper is built to have high throughput and low latency and guarantees strictly ordered access to znodes. Strictly ordered access allows for building of distributed coordination primitives, like locks; additionally, ZooKeeper allows consumers to create a watch on any znode and be notified when the data stored on that znode changes. Both features will be leveraged to create a specific kind of queue with Cassandra, the pub/sub queue.

Pub/Sub

In a pub/sub queue, multiple subscribers may process data from a single publisher. Since more than one entity may need to fetch the items from the queue, it is hard to determine at what point all subscribers are done and the queue can be cleared. This brings up an interesting point: could one build a pub/sub queue that is never cleared?

This could be implemented as a column-based queue that does not perform deletes; somehow, new items that need processing are put into their own rows for easy retrieval by queue consumers.

To support this, the rows in the queue need to be:

  • Unique – if two rows somehow have the same key, then the older row would need to be deleted before inserting the new one. Due to how Cassandra works, this could cause issues with some consumers reading the older data instead of the new.
  • Immutable – the contents of a row do not change. This allows for the queue to be replayed, which is great if data inconsistencies are encountered or if a new consumer is added that needs to process data from the beginning of time.
  • Known – Cassandra, as mentioned previously, does not do well with range slice queries. The row key should somehow be known by consumers before grabbing items to process.
  • Linear – if a consumer has last processed update X, and the consumer checks in to see that the latest update has row key X + 2d (with known interval d), then the consumer should expect there to be another row key at X + d.

Batches

To ensure all these conditions are met, a batch ID is assigned to every enqueueing operation. ZooKeeper is designated as the source of truth – the latest batch ID is stored there. The batch ID is a tuple of queue name and an integer. For example, for a queue named orders, the znode at /queues/orders would contain the value orders-5 (if this were the fifth batch).

Publishing

When a new set of items are enqueued, the publisher acquires a lock (in ZooKeeper) for that queue, grabs the latest advertised batch ID, and adds one to the integer value to create a new batch ID. This is done to ensure uniqueness and linearity for batch IDs. It then inserts a row into Cassandra with the row key having the same name as that batch ID. For example, if the orders 11, 13, and 17 need to be enqueued, and the previous batch ID was orders-5, then the following dictionary will get stored in Cassandra in row key orders-6: {11: '', 13: '', 17: ''}.

For a given batch ID, the columns (items to be processed) never change, making the row immutable. Once the write is successful, the latest batch ID is published to ZooKeeper. From the previous example, orders-6 will be stored in /queues/orders.

Subscribing

Changing the data in ZooKeeper waking up downstream consumers who are watching for updates to that znode. If a subscriber’s latest processed batch is orders-4, it will see that orders-6 is the latest batch ID, and can safely assume that orders-5 is also available, so items from both unprocessed batches can be worked on.

To make sure consumers process the data they need reliably, each consumer is required to independently track how far along it is in the queue. This is done through ZooKeeper – each consumer has a znode under the queue’s znode which stores that consumer’s latest processed batch ID. Considering the batch IDs are linear, this is an easy way to monitor how far behind each consumer is. For example, if OrderProcessor is consuming data from the orders queue, it would have a znode /queues/orders/processors/OrderProcessor which would contain the value orders-4 (if it were two loads behind). The OrderProcessor would update the value of that znode to orders-5 and orders-6 once those are successfully processed.

Scaling

The remaining issue is scalability. Considering this queue never has items deleted, there are conceivably data size issues. However, Cassandra distributes rows across nodes for a given column family; if more rows are needed for the queue and the cluster is too small to hold the data, simply adding more nodes to the cluster is all the scaling that is required.