Monday, July 2, 2012

Storm-HBase Integration : Pour the Data Stream into HBase

Storm is proclaimed to be the “Hadoop of Realtime Processing” and it’s quite justified to be calling it so. Hadoop is solely meant for batch processing of enormous amount of data and on the other end Storm is built for processing realtime data. Where Hadoop uses Map Reduce jobs for the batch processing of large sets of distributed data, Storm runs topologies which are a chain of heterogeneous Storm elements (spouts and bolts)  for processing the realtime data stream.

Hadoop utilizes HBase for storing this large amount of data in a somewhat structured way. And to what I realize, it is essential for Storm to have an equally potential and high capacity storage where the running topology can pour in the processed data.

Why not HBase be this “potential and high capacity storage” then?

Though I don’t claim to have all the knowledge in the universe about all these complex systems, but Yes !!! HBase can definitely be the pick mainly due to giant storage capacity it possesses.
In an attempt to justify my big “Yes !!! “ I tried out this integration of Storm and HBase the outcome of which is a Storm Bolt that can dump the stream into HBase with just a few obvious inputs.


Requirements :
  • Setup Hadoop locally on your machine (tested version 0.20.2-cdh3u2)
  • Setup HBase locally on your  machine (tested version 0.90.4-cdh3u2)
  • Setup Storm locally on  your machine (tested version 0.7.1)

Following is the source code of the Bolt that delivers the stream data into HBase

HBase Dumper Bolt :
This bolt expects the hbase table name, list of column family names and respective column names, and a variable "rowKeyCheck" which is supposed to be set to the column name whose value is the be used as the row key and if timestamp is to be used the "rowKeyCheck" should be "timestamp".
Every time it receives a tuple, it creates a list of values accordingly and calls a method of HBaseCommunicator class which dumps it into the hbase table.
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.Map;
import org.apache.hadoop.hbase.HBaseConfiguration;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
/*
 * Bolt for dumping stream data into hbase
 */
public class HBaseDumperBolt implements IBasicBolt {
    private static final long serialVersionUID = 1L;
    private static transient HBaseConnector connector = null;
    private static transient HBaseConfiguration conf = null;
    private static transient HBaseCommunicator communicator = null;
    private OutputCollector _collector;
    private Date today = null;
    private Timestamp timestamp = null;
    private ArrayList<String> colFamilyNames = new ArrayList<String>();
    private ArrayList<ArrayList<String>>  colNames = new ArrayList<ArrayList<String>>();
    private ArrayList<ArrayList<String>> colValues = new ArrayList<ArrayList<String>>();
    private ArrayList<String> colFamilyValues = new ArrayList<String>();
    private String rowKeyCheck = null, rowKey = null, fieldValue = null, tableName = null;;
    private static int counter = 0;   
    private long time;

    public HBaseDumperBolt(final String hbaseXmlLocation, final String tableName, final String rowKeyCheck, final ArrayList<String> colFamilyNames, final ArrayList<ArrayList<String>> colNames) {
        this.tableName = tableName;
        this.colFamilyNames = colFamilyNames;
        this.colNames = colNames;
        this.rowKeyCheck = rowKeyCheck;
        connector = new HBaseConnector();
        conf = connector.getHBaseConf(hbaseXmlLocation);
        communicator = new HBaseCommunicator(conf);
        //check if tableName already exists
        if (colFamilyNames.size() == colNames.size()) {
            if (!communicator.tableExists(tableName)) {
                communicator.createTable(tableName, colFamilyNames);
            }
        }
    }
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        counter = 0;
        rowKey = null;
        colValues = new ArrayList<ArrayList<String>>();
        if (colFamilyNames.size() == 1) {
            for (int j = 0; j < colNames.get(0).size(); j++) {
                fieldValue = tuple.getValue(j).toString();
                if (rowKeyCheck.equals(colNames.get(0).get(j))) {
                    rowKey = fieldValue;
                }
                colFamilyValues.add(fieldValue);
            }
            colValues.add(colFamilyValues);
        } else {
            for (int i = 0; i < colFamilyNames.size(); i++) {
                for (int j = 0; j < colNames.get(i).size(); j++) {
                    fieldValue = tuple.getValue(counter).toString();
                    if (rowKeyCheck.equals(colNames.get(i).get(j))) {
                        rowKey = fieldValue;
                    }
                    colFamilyValues.add(fieldValue);
                    counter++;
                }
                colValues.add(colFamilyValues);
                colFamilyValues = new ArrayList<String>();
            }
        }
        if (rowKeyCheck.equals("timestamp") && rowKey == null) {
            today = new Date();
            timestamp = new Timestamp(today.getTime());
            time = timestamp.getTime();
            rowKey = String.valueOf(time);
        }
        communicator.addRow(rowKey, tableName, colFamilyNames, colNames, colValues);
    }
    public void prepare(Map confMap, TopologyContext context,
            OutputCollector collector) {
        _collector = collector;
    }
    public void cleanup() {
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }
    @Override
    public Map<String, Object> getComponentConfiguration() {
        Map<String, Object> map = null;
        return map;
    }
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
    }
}

