Cut your run time from minutes to seconds with HBase and Algebird
[Note: Code for this demo is available here: https://github.com/MediaMath/hbase-coprocessor-example]
At MediaMath, our Hadoop data processing pipelines generate various semi-aggregated datasets based on the many terabytes of data our systems generate daily. Those datasets are then imported to a set of relational SQL databases, where internal and external clients query them in real time. When a query involves extra levels of aggregation on an existing dataset at run time, it starts to hog server resources, slowing down runtime.
However, we have been able to reduce the query time on these terabyte–scale datasets from minutes to seconds by using a combination of HBase and Algebird. HBase is a distributed database that runs on top of the Hadoop Distributed File System (HDFS). Algebird is an open source library for abstract algebra built in Scala and developed by Twitter.
With Algebird and HBase, we have added an additional layer of map-reduce computation inside the data store. This moves the aggregation from client side to the servers, distributing the heavy computation across multiple nodes, and significantly shortening the query time.
Today, we are going to demonstrate how to take advantage of the HBase coprocessor and Algebird to build a simple aggregation system on a toy mobile device dataset in Scala.
Let’s take this simple mobile device semi-aggregated dataset as an example and see how it works.
Date | Campaign | Device | Device OS | Impressions | Unqiues |
---|---|---|---|---|---|
2014-07-23 | 1 | Apple Iphone | iOS | 1 | AwznDwM= |
… | … | … | … | … | … |
The dataset has six fields: date, campaign, device, device OS, impressions (ads), and uniques (unique users). The impressions and uniques are the metrics, on which all the fields are to be aggregated. The rest of the fields are “dimensions,” and we can group some or all of them together for the aggregation. The typical query to this dataset is: “in campaign 1, how many impressions or how many unique users have we served through an iPhone in the last seven days?”
First, let’s create a table for this dataset in HBase.
1. HBase table schema
HBase is a key-value store, so each record has its row key and columns (with column family). Let’s first create a table in HBase called “mobile-device.” It uses campaign, date, device, and device OS as the row key, and each row key is associated with two columns: impressions and uniques. Row key is stored as a string, which is the concatenation of all components. Impressions is a counter with long type, and uniques is a serialized HyperLogLog object. We use Chill to serialize/deserialize the HyperLogLog object.
In this demo, only the column family “stats” is used.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
case class MobileDeviceRecord(campaign: String, date: String, device: String, deviceOS: String, impressions: Long, hllHash: String) { private val kryo = KryoEnv.kryo private val hll = HyperLogLog.fromBytes(Base64.decodeBase64(hllHash)) private def makeRowKeyString(parts: String*) = parts.toList.mkString(":") def rowKey: Array[Byte] = Bytes.toBytes(makeRowKeyString(campaign, date, device, deviceOS)) // returns a list of (family, column, value) def colVals: List[(Array[Byte], Array[Byte], Array[Byte])] = List( (Bytes.toBytes("stats"), Bytes.toBytes("impressions"), kryo.toBytesWithClass(impressions)), (Bytes.toBytes("stats"), Bytes.toBytes("uniques"), kryo.toBytesWithClass(hll)) ) } |
Table creation in HBase is fairly simple. All we need is to specify a table name with a column family. We only keep one version for each record. New updates will always overwrite the existing value. HBase also supports the data expiration time via the TTL (time to live) setting so if you want to data to expire after 24 hours, add TTL => 86400 to your table creation statement.
create ‘mobile-device’, { NAME => ‘stats’, VERSIONS => 1 }
Let’s insert some data into our new HBase table. Assume data is stored in a tab separated (tsv) file, and the unique count is represented as a Base64 encoded string. We parse the file and create an HBase Put operation for each line. rowKey and colVals methods in MobileDeviceRecord define how key and column values are generated from each line.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
def newRecordFromLine(line: String) = { val parts = line.split("\t") new MobileDeviceRecord(parts(1), parts(0), parts(2), parts(3), parts(4).toLong, parts(5)) } val htable = connection.getTable("mobile-device") val lines = scala.io.Source.fromFile("data/mobile_device_samples.tsv").getLines() val puts = lines.map(line ⇒ { val record = newRecordFromLine(line) val put = new Put(record.rowKey) for ((q, c, v) ← record.colVals) put.add(q, c, v) put }).toList |
HBase Coprocessor
Now we have our toy dataset in HBase. Let’s embed some aggregation operations using the coprocessor.
HBase introduced the coprocessor framework after version 0.92. It allows users to deploy their own operations to the region server, so they can be performed where the data lives to reduce the amount of data sent over the network. The idea of running data aggregation on each region’s servers independently is very similar to having a map- side combiner in map-reduce paradigm. After each region’s server has aggregated its result, the final aggregation operation will collect outputs from region servers and merge them together.
A custom coprocessor operation is required to extend CoprocessorProtocol. Let’s make a GroupByMonoidSumProtocol trait extending CoprocessorProtocol and define a groupByMonoidSum operation within it. groupByMonoidSum takes a query and HBase scans the objects as inputs, then outputs a double HashMap for the aggregated results. The first hashing maps a grouped key to another HashMap which contains column name values pairs, e.g. “Apple Iphone” -> {“Impressions “-> 3, “Uniques” -> “AwznDwM=”}.
1 2 3 4 5 |
import java.util.{HashMap => JMap} trait GroupByMonoidSumProtocol extends CoprocessorProtocol { def groupByMonoidSum(query: GroupByQuery, scan: Scan): JMap[String, JMap[String, Any]] } |
Next we make a GroupByMonoidSumCoprocessorEndpoint which extends BaseEndpointCoprocessor and implements GroupByMonoidSumProtocol. This class defines the operation to be called in each HBase region server. In groupByMonoidSum, we obtain a scanner from RegionCoprocessorEnvironment.
Combine this with the passed-in scan object specifying the scope of the data, and we can now iterate through only those records we are interested in. For each record, we convert its row key to a new group key based on the groupBy function in the query, and sum up the values with the help of a mutable HashMap. We also wrap GroupByMonoidSumCoprocessorEndpoint in class GroupByMonoidSumCall, which extends Batch.Call, so it can be passed to HTableInterface.coprocessorExec on the client side.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
class GroupByMonoidSumCoprocessorEndpoint extends BaseEndpointCoprocessor with GroupByMonoidSumProtocol { private val kryo = KryoEnv.kryo private val monoids = Monoids override def groupByMonoidSum(query: GroupByQuery, scan: Scan): JMap[String, JMap[String, Any]] = { val region = getEnvironment.asInstanceOf[RegionCoprocessorEnvironment].getRegion val scanner = region.getScanner(scan) val results = new java.util.ArrayList[KeyValue] val ret = new JMap[String, JMap[String, Any]] try { var hasMoreRows = false do { hasMoreRows = scanner.next(results) for (kv: KeyValue ← results) { val buffer = kv.getBuffer val row = Bytes.toString(kv.getBuffer, kv.getRowOffset, kv.getRowLength) val column = Bytes.toString(kv.getBuffer, kv.getQualifierOffset, kv.getQualifierLength) val gpKey = query.groupBy(row) val value = kryo.fromBytes(buffer.slice(kv.getValueOffset, kv.getValueOffset + kv.getValueLength)).asInstanceOf[Any] if (ret.containsKey(gpKey)) { val colVals = ret.get(gpKey) if (colVals.containsKey(column)) colVals.put(column, monoids.plus(value, colVals.get(column))) else colVals.put(column, value) } else { val colVals = new JMap[String, Any] colVals.put(column, value) ret.put(gpKey, colVals) } } results.clear() } while (hasMoreRows) } finally { scanner.close() } ret } } case class GroupByMonoidSumCall(query: GroupByQuery, scan: Scan) extends Batch.Call[GroupByMonoidSumProtocol, JMap[String, JMap[String, Any]]] { override def call(instance: GroupByMonoidSumProtocol): JMap[String, JMap[String, Any]] = instance.groupByMonoidSum(query, scan) } |
The aggregation in our demo is a monoid plus. Under the scene, the plus function will pick up the appropriate monoid from Algebird library for different data type.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
object Monoids { val longMonoid = Monoid.longMonoid val hllMonoid = new HyperLogLogMonoid(12) val doubleMonoid = Monoid.doubleMonoid def plus(l: Any, r: Any) = { (l, r) match { case (x: Long, y: Long) ⇒ longMonoid.plus(x, y) case (x: Double, y: Double) ⇒ doubleMonoid.plus(x, y) case (x: HLL, y: HLL) ⇒ hllMonoid.plus(x, y) } } } |
In the end, we define a callback function in GroupByMonoidSumCallback, which will be triggered after groupByMonoidSum finishes its aggregation on the region server. It uses the output from each region server to update the final aggregation results.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
case class GroupByMonoidSumCallback() extends Batch.Callback[JMap[String, JMap[String, Any]]] { private val monoids = Monoids private val aggregator = new JMap[String, JMap[String, Any]] override def update(region: Array[Byte], row: Array[Byte], nextResult: JMap[String, JMap[String, Any]]): Unit = { for ((nxtKey, nxtColVals) ← nextResult) { if (aggregator.containsKey(nxtKey)) { val aggColVals = aggregator.get(nxtKey) for ((c, v) ← nxtColVals) aggColVals.put(c, monoids.plus(aggColVals.get(c), v)) } else { aggregator.put(nxtKey, nxtColVals) } } } def result: JMap[String, JMap[String, Any]] = aggregator } |
On the client side, we construct an HBase scan object from the user query, and pass GroupByMonoidSumCall and GroupByMonoidSumCallback to HTableInterface.coprocessorExec to trigger the aggregation on HBase severs.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
class CoprocessorClient(val hConf: Configuration) { private def makeScan(table: String, query: GroupByQuery) = { val campaign = query.campaign val startDate = query.startDate val endDate = query.endDate val (startRow, stopRow) = (startDate, endDate) match { case (Some(start), Some(end)) ⇒ (s"$campaign:$start", s"$campaign:$end") case (Some(start), _) ⇒ (s"$campaign:$start", s"{campaign.toLong+1}") case _ ⇒ (s"$campaign", s"${campaign.toLong+1}") } val tableInfo = TableController(table) val scan = new Scan().setMaxVersions(1) query.metrics.foreach { metric ⇒ scan.addColumn(Bytes.toBytes(tableInfo.family), Bytes.toBytes(metric))} scan.setStartRow(Bytes.toBytes(startRow)) scan.setStopRow(Bytes.toBytes(stopRow)) scan.setFilter(new PrefixFilter(Bytes.toBytes(campaign))) scan } def groupBySum(query: GroupByQuery): JMap[String, JMap[String, Any]] = { val scan = makeScan(query.table, query) val call = GroupByMonoidSumCall(query, scan) val callback = GroupByMonoidSumCallback() var hTable: HTableInterface = null try { // get connection and get table hTable = new HTable(hConf, query.table) hTable.coprocessorExec(classOf[GroupByMonoidSumProtocol], null, null, call, callback) } finally { if (hTable != null) { hTable.close() } } callback.result } } |
Deploy the coprocessor code to HBase server
In order to make the coprocessor operation visible in HBase, we need to compile an assembly jar and copy it to HBase classpath (I usually drop it in the lib directory). Meanwhile, hbase-site.xml config file needs to be modified to include GroupByMonoidSumCoprocessorEndpoint class.
1 2 3 4 |
<property> <name>hbase.coprocessor.region.classes</name> <value>GroupByMonoidSumCoprocessorEndpoint</value> </property> |
After the restart of HBase class, this simple aggregation system is ready to roll! Give it a try, and let me know the results in the comments. 🙂
Thanks for the great article. it was extremely useful, as there is almost no example for HBase coprocessors on the web.
I do have a question though. I am using Hbase 1.0.0. I was wondering whether this code is compatible with newer versions of HBase or not?