Sunday, May 19, 2013

Extracting Storm Web UI Parameter values


The Storm Web UI has undergone constant improvisations in the latest releases, when talking in terms of the parameters and their corresponding values it hosts for the users. These parameters are of paramount importance when a user needs to analyze the performance of the topology running over her/his storm cluster.

This post is going to prove to be a boon for you if you have ever felt the need of extracting the values of parameters hosted on the UI inside your application, since it would provide you with a ready-to-use library for calculating the values of almost all the parameters on the Storm Web UI and use them wherever you need to.


import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.thrift7.TException;
import org.apache.thrift7.protocol.TBinaryProtocol;
import org.apache.thrift7.transport.TFramedTransport;
import org.apache.thrift7.transport.TSocket;
import org.apache.thrift7.transport.TTransportException;

import backtype.storm.generated.BoltStats;
import backtype.storm.generated.ClusterSummary;
import backtype.storm.generated.ErrorInfo;
import backtype.storm.generated.ExecutorSpecificStats;
import backtype.storm.generated.ExecutorStats;
import backtype.storm.generated.ExecutorSummary;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.generated.NotAliveException;
import backtype.storm.generated.SpoutStats;
import backtype.storm.generated.SupervisorSummary;
import backtype.storm.generated.TopologyInfo;
import backtype.storm.generated.TopologySummary;
import backtype.storm.generated.Nimbus.Client;


