Showing posts with label Storm. Show all posts
Showing posts with label Storm. Show all posts

Saturday, July 25, 2015

Query Storm Data Streams using Esper

As you might already be aware and as documented on the web, “Esper is a component for complex event processing (CEP) which enables rapid development of applications that process large volumes of incoming messages or events, regardless of whether incoming messages are historical or real-time in nature.” Since integration of Storm and Esper was visibly very beneficial for complex processing of tuples incoming in the real-time stream in Storm Bolts, a specialized bolt called the “Esper Bolt” is available.

Esper Bolt enables you to run SQL like queries on your real-time data in storm. Since Esper Bolt is already available, you would not need to write a bolt as described above, but only use it while defining the main Topology class. Below is an illustration of the Topology with a SampleSpout emitting tuples containing 3 fields, namely "deviceId", "deviceTemp" and "time". These field names are listed in the Spout class in a method called declareOutputFields() as below:

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("deviceId", "deviceTemp", "time"));
}                                                                                                                                                                                                                                           

This method is one of the methods to be overridden in both spouts and bolts, which declares names of all the fields that would be emitted per tuple.

Moving forward with the Topology definition, below is the Esper Bolt declaring a statement (query) that runs over every tuple in the incoming stream and emits "deviceId" (id of the device) and "deviceTemp” (temperature of the device)  if the value of the incoming tuple field “deviceTemp” lies between 2000.00 and 3000.00

{
TopologyBuilder builder = new TopologyBuilder();

// add the spout
builder.setSpout("deviceDataAbsorber", new SampleSpout(), 1);

        // define the Esper bolt
EsperBolt deviceTrackerBolt = new EsperBolt.Builder()
.inputs().aliasComponent("deviceDataAbsorber").withFields("deviceId", "deviceTemp", "time").ofTypes(String.class, Float.class, String.class).toEventType("Evaluate")
.outputs().onDefaultStream().emit("deviceId", "deviceTemp")
.statements().add("select evaluate.deviceId as deviceId, evaluate.deviceTemp as deviceTemp from Evaluate.win:time_batch(1 sec) as evaluate where evaluate.deviceTemp <= 3000.00f and evaluate.deviceTemp >= 2000.00f")
.build();
}

The topology can have any number of Esper Bolts or user-defined bolts feeding upon the output of the Esper Bolt. You can define similar query statements depending upon your problem statement.

Hope it helped!

Writing your first Storm Topology

This blog contains multiple posts on Storm, its installation, shell scripts for installation of a Storm cluster and integration of Storm with HBase and RDBMS. But if you're a newbie to Storm this one's for you. Here you can find how to write a simple topology in Storm just like the "Hello World" in Java. So let's get rolling!

Logical Components of a Storm Topology 


The task submitted to a Storm cluster is called a Topology. It is a workflow of linked individual and dependent processing units of work that compositely define the task and the flow in which it is to be executed. These “individual and dependent processing units” can be of the following two types:
  1. Spout: This component of the topology is responsible for fetching the data stream from the source and forwarding the stream data after filtering or pre-processing (if required) to the bolts where the actual processing of this data is supposed to occur. There can be multiple Spouts in a topology each of which might be consuming data from different sources.
  2. Bolt: As mentioned above, the bolts comprise of the computational part of the topology. The “streams” are typically an unbounded sequence of tuples. The data stream composing of smaller units of data each of which is called a “tuple”, arrive at the bolt, where they get processed. A topology is not restricted to a workflow composing just a single spout and a single bolt. It can be a chain of multiple bolts consuming the tuples generated by spouts or by preceding bolts in the workflow as shown below:

A simplistic view of a Storm Topology and its components


This post would walk you through basics of writing Storm topologies and their components in Java.

Creating Spouts


Although Bolts are the processing units where the actual computation occurs but Spouts are the connecting links between the Storm Topology and the external sources of data. For every different source (for example, Kafka, HBase, MongoDB, Cassandra etc.) of data, we need to have specialized Spouts which can connect to the respective source and start consuming data to be able to emit a stream of tuples.
The bolts are simpler to implement since they include the core processing logic very much as we would have in a simple java program and are also abstract from the source of the data.

Any spout in Java is a class implementing the IRichSpout interface. Out of the various overridden methods, following two are of prime focus for us:

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
// Called only once, when the spout is initialized
/ Initialization stuff including variables initialization, creation of log files, creating connection to remote servers if required for example to a Kafka Broker, HBase Master etc.
}

public void nextTuple() {
// called iteratively until the topology is not killed or stopped.
// with every call to this method, a tuple is emitted using a method called “emit” defined in the SpoutOutputCollector class. So, it includes code that defines what value of tuple to emit.
}

To clarify more, here is an example of a simple spout that does not connect to any external source but keeps emitting strings as tuples chosen randomly from a list.

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
_rand = new Random();
}

@Override
public void nextTuple() {
Utils.sleep(1000);
String[] words = new String[] {"hello", "india", "how", "are", "you"};                                                                                            
Integer[] numbers = new Integer[] {
11, 22, 33, 44, 55
};

if(count == numbers.length -1) {
count = 0;
}
count ++;
int number = numbers[count];
String word = words[count];
_collector.emit(new Values(word, number));
}

If you need to connect to some external source to obtain you stream data, you would need to define the code for connecting to it in the open() method and then keep fetching the data from this source in the nextTuple() method to emit it further.

Creating Bolts


A bolt is a Java class implementing the interface “IBasicBolt”. Just as we have two methods to focus in case of Spout amongst the set of overridden methods, we have two methods in case of Bolt as well which are as follows:

public void prepare(Map stormConf, TopologyContext context) {
// similar to the open() method of a Spout.
// Called only once, when the bolt is initialized and contains all the initialization stuff.
}

