Looking under the hood of the Eventbrite data pipeline!

Eventbrite’s mission is to bring the world together through live experiences. To achieve this goal, Eventbrite relies on data-driven decisions at every level. In this post, we explore Eventbrite’s treasure trove of data and how we leverage it to push the company forward. We also take a closer look at some of the data challenges we’ve faced and how we’re pushing forward with new improvements to address these challenges!

The Data Engineering team at Eventbrite ingests data from a number of different sources into a central repository.  This repository is the foundation for Eventbrite’s analytics platform. It empowers Eventbrite’s engineers, analysts, and data scientists to create data-driven solutions such as predictive analysis, algorithmic newsletters, tagging/clustering algorithms, high-value customer identification, and search/recommendations.

The Situation: Degrading performance and increasing cost of existing Data Warehouse running on Hadoop infrastructure (CDH5)

We use MySQL as our main production datastore, and it supported most of our data analytics/reporting until a few years ago. Then we implemented a Cloudera CDH ecosystem, starting with CDH3 and upgrading to CDH5 when it was released. Prior to phasing in our CDH environment, our main OLTP (online transaction processing) databases were also powering our reporting needs.

Our production MySQL topology consists of a Primary-Secondary setup, with the majority of the read-only traffic directed to the MySQL secondary instances. For many years, we met our reporting requirements by leveraging the read-only MySQL instances but it came at a steep price due to contention and performance issues caused by long-running SQL queries.

As a result, we moved much of our reporting to our new CDH environment, and we designed a new set of transformation tables to simplify the data access  for Engineers, Analysts and Business users. It’s served us well as the backbone for our Data Warehouse efforts, but the time had come to take the next step as we’ve faced a number of challenges:

Cost

Our CDH5 cluster lives on Reserved Instances, and all of the data in the cluster is housed on local solid state drives.  As a result, the cluster is expensive to maintain.

A Reserved Instance is a reservation of resources for an agreed upon period of time.  Unlike on-demand, when you purchase an RI (reserve instance), you commit to paying for all  the hours of the 1-year or 3-year term. The end result is a lower hourly rate, but the long term costs can really add up.

Analytics

We have a large collection of uncurated data, and we had not transformed the data into a single source-of-truth about our business. As a result, core business metrics (such as organizer, consumer, and event data) were reported differently in different places in the organization, and attributes such as currency, location and timezone were reported differently across business units.

Scheduling

Most jobs were scheduled via Oozie, there was little effective monitoring in place, and there was no method to track or enforce dependencies between coordinators. In addition, other analytics jobs that utilize Salesforce and MySQL data were scheduled through a local Windows machine that was prone to errors and regularly failed without warning or notification.

Processing/Elasticity

All ETL-processing and  ad-hoc queries executed on the same CDH5 cluster. Each process had its own load profile, so the cluster was configured to fit an aggregate of those loads. The end result was that jobs frequently conflicted with each other and competed for resources.

Our workload required burst capacity to support experimental development, ad-hoc queries, and routine ingestion scripts. In an ideal setup, we would scale up and scale down computing resources without any interruptions or data loss.

Ingestion

For MySQL ingestion, we used a home-grown wrapper called Sqoozie to integrate with our MySQL databases. Sqoozie combines Apache Sqoop – a command-line application for transferring data between relational databases and Hadoop – and Apache Oozie, a Hadoop workflow scheduler. It allows for writing MySQL tables directly to Hive tables. While this approach worked for smaller datasets, it became prohibitive as our data grew. Unfortunately, it was setup as a full ingestion of all tables each day and typically took most of a day to finish, putting high load on the shared resource cluster for an extended period of time.

For web analytics ingestion, we used a proprietary tool called Blammo-Kafka that pulled the web logs directly from Kafka daily and dumped them to Hive tables partitioned by day.

For Salesforce ingestion, we used the Salesforce Bulk API to ingest all objects daily and overwrite the previous day’s ingestion.

The Solution: EMR, Presto, Hive, and Luigi to the rescue!

In the past year, we’ve invested heavily in building a shiny new “data-foundry” ecosystem to alleviate many of the pain points from our previous CDH environment. It is the result of many whiteboard sessions, sleepless nights, and walks around the block at Eventbrite’s offices at our SOMA location in San Francisco and Cummins Station in Nashville.

We focused not only on improving stability and cost, but also on designing a new set of transformation tables that would become the canonical source-of-truth at the company level. This involved meeting with key stakeholders to understand business metrics and exploring new technologies. The following diagram depicts sample output from some of our working sessions. As you can tell, it was a tedious process.

The end result was the implementation of a new “data-foundry” infrastructure. The following diagram shows a general layout:

EMR (Elastic MapReduce) Clusters

Ingestion and ETL jobs run on daily and hourly scheduled EMR clusters with access to most Hadoop tools. Amazon’s EMR is a managed cluster platform that simplifies running big data frameworks such as Hadoop, Spark, Presto, and other applications in the Apache/Hadoop stack.

The EMR/S3 solution decouples storage from compute. You only pay for compute when you use it (high utilization). Multiple EMR clusters can access the data (S3, Hive Metastore), and interactive workloads (Hive, Presto, Spark) can be launched via on-demand clusters.

We’ve seen some benefits with Amazon EMR:

Intelligent resizing

  • Incrementally scale up (add nodes to EMR cluster) based on available capacity
  • Wait for work to complete before resizing down (removing nodes from EMR cluster)
  • Can scale core nodes and HDFS as well as task nodes

