Monday, July 2, 2012

Storm-RDBMS Integration

"" Wondering if your Storm topology's output could be dumped into RDBMS ? ""
This blog post would answer this.  Inspite of all the upcoming big data technologies intensively in use, SQL databases are an integral part of many of our applications in some way or the other.
This post presents a Storm Bolt that would dump your stream or a specific set of fields of the stream into SQL table.

Requirements :
All you need is to have setup storm(tested version 0.7.1) locally and the mysql server installed on your linux machine.


RDBMS Dumper Bolt :
The bolt dumps the incoming stream of tuples into the specified SQL table. Initially it expect the table name, database url, database username and the database pwd.

import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
/*
 * Bolt for dumping stream data into RDBMS
 */
public class RDBMSDumperBolt  implements IBasicBolt {
    private static final long serialVersionUID = 1L;
    private static transient RDBMSCommunicator communicator = null;
    private transient RDBMSConnector connector = new RDBMSConnector();
    private transient Connection con = null;
    private String tableName = null;
    private ArrayList<Object> fieldValues = new ArrayList<Object>();
    private Set<String> keySet = new HashSet<String>();
    private List<String> list = null;
    public RDBMSDumperBolt(String tableName, String dBUrl, String username, String password)           throws SQLException {
        super();
        this.tableName = tableName;
        try {
            con = connector.getConnection(dBUrl, username, password);
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        communicator = new RDBMSCommunicator(con);
    }
    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        keySet = null;
        fieldValues = new ArrayList<Object>();
        fieldValues = (ArrayList<Object>) input.getValues();
        keySet = input.keySet();
        list = new ArrayList<String>(keySet);
        try {
            communicator.insertRow(this.tableName, list, fieldValues);
        } catch (SQLException e) {
            System.out.println("Exception occurred in adding a row ");
            e.printStackTrace();
        }
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {}
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
    @Override
    public void prepare(Map stormConf, TopologyContext context) {}
    @Override
    public void cleanup() {}
}

The suppoting classed required for the RDBMS bolt to work are RDBMSCommunicator and RDBMSConnector.
RDBMS Communicator :
This class implements the methods required to communicate with the RDBMS.

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/*
 * This class implements method for inserting data into RDBMS tables
 */
public class RDBMSCommunicator {
    private Connection con = null;
    private PreparedStatement prepstmt = null;
    private String queryStmt = null, queryValues = "";
    private int noOfColumns = 0, result = 0;
    private ResultSet rs = null;
    Map<String, String> tableDetails;
    public RDBMSCommunicator(Connection con) {
        super();
        this.con = con;
    }
    public int insertRow(String tableName, List<String> fieldNames, ArrayList<Object> fieldValues) throws SQLException {
        result = 0;
        try {      
            prepstmt = null;
            queryValues = "";
            noOfColumns = fieldNames.size();          
            queryStmt = "insert into " + tableName + " (";
            for (int i = 0; i <= noOfColumns - 1; i++) {
                if (i != noOfColumns - 1) {
                    queryStmt = queryStmt + fieldNames.get(i) + ", ";
                    queryValues = queryValues + "?,";
                } else {
                    queryStmt = queryStmt + fieldNames.get(i) + ") ";
                    queryValues = queryValues + "?";
                }
            }
            queryStmt = queryStmt + " values (" +  queryValues + ")";
            prepstmt = con.prepareStatement(queryStmt);
            for (int j = 0; j <= noOfColumns - 1; j++) {
                prepstmt.setObject(j + 1, fieldValues.get(j));
            }
            result = prepstmt.executeUpdate();
            if (result != 0) {
                System.out.println("Inserted data successfully ..");
            } else {
                System.out.println("Insertion failed ..");  
            }
        } catch(Exception e) {
            e.printStackTrace();
        }
        return result;
    }
    public Map<String, String> getTableInformation(String tableName) {
        tableDetails = new HashMap<String, String>();
        try {
        String stmt = "select column_name, data_type, character_maximum_length from information_schema.columns where table_name = '" + tableName + "'";
        System.out.println(stmt);
        PreparedStatement prepstmt = null;
        prepstmt = con.prepareStatement(stmt);
        rs = prepstmt.executeQuery();
        while(rs.next()) {
            tableDetails.put(rs.getString("column_name"), rs.getString("data_type"));
        }
        } catch(SQLException e) {
            e.printStackTrace();
        }
        return tableDetails;
    }
}

RDBMS Connector :
This class establishes a connection with RDBMS and returns a Connection object.

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
/*
 * Class that establishes a connection with rdbms and returns an Connection object
 */
public class RDBMSConnector {
    String dbUrl = null;
    String dbClass = null;
    Connection con = null;
    public Connection getConnection(final String sqlDBUrl, final String sqlUser, final String sqlPassword) throws ClassNotFoundException, SQLException {
        dbUrl =  sqlDBUrl + "?user="+ sqlUser +"&password=" + sqlPassword;
        dbClass = "com.mysql.jdbc.Driver";
        Class.forName("com.mysql.jdbc.Driver");
        con = DriverManager.getConnection (dbUrl);
        return con;
    }
}

Feeder Spout :
A sample spout which emitting tuples of the form [String, Integer] to be dumped into hbase table.

import java.util.Map;
import java.util.Random;                                             
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class ExclIntegerSpout implements IRichSpout {
    SpoutOutputCollector _collector;
    Random _rand; 
    int count = 0;
    public boolean isDistributed() {
        return true;
    }
    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {                             
        _collector = collector;
        _rand = new Random();
    }
    @Override
    public void nextTuple() {
        Utils.sleep(1000);
        String[] words = new String[] { "hello", "today", "divine", "optimized", "canon"};
        Integer[] numbers = new Integer[] {
                1,2,3,4,5
        };
        if(count == numbers.length -1) {
            count = 0;
        }
        count ++;
        int number = numbers[count];
        String word = words[count];
        int randomNum = (int) (Math.random()*1000);
        System.out.println("Random Number: " +randomNum);
        System.out.println("ExclSpout emitting : " + number);
        _collector.emit(new Values(word, number));      
        }
    @Override
    public void close() {       
    }
    @Override
    public void ack(Object id) {
    }

     @Override
    public void fail(Object id) {
    }
    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "number"));
    }
    @Override
    public void activate() {}
    @Override
    public void deactivate() {}
    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}