public void execute(Tuple input, BasicOutputCollector collector) {
// called iteratively until the topology is not killed or stopped.
// every time a tuple is received this method is called, and the tuple gets processed as per the definition of this method
}

Continuing with the sample spout we saw above which emits strings from a give list, here we would define a bolt that would append an “!!!” to the received string as a part of the tuple. Here’s the java code for the two methods:

public void prepare(Map stormConf, TopologyContext context) {
//create an output log file where the output results would be logged
try {
String logFileName = logFileLocation;
// “file” and “outputFile” have already been declared as class variables.                                                                                                                                                                    
file = new FileWriter(logFileName);
outputFile = new PrintWriter(file);
outputFile.println("In the prepare() method of bolt");
} catch (IOException e) {
System.out.println(“An exception has occurred”);
e.printStackTrace();
}
}

public void execute(Tuple input, BasicOutputCollector collector) {
// get the string that needs to be processed from the tuple
String inputMsg = input.getString(0);
inputMsg = inputMsg + “!!!”
outputFile.println("Received the message:" +  inputMsg);
outputFile.flush();
        collector.emit(tuple(inputMsg));
}

I am sure you must have already made out that, the simple string processing can be replaced by any complex computations that are required as per your problem statement.

Creating Topologies


After you are done with building the blocks of your topology, you also need to link them to form a workflow, which would define a sequence in which they would get executed, precisely you need to create a topology.

A java class that creates an instance of all the spouts and bolt classes and links them together using a TopologyBuilder class object is what we need to achieve here. Have a look at the code snippet below illustrating how to create a topology with the above spout and bolt as a part of it:

public static void main(String[] args) {

// Number of workers to be used for the topology
int numberOfWorkers = 2;

// Number of executors to be used for the spout
int numberOfExecutorsSpout = 1;

// Number of executors to be used for the bolt
int numberOfExecutorsBolt = 1;

// IP of the Storm cluster node on which Nimbus runs
String nimbusHost = "your_nimbus_host_ip";

TopologyBuilder builder = new TopologyBuilder();
Config conf = new Config();

//set the spout for the topology
builder.setSpout("spout",new TestSpout(false), numberOfExecutorsSpout);

//set the bolt for the topology
builder.setBolt("bolt",new TestBolt(), numberOfExecutorsBolt).shuffleGrouping("spout");

// job configuration for remote Storm cluster starts
conf.setNumWorkers(numberOfWorkers);
conf.put(Config.NIMBUS_HOST, nimbusHost);
conf.put(Config.NIMBUS_THRIFT_PORT, 6627L);

// job configuration for a remote Storm cluster
try {
StormSubmitter.submitTopology("test_topology", conf, builder.createTopology());
} catch (AlreadyAliveException e) {
System.out.println("Topology with the Same name is already running on the cluster.");                                                                                          
e.printStackTrace();
} catch (InvalidTopologyException e) {
System.out.println("Topology seems to be invalid.");
e.printStackTrace();
}
}

The above code would launch a topology with name “test_topology” on the Storm cluster.

Hope it helped. Cheers !

Friday, June 26, 2015

Start-up script for an installed Apache Storm Cluster

If you have installed a Storm cluster using my shell scripts in the previous blogs or even otherwise, this script will save you from manually visiting each node and starting the appropriate service(nimbus/supervisor/ui) there. All you have to do is grab a remote machine and run the script. The script will ask for the required information and your cluster would be up. Also, the script should work equally fine for both Ubuntu and CentOS.


#!/bin/bash

# Directory Name
echo "Enter the relative path to the storm setup on the machines (For example, /opt/stormSetup/storm-0.9.0-wip4):"
read -e stormDir

# Read usernames and ips of all the storm cluster nodes
echo "Enter the IP of nimbus machine :"
read -e stormNimbus;
clusterMachineIPs[0]=$stormNimbus

echo "Enter the username of nimbus :"
read -e usernameNimbus
clusterMachineUsernames[0]=$usernameNimbus

# Read supervisor
echo "Enter the number of supervisor machines";
read -e n;
for ((  i = 1 ;  i <= n;  i++  ))
do
echo "Enter the IP of storm supervisor machine $i:"
read -e stormSupervisor;
clusterMachineIPs[i]=$stormSupervisor
echo "Enter the username of machine ${clusterMachineIPs[i]}"
read -e username
clusterMachineUsernames[i]=$username
done

sshpass -p root ssh -o StrictHostKeyChecking=no  $usernameNimbus@$stormNimbus $stormDir/bin/storm nimbus&

# Start the supervisor nodes
for ((  i = 1 ;  i <= n;  i++  ))
do
sshpass -p root ssh -o StrictHostKeyChecking=no  ${clusterMachineUsernames[i]}@${clusterMachineIPs[i]} $stormDir/bin/storm supervisor&
done

# Start the UI on the nimbus machine
sshpass -p root ssh -o StrictHostKeyChecking=no  $usernameNimbus@$stormNimbus $stormDir/bin/storm ui&


Visit your UI on the browser after a few minutes. Hope it shows up fine. Cheers!

Installation Script for Apache Storm on CentOS

CentOS and Ubuntu and two famous Linux distribution used pretty widely. My last post shares an installation script for Storm cluster over Ubuntu machines and this one is for CentOS. The few usage rules are just the same as for Ubuntu. I'll recite here. Although the script should work for older versions of Apache Storm, it has been tested for storm-0.9.0-wip7. The script has embedded descriptive messages for each input it expects from you. The installation would be done in the '/opt' folder of the machines in a sub-directory of your choice. Make sure the user installing the cluster has admin rights on the /opt folder. The script also takes care of installing all the required dependencies. To use the script for versions other than the supported one, you need to make changes to the script and replace the "storm-0.9.0-wip7" occurrence with your storm version.


