vendredi 13 juin 2014

Getting started with HBase

Introduction
HBase indexes data bases on 4D coordinaes which are rowkey, column family (or a collection of columns), column qualifier and version. As a result, HBase can be considered a Key-Value store with a key as the 4D coordinates and the the cell as the value. Based on how many of these coordinates are specified during a query, the value may be a map or a map of map.

Installation

Installing the lastest stable version of hadoop:
$ mkdir hbase-install
$ cd hbase-install
$ wget http://apache.claz.org/hbase/stable/hbase-0.98.3-hadoop2-bin.tar.gz
$ tar xvfz hbase-0.98.3-hadoop2-bin.tar.gz
$ export HBASE_HOME=`pwd`/hbase-0.98.3-hadoop2

Adding the HBase program to path
$ export PATH=$PATH:$HBASE_HOME/bin/

# you need the JAVA_HOME variable to be already set, if you're using open jdk, you can set it to:
$ export JAVA_HOME=/usr/lib/jvm/default-java

Running a standalone version
$ start-hbase.sh

once the master launched you can accees the web admin interface on http://localhost:60010/

By default, hbase will write data into /tmp directory. You can change this by editing $HBASE_HOME/conf/hbase-site.xml and setting the following property (the complete list of properties can be found in the official documentation):
<property>
   <name>hbase.rootdir</name>
   <value>file:///path/to/hbase/direcotry</value>
</property>

The $HBASE_HOME/conf/hbase-env.sh bash file can be run to setup hbase configuration, for instance setting environment variables. For further information on configuring HBase, check the official documentation.

Shell-based interaction
Along the installation binaries, there is a JRuby-based shell that wraps a Java client to interact with HBase interactively (sedding commands and receiving responses directly on the terminal) or via bash scripts.

To validate the installtion, lets run the hbase shell and manipulate some data
$ hbase shell
# check existing tables
hbase(main):001:> list
# create table of column famity 'cf'
hbase(main):002:> create 'mytable', 'cf'
# write 'hello hbase' in first row of column 'cf:message' of table 'mytable'
hbase(main):003:> put 'mytable', 'first', 'cf:message', 'hello HBase'
# create a user table of 'info' famity
hbase(main):004:> create 'users', 'info'
hbase(main):005:> put 'mytable', 'second', 'cf:foo', 3.14159
hbase(main):006:> put 'users', 'first', 'cf:username', "John Doe"
# reading the first row from a table
hbase(main):007:> get 'mytable', 'first'
# reading the whole rows from a table
hbase(main):008:> scan 'mytable'

Java-based interaction

// define a custom configuration (by default the content of hbase-site.xml is used)
Configuration myConf = HBaseConfiguration.create();
myConf.set("param_name", "param_value");

// e.g. to connect to a remote HBase instance you need to set Zookeeper quorum address and port number
myConf.set("hbase.zookeeper.quorum", "serverip");
myConf.set("hbase.zookeeper.property.clientPort", "2181");

// establish a connection
HTableInterface myTable = new HTable(myConf, "users");

// Use pool for a better reuse of connections which are expensive resources
HTablePool pool = new HTablePool(myConf, max_nb_connection);
HTableInterface myTable = pool.getTable("mytable");
...
// close connection and returned to the pool
myTable.close();

In HBase data is manipulated in bytes, Java types should be converted into raw bytes with the help of the utility class Bytes. The HBase API for manipulating data is divided into commands: Get, Put, Delete, Scan and Increment. Data is Example, data can be stored as follows:
// create a command with row key TheRealMT

Put p = new Put(Bytes.toBytes("TheRealJD"));

// add information about user
p.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes("John Doe"));
p.add(Bytes.toBytes("info"), Bytes.toBytes("email"), Bytes.toBytes("john.doe@acme.inc"));
p.add(Bytes.toBytes("info"), Bytes.toBytes("password"), Bytes.toBytes("pass00"));

Once, the entry is ready we can send it to hbase for persistence:
HTableInterface usersTable = pool.getTable("users");
Put p = new Put(Bytes.toBytes("TheRealJD"));
p.add(...);
usersTable.put(p);
usersTable.close();

The Put command can also be used to update the user information:
Put p = new Put(Bytes.toBytes("TheRealJD"));

p.add(Bytes.toBytes("info"), Bytes.toBytes("password"), Bytes.toBytes("securepass"));

usersTable.put(p);