The supporting classes required for the bolt are HBaseCommunicator.java and HBaseConnector.java.

HBase Communicator :
This class defines all the functions needed to communicate with hbase database.

import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
/*
 * This class implements methods for checking an hbase table's existence, creating an hbase table and inserting a row into the hbase table
 */
public class HBaseCommunicator {
    private static HBaseConfiguration conf;
    private HTable table = null;
    private Put putdata = null;
    private HBaseAdmin admin = null;
    private String colFamilyName = null, colValue = null, result = null;
    private byte[] rowKeyBytes = null, key = null, columnValue = null;
    private Get getRowObj = null;
    private Result rowEntries = null;               
    public HBaseCommunicator(final HBaseConfiguration conf)    {
        this.conf = conf;
    }
    // check if the table exists
    public final boolean tableExists(final String tableName) {
        try {
            admin = new HBaseAdmin(conf);
            if (admin.tableExists(tableName)) {
                return true;
            }
        } catch (Exception e) {
            System.out.println("Exception occured while checking table's existence");
            e.printStackTrace();
        }
        return false;
    }
   // creates a table
    public final void createTable(final String tableName, final ArrayList<String> colFamilies) {
        try {
            HBaseAdmin hbase = new HBaseAdmin(conf);
            HTableDescriptor desc = new HTableDescriptor(tableName);
            for(int i = 0; i < colFamilies.size(); i++) {
                HColumnDescriptor meta = new HColumnDescriptor(colFamilies.get(i).getBytes());
                desc.addFamily(meta);
            }
            hbase.createTable(desc);
        } catch (Exception e) {
            System.out.println("Exception occured creating table in hbase");
            e.printStackTrace();
        }
    }
 // add row to a table
    public final void addRow(final String rowKey, final String tableName, final ArrayList<String> colFamilies, final ArrayList<ArrayList<String>> colNames, final ArrayList<ArrayList<String>> data) {
        try    {
            colFamilyName = null;
            rowKeyBytes = null;
            putdata = null;
            table = new HTable(conf, tableName);
            //rowKey = "row" + (int)(Math.random() * 1000);
            rowKeyBytes = Bytes.toBytes(rowKey);
            putdata = new Put(rowKeyBytes);
            for (int i = 0 ; i < colFamilies.size(); i++) {
                colFamilyName = colFamilies.get(i);
                if (colNames.get(i).size() == data.get(i).size())
                {
                    for (int j = 0 ; j < colNames.get(i).size(); j++) {
                        colValue = data.get(i).get(j);
                        if (colValue.equals(null))
                            colValue = "null";
                        putdata.add(Bytes.toBytes(colFamilyName), Bytes.toBytes(colNames.get(i).get(j)),
                                Bytes.toBytes(colValue));
                    }
                    table.put(putdata);
                }
            }
        } catch (IOException e)    {
            System.out.println("Exception occured in adding data");
        }
    }
    public final String getColEntry(String tableName, String rowKey,
            String colFamilyName, String colName) {
        result = null;
        try {
            HTable table = new HTable(conf, tableName);
            key = Bytes.toBytes(rowKey);
            getRowObj = new Get(key);
            rowEntries = table.get(getRowObj);
            columnValue = rowEntries.getValue(Bytes.toBytes(colFamilyName),
                    Bytes.toBytes(colName));
            result = Bytes.toString(columnValue);
        } catch (IOException e) {
            System.out.println("Exception occured in retrieving data");
        }
        return result;
    }
}