#!/bin/bash
# Local FS Setups location
echo "Enter the location of the setups folder. For example '/home/abc/storminstallation/setups'"
read -e setupsLocation
# Directory Name
echo "Enter the directory name"
read -e realTimePlatformDir
rtpLocalDir="\/opt\/$realTimePlatformDir\/storm\/storm_temp"
rtpLocalDirMake=/opt/$realTimePlatformDir/storm/storm_temp
echo $rtpLocalDir;
echo "Enter the IP of nimbus machine :"
read -e stormNimbus;
array[0]=$stormNimbus


# Read supervisor
echo "Enter the number of supervisor machines";
read -e n;
for ((  i = 1 ;  i <= n;  i++  ))
do
echo "Enter the IP of storm supervisor machine $i:"
read -e stormSupervisor;
array[i]=$stormSupervisor
done

# Read zookeeper
echo "Enter the number of machines in the zookeeper cluster";
read -e m;
for ((  i = 1 ;  i <= m;  i++  ))
do
echo "Enter the IP of zookeeper machine $i:"
read -e zkServer;
zkEntry="- \""$zkServer"\""
zKArray=$zKArray","$zkEntry
done

# Copy the required setups to all the storm machines
for ((  i = 1 ;  i <= n+1;  i++  ))
do
echo "Enter the username of machine ${array[i-1]}"
read -e username
echo "Username:"$username
if [ $username == 'root' ]; then
echo 'root';
yamlFilePath="/root/.storm";
else
echo $username;
yamlFilePath="/home/$username/.storm";
fi
echo "the storm.yaml file would be formed at : $yamlFilePath";
echo "Enter the value for JAVA_HOME to be set on the machine ${array[i-1]}"
read -e javaHome;
echo 'JAVA_HOME would be set to :'$javaHome;
ssh -t $username@${array[i-1]} "if [ ! -d /opt/$realTimePlatformDir ]; then
    sudo mkdir /opt/$realTimePlatformDir;
    sudo chown -R $username: /opt/$realTimePlatformDir;
    mkdir /opt/$realTimePlatformDir/storm;
    mkdir $rtpLocalDirMake;
    mkdir $yamlFilePath;
  fi"
     
scp -r -q $setupsLocation/storm-0.9.0-wip7 $username@${array[i-1]}:/opt/$realTimePlatformDir/storm/storm-0.9.0-wip7
     
ssh -t $username@${array[i-1]} "sed -i 's/ZOOKEEPER_IPS/$zKArray/g' /opt/$realTimePlatformDir/storm/storm-0.9.0-wip7/conf/storm.yaml;
sed -i 's/,/\n/g' /opt/$realTimePlatformDir/storm/storm-0.9.0-wip7/conf/storm.yaml;
sed -i 's/NIMBUS_IP/$stormNimbus/g' /opt/$realTimePlatformDir/storm/storm-0.9.0-wip7/conf/storm.yaml;
sed -i 's/LOCAL_DIR/$rtpLocalDir/g' /opt/$realTimePlatformDir/storm/storm-0.9.0-wip7/conf/storm.yaml;
cp /opt/$realTimePlatformDir/storm/storm-0.9.0-wip7/conf/storm.yaml $yamlFilePath;
sudo yum install git;
sudo yum install libuuid-devel;"

ssh -t $username@${array[i-1]} "cd /opt/$realTimePlatformDir/storm;
wget http://download.zeromq.org/zeromq-2.1.7.tar.gz;
tar -xzf zeromq-2.1.7.tar.gz
cd zeromq-2.1.7                    
./configure
make
sudo make install

cd ..
export JAVA_HOME=$javaHome;
echo $JAVA_HOME;
git clone https://github.com/nathanmarz/jzmq.git
cd jzmq
./autogen.sh
./configure
make
sudo make install"

done


Hope this helps. My next post shares small start-up script for the installed Storm cluster.

Installation Script for Apache Storm on Ubuntu

One of my blogs here, describes steps for manual installation of a Storm cluster. To intensify the convenience factor for you, here's an installation script that you can use for setting up a Storm cluster on Linux machines. Although the script should work for older versions of Apache Storm, it has been tested for storm-0.9.0-wip4. The script has embedded descriptive messages for each input it expects from you. The installation would be done in the '/opt' folder of the machines in a sub-directory of your choice. Make sure the user installing the cluster has admin rights on the /opt folder. The script also takes care of installing all the required dependencies. To use the script for versions other than the supported one, you need to make changes to the script and replace the "storm-0.9.0-wip4" occurrence with your storm version.


#!/bin/bash
# Local FS Setups location
echo "Enter the location of the setups folder. For example '/home/abc/storminstallation/setups'"
read -e setupsLocation
# Directory Name
echo "Enter the directory name"
read -e realTimePlatformDir
rtpLocalDir="\/opt\/$realTimePlatformDir\/storm\/storm_temp"
rtpLocalDirMake=/opt/$realTimePlatformDir/storm/storm_temp
echo $rtpLocalDir;
echo "Enter the IP of nimbus machine :"
read -e stormNimbus;
array[0]=$stormNimbus


# Read supervisor
echo "Enter the number of supervisor machines";
read -e n;
for ((  i = 1 ;  i <= n;  i++  ))
do
echo "Enter the IP of storm supervisor machine $i:"
read -e stormSupervisor;
array[i]=$stormSupervisor
done

# Read zookeeper
echo "Enter the number of machines in the zookeeper cluster";
read -e m;
for ((  i = 1 ;  i <= m;  i++  ))
do
echo "Enter the IP of zookeeper machine $i:"
read -e zkServer;
zkEntry="- \""$zkServer"\""
zKArray=$zKArray","$zkEntry
done