The HBase client does not interact directly with the storage layer which is formed of HFile. Instead, HBase writes all operations in a Write-Ahead-Log (WAL) for durability and failure recovery. While, the data are store in memory region called MemStore that upon filled its entire content is flushed to a new immutable file called HFile (no modification of existing HFiles).
This can be customized. For instance, the size of this region can be set via the hbase.hregion.memstore.flush.size parameter. Also, the WAL can be disabled with:
Put p = new Put();
p.setWriteToWAL(false);

The Get command is used to query data from a set of given columns:
Get g = new Get(Bytes.toBytes("TheRealJD"));
g.addFamily(Bytes.getBytes("info"));
g.addColumn(Bytes.toBytes("info"), Bytes.toBytes("password"));
Result r usersTable.get(g);
byte[] b = r.getValue(Bytes.toBytes("info"), Bytes.toBytes("email"));
String email = Bytes.toString(b);
As HBase is versioned, we can look at partical values in history:
List<keyvalue> passwords = r.getColumn(Bytes.toBytes("info"), Bytes.toBytes("password"));
b = passwords.get(0).getValue();
String currentPassword = Bytes.toString(b);
b = passwords.get(1).getValue();
String previousPassword = Bytes.toString(b);

// the verions are by default the milliseconds corresponding to the moment when the operation was performed
long version = passwords.get(0).getTimestamp();

The Delete command is used to delete data from HBase
Delete d = new Delete(Bytes.toBytes("TheRealJD"));

// remove one column
d.deleteColumn(Bytes.toBytes("info"), Bytes.toBytes("email"));

// remove an entire row with all its columns
d.deleteColumns(Bytes.toBytes("info"), Bytes.toBytes("email"));

usersTable.delete(d);

The delete operation is logical, meaning the concerned record is flagged as deleted and will no loger be returned in a get or scan. It is until compaction (merging two HFiles into single bigger one) that the record is effectively deleted. More details on the compaction operation can be found in this article.

Creating a table programatically
Configuration conf = HBaseConfiguration.create();
HBaseAdmin admin = new HBaseAdmin(conf);
HTableDescriptor desc = new HTableDescriptor("UserFeed");
// create a column family
HColumnDescriptor c = new HColumnDescriptor("stream");
c.setMaxVersions(1);
desc.addFamily(c);
admin.createTable(desc);

Once the table is created we can insert data into it, we may hash the row key used for users (i.e. TheRealJD) to a void variable length rowkey and for a better performance:
// prepare the value of the row key
int longLength = Long.SIZE / 8;
byte[] userHash = Md5Utils.md5sum("TheRealJD");
byte[] timestamp = Bytes.toBytes(-1 * System.currentMilliseconds());
byte[] rowKey = new byte[Md5Utils.MD5_LENGTH + longLength];
int offset = 0;
offset = Bytes.putBytes(rowKey, offset, userHash, 0, userHash.length);
Bytes.putBytes(rowKey, offset, timestamp, 0, timestamp.length);
// prepare the put command
Put put = new Put(rowKey);
// we may need to store the real value of user id to be able to find the associated user when scanning the feeds table
put.add(Bytes.toBytes("UserFeed"), Bytes.toBytes("user"), Bytes.toBytes("TheRealMT"));
put.add(Bytes.toBytes("UserFeed"), Bytes.toBytes("feed"), Bytes.toBytes("Hello world!"));

When it comes to scanning the feeds table, things got easy as a result of using a row key starting with a hash of the user row key.
byte[] userHash = Md5Utils.md5sum(user);
byte[] startRow = Bytes.padHead(userHash, longLength);
// create a stop key equal to the increment of the last byte of user id
byte[] stopRow = Bytes.padTail(userHash, longLength);
stopRow[md5Utils.MD5_LENGTH-1]++;
Scan s = new Scan(startRow, stopRow);
ResultsScanner rs = feedsTable.getScanner(s);
// extract the columns (as created previously) from each result
for(Result r: rs) {
  // extract the username
  byte[] b = r.getValue(Bytes.toBytes("UserFeed"), Bytes.toBytes("user"));
  String user = Bytes.toString(b);
  // extract the feed
  b = r.getValue(Bytes.toBytes("UserFeed"), Bytes.toBytes("feed"));
  String feed = Bytes.toString(b);
  // extract the timestamp
  b = Arrays.copyOfRange(r.getRow(), Md5Utils.MD5_LENGTH, Md5Utils.MD5_LENGTH+longLength);
  DateTime dt = new DateTime(-1 * Bytes.toLong(b));
}
By default, each RPC call from the client to HBase will return only 1 row (i.e. no cashing) which is not good in case of scanning the whole table. We can make each call returning n row by setting the property hbase.client.scanner.cashing or calling Scan.setCashing(int).

Continue here.

Resources