Wednesday, November 21, 2012

Setting up a Zookeeper Cluster

ZooKeeper is a distributed, open-source and high-performance coordination service for distributed applications. Setting up a running Zookeeper cluster is a prerequisite to kick off many installations.
This post would direct you on how to setup a Zookeeper Cluster.

 

Prerequisites

Java JDK installed on all nodes of the cluster.

 

Setting up the cluster

The following installation steps have been tested for 10.04, 10.10, 11.04, 12.04 versions of Ubuntu.

1. Obtain the zookeeper setup at some location. Setup can be downloaded from :
http://hadoop.apache.org/zookeeper/releases.html

2. Create a file with any name (eg. zoo.cfg) in the conf folder of the copied setup and write in it


dataDir=/var/zookeeper/                                                                   
clientPort=2181
initLimit=5
syncLimit=2
server.server1=zoo1:2888:3888                               
server.server2=zoo2:2888:3888
server.server3=zoo3:2888:3888                                                         

Here 2888 and 3888 ports cannot be modified but the server.id(server.server1) and the zkServerName(zoo1) can be changed by the user. Using the above entries as sample entries, next it is required that a file named “myid” be created in the path specified in dataDir which contains just one entry which is of the server id. So the first system of the cluster would have a file named "myid" created at the path specified in dataDir containing server1 and so on i.e.
To make it more clear, if we are using 3 systems with IP 192.192.192.191, 192, 193
and zoo1 would designate 192.192.192.191, zoo2 would designate 192.192.192.192, zoo3 would designate 192.192.192.193
then
the machine 192.192.192.191 should contain a file called myid at /var/zookeeper/ (or the value of dataDir specified in zoo.cfg) containing the following entry
server1
Similarly machines 192.192.192.192 and 192.192.192.193 should have entries server2 and server3 respectively.

3. Update the /etc/hosts file on each machine to add the host names being used in the zookeeper configuration. This is needed so as to make it understandable that zoo1, zoo2 and zoo3 refer to which systems.
Post-updation the /etc/hosts file on each system in the cluster would have a similar set of entries like :

192.192.192.191   zoo1
192.192.192.192   zoo2                                                                     
192.192.192.193   zoo3

4. This completes the configuration part, next cd to the zookeeper home and start the cluster by running

bin/zkServer.sh start                                                                          
command on each system.
And you have a running Zookeeper cluster at your disposal. 
Good Luck !!!

Monday, November 19, 2012

Building Java Action in Oozie


Java applications is one amongst the list of jobs that can be run as a part of the Oozie workflow. Here, I focus on how to create an oozie workflow that executes a java action. The oozie Java application folder has three components :

  1. lib folder
  2. workflow.xml file
  3. job.properties file

We shall take them one by one :

Steps to create the 'lib' folder :

The lib folder should consist of all the jars/files required to compile your java class and another jar that we would be creating here.
  1. To start with place your .java file in a directory structure mapping the package it belongs to,(For eg. place Fetch.java in /com/jayati/sampleapp/Fetch.java, if Fetch.java belongs to package com.jayati.sampleapp;) and compile it
  2. Create a folder with the desired application name (assuming appName). And create a lib folder in it. Copy the directory structure created in the step 1 to appName/lib. Remove the .java file from it. So now we have /appName/lib/com/jayati/sampleapp/Fetch.class
  3. Place all the jars/files that were required to compile your java class in the lib folder parallel to the com folder.
Workflow.xml

The workflow.xml defines a sequence of actions that would be executed in the workflow. In this example, we have just one java action to be executed.
In case of a java action, we need to specify the job tracker, name node java class name. Assuming the action name as 'java-node', the .xml file would look like :


<workflow-app xmlns="uri:oozie:workflow:0.1" name="appName-wf">
<start to="java-node"/>
<action name="java-node">
<java>
<job-tracker>localhost:9001</job-tracker>
<name-node>hdfs://localhost:9000</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>default</value>
</property>
</configuration>
<main-class>com.jayati.sampleapp.Fetch</main-class>
<java-opts>-Denv=stg -DPP=DB_PASSPHRASE</java-opts>
</java>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Java failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app>

You'll need to replace the jobTracker and nameNode port numbers in case they differ as per your hadoop configuration and then place this xml in appName/ folder.

job.properties

