Replayable Pub/Sub Queues with Cassandra and ZooKeeper

When first playing around with Cassandra and discovering how fast it is at giving you columns for a row, it appears to be an excellent choice for implementing a distributed queue. However, in reality queues tend to bring out the worst of Cassandra’s thorniest areas: tombstones and consistency level, and are thus seen as an antipattern.

Row-Based vs Column-Based

To implement a queue in Cassandra, you must choose from either row-based or column-based.  In row-based, the item to be processed is stored as a row key. In column-based, the item to be processed is stored as a column in a specific row.

With the item to be processed stored as a row key, consistency becomes a bottleneck. Since the items to process are unknown, getting range slices across row keys is the only way to fetch data; this operation ends up querying every node when all keys are needed, as the location and number of keys are unknown ahead of time. Since not all nodes are available at any given time, this is less than ideal.

Continue reading

Tracking Method Calls During Testing

Our automated testing is broken into two broad areas: unit tests and integration tests. Unit tests are where we test the domain logic for our models, with few dependencies. The tests may hit a MySQL database to Django ORM related logic, but the test runner can’t access external services or things like Cassandra. (We’re using Django and the Django test runner, which creates test databases during setup. You may object that hitting the database means these aren’t “unit” tests. I agree. Nonetheless, we call them unit tests.) Our integration tests, on the other hand, are run against full builds of the site, and have access to all of the services that our site normally does. Continue reading

Whirlwind Week: OSCON and PyOhio

I spent last week attending two very different conferences. In both cases I was honored to have the opportunity to present the work I’ve been doing at Eventbrite. It was exciting to me that even though the conferences were different in just about every way — size, venue, focus, geography, cost — they were filled with people working on interesting technologies and ideas.

Continue reading

Optimizing a table with composite primary keys

To scale our data storage, Eventbrite’s strategy has been a combination of: move data to NoSQL solutions, aggressively move queries to slave databases, buy better database hardware, maintain different indexes on database slaves that receive different queries, and finally: design the most optimal tables possible for large and highly-utilized data-sets.

This is a story of optimizing a design for a single MySQL table to store multiple email-addresses per-user (needed by some forward-looking infrastructure we are building). We’ll discuss the Django implementation in a future post.

Multiple Email Address Table

To support multiple email-addresses per-user in MySQL, we need a one-to-many table. A typical access pattern is lookup by email-address, and a join to the users table.

Here is the basic design, followed by our improvements.

The Naïve Implementation

The basic design’s one-to-many table would have an auto-increment primary-key, a column for the email-address, and an index on the email-address. Lookups by email-address will pass through that index.