HBase Connector : 
This class establishes a connection with the hbase database and returns a HBaseConfiguration object.

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
/*
 * Class that establishes a connection with hbase and returns an HBaseConfiguration object                
 */
public class HBaseConnector {
    private HBaseConfiguration conf;
    public final HBaseConfiguration getHBaseConf(final String pathToHBaseXMLFile) {
        conf = new HBaseConfiguration();
        conf.addResource(new Path(pathToHBaseXMLFile));
        return conf;
    }
}

Feeder Spout :
A sample spout which emitting tuples of the form [String, Integer] to be dumped into hbase table.


import java.util.Map;
import java.util.Random;                                             
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class ExclIntegerSpout implements IRichSpout {
    SpoutOutputCollector _collector;
    Random _rand; 
    int count = 0;
    public boolean isDistributed() {
        return true;
    }
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {               
        _collector = collector;
        _rand = new Random();
    }
    @Override
    public void nextTuple() {
        Utils.sleep(1000);
        String[] words = new String[] { "hello", "today", "divine", "optimized", "canon"};
        Integer[] numbers = new Integer[] {
                1,2,3,4,5
        };
        if(count == numbers.length -1) {
            count = 0;
        }
        count ++;
        int number = numbers[count];
        String word = words[count];
        int randomNum = (int) (Math.random()*1000);
        System.out.println("Random Number: " +randomNum);
        System.out.println("ExclSpout emitting : " + number);
        _collector.emit(new Values(word, number));      
        }
    @Override
    public void close() {       
    }
    @Override
    public void ack(Object id) {
    }

     @Override
    public void fail(Object id) {
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "number"));
    }
    @Override
    public void activate() {}
    @Override
    public void deactivate() {}
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}


Here’s a sample topology showcasing the use of the bolt.
Topology implemeting the HBase Bolt :

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
/*
 * Topology that creates an hbase table (if not already created) with a single column family, row key being the timestamp and dumps the stream data into it.
 */
public class HBaseTopology {
    public static void main(String[] args) {
        /*
         * List of column family names of the hbase table
         */
        ArrayList<String> columnFamilyNames = new ArrayList<String>();
        /*
         * List containing list of column names per column family of the hbase table
         */
        ArrayList<ArrayList<String>> columnNamesList  = new ArrayList<ArrayList<String>>();
        /*
         * Temporary list used for populating the @columnNamesList
         */
        ArrayList<String> columnNames = new ArrayList<String>();
        /*
         * rowKey of the hbase table, assigned "timestamp" if none of the column's value but timestamp is to be used as the row key, else the column name
         */
        String rowKey = null;      
        /*
         * hbase table name
         */
        String tableName = "testTable";      
        /*
         * local fs location of the hbase-site.xml file
         */
        String hbaseXmlFileLocation = "/home/abc/hbase-0.20.6/conf/hbase-site.xml";
        TopologyBuilder builder = new TopologyBuilder();

         // add spout to the builder
        builder.setSpout("spout", new ExclIntegerSpout(), 2);
        // populate the column family name and column names list
        columnFamilyNames.add("colFamily1");
        columnNames.add("word");
        columnNames.add("number");
        columnNamesList.add(columnNames);
        columnNames = new ArrayList<String>();
        // set rowKey = "timestamp" if value of none of the columns is to be used as rowKey
        rowKey = "timestamp";
        // if some column's value is to be used as the row key, for e.g. number then use
        / /rowKey = "number";
        // add dumper bolt to the builder
        HBaseDumperBolt dumperBolt = new HBaseDumperBolt(hbaseXmlFileLocation, tableName, rowKey, columnFamilyNames, columnNamesList);
        builder.setBolt("dumperBolt", dumperBolt, 1).shuffleGrouping("spout");
        Config conf = new Config();
        conf.setDebug(true);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("sample-workflow", conf, builder.createTopology());
        Utils.sleep(10000);
        cluster.shutdown();
    }
}