This file lists down values of all the variables used in workflow.xml such as jobTracker, nameNode etc. but since we have used direct values, our job.properties would consist of a single line of content and would look like :


oozie.wf.application.path=hdfs://localhost:9000/hadoopfs_path/appName            

where hadoopfs_path is the path of the folder in hdfs where this application folder would be placed. Copy the above file to appName/
This finishes the building of a workflow containing one java action and to run this application on oozie you can refer one of my previous blogs, "Try On Oozie".


Monday, July 2, 2012

Storm-RDBMS Integration

"" Wondering if your Storm topology's output could be dumped into RDBMS ? ""
This blog post would answer this.  Inspite of all the upcoming big data technologies intensively in use, SQL databases are an integral part of many of our applications in some way or the other.
This post presents a Storm Bolt that would dump your stream or a specific set of fields of the stream into SQL table.

Requirements :
All you need is to have setup storm(tested version 0.7.1) locally and the mysql server installed on your linux machine.


RDBMS Dumper Bolt :
The bolt dumps the incoming stream of tuples into the specified SQL table. Initially it expect the table name, database url, database username and the database pwd.

import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
/*
 * Bolt for dumping stream data into RDBMS
 */
public class RDBMSDumperBolt  implements IBasicBolt {
    private static final long serialVersionUID = 1L;
    private static transient RDBMSCommunicator communicator = null;
    private transient RDBMSConnector connector = new RDBMSConnector();
    private transient Connection con = null;
    private String tableName = null;
    private ArrayList<Object> fieldValues = new ArrayList<Object>();
    private Set<String> keySet = new HashSet<String>();
    private List<String> list = null;
    public RDBMSDumperBolt(String tableName, String dBUrl, String username, String password)           throws SQLException {
        super();
        this.tableName = tableName;
        try {
            con = connector.getConnection(dBUrl, username, password);
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        communicator = new RDBMSCommunicator(con);
    }
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        keySet = null;
        fieldValues = new ArrayList<Object>();
        fieldValues = (ArrayList<Object>) input.getValues();
        keySet = input.keySet();
        list = new ArrayList<String>(keySet);
        try {
            communicator.insertRow(this.tableName, list, fieldValues);
        } catch (SQLException e) {
            System.out.println("Exception occurred in adding a row ");
            e.printStackTrace();
        }
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
    @Override
    public void prepare(Map stormConf, TopologyContext context) {}
    @Override
    public void cleanup() {}
}

The suppoting classed required for the RDBMS bolt to work are RDBMSCommunicator and RDBMSConnector.
RDBMS Communicator :
This class implements the methods required to communicate with the RDBMS.

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/*
 * This class implements method for inserting data into RDBMS tables
 */
public class RDBMSCommunicator {
    private Connection con = null;
    private PreparedStatement prepstmt = null;
    private String queryStmt = null, queryValues = "";
    private int noOfColumns = 0, result = 0;
    private ResultSet rs = null;
    Map<String, String> tableDetails;
    public RDBMSCommunicator(Connection con) {
        super();
        this.con = con;
    }
    public int insertRow(String tableName, List<String> fieldNames, ArrayList<Object> fieldValues) throws SQLException {
        result = 0;
        try {      
            prepstmt = null;
            queryValues = "";
            noOfColumns = fieldNames.size();          
            queryStmt = "insert into " + tableName + " (";
            for (int i = 0; i <= noOfColumns - 1; i++) {
                if (i != noOfColumns - 1) {
                    queryStmt = queryStmt + fieldNames.get(i) + ", ";
                    queryValues = queryValues + "?,";
                } else {
                    queryStmt = queryStmt + fieldNames.get(i) + ") ";
                    queryValues = queryValues + "?";
                }
            }
            queryStmt = queryStmt + " values (" +  queryValues + ")";
            prepstmt = con.prepareStatement(queryStmt);
            for (int j = 0; j <= noOfColumns - 1; j++) {
                prepstmt.setObject(j + 1, fieldValues.get(j));
            }
            result = prepstmt.executeUpdate();
            if (result != 0) {
                System.out.println("Inserted data successfully ..");
            } else {
                System.out.println("Insertion failed ..");  
            }
        } catch(Exception e) {
            e.printStackTrace();
        }
        return result;
    }
    public Map<String, String> getTableInformation(String tableName) {
        tableDetails = new HashMap<String, String>();
        try {
        String stmt = "select column_name, data_type, character_maximum_length from information_schema.columns where table_name = '" + tableName + "'";
        System.out.println(stmt);
        PreparedStatement prepstmt = null;
        prepstmt = con.prepareStatement(stmt);
        rs = prepstmt.executeQuery();
        while(rs.next()) {
            tableDetails.put(rs.getString("column_name"), rs.getString("data_type"));
        }
        } catch(SQLException e) {
            e.printStackTrace();
        }
        return tableDetails;
    }
}

RDBMS Connector :
This class establishes a connection with RDBMS and returns a Connection object.

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
/*
 * Class that establishes a connection with rdbms and returns an Connection object
 */
public class RDBMSConnector {
    String dbUrl = null;
    String dbClass = null;
    Connection con = null;
    public Connection getConnection(final String sqlDBUrl, final String sqlUser, final String sqlPassword) throws ClassNotFoundException, SQLException {
        dbUrl =  sqlDBUrl + "?user="+ sqlUser +"&password=" + sqlPassword;
        dbClass = "com.mysql.jdbc.Driver";
        Class.forName("com.mysql.jdbc.Driver");
        con = DriverManager.getConnection (dbUrl);
        return con;
    }
}

Feeder Spout :
A sample spout which emitting tuples of the form [String, Integer] to be dumped into hbase table.

import java.util.Map;
import java.util.Random;                                             
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class ExclIntegerSpout implements IRichSpout {
    SpoutOutputCollector _collector;
    Random _rand; 
    int count = 0;
    public boolean isDistributed() {
        return true;
    }
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {                             
        _collector = collector;
        _rand = new Random();
    }
    @Override
    public void nextTuple() {
        Utils.sleep(1000);
        String[] words = new String[] { "hello", "today", "divine", "optimized", "canon"};
        Integer[] numbers = new Integer[] {
                1,2,3,4,5
        };
        if(count == numbers.length -1) {
            count = 0;
        }
        count ++;
        int number = numbers[count];
        String word = words[count];
        int randomNum = (int) (Math.random()*1000);
        System.out.println("Random Number: " +randomNum);
        System.out.println("ExclSpout emitting : " + number);
        _collector.emit(new Values(word, number));      
        }
    @Override
    public void close() {       
    }
    @Override
    public void ack(Object id) {
    }

     @Override
    public void fail(Object id) {
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "number"));
    }
    @Override
    public void activate() {}
    @Override
    public void deactivate() {}
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}