DROP TABLE IF EXISTS `user_emails`;
CREATE TABLE `user_emails` (
 `id` int NOT NULL AUTO_INCREMENT,
 `email_address` varchar(255) NOT NULL,
 … --other columns about the user
 `user_id` int, --foreign key to users
 KEY (`email_address`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Continue reading

Smarter Unit Testing with nose-knows

No one likes to break unit tests. You get all stressed about it, feel like you’ve let your peers down, and sometimes even have to get everyone donuts the next day. Our production Python codebase is complex, and the smallest changes can have an unexpectedly large impact; this is only complicated by the fact that Python is a dynamic language, making it hard to figure out what code touches what.

Enter nose-knows, a plugin for the nose unit test runner (and py.test, experimentally). It traces your code while unit tests are running, and figures out which files have been touched by which tests. Now, running your full test suite with code tracing turned on is expensive, so we have a daily Jenkins job that does it and creates an output file. It can also do the converse, as it knows how to leverage this file to run specific tests.

Continue reading

JavaScript “code race”.

Via a periodic JavaScript “code race,” our engineering team competes for pride with challenges designed to enhance our JavaScript abilities. We pick a topic, create a problem to solve, and try to make the code run as fast as possible; our most recent topic was jQuery’s deferred api.

Here are a few performance tips that have helped me with these challenges.
Continue reading

Bonjour Gatekeeper: How to implement Bonjour service in an iOS or Android app

Gatekeeper is Eventbrite’s onsite entry management technology that delivers the high-speed secure scanning solution for organizers to use on the day of their events.

Ideal for events with more than 5,000 attendees, Gatekeeper consists of Linux-based gate servers and multiple scan devices at each gate. We use iPhone and Android phones with barcode scanner add-ons as the scan terminal, which not only improves the check-in speed but also makes the system easier to set up and use.

Smartphone as Gatekeeper scan terminal

Technically, to connect the scan terminal to the Gatekeeper server, staff members must enter the server’s IP address in the scanner—potentially adding complexity for event organizers. We opted to eliminate this step so staff members could simply turn on the scanner and launch the app, which then automatically finds the nearby Gatekeeper servers.

How, you ask? By leveraging Bonjour, Apple’s implementation of zero configuration networking, which establishes a connection for the scanner to the server by locating nearby services on a local network using mDNS.

Continue reading

Multi-Index Locality Sensitive Hashing for Fun and Profit

Keeping our organizers and attendees safe from malicious and unwanted spam from is a challenge, especially given the high volume of messages that are routed through the our system every day

One way that we deal with this volume of data, is to cluster up all the similar messages together to find patterns in behavior of senders. For example, if someone is contacting thousands of different organizers with similar messages, that behavior is suspect and will be examined.

The big question is, how can we compare every single message we see with every other message efficiently and accurately? In this article, we’ll be exploring a technique known as Multi-Index Locality Sensitive Hashing.

To perform the the comparison efficiently, we pre-process the data with a series of steps:

  1. Message Tokenization
  2. Locality Sensitive Hashing
  3. Multi-Index Optimization

Message Tokenization

Let’s first define what similar messages are. Here we have and example of two similar messages A and B:

A = "Is there a dress code for this event? Thanks!"
B = "Hi, is there a DRESS CODE to this event"

To our human eyes of course they’re similar, but we want determine this similarity quantitatively. The solution is to break up the message into tokens, and then treat each message as a bag of tokens. The simplest, naive way to do tokenization is to split up a message on spaces/punctuation and convert each character to lowercase. So our result from our tokenization of the above messages would be:

tokenize(A) -> tA = "is","there","a","dress","code","for","this","event","thanks"
tokenize(B) -> tB = "hi","is","there","a","dress","code","to","this","event"

I’ll leave as an exercise to the reader to come up with more interesting ways to do tokenization for handling contractions, plurals, foreign languages, etc.

To calculate the similarity between these two bags of tokens, we’ll use an estimation known as the Jaccard Similarity Coefficient. This is defined as “the ratio of sizes of the intersection and union of A and B”. Therefore, in our example:

Similarity = Jaccard(A, B) = |A ∩ B| / |A ∪ B|
    = size(intersection(tA, tB)) / size(union(tA, tB))
    = size("is","there","a","dress","code","this","event") /
      size("hi","is","there","a","dress","code","for","to","this","event","thanks")
    = 7 / 11
    ~ .64

We’ll then set a threshold, above which, we will consider two messages to be similar. So then, when given a set of M messages, we simply compute the similarity of a message to every other message. This works in theory, but in practice there are cases where this metric is unreliable (eg. if one message is significantly longer than the other); not to mention horribly inefficient (O(N² M²), where N is the number of tokens per message). We need do things smarter!

Continue reading

Watching Metadata Changes in a Distributed Application Using ZooKeeper

We created a distributed ETL system we affectionately call Mandoline. It is configurable, distributed, scalable, and easy to manage – here’s how we did it.

One of the hardest parts of building a distributed system is ensuring proper coordination between nodes across a cluster, and we decided to do it using Apache ZooKeeper. ZooKeeper can be imagined as a remote file system, where every file is also a folder (these are referred to as “znodes”). For example, let’s say we have the znode /mandoline where we store the system version, "1". Under /mandoline we may also store items like the configuration, so /mandoline/load_configstores our load configuration (in our case, a json dictionary for every source). The magic sauce of ZooKeeper is that it guarantees “strictly ordered access” to znodes, allowing synchronization and coordination to be built on top of it.

Mandoline coordinates its tasks using watchers, a neat ZooKeeper feature. You can specify a znode, and whether you’d like to watch changes to its data or its children, and ZooKeeper will notify you once those have happened. In the Mandoline architecture, processes choose specific znodes and use those to communicate changes in the data they are responsible for.

For example, we have a component that loads orders from our Orders table in the database, and we have two other components that need to track: 1. the purchase history of a given user, and 2. the total sales for that event. This is how the loading data component does it:

latest_timestamp = 0
for datum in query_data:
    key = datum.pop(primary_key)

    timestamp = datum.pop(MANDOLINE_TIME_CHECKPOINT, 0)
    if timestamp > latest_timestamp:
        latest_timestamp = timestamp

    main_batch.insert(key, datum)

self.zk_client.retry(
    self.zk_client.set,
    self.load_notification_node,
    str(latest_timestamp),
)

Notice that there are many operations done for a given query, however only a small value (a timestamp, in this case) is written to ZooKeeper. Znodes have a restriction whereas they cannot hold large values, so the queue containing items to actually perform work on are stored in Cassandra while ZooKeeper handles the notification part.

Continue reading