The general functional goal of the SEP is to listen to changes happening on HBase in order to do some processing. For example, the hbase-indexer listens to these changes in order to index new or updated content in Solr.
A reliable source of change events are the HBase write-ahead-logs, thus the HLog files: these will contain an entry if and only if an update has been applied to HBase. Producing such change events (from the client) in some other way, like feeding a message queue, would be error-prone (how do you guarantee a message on the queue if and only if the update got applied on HBase?) and would incur an extra performance cost (additional data being written). The HBase HLog files are stored on HDFS, thus replicated, and flushed after each update (depending on config).
So what we could do is read those HLog files and process the entries in them, but we don’t need to do that ourselves, HBase already has all the infrastructure for this in the form of its replication system. HBase replication continuously polls the HLog files to get the latest changes, takes care of keeping HLogs around as long as they are not processed, takes care of taking over HLogs from stopped regionservers, can manage multiple independently fed peer clusters (one peer cluster being down doesn’t affect replication towards other peer clusters), and can deal with peer clusters being down for a while without data loss (it will continue replicating from where it left off once the peer cluster is back online). It also delivers events in batches, rather than one-per-one, allowing to optimize processing.
If we want to write an application that receives change events from HBase, we need to masquerade as a peer HBase cluster. This happens to be quite feasible, you just need to implement a single method from HRegionServer, replicateLogEntries(), start an HBase RPC server, write some entries into ZooKeeper, and register a replication peer with HBase. This is what the SEP library takes care of.
The following diagram illustrates replication.
Again, in the SEP case, the RegionServers in the target cluster are not real region servers, they just pretend to be, so that replication would ship events to them. The size of the master and source cluster is independent, they can both be scaled horizontally as needed.
It is the regionserver in the source cluster that decides to what regionserver in the target cluster to send a batch of events. Target regionservers are randomly selected (if replication.source.ratio = 1.0) for each batch of events that is shipped, so normally load will be distributed evenly. One target region server might be called by two source region servers concurrently, but one source regionserver doesn’t send out event batches concurrently: it processes the HLogs in order, only sending out the next batch if the processing of the previous one was confirmed. Depending on the nature of the processing, you might want to multi-thread the processing of the received events, which is something the SEP framework can manage for you.
From replication to SEP
To find your way around the SEP, it is useful to know how some concepts have been rebranded to better fit with the mutation event processing theme:
HBase peer clusters become SEP subscriptions: each subscription will receive each message once, so registering multiple subscriptions allows for multiple processors of the event stream
A particular HBase region server in the peer cluster which processes HLog entries becomes a SepConsumer. A SepConsumer is just the technical piece to receive events, the actual processing is delegated to EventListeners. EventListener is the interface you implement yourself to do whatever you want to do with the events.
WALEdits (= HLog entries) become SepEvents. These don’t always correspond one-to-one, see below.
WALEdit & SepEvents
An HLog file is a list of WALEdits. Each WALEdit contains a list of KeyValue pairs, containing anything that was in your row mutation (put, delete, …). In some cases, like write-only scenarios, this means you don’t need to perform random reads on HBase to get the row data, but instead you can just decode it from the WALEdit.
WALEdits can contain information on multiple rows (in case of multi-puts), the SEP takes care of splitting that per row before delivering it in the form of a SepEvent.
It is interesting if the system that delivers events can guarantee that it doesn’t deliver two events about the same row concurrently, this can avoid the need for expensive locking for cases that are sensitive to race conditions. In addition, it is interesting if the events per-row are delivered in the order they happened. For example, in hbase-indexer, we don’t want to overwrite newer data in Solr with older data.
Since a single HBase region (a set of rows) is hosted on one particular region server, the cases where these properties are broken are when a region moves from one region server to another, or when HBase servers are restarted so that recovered queues appear, which are processed concurrently with the normal queues.
While those conditions in practice proof to be rare in real-world usage, the SEP doesn’t offer any additional guarantees against out-of-order event delivery, and this is an area we will be improving upon in the future.
Since there is just one HLog per regionserver, it contains the edits of all tables. HBase has a basic mechanism to filter what events (WALEdits) are replicated: on a per-column-family level you need to set REPLICATION_SCOPE to 1. This setting applies however to all peers (SEP subscriptions). The SEP extends this by providing an interface WALEditFilter that you can implement for custom filtering (on the side of the sending region server), this WALEditFilter is subscription-specific.
When processing happens asynchronously, it is important to be able to monitor how much delay (if any) there is in the processing. We created a rudimentary tool that has proved to be very helpful in this regard.
The tool gathers information from these sources:
ZooKeeper, to know about the various peer clusters defined, and the queues (and recovered queues) for each of them, and the file position reached in processing the HLog files
HDFS, to get the sizes of the queued HLog files. HLog files are stored uncompressed, and combined with the file position information read from ZooKeeper, this allows to show progress on a single HLog file.
Regionserver JMX, to get the age of the last processed message, and information regarding sleep loops (this last stuff is experimental, only available when using the SepReplicationSource)
Since this tool can be used not only for the SEP but for any other HBase replication use as well, we’ve used the replication terminology.
Let’s run through an example scenario, on a 3-node hbase-indexer cluster. For this demo we are using a pre-split table with random row keys, in order to have a nice balanced load.
After downloading the hbase-indexer, the tool is started as follows:
./bin/hbase-indexer replication-status -z idxdemo1
where idxdemo1 is the ZooKeeper connection string.
The following screenshot shows a typical output:
At (1) the name of the peer cluster is shown, or in SEP terminology this is the name of the subscription. Here we have only one peer.
At (2) the regionservers are shown, there are three of them. HBase appends the port number and a timestamp to the hostname to uniquely identify a particular launch of a regionserver.
The second column, queue size, shows the number of HLog files. Here there is just 1, the current HLog file that is also being written to, so there are no additonal HLog files queued. (in order to understand why there is a queue, you should know that HBase rolls its HLog files once they reach 64 MB)
The third column shows the size of the HLog files. In this case, the 0 MB doesn’t necessarily mean the HLog is empty: HDFS isn’t able to report the size of files that are being appended to (or more correctly, only shows the size of finished blocks, but usually an HLog is rolled before it reaches the size of an HDFS block).
The fourth column shows the progress within the current HLog file. Since in this case we don’t know the size of the HLog, we can’t show progress.
So it might seem this doesn’t give too much useful information, but actually it does: if the queue size doesn’t go above 1, you are in a good situation, since there is at most 64MB worth of change events delay in processing.
For larger clusters, the current output of the replication-status tool will quickly become impractical: there would be just too much numbers to look at. This is something to improve in the future.
Queued up HLog files
The below screenshot shows what it looks like if there are multiple HLog files in the queue.
Since it is now working on an old (closed) HLog file, the size of the HLog is known and progress can be shown.
The progress should be interpreted as follows: once it reaches 100%, an HLog has been fully processed so the queue size will drop with one. (the queue might have grown in the meantime though due to new writes on HBase)
Enabling JMX – nothing to replicate
By default the replication-status tool doesn’t read information from JMX, you need to enable this using the –enable-jmx flag:
hbase-indexer replication-status –enable-jmx
We now see the following extra information:
the age of last shipped op(eration): this is a standard metric exposed by HBase. It shows how old the WALEdit is that is currently being shipped. This can give an indication of how far replication is behind. If no updates happen on HBase, so if nothing needs to be replicated, this age will increase, in that case it gives an indication of how long it’s been that nothing had to be replicated.
the timestamp last shipped op: this is basically the same as the age of last shipped op, but in the form of a timestamp instead of an age.
peer count: this shows the number of nodes in the target cluster, in our case it is the number of hbase-indexer processes running in the cluster, which happens to be 3.
for each regionserver, ‘last slept’ information in shown: this shows when the replication system last went into a sleep-loop and for what reason. It is important to understand that this information is only relevant if it is recent, if for example it slept 5 minutes ago, then that doesn’t mean it is still sleeping now. In the case above, the sleep is recent (about a second ago), and the reason for the sleep was there was nothing to replicate: HBase polled the current HLog files and was at its end.
Some of the information retrieved using JMX is currently only available when using the SEP’s SepReplicationSource implementation (in fact, everything except the age of last shipped op).
Stopping an HBase Indexer
When we stop one of the hbase-indexer servers, the peer count will drop to 2, as shown below.
Replication failures: stopping Solr
When we stop Solr, the hbase-indexer servers can’t do their work anymore: they receive events from HBase but can’t do corresponding index operations on Solr.
In case the error is something that makes sense to retry later, as is the case here, the event listener can throw the exception back to HBase, which will retry delivery until it succeeds.
What we see is that it slept recently (1-2 seconds ago), and it was because the replication attempt failed. If you’re wondering what a particular sleep message means, it’s best to check the ReplicationSource code.
Each time an HBase region server is stopped, the HLog file that were still in its replication queue (including the current HLog) will be taken over by another region server in the form of a recovered queue.
Such a situation happens for example when you restart your entire HBase cluster. The below screenshot shows how the replication-status tool shows this.
Once a recovered queue is fully processed, it simply disappears.
In this post, we introduced you to the SEP library, the HBase Side Effect Processor, which makes it easy to do asynchronous processing in response to the HBase change events. The SEP uses the existing HBase replication infrastructure to achieve this goal.
We also introduced the replication monitoring tool which allows you to keep an eye on the progress of replication and hence SEP event processing.
Both tools are part of the Lily HBase Indexer that provides the ability to quickly and easily search for any content stored in HBase. It allows you to quickly and easily index HBase rows into Solr, without writing a line of code. Lily HBase Indexer is an open source project available stand-alone, or as part of Cloudera Search.