Here’s a sample topology showcasing the use of the bolt.
Topology implemeting the RDBMS Bolt :

import java.sql.SQLException;
import java.util.ArrayList;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
/*
 * Sample Topology using the RDBMSDumperBolt
 * Its important to note that, the rdbms table column names should match the fields of the input stream tuples for the topology to work
 * For eg. the table used below if already created, should have word and number as the columns with resp. data types.
 */
public class RDBMSDumperTopology {
        public static void main(String[] args) throws SQLException {
            ArrayList<String> columnNames = new ArrayList<String>();
            ArrayList<String> columnTypes = new ArrayList<String>();
            String tableName = "testTable";
            // Note: if the rdbms table need not to have a primary key, set the variable 'primaryKey' to 'N/A'
            // else set its value to the name of the tuple field which is to be treated as primary key
            String primaryKey = "N/A";
            String rdbmsUrl = "jdbc:mysql://localhost:3306/testDB" ;
            String rdbmsUserName = "root";
            String rdbmsPassword = "root";
            // add the column names and the respective types in the two arraylists
            columnNames.add("word");
            columnNames.add("number");
            // add the types
            columnTypes.add("varchar (100)");
            columnTypes.add("int");
            TopologyBuilder builder = new TopologyBuilder();
            // set the spout for the topology
            builder.setSpout("spout", new SampleSpout(), 10);
            // dump the stream data into rdbms table      
            RDBMSDumperBolt dumperBolt = new RDBMSDumperBolt(primaryKey, tableName, columnNames, columnTypes, rdbmsUrl, rdbmsUserName, rdbmsPassword);
            builder.setBolt("dumperBolt",dumperBolt, 1).shuffleGrouping("spout");
            Config conf = new Config();
            conf.setDebug(true);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("rdbms-workflow", conf, builder.createTopology());
            Utils.sleep(10000);
            cluster.shutdown();
        }
 }

