DO NOT ADD CONTENT ABOVE HERE

NGData_Full-Color-Mobile
Uncategorized

Rowlog: A message queue system on top of HBase

The need for a MQ

In our Lily content repository project (cfr www.lilyproject.org), each time we update (create, update, delete) a record we want to update a SOLR-based index. To accomplish this we need a message queue system (MQ) that delivers messages about updates on records to an indexer component which updates the SOLR index.
The MQ needs to comply to a few requirements:

  • The context of the messages are events that happen on Lily records
  • Messages can be delivered in an asynchronous way to the indexing component
  • Messages about a specific record should be delivered in-order
  • No two messages of a same record can be consumed by the indexer component at the same time
  • A message consumer should be able to retrieve all oustanding messages related to a record
  • There can be no message loss

Why build our own?

MQ systems already exist in many flavors and implementations like for instance RabbitMQ, ActiveMQ, etc. Still we decided to build our own system on top of HBase.
First of all we want to keep the amount of different components and storage technologies (e.g. MySQL) co-existing in the Lily space as low as possible since each new technology adds an extra level of complexity with respect to configuration, administration, deployment etc.
Secondly we want all components of Lily to behave the same with respect to scalability (partitioning), failover, reliablity (replication). Since we were already using HBase that has our desired scalabiliy properties we decided to use the same technology to build a MQ system.

A MQ on HBase named ‘Rowlog’

Until now this blog has been talking about updates and events that apply to Lily records. But in fact we have built an MQ system that is usable outside the context of Lily. The context of the messages are events that happen on a HBase row. That is also where the name “Rowlog” comes from.

Subscriptions

Each client interested in processing or consuming messages should register a subscription on the rowlog. In case of Lily this is the indexer updating the SOLR index, but this could just as well be an e-mail notification client or anything else.
A message will then be offered to each registered subscription for consumption by one of its listeners.

Message Queue Table

Each message is stored in a message table (which is stored in a HBase table). The key of the message contains a subscription id, a timestamp of when the event happened and the rowkey of the row the message is about.
A Rowlog processor will regularly scan the message table and process the messages. The messages are sorted based on their timestamp. They will therefore be processed in the order the events happened, even across different rows.

MessageQueueTable

Row-local Queue

Next to the message queue table, there is also the concept of a row-local queue. In a row-local queue we keep information about events on a row in messages that are stored on the same row where the message is about, but in a column family separate from the actual data.
Doing this gives us a few advantages:

  • Adding a message can happen in the same atomic put operation as updating the data itself (thereby avoiding the need for transactions).
  • All outstanding messages of a row can easily be retrieved. In the message queue table the messages of one row are interleaved with the messages of other rows.
  • All message information is available next to the data. If the message queue table would be lost it can be rebuild based on the row-local queue. (This is more a hypothetical advantage since we don’t expect to be loosing the message queue table.)

The information stored in the row-local queue is an execution state for each subscription (executed or not) and a payload (extra information about the event that happened). By storing this information here, the messages stored in the message queue table can be kept very light-weight.

RowLocalQueue

Write-Ahead Log (WAL)

The message queue is not the only reason why we came up with the rowlog solution.
At the same time we also needed a solution to be able to execute actions as a consequence of an update on a record, i.e. ‘secondary actions’. The requirements for these actions were :

  • Guaranteed, eventual execution (cfr no message loss)
  • Preferably synchronous execution (can’t be fulfilled in case of failures)ability to resume in case of failures (e.g. a node crash after an update but before a secondary action got executed)
  • Secondary actions should be able to see the state of the record corresponding to the update. Therefore, secondary actions should be executed before the next update on the record happens.
  • Ordered execution of multiple secondary actions following the same update

The above requirements don’t completely apply to what is usually understood under a write-ahead log. For instance, we don’t need are transactions across multiple updates on multiple rows. But we stick with the name.

Rowlog as a solution for the WAL

While designing a solution for the WAL we noticed that we ended up with something that was very similar to our MQ solution and that in fact we could distill the same solution for both use cases : the “Rowlog”.

For a more technical explanation of the Rowlog, it’s architecture and how to use it, take a look here.