Here’s a sample topology showcasing the use of the bolt.
Topology implemeting the RDBMS Bolt :

import java.sql.SQLException;
import java.util.ArrayList;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
/*
 * Sample Topology using the RDBMSDumperBolt
 * Its important to note that, the rdbms table column names should match the fields of the input stream tuples for the topology to work
 * For eg. the table used below if already created, should have word and number as the columns with resp. data types.
 */
public class RDBMSDumperTopology {
        public static void main(String[] args) throws SQLException {
            ArrayList<String> columnNames = new ArrayList<String>();
            ArrayList<String> columnTypes = new ArrayList<String>();
            String tableName = "testTable";
            // Note: if the rdbms table need not to have a primary key, set the variable 'primaryKey' to 'N/A'
            // else set its value to the name of the tuple field which is to be treated as primary key
            String primaryKey = "N/A";
            String rdbmsUrl = "jdbc:mysql://localhost:3306/testDB" ;
            String rdbmsUserName = "root";
            String rdbmsPassword = "root";
            // add the column names and the respective types in the two arraylists
            columnNames.add("word");
            columnNames.add("number");
            // add the types
            columnTypes.add("varchar (100)");
            columnTypes.add("int");
            TopologyBuilder builder = new TopologyBuilder();
            // set the spout for the topology
            builder.setSpout("spout", new SampleSpout(), 10);
            // dump the stream data into rdbms table      
            RDBMSDumperBolt dumperBolt = new RDBMSDumperBolt(primaryKey, tableName, columnNames, columnTypes, rdbmsUrl, rdbmsUserName, rdbmsPassword);
            builder.setBolt("dumperBolt",dumperBolt, 1).shuffleGrouping("spout");
            Config conf = new Config();
            conf.setDebug(true);
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("rdbms-workflow", conf, builder.createTopology());
            Utils.sleep(10000);
            cluster.shutdown();
        }
 }

