Our data matching service processes ~10 petabytes of data per day and generates ~1 petabyte of compressed output every day. It continuously utilizes ~25k CPU cores and consumes ~50 terabytes of RAM on our Hadoop clusters. These numbers are growing as more and more data flows through our platform.
How can we efficiently process the massive volume of data we see today, and how can we prepare for the growth in scale we are experiencing? To answer that we first need to dig into what the matching service does and discuss nuances that make this challenge trickier than it seems.
What does the matching service do?
The matching service joins customer data with LiveRamp Identity data to determine what to send to end destinations. This provides the backbone of LiveRamp’s core product: matching data between different consumer platforms. Due to the centralized role, and combinatorics of the work being done, it is the most expensive single application at LiveRamp. We’re going to explore some of the strategies and optimizations we’ve made to scale the enormous work this service does.
The high level of what the matching service does is pretty simple. It joins customer data keyed on anonymous identifiers with a mapping from those anonymous identifiers to new ones. It then sorts and deduplicates the data on the output identifier key for downstream processing.
Beyond the raw volume of data being processed, additional factors complicate scaling.
Tens of thousands of distinct requests are made daily. This means we need to scale both data throughput and request throughput. If there was a single job processing 1 petabyte of data, it would be similarly expensive in compute costs, but much simpler to manage. We need to run thousands of requests concurrently, track all their statuses, and support recovery for failed requests.
The size of each request varies by many orders of magnitude. Requests can process anywhere from less than a gigabyte to more than one hundred terabytes. This makes it difficult to share tuning logic and to hit similar runtimes for all jobs. Ideally, we want to scale resource assignments with the job sizes. Unfortunately, it’s difficult to predict output size of the data based on the input size because we don’t know how much overlap there will be between the customer data and the identifier mapping. It’s possible that a 10 terabyte input will generate only 100 gigabytes of output. It’s also possible that a 100 gigabyte input will generate 10 terabytes of output. This makes it difficult to assign appropriate resources to each request.
We currently use a MapReduce workflow to perform this massive join. The workflow first joins the two input data sets then sorts the resulting data on the output identifier. MapReduce is a simple and effective solution to process individual requests, but scaling to thousands of parallel requests has led to new challenges with scheduling, batching, tuning and optimizing throughput.
Here are several of the strategies we’ve used.
Caching is a simple and effective way to improve performance when receiving multiple identical requests. Around 20% of requests share duplicate work, so implementing a request caching strategy was a simple way to cut 20% of processing.
Map-side Join (MSJ)
The first part of the MapReduce workflow involves joining our two input datasets. The typical join operation in a MapReduce application is a sorted-merge join. This involves a costly sort and shuffle phase.
In our case, we require both inputs to be sorted. This allows us to perform a merge join without the need to sort and shuffle the data first. By doing this we eliminate an entire MapReduce job from our workflow. We can now join the inputs in the map phase using a map-side join (MSJ), then sort and reduce the output of that join in a single MapReduce job. This more than halves the cost of computation in most cases.
For MSJ to work, both inputs must already be sorted. This is a tradeoff that adds cost to the applications that feed data into the matching service. It is still significantly cheaper overall, because on average each data source is reused multiple times throughout its lifecycle. We sort it once and reap the benefits every time we process it after.
Similar to in a typical database, we can further improve the performance of our join by performing an indexed sorted-merge join. Essentially, instead of performing a linear merge on both inputs, we seek only relevant keys in the larger input. This enables us to limit read complexity to twice the size of the smaller store, instead of the sum of both stores. You can learn more about this cool technology in our blog post, Seeking Map-Side Join.
In our case, the mapping files contain billions of records because they represent a specific subset of our graph. The customer data inputs typically only have a few million records. Using our Seeking MSJ implementation allows us to limit the records processed in these cases to millions instead of billions and results in massive performance gains.
In some cases the join creates many output records from the map phase with the same key. This happens because the identity mappings represent many-to-many relationships. Depending on that relationship, we’ve had cases with hundreds of duplicates for each key. These are deduplicated during the reduce phase, but must be sorted and shuffled first.
We use MapReduce Combiners to reduce the output of each map task so there are no duplicate identifiers from the same map task. The reduce tasks then deduplicates the output of all the map tasks.
In extreme cases we’ve seen examples of a request with 10 terabytes of input and an output that had expanded to 200 terabytes of mapper output data being sorted and shuffled. Using combiners cuts the bytes being shuffled back down to 10 terabytes, saving an enormous amount of cluster resources.
LiveRamp actually implemented our own version of Combiners into Cascading (the framework we use to write MapReduce jobs), which you can read about it here.
There are many ways to tune MapReduce to be more performant. Poorly tuned MapReduce jobs can cause a variety of problems like extremely long job times, killing the YARN scheduler, overloading nodes in the cluster, or wasting resources. Tuning can be the most challenging and most important part of developing a MapReduce workflow.
One difficulty in tuning the matching MapReduce job is that it is hard to predict the output size based on the input size. This means it is hard to tell how many map tasks and reduce tasks should be used for optimal performance. Small tasks exacerbate startup overhead and scheduling costs. Large tasks aren’t affected by startup overhead as strongly, but take a long time to run and lose significant progress when there is a task failure. If the data per task is too large the job might not even be able to run at all! The goal is to have all medium sized tasks.
We use a few optimizations to achieve this goal.
- The first is to have a flexible storage structure for our sorted inputs. In order to perform an MSJ, both input data sets must be sorted and partitioned in the same way. MapReduce typically determines partitions by file splits, which requires both input data sets to have the same number of files. Instead of partitioning by file, we developed a virtual partitioning scheme to use in our MSJ jobs. It allows us to flexibly tune file size and map task counts while still performing efficient partitioned joins using MSJ.
- Tuning the number of map tasks and reduce tasks is difficult. Both the map and reduce tasks can have drastically different input and output sizes. It’s impossible for us to tell exactly what the output sizes will be without processing the data. We currently have a simple algorithm in place to predict output sizes and tune accordingly.
- In some cases the amount of data is too small to effectively use and tune MapReduce. To both optimize these cases and increase parallelism we implemented an extensive batching solution.
The matching service needs to schedule and process thousands of parallel requests in MapReduce. Scheduling this many parallel MapReduce jobs pushes the limits of the YARN and offers opportunities to find shared work between jobs. Intelligently batching work together allows us to optimize in both of these ways.
To handle the volume of requests, we need to be able to process 2-3k requests in parallel. In our Hadoop cluster we’ve found that the YARN scheduler hits scaling limits over 1k concurrent applications. In addition to managing the concurrent applications, the scheduler must also schedule many tasks for each application. This means it is trying to optimize scheduling for hundreds of thousands of tasks at any time. The scheduler is a single point of failure that runs on a single node and simply can’t keep up. Knowing this, we had to find a way to process requests using fewer jobs and tasks.
Batching requests allows us to both run more requests in fewer jobs, and group many small requests together into medium sized jobs that are easier to tune. We also batch requests that use the same mapping (have the same input and output IDs). Even if the customer data being joined is entirely different, we only need to read the mapping once. This cuts out a reasonable amount of decompression, deserialization, I/O, and computation.
Where do we go from here?
These strategies have scaled us from terabytes processed per day 5 years ago to well over a petabyte per day now. Here are a few quick ideas about how to scale to the next orders of magnitude.
- Use different technologies
- Small requests don’t need the full weight of MapReduce. Maybe we can find a more efficient solution for them.
- Streaming approaches for smaller scale requests could make sense in some cases.
- Run over multiple clusters
- We’re already pushing the limits of what a single Hadoop cluster can manage. If we continue with our same processing model we will need to expand into multiple clusters eventually.
- We’ve already began experimenting with this after moving our infrastructure into GCP.
- Improve batching
- Similarly to how we can remove duplicate work reading mapping files, we could implement a way to remove duplicate work reading the customer data sets.
- Use a machine learning model to predict batch sizes more accurately.
- Representing the data more efficiently during internal processing.
- Out data representation is already very sparse, but additional metadata could allow us devise better compression strategies.
- Large architectural changes to this and surrounding data pipelines to process less data and be more efficient
We have many ideas about how we can continue to scale, and there are lots of other directions to explore. Scaling this service has been a massive undertaking, and the scope of the problem will only get bigger. We’re excited about what’s next!