# Copy the required setups to all the storm machines
for ((  i = 1 ;  i <= n+1;  i++  ))
do
echo "Enter the username of machine ${array[i-1]}"
read -e username
echo "Username:"$username
if [ $username == 'root' ]; then
echo 'root';
yamlFilePath="/root/.storm";
else
echo $username;
yamlFilePath="/home/$username/.storm";
fi
echo "the storm.yaml file would be formed at : $yamlFilePath";
echo "Enter the value for JAVA_HOME to be set on the machine ${array[i-1]}"
read -e javaHome;
echo 'JAVA_HOME would be set to :'$javaHome;
ssh -t $username@${array[i-1]} "if [ ! -d /opt/$realTimePlatformDir ]; then
           sudo mkdir /opt/$realTimePlatformDir;
           sudo chown -R $username: /opt/$realTimePlatformDir;
           mkdir /opt/$realTimePlatformDir/storm;
           mkdir $rtpLocalDirMake;
           mkdir $yamlFilePath;
        fi"
     
scp -r -q $setupsLocation/storm-0.9.0-wip4 $username@${array[i-1]}:/opt/$realTimePlatformDir/storm/storm-0.9.0-wip4
     
ssh -t $username@${array[i-1]} "sed -i 's/ZOOKEEPER_IPS/$zKArray/g' /opt/$realTimePlatformDir/storm/storm-0.9.0-wip4/conf/storm.yaml;
sed -i 's/,/\n/g' /opt/$realTimePlatformDir/storm/storm-0.9.0-wip4/conf/storm.yaml;
sed -i 's/NIMBUS_IP/$stormNimbus/g' /opt/$realTimePlatformDir/storm/storm-0.9.0-wip4/conf/storm.yaml;
sed -i 's/LOCAL_DIR/$rtpLocalDir/g' /opt/$realTimePlatformDir/storm/storm-0.9.0-wip4/conf/storm.yaml;
cp /opt/$realTimePlatformDir/storm/storm-0.9.0-wip4/conf/storm.yaml $yamlFilePath;
sudo apt-get install git;
sudo apt-get install uuid-dev;"

ssh -t $username@${array[i-1]} "cd /opt/$realTimePlatformDir/storm;
                        wget http://download.zeromq.org/zeromq-2.1.7.tar.gz;
                        tar -xzf zeromq-2.1.7.tar.gz
                        cd zeromq-2.1.7                    
                        ./configure
                        make
                        sudo make install

                        cd ..
                        export JAVA_HOME=$javaHome;
                        echo $JAVA_HOME;
                        git clone https://github.com/nathanmarz/jzmq.git
                        cd jzmq
                        ./autogen.sh
                        ./configure
                        make
                        sudo make install"

done


Yep Done! Hope it helped. My next post shares the installation script for CentOS and the next to it a small start up script for the installed Storm cluster.

Sunday, May 19, 2013

Storm Monitoring using JMX-JMXTrans-Ganglia

Though Storm supports a full-fledged UI, some applications where Ganglia is being used as a kind of universal tool for displaying the metrics of the nodes in clusters of various technologies present in the application, it's essential to have Storm cluster nodes also enabled to report their metrics to Ganglia. 

Since as of yet, there is no in-built support in Storm like we have in Hadoop, HBase etc that might enable its monitoring using Ganglia, we need to do that using JMXTrans. This post is about how to setup a JMX cluster and configure Storm Cluster nodes so that the target can be achieved.

Setting up JMXTrans: 

Follow the steps below to setup a JMXTrans cluster that would act as a bridge between   the Storm Cluster nodes and Ganglia and form the reporting channel for Ganglia.
  • Obtain the setup jmxtrans_20121016-175251-ab6cfd36e3-1_all.deb and extract it on all the machines of the Storm cluster
  • Copy /path_to_extracted_jmx_setup/jmxtrans_20121016-175251-ab6cfd36e3-01_all/data/usr/share/jmxtrans to /usr/share/jmxtrans 
  • Now any .json can be run using the following command 
/usr/share/jmxtrans/jmxtrans.sh start /path_to_json/example.json       
  • And jmxtrans can be stopped using 
/usr/share/jmxtrans/jmxtrans.sh stop                                                  

Configure the storm daemons to report to JMXTrans, add the following to ~/.storm/storm.yaml


“storm.yaml”- Supervisor Nodes


Add the following to the "conf/storm.yaml" on all the supervisor nodes of the cluster


worker.childopts: " -verbose:gc -XX:+PrintGCTimeStamps -XX:+PrintGCDetails -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.port=1%ID%"

Now, json file can send the metrics from ports 16700, 16701, 16702, 16703. Also add the following to report the metrics of the jvm running the supervisor


supervisor.childopts: " -verbose:gc -XX:+PrintGCTimeStamps -XX:+PrintGCDetails -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.port=any_open_port_number"

“storm.yaml”- Nimbus


Specify just the following, however the presence of the above entries will not affect its performance.


nimbus.childopts: " -verbose:gc -XX:+PrintGCTimeStamps -XX:+PrintGCDetails -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.port=any_open_port_number"

Rest needs to be done in the json files. Storm cluster machines do not need ganglia monitoring daemons(gmond) to be running on all nodes, they can also report to a remote gmond. Finally, run the json files as mentioned above on each of the storm cluster nodes you want to monitor.


Sample JSON- Storm Workers