14 comments:

  1. Just wanted to thank for the wonderful posts on Storm. Its neatly written. Many thanks for blogging a few times on this topic.

    ReplyDelete
    Replies
    1. Thanks so much for the acknowledgement Jay .. J

      Delete
  2. Good post. Is there a way to expose Spout as Rest service?

    ReplyDelete
  3. Thanks you for the wonderful post.

    ReplyDelete
  4. how HTable instance handle concurrent request when i need to get random rows from hbase?

    ReplyDelete
  5. Can you tell the required procedure...

    $ bin/storm jar /home/ubuntu/data/stormHbase.jar
    Traceback (most recent call last):
    File "bin/storm", line 455, in
    main()
    File "bin/storm", line 452, in main
    (COMMANDS.get(COMMAND, unknown_command))(*ARGS)
    TypeError: jar() takes at least 2 arguments (1 given)

    ReplyDelete
    Replies
    1. You have to specify the name of the main class you want to run from the "stormHbase.jar" and also any arguments that this class needs to execute.

      Delete
  6. What would the resulting HBase table look like?

    ReplyDelete
    Replies
    1. You'll use the method "createTable(final String tableName, final ArrayList colFamilies) " to specify the structure of the HBase Table. The "colFamilies" argument should contain the column family names with their respective column names. That's how you table would look like.

      Delete
  7. Can you highlight on the process to lookup a Hbase table in the Storm Bolt?

    ReplyDelete
  8. Thanks for your post.Is there a way bulk load into hbase using storm, I mean create hfiles and then bulk load them.I have used Map reduce to do that on the past.

    ReplyDelete
  9. Hi

    While trying this sample, I got the following error:
    table is created successfully in the hbase but topology is not submitting with exception as:

    6315 [SyncThread:0] INFO o.a.s.s.o.a.z.s.ZooKeeperServer - Established session 0x1542e3a3ad6000d with negotiated timeout 20000 for client /127.0.0.1:35076
    6315 [main-SendThread(localhost:2000)] INFO o.a.s.s.o.a.z.ClientCnxn - Session establishment complete on server localhost/127.0.0.1:2000, sessionid = 0x1542e3a3ad6000d, negotiated timeout = 20000
    6315 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED
    6316 [ProcessThread(sid:0 cport:-1):] INFO o.a.s.s.o.a.z.s.PrepRequestProcessor - Got user-level KeeperException when processing sessionid:0x1542e3a3ad6000d type:create cxid:0x2 zxid:0x26 txntype:-1 reqpath:n/a Error Path:/storm/blobstoremaxkeysequencenumber Error:KeeperErrorCode = NoNode for /storm/blobstoremaxkeysequencenumber
    6367 [Curator-Framework-0] INFO o.a.s.s.o.a.c.f.i.CuratorFrameworkImpl - backgroundOperationsLoop exiting
    6368 [ProcessThread(sid:0 cport:-1):] INFO o.a.s.s.o.a.z.s.PrepRequestProcessor - Processed session termination for sessionid: 0x1542e3a3ad6000d
    6379 [main] INFO o.a.s.s.o.a.z.ZooKeeper - Session: 0x1542e3a3ad6000d closed

    ReplyDelete
    Replies
    1. what version of storm you are using? You might want to try storm version below 1.0.0.
      I was facing this.I tried storm below 1.0.0. and it got resolved.

      Delete