You can also find the entire code at :

Storm-HBase Integration : Pour the Data Stream into HBase

Storm is proclaimed to be the “Hadoop of Realtime Processing” and it’s quite justified to be calling it so. Hadoop is solely meant for batch processing of enormous amount of data and on the other end Storm is built for processing realtime data. Where Hadoop uses Map Reduce jobs for the batch processing of large sets of distributed data, Storm runs topologies which are a chain of heterogeneous Storm elements (spouts and bolts)  for processing the realtime data stream.

Hadoop utilizes HBase for storing this large amount of data in a somewhat structured way. And to what I realize, it is essential for Storm to have an equally potential and high capacity storage where the running topology can pour in the processed data.

Why not HBase be this “potential and high capacity storage” then?

Though I don’t claim to have all the knowledge in the universe about all these complex systems, but Yes !!! HBase can definitely be the pick mainly due to giant storage capacity it possesses.
In an attempt to justify my big “Yes !!! “ I tried out this integration of Storm and HBase the outcome of which is a Storm Bolt that can dump the stream into HBase with just a few obvious inputs.


Requirements :
  • Setup Hadoop locally on your machine (tested version 0.20.2-cdh3u2)
  • Setup HBase locally on your  machine (tested version 0.90.4-cdh3u2)
  • Setup Storm locally on  your machine (tested version 0.7.1)

Following is the source code of the Bolt that delivers the stream data into HBase

HBase Dumper Bolt :
This bolt expects the hbase table name, list of column family names and respective column names, and a variable "rowKeyCheck" which is supposed to be set to the column name whose value is the be used as the row key and if timestamp is to be used the "rowKeyCheck" should be "timestamp".
Every time it receives a tuple, it creates a list of values accordingly and calls a method of HBaseCommunicator class which dumps it into the hbase table.
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Date;
import java.util.Map;
import org.apache.hadoop.hbase.HBaseConfiguration;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
/*
 * Bolt for dumping stream data into hbase
 */
public class HBaseDumperBolt implements IBasicBolt {
    private static final long serialVersionUID = 1L;
    private static transient HBaseConnector connector = null;
    private static transient HBaseConfiguration conf = null;
    private static transient HBaseCommunicator communicator = null;
    private OutputCollector _collector;
    private Date today = null;
    private Timestamp timestamp = null;
    private ArrayList<String> colFamilyNames = new ArrayList<String>();
    private ArrayList<ArrayList<String>>  colNames = new ArrayList<ArrayList<String>>();
    private ArrayList<ArrayList<String>> colValues = new ArrayList<ArrayList<String>>();
    private ArrayList<String> colFamilyValues = new ArrayList<String>();
    private String rowKeyCheck = null, rowKey = null, fieldValue = null, tableName = null;;
    private static int counter = 0;   
    private long time;

    public HBaseDumperBolt(final String hbaseXmlLocation, final String tableName, final String rowKeyCheck, final ArrayList<String> colFamilyNames, final ArrayList<ArrayList<String>> colNames) {
        this.tableName = tableName;
        this.colFamilyNames = colFamilyNames;
        this.colNames = colNames;
        this.rowKeyCheck = rowKeyCheck;
        connector = new HBaseConnector();
        conf = connector.getHBaseConf(hbaseXmlLocation);
        communicator = new HBaseCommunicator(conf);
        //check if tableName already exists
        if (colFamilyNames.size() == colNames.size()) {
            if (!communicator.tableExists(tableName)) {
                communicator.createTable(tableName, colFamilyNames);
            }
        }
    }
    public void execute(Tuple tuple, BasicOutputCollector collector) {
        counter = 0;
        rowKey = null;
        colValues = new ArrayList<ArrayList<String>>();
        if (colFamilyNames.size() == 1) {
            for (int j = 0; j < colNames.get(0).size(); j++) {
                fieldValue = tuple.getValue(j).toString();
                if (rowKeyCheck.equals(colNames.get(0).get(j))) {
                    rowKey = fieldValue;
                }
                colFamilyValues.add(fieldValue);
            }
            colValues.add(colFamilyValues);
        } else {
            for (int i = 0; i < colFamilyNames.size(); i++) {
                for (int j = 0; j < colNames.get(i).size(); j++) {
                    fieldValue = tuple.getValue(counter).toString();
                    if (rowKeyCheck.equals(colNames.get(i).get(j))) {
                        rowKey = fieldValue;
                    }
                    colFamilyValues.add(fieldValue);
                    counter++;
                }
                colValues.add(colFamilyValues);
                colFamilyValues = new ArrayList<String>();
            }
        }
        if (rowKeyCheck.equals("timestamp") && rowKey == null) {
            today = new Date();
            timestamp = new Timestamp(today.getTime());
            time = timestamp.getTime();
            rowKey = String.valueOf(time);
        }
        communicator.addRow(rowKey, tableName, colFamilyNames, colNames, colValues);
    }
    public void prepare(Map confMap, TopologyContext context,
            OutputCollector collector) {
        _collector = collector;
    }
    public void cleanup() {
    }
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
    }
    @Override
    public Map<String, Object> getComponentConfiguration() {
        Map<String, Object> map = null;
        return map;
    }
    @Override
    public void prepare(Map stormConf, TopologyContext context) {
    }
}