You can also find the entire code at :

14 comments:

  1. how to run this topology and whats the procedure ? a litle help will be very useful.
    thanks in advance !!

    ReplyDelete
  2. Setup Storm locally or on a cluster using http://jayatiatblogs.blogspot.in/2011/11/storm-installation.html

    After this if your are running storm locally, run the main class on your local machine, else create a jar containing all these files and deploy it on your cluster

    Refer https://github.com/nathanmarz/storm/wiki/Running-topologies-on-a-production-cluster

    ReplyDelete
  3. Hi Jayati,

    Have you tried using storm to process data from rdbms as a source ?

    ReplyDelete
  4. Hi Jayati,

    I am able to run without issue the rdbms bolt on a local cluster.
    But when I try to run this bolt on a real storm cluster I get the following exception:
    java.lang.RuntimeException: java.lang.NullPointerException at backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:87) at backtype.storm.utils.DisruptorQueue.consumeBatchWhen

    Any idea how to solve this?

    Thanks in advance.

    ReplyDelete
    Replies
    1. Hi Stavros, I had the same problem. The issue here is that in order to run in a distributed cluster, you need to use a Serializable Spout. As the Spout is not serializable and Connection class neither, it is not possible to do it this way. All the variables which are set as transient will be null after you send the Spout to the Storm cluster.
      There are to possible options: one is setting up a connection every time you have to write to the database. This will be enough as long as you don't have heavy usage (for example, 10 times per minute would be acceptable). The other option would be setting up the connection in the prepare statement. If you are able to achieve the second option (which is the best one) please tell me.
      Hope this will help you, although it's a bit late.

      Delete
    2. Hi Maximo,

      I got same problem too.
      I've tried both of your options. But , the error appeared when the Bolt tried to open a connection (at a prepare method or execute method).
      Any idea?

      Delete
  5. With the given information, it would be a bit difficult to suggest a solution. But you could consider, tracing the logs which should be indicating the origin of this error as in the java file in which the error is occurring and the line number ..

    ReplyDelete
  6. Jayati, would you be interested in trying scaledb as the engine behind MySQL in order to ingest the data much faster (3M inserts/second)? If so I'm at mike [at] scaledb [dot] com

    ReplyDelete
    Replies
    1. Sure Mike, 3M inserts/sec sounds very exciting. I would contact you.

      Delete
  7. Hi Jayati,

    have tried a simple program with storm, for inserting data into vertica tables.

    but facing the below error,

    ERROR org.apache.zookeeper.server.NIOServerCnxnFactory - Thread Thread[main,5,main] died
    java.lang.RuntimeException: java.io.NotSerializableException: com.vertica.jdbc.VerticaConnectionImpl
    at backtype.storm.utils.Utils.serialize(Utils.java:81) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at backtype.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:106) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    at com.wordcount.example.HelloStorm.main(HelloStorm.java:21) ~[bin/:na]
    Caused by: java.io.NotSerializableException: com.vertica.jdbc.VerticaConnectionImpl
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) ~[na:1.7.0_51]
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[na:1.7.0_51]
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[na:1.7.0_51]
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[na:1.7.0_51]
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[na:1.7.0_51]
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) ~[na:1.7.0_51]
    at backtype.storm.utils.Utils.serialize(Utils.java:77) ~[storm-core-0.9.2-incubating.jar:0.9.2-incubating]
    ... 2 common frames omitted

    com.vertica.jdbc.VerticaConnectionImpl is from VerticaJDBC driver.

    is there any other way to avoid this problem and proceed with out erros.

    ReplyDelete
  8. Need to check row value before inserting into table.does it can be done in storm???

    ReplyDelete
  9. This comment has been removed by the author.

    ReplyDelete
  10. hey Jayati,
    Is storm integrate with MYSQL using php ?
    If yes could you please provide the link or tutorial for this.

    Thanks
    Deepak Chattha

    ReplyDelete