{
  "servers" : [ {
    "port" : "16700", <--- Defined Storm JMX Port 
    "host" : "127.0.0.1",  <--- Storm Worker
    "queries" : [ {
      "outputWriters" : [ {
        "@class" : "com.googlecode.jmxtrans.model.output.GangliaWriter",
        "settings" : {
            "groupName" : "workerMemory",
          "host" : "ip_of_gmond_server",
          "port" : 8649,
            "v3.1" : false
        }
      } ],
      "obj" : "java.lang:type=ClassLoading",
      "attr" : [ "LoadedClassCount", "UnloadedClassCount" ]
    } ],
    "numQueryThreads" : 2
  } ]

Sample JSON- Storm Supervisors



{
  "servers" : [ {
    "port" : "assigned_port_no for eg. 10000", <---Defined Storm JMX Port

  "host" : "127.0.0.1", <--- Storm Supervisor
    "queries" : [ {
      "outputWriters" : [ {
        "@class" : "com.googlecode.jmxtrans.model.output.GangliaWriter",
        "settings" : {
            "groupName" : "SuperVisorMemory",
          "host" : "ip_of_gmond_server",
          "port" : 8649,
            "v3.1" : false
        }
      } ],
      "obj" : "java.lang:type=Memory",
      "resultAlias": "supervisor",
      "attr" : [ "HeapMemoryUsage", "NonHeapMemoryUsage" ]
    }],
    "numQueryThreads" : 2
  } ]

Sample JSON- Storm Nimbus



{
  "servers" : [ {
    "port" : "assigned_port_no", <---Defined Storm JMX Port 
    "host" : "127.0.0.1",  <--- Storm Nimbus
    "queries" : [ {
      "outputWriters" : [ {
        "@class" : "com.googlecode.jmxtrans.model.output.GangliaWriter",
        "settings" : {
            "groupName" : "NimbusMemory",
          "host" : "ip_of_gmond_server",
          "port" : 8649,
            "v3.1" : false
        }
      } ],
      "obj" : "java.lang:type=Memory",
      "resultAlias": "nimbus",
      "attr" : [ "HeapMemoryUsage", "NonHeapMemoryUsage" ]
    }],
    "numQueryThreads" : 2
  } ]
}                                                                                                                                                           

With the help of the above JSON sample files your Storm Cluster nodes can start reporting their metrics onto the Ganglia Web UI. 
All the best !!!

Extracting Storm Web UI Parameter values


The Storm Web UI has undergone constant improvisations in the latest releases, when talking in terms of the parameters and their corresponding values it hosts for the users. These parameters are of paramount importance when a user needs to analyze the performance of the topology running over her/his storm cluster.

This post is going to prove to be a boon for you if you have ever felt the need of extracting the values of parameters hosted on the UI inside your application, since it would provide you with a ready-to-use library for calculating the values of almost all the parameters on the Storm Web UI and use them wherever you need to.


import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.thrift7.TException;
import org.apache.thrift7.protocol.TBinaryProtocol;
import org.apache.thrift7.transport.TFramedTransport;
import org.apache.thrift7.transport.TSocket;
import org.apache.thrift7.transport.TTransportException;

import backtype.storm.generated.BoltStats;
import backtype.storm.generated.ClusterSummary;
import backtype.storm.generated.ErrorInfo;
import backtype.storm.generated.ExecutorSpecificStats;
import backtype.storm.generated.ExecutorStats;
import backtype.storm.generated.ExecutorSummary;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.generated.NotAliveException;
import backtype.storm.generated.SpoutStats;
import backtype.storm.generated.SupervisorSummary;
import backtype.storm.generated.TopologyInfo;
import backtype.storm.generated.TopologySummary;
import backtype.storm.generated.Nimbus.Client;


/*

 * Library to extract Storm Web UI Parameter Values
*/
public class ClusterInformationExtractor {
public static void main(String[] args) {
TSocket socket = new TSocket("ip_of_your_storm_ui_node", 6627);
TFramedTransport transport = new TFramedTransport(socket);
TBinaryProtocol protocol = new TBinaryProtocol(transport);
Client client = new Client(protocol);
try {
transport.open();
ClusterSummary summary = client.getClusterInfo();

// Cluster Details

System.out.println("**** Storm UI Home Page ****");
System.out.println(" 
****Cluster Summary**** ");
int nimbusUpTime = summary.getNimbus_uptime_secs();
System.out.println("Nimbus Up Time: "  + nimbusUpTime);
System.out.println("Number of Supervisors: "  + summary.getSupervisorsSize());
System.out.println("Number of Topologies: "  + summary.getTopologiesSize());

// Topology stats

System.out.println(" ****Topology summary**** ");
Map<String, String> topologyConfigurationParamValues = new HashMap<String, String>();
List<TopologySummary> topologies = summary.getTopologies();
Iterator<TopologySummary> topologiesIterator = summary.getTopologiesIterator();
while(topologiesIterator.hasNext()) {
TopologySummary topology = topologiesIterator.next();
System.out.println("Topology ID: "  + topology.getId());
System.out.println("Topology Name: " + topology.getName());
System.out.println("Number of Executors: " + topology.getNum_executors());
System.out.println("Number of Tasks: " + topology.getNum_tasks());
System.out.println("Number of Workers: " + topology.getNum_workers());
System.out.println("Status : " + topology.getStatus());
System.out.println("UpTime in Seconds: " + topology.getUptime_secs());
}

// Supervisor stats

System.out.println("**** Supervisor summary ****");
List<SupervisorSummary> supervisors = summary.getSupervisors();
Iterator<SupervisorSummary> supervisorsIterator = summary.getSupervisorsIterator();
while(supervisorsIterator.hasNext()) {
SupervisorSummary supervisor = supervisorsIterator.next();
System.out.println("Supervisor ID: "  + supervisor.getSupervisor_id());
System.out.println("Host: " + supervisor.getHost());
System.out.println("Number of used workers: " + supervisor.getNum_used_workers());
System.out.println("Number of workers: " + supervisor.getNum_workers());
System.out.println("Supervisor uptime: " + supervisor.getUptime_secs());
}

// Nimbus config parameter-values

  System.out.println("****Nimbus Configuration****");
  Map<String, String> nimbusConfigurationParamValues = new HashMap<String, String>();
String nimbusConfigString = client.getNimbusConf();
nimbusConfigString = nimbusConfigString.substring(1, nimbusConfigString.length()-1);
String [] nimbusConfParameters = nimbusConfigString.split(",\"");
for(String nimbusConfParamValue : nimbusConfParameters) {
String [] paramValue = nimbusConfParamValue.split(":");
String parameter = paramValue[0].substring(0, paramValue[0].length()-1);
String parameterValue = paramValue[1];
if(paramValue[1].startsWith("\"")) {
parameterValue = paramValue[1].substring(1, paramValue[1].length()-1);
}
nimbusConfigurationParamValues.put(parameter, parameterValue);
}

Set<String> nimbusConfigurationParameters = nimbusConfigurationParamValues.keySet();
Iterator<String> parameters = nimbusConfigurationParameters.iterator();
while(parameters.hasNext()) {
String key = parameters.next();
System.out.println("Parameter : " + key + " Value : " + nimbusConfigurationParamValues.get(key));
}

System.out.println(" 
**** End of Storm UI Home Page Details**** ");

// Topology stats

System.out.println(" **** Topology Home Page Details **** ");
topologiesIterator = summary.getTopologiesIterator();
while(topologiesIterator.hasNext()) {
TopologySummary topology = topologiesIterator.next();
System.out.println("
**** Topology summary ****");
System.out.println("Topology Id: "  + topology.getId());
System.out.println("Topology Name: " + topology.getName());
System.out.println("Number of Executors: " + topology.getNum_executors());
System.out.println("Number of Tasks: " + topology.getNum_tasks());
System.out.println("Number of Workers: " + topology.getNum_workers());
System.out.println("Status: " + topology.getStatus());
System.out.println("UpTime in Seconds: " + topology.getUptime_secs());
                                

     // Spouts (All time)
System.out.println("**** Spouts (All time) ****");
TopologyInfo topology_info = client.getTopologyInfo(topology.getId());
Iterator<ExecutorSummary> executorStatusItr = topology_info.getExecutorsIterator();
while(executorStatusItr.hasNext()) {
// get the executor
ExecutorSummary executor_summary =  executorStatusItr.next();
ExecutorStats execStats = executor_summary.getStats();
ExecutorSpecificStats execSpecStats = execStats.getSpecific();
String componentId = executor_summary.getComponent_id();
// if the executor is a spout
if(execSpecStats.isSetSpout()) {
SpoutStats spoutStats = execSpecStats.getSpout();
System.out.println("Spout Id: " + componentId);
System.out.println("Transferred: " + getStatValueFromMap(execStats.getTransferred(), ":all-time"));
System.out.println("Emitted: " + getStatValueFromMap(execStats.getEmitted(), ":all-time"));
System.out.println("Acked: " + getStatValueFromMap(spoutStats.getAcked(), ":all-time"));
System.out.println("Failed: " + getStatValueFromMap(spoutStats.getFailed(), ":all-time"));
}
}
                                

    // Bolts (All time)
System.out.println("
****Bolts (All time)****");
executorStatusItr = topology_info.getExecutorsIterator();
while(executorStatusItr.hasNext()) {
// get the executor
ExecutorSummary executor_summary =  executorStatusItr.next();
ExecutorStats execStats = executor_summary.getStats();
ExecutorSpecificStats execSpecStats = execStats.getSpecific();
String componentId = executor_summary.getComponent_id();
if(execSpecStats.isSetBolt()) {
BoltStats boltStats = execSpecStats.getBolt();
System.out.println("Bolt Id: " + componentId);
System.out.println("Transferred: " + getStatValueFromMap(execStats.getTransferred(), ":all-time"));
System.out.println("Emitted: " + getStatValueFromMap(execStats.getEmitted(), ":all-time"));
System.out.println("Acked: " + getBoltStatLongValueFromMap(boltStats.getAcked(), ":all-time"));
System.out.println("Failed: " + getBoltStatLongValueFromMap(boltStats.getFailed(), ":all-time"));
System.out.println("Executed : " + getBoltStatLongValueFromMap(boltStats.getExecuted(), ":all-time"));
System.out.println("Execute Latency (ms): " + getBoltStatDoubleValueFromMap(boltStats.getExecute_ms_avg(), ":all-time"));
System.out.println("Process Latency (ms): " + getBoltStatDoubleValueFromMap(boltStats.getProcess_ms_avg(), ":all-time"));
}
}
                                

    // Topology Configuration
System.out.println("
**** Topology Configuration ****");
String topologyConfigString = client.getTopologyConf(topology.getId());
topologyConfigString = topologyConfigString.substring(1, topologyConfigString.length()-1);
String [] topologyConfParameters = topologyConfigString.split(",\"");

for(String topologyConfParamValue : topologyConfParameters) {
String [] paramValue = topologyConfParamValue.split(":");
String parameter = paramValue[0].substring(0, paramValue[0].length()-1);
String parameterValue = paramValue[1];
if(paramValue[1].startsWith("\"")) {
parameterValue = paramValue[1].substring(1, paramValue[1].length()-1);
}
topologyConfigurationParamValues.put(parameter, parameterValue);
}
Set<String> topologyConfigurationParameters = topologyConfigurationParamValues.keySet();
Iterator<String> topologyParameters = topologyConfigurationParameters.iterator();
while(topologyParameters.hasNext()) {
String key = topologyParameters.next();
System.out.println("Parameter: " + key + " Value : " + topologyConfigurationParamValues.get(key));
}
}
System.out.println(" 
****  End of Topology Home Page Details ****");
                        
   //  Spout Home Page Details
System.out.println(" 
**** Spout Home Page Details ****");
topologiesIterator = summary.getTopologiesIterator();
while(topologiesIterator.hasNext()) {
TopologySummary topology = topologiesIterator.next();
TopologyInfo topology_info = client.getTopologyInfo(topology.getId());
Iterator<ExecutorSummary> executorStatusItr = topology_info.getExecutorsIterator();
while(executorStatusItr.hasNext()) {
// get the executor
ExecutorSummary executor_summary =  executorStatusItr.next();
ExecutorStats execStats = executor_summary.getStats();
ExecutorSpecificStats execSpecStats = execStats.getSpecific();
String componentId = executor_summary.getComponent_id();
// if the executor is a spout
if(execSpecStats.isSetSpout()) {
spoutSpecificStats(topology_info, topology, executor_summary, componentId);
}
}
}
System.out.println(" 
**** End of Spout Home Page Details**** ");
                        
   // Bolt Home Page Details
System.out.println(" 
**** Bolt Home Page Details ****");
topologiesIterator = summary.getTopologiesIterator();
while(topologiesIterator.hasNext()) {
TopologySummary topology = topologiesIterator.next();
TopologyInfo topology_info = client.getTopologyInfo(topology.getId());
Iterator<ExecutorSummary> executorStatusItr = topology_info.getExecutorsIterator();
while(executorStatusItr.hasNext()) {
// get the executor
ExecutorSummary executor_summary =  executorStatusItr.next();
ExecutorStats execStats = executor_summary.getStats();
ExecutorSpecificStats execSpecStats = execStats.getSpecific();
String componentId = executor_summary.getComponent_id();
// if the executor is a bolt
if(execSpecStats.isSetBolt()) {
boltSpecificStats(topology_info, topology, executor_summary, componentId);
}
}
}
System.out.println(" 
**** End of Bolt Home Page Details **** ");
transport.close();
} catch (TTransportException e) {
e.printStackTrace();
} catch (TException e) {
e.printStackTrace();
} catch (NotAliveException e) {
e.printStackTrace();
}
}
 

/*
 * Calculate spout specific stats
 */  
private static void spoutSpecificStats(TopologyInfo topologyInfo, TopologySummary topology, ExecutorSummary executorSummary,
String componentId) {
ExecutorStats execStats = executorSummary.getStats();
ExecutorSpecificStats execSpecStats = execStats.getSpecific();
SpoutStats spoutStats = execSpecStats.getSpout();
System.out.println("
**** Component summary ****");
System.out.println("Id : " + componentId);
System.out.println("Topology Name  : " + topology.getName());
System.out.println("Executors : " + "1");
System.out.println("Tasks : " + "1");
System.out.println("
**** Spout stats ****");
System.out.println("
**** Window Size ****  " + "600");
System.out.println("Transferred: " + getStatValueFromMap(execStats.getTransferred(), "600"));
System.out.println("Emitted: " + getStatValueFromMap(execStats.getEmitted(), "600"));
System.out.println("Acked: " + getStatValueFromMap(spoutStats.getAcked(), "600"));
System.out.println("Failed: " + getStatValueFromMap(spoutStats.getFailed(), "600"));
System.out.println("
**** Window Size ****  " + "10800");
System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), "10800"));
System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), "10800"));
System.out.println("Acked : " + getStatValueFromMap(spoutStats.getAcked(), "10800"));
System.out.println("Failed : " + getStatValueFromMap(spoutStats.getFailed(), "10800"));
System.out.println("
**** Window Size ****  " + "86400");
System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), "86400"));
System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), "86400"));
System.out.println("Acked : " + getStatValueFromMap(spoutStats.getAcked(), "86400"));
System.out.println("Failed : " + getStatValueFromMap(spoutStats.getFailed(), "86400"));
System.out.println("
**** Window Size ****  " + "all-time");
System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), ":all-time"));
System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), ":all-time"));
System.out.println("Acked : " + getStatValueFromMap(spoutStats.getAcked(), ":all-time"));
System.out.println("Failed : " + getStatValueFromMap(spoutStats.getFailed(), ":all-time"));

System.out.println("
**** Output stats (All time) ****");
System.out.println("Stream : " + "default");
System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), ":all-time"));
System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), ":all-time"));
System.out.println("Acked : " + getStatValueFromMap(spoutStats.getAcked(), ":all-time"));
System.out.println("Failed : " + getStatValueFromMap(spoutStats.getFailed(), ":all-time"));