The supporting classes required for the bolt are HBaseCommunicator.java and HBaseConnector.java.

HBase Communicator :
This class defines all the functions needed to communicate with hbase database.

import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
/*
 * This class implements methods for checking an hbase table's existence, creating an hbase table and inserting a row into the hbase table
 */
public class HBaseCommunicator {
    private static HBaseConfiguration conf;
    private HTable table = null;
    private Put putdata = null;
    private HBaseAdmin admin = null;
    private String colFamilyName = null, colValue = null, result = null;
    private byte[] rowKeyBytes = null, key = null, columnValue = null;
    private Get getRowObj = null;
    private Result rowEntries = null;               
    public HBaseCommunicator(final HBaseConfiguration conf)    {
        this.conf = conf;
    }
    // check if the table exists
    public final boolean tableExists(final String tableName) {
        try {
            admin = new HBaseAdmin(conf);
            if (admin.tableExists(tableName)) {
                return true;
            }
        } catch (Exception e) {
            System.out.println("Exception occured while checking table's existence");
            e.printStackTrace();
        }
        return false;
    }
   // creates a table
    public final void createTable(final String tableName, final ArrayList<String> colFamilies) {
        try {
            HBaseAdmin hbase = new HBaseAdmin(conf);
            HTableDescriptor desc = new HTableDescriptor(tableName);
            for(int i = 0; i < colFamilies.size(); i++) {
                HColumnDescriptor meta = new HColumnDescriptor(colFamilies.get(i).getBytes());
                desc.addFamily(meta);
            }
            hbase.createTable(desc);
        } catch (Exception e) {
            System.out.println("Exception occured creating table in hbase");
            e.printStackTrace();
        }
    }
 // add row to a table
    public final void addRow(final String rowKey, final String tableName, final ArrayList<String> colFamilies, final ArrayList<ArrayList<String>> colNames, final ArrayList<ArrayList<String>> data) {
        try    {
            colFamilyName = null;
            rowKeyBytes = null;
            putdata = null;
            table = new HTable(conf, tableName);
            //rowKey = "row" + (int)(Math.random() * 1000);
            rowKeyBytes = Bytes.toBytes(rowKey);
            putdata = new Put(rowKeyBytes);
            for (int i = 0 ; i < colFamilies.size(); i++) {
                colFamilyName = colFamilies.get(i);
                if (colNames.get(i).size() == data.get(i).size())
                {
                    for (int j = 0 ; j < colNames.get(i).size(); j++) {
                        colValue = data.get(i).get(j);
                        if (colValue.equals(null))
                            colValue = "null";
                        putdata.add(Bytes.toBytes(colFamilyName), Bytes.toBytes(colNames.get(i).get(j)),
                                Bytes.toBytes(colValue));
                    }
                    table.put(putdata);
                }
            }
        } catch (IOException e)    {
            System.out.println("Exception occured in adding data");
        }
    }
    public final String getColEntry(String tableName, String rowKey,
            String colFamilyName, String colName) {
        result = null;
        try {
            HTable table = new HTable(conf, tableName);
            key = Bytes.toBytes(rowKey);
            getRowObj = new Get(key);
            rowEntries = table.get(getRowObj);
            columnValue = rowEntries.getValue(Bytes.toBytes(colFamilyName),
                    Bytes.toBytes(colName));
            result = Bytes.toString(columnValue);
        } catch (IOException e) {
            System.out.println("Exception occured in retrieving data");
        }
        return result;
    }
}