/*

 * Library to extract Storm Web UI Parameter Values
*/
public class ClusterInformationExtractor {
public static void main(String[] args) {
TSocket socket = new TSocket("ip_of_your_storm_ui_node", 6627);
TFramedTransport transport = new TFramedTransport(socket);
TBinaryProtocol protocol = new TBinaryProtocol(transport);
Client client = new Client(protocol);
try {
transport.open();
ClusterSummary summary = client.getClusterInfo();

// Cluster Details

System.out.println("**** Storm UI Home Page ****");
System.out.println(" 
****Cluster Summary**** ");
int nimbusUpTime = summary.getNimbus_uptime_secs();
System.out.println("Nimbus Up Time: "  + nimbusUpTime);
System.out.println("Number of Supervisors: "  + summary.getSupervisorsSize());
System.out.println("Number of Topologies: "  + summary.getTopologiesSize());

// Topology stats

System.out.println(" ****Topology summary**** ");
Map<String, String> topologyConfigurationParamValues = new HashMap<String, String>();
List<TopologySummary> topologies = summary.getTopologies();
Iterator<TopologySummary> topologiesIterator = summary.getTopologiesIterator();
while(topologiesIterator.hasNext()) {
TopologySummary topology = topologiesIterator.next();
System.out.println("Topology ID: "  + topology.getId());
System.out.println("Topology Name: " + topology.getName());
System.out.println("Number of Executors: " + topology.getNum_executors());
System.out.println("Number of Tasks: " + topology.getNum_tasks());
System.out.println("Number of Workers: " + topology.getNum_workers());
System.out.println("Status : " + topology.getStatus());
System.out.println("UpTime in Seconds: " + topology.getUptime_secs());
}

// Supervisor stats

System.out.println("**** Supervisor summary ****");
List<SupervisorSummary> supervisors = summary.getSupervisors();
Iterator<SupervisorSummary> supervisorsIterator = summary.getSupervisorsIterator();
while(supervisorsIterator.hasNext()) {
SupervisorSummary supervisor = supervisorsIterator.next();
System.out.println("Supervisor ID: "  + supervisor.getSupervisor_id());
System.out.println("Host: " + supervisor.getHost());
System.out.println("Number of used workers: " + supervisor.getNum_used_workers());
System.out.println("Number of workers: " + supervisor.getNum_workers());
System.out.println("Supervisor uptime: " + supervisor.getUptime_secs());
}

// Nimbus config parameter-values

  System.out.println("****Nimbus Configuration****");
  Map<String, String> nimbusConfigurationParamValues = new HashMap<String, String>();
String nimbusConfigString = client.getNimbusConf();
nimbusConfigString = nimbusConfigString.substring(1, nimbusConfigString.length()-1);
String [] nimbusConfParameters = nimbusConfigString.split(",\"");
for(String nimbusConfParamValue : nimbusConfParameters) {
String [] paramValue = nimbusConfParamValue.split(":");
String parameter = paramValue[0].substring(0, paramValue[0].length()-1);
String parameterValue = paramValue[1];
if(paramValue[1].startsWith("\"")) {
parameterValue = paramValue[1].substring(1, paramValue[1].length()-1);
}
nimbusConfigurationParamValues.put(parameter, parameterValue);
}

Set<String> nimbusConfigurationParameters = nimbusConfigurationParamValues.keySet();
Iterator<String> parameters = nimbusConfigurationParameters.iterator();
while(parameters.hasNext()) {
String key = parameters.next();
System.out.println("Parameter : " + key + " Value : " + nimbusConfigurationParamValues.get(key));
}

System.out.println(" 
**** End of Storm UI Home Page Details**** ");

// Topology stats

System.out.println(" **** Topology Home Page Details **** ");
topologiesIterator = summary.getTopologiesIterator();
while(topologiesIterator.hasNext()) {
TopologySummary topology = topologiesIterator.next();
System.out.println("
**** Topology summary ****");
System.out.println("Topology Id: "  + topology.getId());
System.out.println("Topology Name: " + topology.getName());
System.out.println("Number of Executors: " + topology.getNum_executors());
System.out.println("Number of Tasks: " + topology.getNum_tasks());
System.out.println("Number of Workers: " + topology.getNum_workers());
System.out.println("Status: " + topology.getStatus());
System.out.println("UpTime in Seconds: " + topology.getUptime_secs());
                                

     // Spouts (All time)
System.out.println("**** Spouts (All time) ****");
TopologyInfo topology_info = client.getTopologyInfo(topology.getId());
Iterator<ExecutorSummary> executorStatusItr = topology_info.getExecutorsIterator();
while(executorStatusItr.hasNext()) {
// get the executor
ExecutorSummary executor_summary =  executorStatusItr.next();
ExecutorStats execStats = executor_summary.getStats();
ExecutorSpecificStats execSpecStats = execStats.getSpecific();
String componentId = executor_summary.getComponent_id();
// if the executor is a spout
if(execSpecStats.isSetSpout()) {
SpoutStats spoutStats = execSpecStats.getSpout();
System.out.println("Spout Id: " + componentId);
System.out.println("Transferred: " + getStatValueFromMap(execStats.getTransferred(), ":all-time"));
System.out.println("Emitted: " + getStatValueFromMap(execStats.getEmitted(), ":all-time"));
System.out.println("Acked: " + getStatValueFromMap(spoutStats.getAcked(), ":all-time"));
System.out.println("Failed: " + getStatValueFromMap(spoutStats.getFailed(), ":all-time"));
}
}
                                

    // Bolts (All time)
System.out.println("
****Bolts (All time)****");
executorStatusItr = topology_info.getExecutorsIterator();
while(executorStatusItr.hasNext()) {
// get the executor
ExecutorSummary executor_summary =  executorStatusItr.next();
ExecutorStats execStats = executor_summary.getStats();
ExecutorSpecificStats execSpecStats = execStats.getSpecific();
String componentId = executor_summary.getComponent_id();
if(execSpecStats.isSetBolt()) {
BoltStats boltStats = execSpecStats.getBolt();
System.out.println("Bolt Id: " + componentId);
System.out.println("Transferred: " + getStatValueFromMap(execStats.getTransferred(), ":all-time"));
System.out.println("Emitted: " + getStatValueFromMap(execStats.getEmitted(), ":all-time"));
System.out.println("Acked: " + getBoltStatLongValueFromMap(boltStats.getAcked(), ":all-time"));
System.out.println("Failed: " + getBoltStatLongValueFromMap(boltStats.getFailed(), ":all-time"));
System.out.println("Executed : " + getBoltStatLongValueFromMap(boltStats.getExecuted(), ":all-time"));
System.out.println("Execute Latency (ms): " + getBoltStatDoubleValueFromMap(boltStats.getExecute_ms_avg(), ":all-time"));
System.out.println("Process Latency (ms): " + getBoltStatDoubleValueFromMap(boltStats.getProcess_ms_avg(), ":all-time"));
}
}
                                

    // Topology Configuration
System.out.println("
**** Topology Configuration ****");
String topologyConfigString = client.getTopologyConf(topology.getId());
topologyConfigString = topologyConfigString.substring(1, topologyConfigString.length()-1);
String [] topologyConfParameters = topologyConfigString.split(",\"");

for(String topologyConfParamValue : topologyConfParameters) {
String [] paramValue = topologyConfParamValue.split(":");
String parameter = paramValue[0].substring(0, paramValue[0].length()-1);
String parameterValue = paramValue[1];
if(paramValue[1].startsWith("\"")) {
parameterValue = paramValue[1].substring(1, paramValue[1].length()-1);
}
topologyConfigurationParamValues.put(parameter, parameterValue);
}
Set<String> topologyConfigurationParameters = topologyConfigurationParamValues.keySet();
Iterator<String> topologyParameters = topologyConfigurationParameters.iterator();
while(topologyParameters.hasNext()) {
String key = topologyParameters.next();
System.out.println("Parameter: " + key + " Value : " + topologyConfigurationParamValues.get(key));
}
}
System.out.println(" 
****  End of Topology Home Page Details ****");
                        
   //  Spout Home Page Details
System.out.println(" 
**** Spout Home Page Details ****");
topologiesIterator = summary.getTopologiesIterator();
while(topologiesIterator.hasNext()) {
TopologySummary topology = topologiesIterator.next();
TopologyInfo topology_info = client.getTopologyInfo(topology.getId());
Iterator<ExecutorSummary> executorStatusItr = topology_info.getExecutorsIterator();
while(executorStatusItr.hasNext()) {
// get the executor
ExecutorSummary executor_summary =  executorStatusItr.next();
ExecutorStats execStats = executor_summary.getStats();
ExecutorSpecificStats execSpecStats = execStats.getSpecific();
String componentId = executor_summary.getComponent_id();
// if the executor is a spout
if(execSpecStats.isSetSpout()) {
spoutSpecificStats(topology_info, topology, executor_summary, componentId);
}
}
}
System.out.println(" 
**** End of Spout Home Page Details**** ");
                        
   // Bolt Home Page Details
System.out.println(" 
**** Bolt Home Page Details ****");
topologiesIterator = summary.getTopologiesIterator();
while(topologiesIterator.hasNext()) {
TopologySummary topology = topologiesIterator.next();
TopologyInfo topology_info = client.getTopologyInfo(topology.getId());
Iterator<ExecutorSummary> executorStatusItr = topology_info.getExecutorsIterator();
while(executorStatusItr.hasNext()) {
// get the executor
ExecutorSummary executor_summary =  executorStatusItr.next();
ExecutorStats execStats = executor_summary.getStats();
ExecutorSpecificStats execSpecStats = execStats.getSpecific();
String componentId = executor_summary.getComponent_id();
// if the executor is a bolt
if(execSpecStats.isSetBolt()) {
boltSpecificStats(topology_info, topology, executor_summary, componentId);
}
}
}
System.out.println(" 
**** End of Bolt Home Page Details **** ");
transport.close();
} catch (TTransportException e) {
e.printStackTrace();
} catch (TException e) {
e.printStackTrace();
} catch (NotAliveException e) {
e.printStackTrace();
}
}
 

/*
 * Calculate spout specific stats
 */  
private static void spoutSpecificStats(TopologyInfo topologyInfo, TopologySummary topology, ExecutorSummary executorSummary,
String componentId) {
ExecutorStats execStats = executorSummary.getStats();
ExecutorSpecificStats execSpecStats = execStats.getSpecific();
SpoutStats spoutStats = execSpecStats.getSpout();
System.out.println("
**** Component summary ****");
System.out.println("Id : " + componentId);
System.out.println("Topology Name  : " + topology.getName());
System.out.println("Executors : " + "1");
System.out.println("Tasks : " + "1");
System.out.println("
**** Spout stats ****");
System.out.println("
**** Window Size ****  " + "600");
System.out.println("Transferred: " + getStatValueFromMap(execStats.getTransferred(), "600"));
System.out.println("Emitted: " + getStatValueFromMap(execStats.getEmitted(), "600"));
System.out.println("Acked: " + getStatValueFromMap(spoutStats.getAcked(), "600"));
System.out.println("Failed: " + getStatValueFromMap(spoutStats.getFailed(), "600"));
System.out.println("
**** Window Size ****  " + "10800");
System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), "10800"));
System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), "10800"));
System.out.println("Acked : " + getStatValueFromMap(spoutStats.getAcked(), "10800"));
System.out.println("Failed : " + getStatValueFromMap(spoutStats.getFailed(), "10800"));
System.out.println("
**** Window Size ****  " + "86400");
System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), "86400"));
System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), "86400"));
System.out.println("Acked : " + getStatValueFromMap(spoutStats.getAcked(), "86400"));
System.out.println("Failed : " + getStatValueFromMap(spoutStats.getFailed(), "86400"));
System.out.println("
**** Window Size ****  " + "all-time");
System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), ":all-time"));
System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), ":all-time"));
System.out.println("Acked : " + getStatValueFromMap(spoutStats.getAcked(), ":all-time"));
System.out.println("Failed : " + getStatValueFromMap(spoutStats.getFailed(), ":all-time"));