Cost Savings

By moving to EMR and S3, we’ve been able to considerably cut costs. With S3 we pay only for the storage that we use, not for total capacity. And with EMR, we’re able to take advantage of  “on-demand” pricing, paying low hourly rates for clusters only when we need the capacity. Also, we’ve reduced the cost even further by purchasing Reserved Instances and bidding on Spot instances.

  • Use Amazon EC2 spot instances to save > 80%
  • Use Amazon EC2 Reserved Instances for steady workloads

Reliability/Improved Operational Support

Amazon EMR monitors nodes in each cluster and automatically terminates and replaces an instance if there is a failure. Plus the new environment has been built from scratch, is configured via Terraform, and uses automated Ansible templates.

Job Scheduling

We use Luigi to orchestrate our Python jobs. Luigi enables us to easily define task workflows without having to know much about other workflows. It is an open source Python framework created by Spotify for managing data processing jobs, and it is really good at dependency management, which makes it a perfect tool for coalescing dependent data sources.

Centralized Hive Metastore

We have a centralized Hive metastore that saves all the structure information of the various tables, columns, and partitions for our Hive metadata. We chose Hive for most of our Hadoop jobs primarily because the SQL interface is simple. It is much cleaner than listing files in a directory to determine what output exists, and is also much faster and consistent because it’s backed by MySQL/RDS. This is particularly important since we rely on S3, which is slow at listing files and is prone to “eventual” consistency issues.

Ingestion

We continue to ingest production data from MySQL tables on a daily basis using Apache Sqoop, but in the “data-foundry” ecosystem we ingest the tables incrementally using “changed” columns to allow for quicker updates.

We ingest web analytics data by using Pinterest Secor to dump data from Kafka to S3. We then process it from that S3 path using Spark, both hourly and daily. Hourly we  ingest the latest data for each web analytics table since the last time it was ingested and write it to Hive tables partitioned by day and hour. Daily we also ingest the web analytics data to day partitioned Hive tables.

We ingest Salesforce data using a combination of the Salesforce REST and Bulk APIs using custom internal built Python clients for both. Tables are ingested through Spark using the API that makes the most sense based on the size of the data. Also, where available, we use primary key chunking in the Bulk API to optimize ingestion of large tables.

In addition to the ingestion processes that bring us to feature parity with the old CDH5 infrastructure, we also ingest data from a few other sources, including Google Analytics and several other 3rd party services.

We ingest Google Analytics data three times a day for the current day and once for the previous day based on SLAs provided by Google. We use Spark in addition to Google’s BigQuery and Cloud Storage clients to ingest Google Analytics data for our mobile app, organizer app, and web app to Hive tables partitioned by day.

Analytics

By separating analytics processing from visualization and queries, we’ve been able to explore more tooling options. Both Presto and Superset have proven to be useful.  

Presto is a distributed SQL query engine optimized for ad-hoc analysis. It supports the ANSI SQL standard, including complex queries, aggregations, and joins. Presto can run on multiple data sources, including Amazon S3. We’re using Presto with EC2 Auto Scaling Groups to dynamically scale based on usage patterns.

Presto’s execution framework is fundamentally different from that of Hive/MapReduce. It has a custom query and execution engine where the stages of execution are pipelined, similar to a directed acyclic graph (DAG), and all processing occurs in memory to reduce disk I/O. This pipelined execution model can run multiple stages in parallel, and it streams data from one stage to another as the data becomes available. This reduces end-to-end latency, and we’ve found Presto to be quite snappy for ad-hoc data exploration over large datasets.

An additional benefit is that Facebook and the open-source community are actively developing Presto, which has no vendor lock-in because it speaks ANSI-SQL.

Superset is a data exploration and visualization tool that was open sourced by Airbnb. It allows for fast and flexible data access and comes complete with a rich SQL IDE, which is used heavily by Eventbrite’s business analysts.

Transformations

We’ve introduced a new set of staging tables in our data warehouse that transform the raw data into dimension tables aligned specifically to meet business requirements.  These new tables enable analytics, data science, and reporting. The goal is to create a single “source-of-truth” for company metrics and company business concepts.

Data Exports

The Data Engineering team has developed a set of exporter jobs in Python to push data to targets such as Redis, Elasticsearch, Amazon S3 or MySQL. This allows us to the cache the results of queries to power reports, so that the data is available to everyone, whenever it is needed.

What next?

We’re looking for new ways to decrease our ingestion times from MySQL using stream processing with products such as Maxwell (http://maxwells-daemon.io/), which has been well-documented by Zendesk. Maxwell reads MySQL binlogs and writes row updates to Kafka as JSON. We’re also using SparkSQL and excited to use Apache Spark more broadly, especially Spark streaming.

We have a ton of enhancement requests to extend our Data Warehouse tables to meet the growing needs of the business and to provide better ways of visualizing the data via new Tableau dashboards.

As the Eventbrite family continues to grow with the acquisitions of Ticketscript, Ticketfly, and Ticketea, we continue to explore ways to migrate/combine data sources. This includes ingesting data from sources new to us, such as Amazon Redshift and Amazon Dynamo.

It’s fun times here at Eventbrite!

Special thanks to Eventbrite’s Data Engineering team: (Brandon Hamric, Alex Meyer, Will Gaggioli, Beck Cronin-Dixon, Jasper Groot, Jeremy Bakker, and Paul Edwards) for their contributions to this blog post. This team rocks!

Leave a Reply

Your email address will not be published. Required fields are marked *