HBase Connector : 
This class establishes a connection with the hbase database and returns a HBaseConfiguration object.

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
/*
 * Class that establishes a connection with hbase and returns an HBaseConfiguration object                
 */
public class HBaseConnector {
    private HBaseConfiguration conf;
    public final HBaseConfiguration getHBaseConf(final String pathToHBaseXMLFile) {
        conf = new HBaseConfiguration();
        conf.addResource(new Path(pathToHBaseXMLFile));
        return conf;
    }
}

Feeder Spout :
A sample spout which emitting tuples of the form [String, Integer] to be dumped into hbase table.


import java.util.Map;
import java.util.Random;                                             
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class ExclIntegerSpout implements IRichSpout {
    SpoutOutputCollector _collector;
    Random _rand; 
    int count = 0;
    public boolean isDistributed() {
        return true;
    }
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {               
        _collector = collector;
        _rand = new Random();
    }
    @Override
    public void nextTuple() {
        Utils.sleep(1000);
        String[] words = new String[] { "hello", "today", "divine", "optimized", "canon"};
        Integer[] numbers = new Integer[] {
                1,2,3,4,5
        };
        if(count == numbers.length -1) {
            count = 0;
        }
        count ++;
        int number = numbers[count];
        String word = words[count];
        int randomNum = (int) (Math.random()*1000);
        System.out.println("Random Number: " +randomNum);
        System.out.println("ExclSpout emitting : " + number);
        _collector.emit(new Values(word, number));      
        }
    @Override
    public void close() {       
    }
    @Override
    public void ack(Object id) {
    }

     @Override
    public void fail(Object id) {
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "number"));
    }
    @Override
    public void activate() {}
    @Override
    public void deactivate() {}
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}


Here’s a sample topology showcasing the use of the bolt.
Topology implemeting the HBase Bolt :

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
/*
 * Topology that creates an hbase table (if not already created) with a single column family, row key being the timestamp and dumps the stream data into it.
 */
public class HBaseTopology {
    public static void main(String[] args) {
        /*
         * List of column family names of the hbase table
         */
        ArrayList<String> columnFamilyNames = new ArrayList<String>();
        /*
         * List containing list of column names per column family of the hbase table
         */
        ArrayList<ArrayList<String>> columnNamesList  = new ArrayList<ArrayList<String>>();
        /*
         * Temporary list used for populating the @columnNamesList
         */
        ArrayList<String> columnNames = new ArrayList<String>();
        /*
         * rowKey of the hbase table, assigned "timestamp" if none of the column's value but timestamp is to be used as the row key, else the column name
         */
        String rowKey = null;      
        /*
         * hbase table name
         */
        String tableName = "testTable";      
        /*
         * local fs location of the hbase-site.xml file
         */
        String hbaseXmlFileLocation = "/home/abc/hbase-0.20.6/conf/hbase-site.xml";
        TopologyBuilder builder = new TopologyBuilder();

         // add spout to the builder
        builder.setSpout("spout", new ExclIntegerSpout(), 2);
        // populate the column family name and column names list
        columnFamilyNames.add("colFamily1");
        columnNames.add("word");
        columnNames.add("number");
        columnNamesList.add(columnNames);
        columnNames = new ArrayList<String>();
        // set rowKey = "timestamp" if value of none of the columns is to be used as rowKey
        rowKey = "timestamp";
        // if some column's value is to be used as the row key, for e.g. number then use
        / /rowKey = "number";
        // add dumper bolt to the builder
        HBaseDumperBolt dumperBolt = new HBaseDumperBolt(hbaseXmlFileLocation, tableName, rowKey, columnFamilyNames, columnNamesList);
        builder.setBolt("dumperBolt", dumperBolt, 1).shuffleGrouping("spout");
        Config conf = new Config();
        conf.setDebug(true);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("sample-workflow", conf, builder.createTopology());
        Utils.sleep(10000);
        cluster.shutdown();
    }
}