System.out.println("
**** Output stats (All time) ****");
System.out.println("Stream : " + "default");
System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), ":all-time"));
System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), ":all-time"));
System.out.println("Acked : " + getStatValueFromMap(spoutStats.getAcked(), ":all-time"));
System.out.println("Failed : " + getStatValueFromMap(spoutStats.getFailed(), ":all-time"));

System.out.println("
**** Executors (All time) ****");
System.out.println("Host : " + executorSummary.getHost());
System.out.println("Port : " + executorSummary.getPort());
System.out.println("Up-Time : " + executorSummary.getUptime_secs());
System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), ":all-time"));
System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), ":all-time"));
System.out.println("Acked : " + getStatValueFromMap(spoutStats.getAcked(), ":all-time"));
System.out.println("Failed : " + getStatValueFromMap(spoutStats.getFailed(), ":all-time"));

System.out.println("
**** Errors ****");
Map<String, List<ErrorInfo>> errors = topologyInfo.getErrors();
List<ErrorInfo> spoutErrors = errors.get(componentId);
for(ErrorInfo errorInfo : spoutErrors) {
System.out.println("Spout Error : " + errorInfo.getError());
}
}


/*
 * Calculate bolt specific stats
 */
private static void boltSpecificStats(TopologyInfo topologyInfo, TopologySummary topology, ExecutorSummary executorSummary,
String componentId) {
ExecutorStats execStats = executorSummary.getStats();
ExecutorSpecificStats execSpecStats = execStats.getSpecific();
BoltStats boltStats = execSpecStats.getBolt();
System.out.println(":::::::::: Component summary ::::::::::");
System.out.println("Id : " + componentId);
System.out.println("Topology Name  : " + topology.getName());
System.out.println("Executors : " + "1");
System.out.println("Tasks : " + "1");
System.out.println(":::::::::: Bolt stats ::::::::::");
System.out.println("::::::::::: Window Size :::::::::::  " + "600");
System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), "600"));
System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), "600"));
System.out.println("Acked : " + getBoltStatLongValueFromMap(boltStats.getAcked(), "600"));
System.out.println("Failed : " + getBoltStatLongValueFromMap(boltStats.getFailed(), "600"));
System.out.println("Executed : " + getBoltStatLongValueFromMap(boltStats.getExecuted(), "600"));
System.out.println("Execute Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getExecute_ms_avg(), "600"));
System.out.println("Process Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getProcess_ms_avg(), "600"));
System.out.println("::::::::::: Window Size :::::::::::  " + "10800");
System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), "10800"));
System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), "10800"));
System.out.println("Acked : " + getBoltStatLongValueFromMap(boltStats.getAcked(), "10800"));
System.out.println("Failed : " + getBoltStatLongValueFromMap(boltStats.getFailed(), "10800"));
System.out.println("Executed : " + getBoltStatLongValueFromMap(boltStats.getExecuted(), "10800"));
System.out.println("Execute Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getExecute_ms_avg(), "10800"));
System.out.println("Process Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getProcess_ms_avg(), "10800"));
System.out.println("::::::::::: Window Size :::::::::::  " + "86400");
System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), "86400"));
System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), "86400"));
System.out.println("Acked : " + getBoltStatLongValueFromMap(boltStats.getAcked(), "86400"));
System.out.println("Failed : " + getBoltStatLongValueFromMap(boltStats.getFailed(), "86400"));
System.out.println("Executed : " + getBoltStatLongValueFromMap(boltStats.getExecuted(), "86400"));
System.out.println("Execute Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getExecute_ms_avg(), "86400"));
System.out.println("Process Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getProcess_ms_avg(), "86400"));
System.out.println("::::::::::: Window Size :::::::::::  " + "all-time");
System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), ":all-time"));
System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), ":all-time"));
System.out.println("Acked : " + getBoltStatLongValueFromMap(boltStats.getAcked(), ":all-time"));
System.out.println("Failed : " + getBoltStatLongValueFromMap(boltStats.getFailed(), ":all-time"));
System.out.println("Executed : " + getBoltStatLongValueFromMap(boltStats.getExecuted(), ":all-time"));
System.out.println("Execute Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getExecute_ms_avg(), ":all-time"));
System.out.println("Process Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getProcess_ms_avg(), ":all-time"));

System.out.println(":::::::::: Output stats (All time) ::::::::::");
System.out.println("Stream : " + "default");
System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), ":all-time"));
System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), ":all-time"));
System.out.println("Acked : " + getBoltStatLongValueFromMap(boltStats.getAcked(), ":all-time"));
System.out.println("Failed : " + getBoltStatLongValueFromMap(boltStats.getFailed(), ":all-time"));
System.out.println("Executed : " + getBoltStatLongValueFromMap(boltStats.getExecuted(), ":all-time"));
System.out.println("Execute Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getExecute_ms_avg(), ":all-time"));
System.out.println("Process Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getProcess_ms_avg(), ":all-time"));

System.out.println(":::::::::: Executors (All time) ::::::::::");
System.out.println("Host : " + executorSummary.getHost());
System.out.println("Port : " + executorSummary.getPort());
System.out.println("Up-Time : " + executorSummary.getUptime_secs());
System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), ":all-time"));
System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), ":all-time"));
System.out.println("Acked : " + getBoltStatLongValueFromMap(boltStats.getAcked(), ":all-time"));
System.out.println("Failed : " + getBoltStatLongValueFromMap(boltStats.getFailed(), ":all-time"));
System.out.println("Executed : " + getBoltStatLongValueFromMap(boltStats.getExecuted(), ":all-time"));
System.out.println("Execute Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getExecute_ms_avg(), ":all-time"));
System.out.println("Process Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getProcess_ms_avg(), ":all-time"));

System.out.println(":::::::::: Errors ::::::::::");
Map<String, List<ErrorInfo>> errors = topologyInfo.getErrors();
System.out.println(errors.keySet());
List<ErrorInfo> boltErrors = errors.get(componentId);
for(ErrorInfo errorInfo : boltErrors) {
System.out.println("Bolt Error : " + errorInfo.getError());
}
}
        
/*
 * Utility method to parse a Map<>
 */
public static Long getStatValueFromMap(Map<String, Map<String, Long>> map, String statName) {
Long statValue = null;
Map<String, Long> intermediateMap = map.get(statName);
statValue = intermediateMap.get("default");
return statValue;
}

/*
 * Utility method to parse a Map<> as a special case for Bolts
 */
public static Double getBoltStatDoubleValueFromMap(Map<String, Map<GlobalStreamId, Double>> map, String statName) {
Double statValue = 0.0;
Map<GlobalStreamId, Double> intermediateMap = map.get(statName);
Set<GlobalStreamId> key = intermediateMap.keySet();
if(key.size() > 0) {
Iterator<GlobalStreamId> itr = key.iterator();
statValue = intermediateMap.get(itr.next());
}
return statValue;
}

/*
 * Utility method for Bolts
 */
public static Long getBoltStatLongValueFromMap(Map<String, Map<GlobalStreamId, Long>> map, String statName) {
Long statValue = null;
Map<GlobalStreamId, Long> intermediateMap = map.get(statName);
Set<GlobalStreamId> key = intermediateMap.keySet();
if(key.size() > 0) {
Iterator<GlobalStreamId> itr = key.iterator();
statValue = intermediateMap.get(itr.next());
}
return statValue;
}
}



Hope this post helps you. All the best !!

9 comments:

  1. hello, thank you for your work
    how can I include thrift and storm package when I compiling?

    ReplyDelete
  2. If you are creating an eclipse project, you would need to add the reqd. jars to the build path, else if you are running the class from command line, you need to specify the jars using the "-cp" command at the time of compilation.

    ReplyDelete
  3. Thx
    All "org.apache.thrift7" package I can use -cp from /storm/libs/, where can I find the class under "backtype.storm.generated"? Are you generated these class by yourself from thrift compiler, or it located somewhere in storm?

    ReplyDelete
  4. Hi,thanks for the detailed,all-in-one utility.However,I am getting NullPointer exception at "executor_summary.getStats();"All other fields of executor-summary (such as host,port,config,etc.) are displaying properly,but its just the get_stats thats returning null.
    Can you provide any pointers?

    ReplyDelete
  5. We developed a system using Storm cluster.And we are getting Storm UI with results. But results are not satisfiable.Latency time may get increased sometime and sometime decreased.Why it happened?

    ReplyDelete