Cut your run time from minutes to seconds with HBase and Algebird

// 02.04.2015 // Data

[Note: Code for this demo is available here: https://github.com/MediaMath/hbase-coprocessor-example]

At MediaMath, our Hadoop data processing pipelines generate various semi-aggregated datasets based on the many terabytes of data our systems generate daily. Those datasets are then imported to a set of relational SQL databases, where internal and external clients query them in real time. When a query involves extra levels of aggregation on an existing dataset at run time, it starts to hog server resources, slowing down runtime.

However, we have been able to reduce the query time on these terabyte–scale datasets from minutes to seconds by using a combination of HBase and Algebird. HBase is a distributed database that runs on top of the Hadoop Distributed File System (HDFS). Algebird is an open source library for abstract algebra built in Scala and developed by Twitter.

With Algebird and HBase, we have added an additional layer of map-reduce computation inside the data store. This moves the aggregation from client side to the servers, distributing the heavy computation across multiple nodes, and significantly shortening the query time.

Today, we are going to demonstrate how to take advantage of the HBase coprocessor and Algebird to build a simple aggregation system on a toy mobile device dataset in Scala.

Let’s take this simple mobile device semi-aggregated dataset as an example and see how it works.

 

Date Campaign Device Device OS Impressions Unqiues
2014-07-23 1 Apple Iphone iOS 1 AwznDwM=

The dataset has six fields: date, campaign, device, device OS, impressions (ads), and uniques (unique users). The impressions and uniques are the metrics, on which all the fields are to be aggregated. The rest of the fields are “dimensions,” and we can group some or all of them together for the aggregation. The typical query to this dataset is: “in campaign 1, how many impressions or how many unique users have we served through an iPhone in the last seven days?”

First, let’s create a table for this dataset in HBase.              

1. HBase table schema

HBase is a key-value store, so each record has its row key and columns (with column family). Let’s first create a table in HBase called “mobile-device.” It uses campaign, date, device, and device OS as the row key, and each row key is associated with two columns: impressions and uniques. Row key is stored as a string, which is the concatenation of all components. Impressions is a counter with long type, and uniques is a serialized HyperLogLog object. We use Chill to serialize/deserialize the HyperLogLog object.

In this demo, only the column family “stats” is used.

Table creation in HBase is fairly simple. All we need is to specify a table name with a column family. We only keep one version for each record. New updates will always overwrite the existing value. HBase also supports the data expiration time via the TTL (time to live) setting so if you want to data to expire after 24 hours, add TTL => 86400 to your table creation statement.

create ‘mobile-device’, { NAME => ‘stats’, VERSIONS => 1 }

Let’s insert some data into our new HBase table. Assume data is stored in a tab separated (tsv) file, and the unique count is represented as a Base64 encoded string. We parse the file and create an HBase Put operation for each line. rowKey and colVals methods in MobileDeviceRecord define how key and column values are generated from each line.

HBase Coprocessor

Now we have our toy dataset in HBase. Let’s embed some aggregation operations using the coprocessor.

HBase introduced the coprocessor framework after version 0.92. It allows users to deploy their own operations to the region server, so they can be performed where the data lives to reduce the amount of data sent over the network. The idea of running data aggregation on each region’s servers independently is very similar to having a map- side combiner in map-reduce paradigm. After each region’s server has aggregated its result, the final aggregation operation will collect outputs from region servers and merge them together.

A custom coprocessor operation is required to extend CoprocessorProtocol. Let’s make a GroupByMonoidSumProtocol trait extending CoprocessorProtocol and define a groupByMonoidSum operation within it. groupByMonoidSum takes a query and HBase scans the objects as inputs, then outputs a double HashMap for the aggregated results. The first hashing maps a grouped key to another HashMap which contains column name values pairs, e.g. “Apple Iphone” -> {“Impressions “-> 3, “Uniques” -> “AwznDwM=”}.

Next we make a GroupByMonoidSumCoprocessorEndpoint which extends BaseEndpointCoprocessor and implements GroupByMonoidSumProtocol. This class defines the operation to be called in each HBase region server. In groupByMonoidSum, we obtain a scanner from RegionCoprocessorEnvironment.

Combine this with the passed-in scan object specifying the scope of the data, and we can now iterate through only those records we are interested in. For each record, we convert its row key to a new group key based on the groupBy function in the query, and sum up the values with the help of a mutable HashMap. We also wrap GroupByMonoidSumCoprocessorEndpoint in class GroupByMonoidSumCall, which extends Batch.Call, so it can be passed to HTableInterface.coprocessorExec on the client side.

The aggregation in our demo is a monoid plus. Under the scene, the plus function will pick up the appropriate monoid from Algebird library for different data type.

In the end, we define a callback function in GroupByMonoidSumCallback, which will be triggered after groupByMonoidSum finishes its aggregation on the region server. It uses the output from each region server to update the final aggregation results.

On the client side, we construct an HBase scan object from the user query, and pass GroupByMonoidSumCall and GroupByMonoidSumCallback to HTableInterface.coprocessorExec to trigger the aggregation on HBase severs.

Deploy the coprocessor code to HBase server

In order to make the coprocessor operation visible in HBase, we need to compile an assembly jar and copy it to HBase classpath (I usually drop it in the lib directory). Meanwhile, hbase-site.xml config file needs to be modified to include GroupByMonoidSumCoprocessorEndpoint class.

After the restart of HBase class, this simple aggregation system is ready to roll! Give it a try, and let me know the results in the comments. 🙂

A Picture of Keshi Dai

KESHI DAI

Senior Data Engineer Keshi Dai is a senior data engineer at MediaMath. He builds big data tools and platform that power MediaMath’s reporting and analytics products. Before MediaMath he worked at eBay, where he built a collaborative filtering recommendation system for eBay.com. He got his BS in Computer Science from Zhejiang Sci-Tech University in China, and received his PhD degree specialized in information retrieval from Northeastern University.
1 Comment.

One response to “Cut your run time from minutes to seconds with HBase and Algebird”

  1. Thanks for the great article. it was extremely useful, as there is almost no example for HBase coprocessors on the web.
    I do have a question though. I am using Hbase 1.0.0. I was wondering whether this code is compatible with newer versions of HBase or not?

Leave a Reply

Your email address will not be published. Required fields are marked *