System.out.println("
**** Executors (All time) ****");
System.out.println("Host : " + executorSummary.getHost());
System.out.println("Port : " + executorSummary.getPort());
System.out.println("Up-Time : " + executorSummary.getUptime_secs());
System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), ":all-time"));
System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), ":all-time"));
System.out.println("Acked : " + getStatValueFromMap(spoutStats.getAcked(), ":all-time"));
System.out.println("Failed : " + getStatValueFromMap(spoutStats.getFailed(), ":all-time"));

System.out.println("
**** Errors ****");
Map<String, List<ErrorInfo>> errors = topologyInfo.getErrors();
List<ErrorInfo> spoutErrors = errors.get(componentId);
for(ErrorInfo errorInfo : spoutErrors) {
System.out.println("Spout Error : " + errorInfo.getError());
}
}


/*
 * Calculate bolt specific stats
 */
private static void boltSpecificStats(TopologyInfo topologyInfo, TopologySummary topology, ExecutorSummary executorSummary,
String componentId) {
ExecutorStats execStats = executorSummary.getStats();
ExecutorSpecificStats execSpecStats = execStats.getSpecific();
BoltStats boltStats = execSpecStats.getBolt();
System.out.println(":::::::::: Component summary ::::::::::");
System.out.println("Id : " + componentId);
System.out.println("Topology Name  : " + topology.getName());
System.out.println("Executors : " + "1");
System.out.println("Tasks : " + "1");
System.out.println(":::::::::: Bolt stats ::::::::::");
System.out.println("::::::::::: Window Size :::::::::::  " + "600");
System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), "600"));
System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), "600"));
System.out.println("Acked : " + getBoltStatLongValueFromMap(boltStats.getAcked(), "600"));
System.out.println("Failed : " + getBoltStatLongValueFromMap(boltStats.getFailed(), "600"));
System.out.println("Executed : " + getBoltStatLongValueFromMap(boltStats.getExecuted(), "600"));
System.out.println("Execute Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getExecute_ms_avg(), "600"));
System.out.println("Process Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getProcess_ms_avg(), "600"));
System.out.println("::::::::::: Window Size :::::::::::  " + "10800");
System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), "10800"));
System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), "10800"));
System.out.println("Acked : " + getBoltStatLongValueFromMap(boltStats.getAcked(), "10800"));
System.out.println("Failed : " + getBoltStatLongValueFromMap(boltStats.getFailed(), "10800"));
System.out.println("Executed : " + getBoltStatLongValueFromMap(boltStats.getExecuted(), "10800"));
System.out.println("Execute Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getExecute_ms_avg(), "10800"));
System.out.println("Process Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getProcess_ms_avg(), "10800"));
System.out.println("::::::::::: Window Size :::::::::::  " + "86400");
System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), "86400"));
System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), "86400"));
System.out.println("Acked : " + getBoltStatLongValueFromMap(boltStats.getAcked(), "86400"));
System.out.println("Failed : " + getBoltStatLongValueFromMap(boltStats.getFailed(), "86400"));
System.out.println("Executed : " + getBoltStatLongValueFromMap(boltStats.getExecuted(), "86400"));
System.out.println("Execute Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getExecute_ms_avg(), "86400"));
System.out.println("Process Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getProcess_ms_avg(), "86400"));
System.out.println("::::::::::: Window Size :::::::::::  " + "all-time");
System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), ":all-time"));
System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), ":all-time"));
System.out.println("Acked : " + getBoltStatLongValueFromMap(boltStats.getAcked(), ":all-time"));
System.out.println("Failed : " + getBoltStatLongValueFromMap(boltStats.getFailed(), ":all-time"));
System.out.println("Executed : " + getBoltStatLongValueFromMap(boltStats.getExecuted(), ":all-time"));
System.out.println("Execute Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getExecute_ms_avg(), ":all-time"));
System.out.println("Process Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getProcess_ms_avg(), ":all-time"));

System.out.println(":::::::::: Output stats (All time) ::::::::::");
System.out.println("Stream : " + "default");
System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), ":all-time"));
System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), ":all-time"));
System.out.println("Acked : " + getBoltStatLongValueFromMap(boltStats.getAcked(), ":all-time"));
System.out.println("Failed : " + getBoltStatLongValueFromMap(boltStats.getFailed(), ":all-time"));
System.out.println("Executed : " + getBoltStatLongValueFromMap(boltStats.getExecuted(), ":all-time"));
System.out.println("Execute Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getExecute_ms_avg(), ":all-time"));
System.out.println("Process Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getProcess_ms_avg(), ":all-time"));

System.out.println(":::::::::: Executors (All time) ::::::::::");
System.out.println("Host : " + executorSummary.getHost());
System.out.println("Port : " + executorSummary.getPort());
System.out.println("Up-Time : " + executorSummary.getUptime_secs());
System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), ":all-time"));
System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), ":all-time"));
System.out.println("Acked : " + getBoltStatLongValueFromMap(boltStats.getAcked(), ":all-time"));
System.out.println("Failed : " + getBoltStatLongValueFromMap(boltStats.getFailed(), ":all-time"));
System.out.println("Executed : " + getBoltStatLongValueFromMap(boltStats.getExecuted(), ":all-time"));
System.out.println("Execute Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getExecute_ms_avg(), ":all-time"));
System.out.println("Process Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getProcess_ms_avg(), ":all-time"));

System.out.println(":::::::::: Errors ::::::::::");
Map<String, List<ErrorInfo>> errors = topologyInfo.getErrors();
System.out.println(errors.keySet());
List<ErrorInfo> boltErrors = errors.get(componentId);
for(ErrorInfo errorInfo : boltErrors) {
System.out.println("Bolt Error : " + errorInfo.getError());
}
}
        
/*
 * Utility method to parse a Map<>
 */
public static Long getStatValueFromMap(Map<String, Map<String, Long>> map, String statName) {
Long statValue = null;
Map<String, Long> intermediateMap = map.get(statName);
statValue = intermediateMap.get("default");
return statValue;
}

/*
 * Utility method to parse a Map<> as a special case for Bolts
 */
public static Double getBoltStatDoubleValueFromMap(Map<String, Map<GlobalStreamId, Double>> map, String statName) {
Double statValue = 0.0;
Map<GlobalStreamId, Double> intermediateMap = map.get(statName);
Set<GlobalStreamId> key = intermediateMap.keySet();
if(key.size() > 0) {
Iterator<GlobalStreamId> itr = key.iterator();
statValue = intermediateMap.get(itr.next());
}
return statValue;
}

/*
 * Utility method for Bolts
 */
public static Long getBoltStatLongValueFromMap(Map<String, Map<GlobalStreamId, Long>> map, String statName) {
Long statValue = null;
Map<GlobalStreamId, Long> intermediateMap = map.get(statName);
Set<GlobalStreamId> key = intermediateMap.keySet();
if(key.size() > 0) {
Iterator<GlobalStreamId> itr = key.iterator();
statValue = intermediateMap.get(itr.next());
}
return statValue;
}
}



Hope this post helps you. All the best !!