The Storm Web UI has undergone
constant improvisations in the latest releases, when talking in terms of the
parameters and their corresponding values it hosts for the users. These
parameters are of paramount importance when a user needs to analyze the
performance of the topology running over her/his storm cluster.
This post is going to prove to be a
boon for you if you have ever felt the need of extracting the values of
parameters hosted on the UI inside your application, since it would provide you
with a ready-to-use library for calculating the values of almost all the
parameters on the Storm Web UI and use them wherever you need to.
Hope this post helps you. All the best !!
import java.util.HashMap;
import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import org.apache.thrift7.TException; import org.apache.thrift7.protocol.TBinaryProtocol; import org.apache.thrift7.transport.TFramedTransport; import org.apache.thrift7.transport.TSocket; import org.apache.thrift7.transport.TTransportException; import backtype.storm.generated.BoltStats; import backtype.storm.generated.ClusterSummary; import backtype.storm.generated.ErrorInfo; import backtype.storm.generated.ExecutorSpecificStats; import backtype.storm.generated.ExecutorStats; import backtype.storm.generated.ExecutorSummary; import backtype.storm.generated.GlobalStreamId; import backtype.storm.generated.NotAliveException; import backtype.storm.generated.SpoutStats; import backtype.storm.generated.SupervisorSummary; import backtype.storm.generated.TopologyInfo; import backtype.storm.generated.TopologySummary; import backtype.storm.generated.Nimbus.Client; /* * Library to extract Storm Web UI Parameter Values */ public class ClusterInformationExtractor { public static void main(String[] args) { TSocket socket = new TSocket("ip_of_your_storm_ui_node", 6627); TFramedTransport transport = new TFramedTransport(socket); TBinaryProtocol protocol = new TBinaryProtocol(transport); Client client = new Client(protocol); try { transport.open(); ClusterSummary summary = client.getClusterInfo(); // Cluster Details System.out.println("**** Storm UI Home Page ****"); System.out.println(" ****Cluster Summary**** "); int nimbusUpTime = summary.getNimbus_uptime_secs(); System.out.println("Nimbus Up Time: " + nimbusUpTime); System.out.println("Number of Supervisors: " + summary.getSupervisorsSize()); System.out.println("Number of Topologies: " + summary.getTopologiesSize()); // Topology stats System.out.println(" ****Topology summary**** "); Map<String, String> topologyConfigurationParamValues = new HashMap<String, String>(); List<TopologySummary> topologies = summary.getTopologies(); Iterator<TopologySummary> topologiesIterator = summary.getTopologiesIterator(); while(topologiesIterator.hasNext()) { TopologySummary topology = topologiesIterator.next(); System.out.println("Topology ID: " + topology.getId()); System.out.println("Topology Name: " + topology.getName()); System.out.println("Number of Executors: " + topology.getNum_executors()); System.out.println("Number of Tasks: " + topology.getNum_tasks()); System.out.println("Number of Workers: " + topology.getNum_workers()); System.out.println("Status : " + topology.getStatus()); System.out.println("UpTime in Seconds: " + topology.getUptime_secs()); } // Supervisor stats System.out.println("**** Supervisor summary ****"); List<SupervisorSummary> supervisors = summary.getSupervisors(); Iterator<SupervisorSummary> supervisorsIterator = summary.getSupervisorsIterator(); while(supervisorsIterator.hasNext()) { SupervisorSummary supervisor = supervisorsIterator.next(); System.out.println("Supervisor ID: " + supervisor.getSupervisor_id()); System.out.println("Host: " + supervisor.getHost()); System.out.println("Number of used workers: " + supervisor.getNum_used_workers()); System.out.println("Number of workers: " + supervisor.getNum_workers()); System.out.println("Supervisor uptime: " + supervisor.getUptime_secs()); } // Nimbus config parameter-values System.out.println("****Nimbus Configuration****"); Map<String, String> nimbusConfigurationParamValues = new HashMap<String, String>(); String nimbusConfigString = client.getNimbusConf(); nimbusConfigString = nimbusConfigString.substring(1, nimbusConfigString.length()-1); String [] nimbusConfParameters = nimbusConfigString.split(",\""); for(String nimbusConfParamValue : nimbusConfParameters) { String [] paramValue = nimbusConfParamValue.split(":"); String parameter = paramValue[0].substring(0, paramValue[0].length()-1); String parameterValue = paramValue[1]; if(paramValue[1].startsWith("\"")) { parameterValue = paramValue[1].substring(1, paramValue[1].length()-1); } nimbusConfigurationParamValues.put(parameter, parameterValue); } Set<String> nimbusConfigurationParameters = nimbusConfigurationParamValues.keySet(); Iterator<String> parameters = nimbusConfigurationParameters.iterator(); while(parameters.hasNext()) { String key = parameters.next(); System.out.println("Parameter : " + key + " Value : " + nimbusConfigurationParamValues.get(key)); } System.out.println(" **** End of Storm UI Home Page Details**** "); // Topology stats System.out.println(" **** Topology Home Page Details **** "); topologiesIterator = summary.getTopologiesIterator(); while(topologiesIterator.hasNext()) { TopologySummary topology = topologiesIterator.next(); System.out.println("**** Topology summary ****"); System.out.println("Topology Id: " + topology.getId()); System.out.println("Topology Name: " + topology.getName()); System.out.println("Number of Executors: " + topology.getNum_executors()); System.out.println("Number of Tasks: " + topology.getNum_tasks()); System.out.println("Number of Workers: " + topology.getNum_workers()); System.out.println("Status: " + topology.getStatus()); System.out.println("UpTime in Seconds: " + topology.getUptime_secs()); // Spouts (All time) System.out.println("**** Spouts (All time) ****"); TopologyInfo topology_info = client.getTopologyInfo(topology.getId()); Iterator<ExecutorSummary> executorStatusItr = topology_info.getExecutorsIterator(); while(executorStatusItr.hasNext()) { // get the executor ExecutorSummary executor_summary = executorStatusItr.next(); ExecutorStats execStats = executor_summary.getStats(); ExecutorSpecificStats execSpecStats = execStats.getSpecific(); String componentId = executor_summary.getComponent_id(); // if the executor is a spout if(execSpecStats.isSetSpout()) { SpoutStats spoutStats = execSpecStats.getSpout(); System.out.println("Spout Id: " + componentId); System.out.println("Transferred: " + getStatValueFromMap(execStats.getTransferred(), ":all-time")); System.out.println("Emitted: " + getStatValueFromMap(execStats.getEmitted(), ":all-time")); System.out.println("Acked: " + getStatValueFromMap(spoutStats.getAcked(), ":all-time")); System.out.println("Failed: " + getStatValueFromMap(spoutStats.getFailed(), ":all-time")); } } // Bolts (All time) System.out.println("****Bolts (All time)****"); executorStatusItr = topology_info.getExecutorsIterator(); while(executorStatusItr.hasNext()) { // get the executor ExecutorSummary executor_summary = executorStatusItr.next(); ExecutorStats execStats = executor_summary.getStats(); ExecutorSpecificStats execSpecStats = execStats.getSpecific(); String componentId = executor_summary.getComponent_id(); if(execSpecStats.isSetBolt()) { BoltStats boltStats = execSpecStats.getBolt(); System.out.println("Bolt Id: " + componentId); System.out.println("Transferred: " + getStatValueFromMap(execStats.getTransferred(), ":all-time")); System.out.println("Emitted: " + getStatValueFromMap(execStats.getEmitted(), ":all-time")); System.out.println("Acked: " + getBoltStatLongValueFromMap(boltStats.getAcked(), ":all-time")); System.out.println("Failed: " + getBoltStatLongValueFromMap(boltStats.getFailed(), ":all-time")); System.out.println("Executed : " + getBoltStatLongValueFromMap(boltStats.getExecuted(), ":all-time")); System.out.println("Execute Latency (ms): " + getBoltStatDoubleValueFromMap(boltStats.getExecute_ms_avg(), ":all-time")); System.out.println("Process Latency (ms): " + getBoltStatDoubleValueFromMap(boltStats.getProcess_ms_avg(), ":all-time")); } } // Topology Configuration System.out.println("**** Topology Configuration ****"); String topologyConfigString = client.getTopologyConf(topology.getId()); topologyConfigString = topologyConfigString.substring(1, topologyConfigString.length()-1); String [] topologyConfParameters = topologyConfigString.split(",\""); for(String topologyConfParamValue : topologyConfParameters) { String [] paramValue = topologyConfParamValue.split(":"); String parameter = paramValue[0].substring(0, paramValue[0].length()-1); String parameterValue = paramValue[1]; if(paramValue[1].startsWith("\"")) { parameterValue = paramValue[1].substring(1, paramValue[1].length()-1); } topologyConfigurationParamValues.put(parameter, parameterValue); } Set<String> topologyConfigurationParameters = topologyConfigurationParamValues.keySet(); Iterator<String> topologyParameters = topologyConfigurationParameters.iterator(); while(topologyParameters.hasNext()) { String key = topologyParameters.next(); System.out.println("Parameter: " + key + " Value : " + topologyConfigurationParamValues.get(key)); } } System.out.println(" **** End of Topology Home Page Details ****"); // Spout Home Page Details System.out.println(" **** Spout Home Page Details ****"); topologiesIterator = summary.getTopologiesIterator(); while(topologiesIterator.hasNext()) { TopologySummary topology = topologiesIterator.next(); TopologyInfo topology_info = client.getTopologyInfo(topology.getId()); Iterator<ExecutorSummary> executorStatusItr = topology_info.getExecutorsIterator(); while(executorStatusItr.hasNext()) { // get the executor ExecutorSummary executor_summary = executorStatusItr.next(); ExecutorStats execStats = executor_summary.getStats(); ExecutorSpecificStats execSpecStats = execStats.getSpecific(); String componentId = executor_summary.getComponent_id(); // if the executor is a spout if(execSpecStats.isSetSpout()) { spoutSpecificStats(topology_info, topology, executor_summary, componentId); } } } System.out.println(" **** End of Spout Home Page Details**** "); // Bolt Home Page Details System.out.println(" **** Bolt Home Page Details ****"); topologiesIterator = summary.getTopologiesIterator(); while(topologiesIterator.hasNext()) { TopologySummary topology = topologiesIterator.next(); TopologyInfo topology_info = client.getTopologyInfo(topology.getId()); Iterator<ExecutorSummary> executorStatusItr = topology_info.getExecutorsIterator(); while(executorStatusItr.hasNext()) { // get the executor ExecutorSummary executor_summary = executorStatusItr.next(); ExecutorStats execStats = executor_summary.getStats(); ExecutorSpecificStats execSpecStats = execStats.getSpecific(); String componentId = executor_summary.getComponent_id(); // if the executor is a bolt if(execSpecStats.isSetBolt()) { boltSpecificStats(topology_info, topology, executor_summary, componentId); } } } System.out.println(" **** End of Bolt Home Page Details **** "); transport.close(); } catch (TTransportException e) { e.printStackTrace(); } catch (TException e) { e.printStackTrace(); } catch (NotAliveException e) { e.printStackTrace(); } } /* * Calculate spout specific stats */ private static void spoutSpecificStats(TopologyInfo topologyInfo, TopologySummary topology, ExecutorSummary executorSummary, String componentId) { ExecutorStats execStats = executorSummary.getStats(); ExecutorSpecificStats execSpecStats = execStats.getSpecific(); SpoutStats spoutStats = execSpecStats.getSpout(); System.out.println("**** Component summary ****"); System.out.println("Id : " + componentId); System.out.println("Topology Name : " + topology.getName()); System.out.println("Executors : " + "1"); System.out.println("Tasks : " + "1"); System.out.println("**** Spout stats ****"); System.out.println("**** Window Size **** " + "600"); System.out.println("Transferred: " + getStatValueFromMap(execStats.getTransferred(), "600")); System.out.println("Emitted: " + getStatValueFromMap(execStats.getEmitted(), "600")); System.out.println("Acked: " + getStatValueFromMap(spoutStats.getAcked(), "600")); System.out.println("Failed: " + getStatValueFromMap(spoutStats.getFailed(), "600")); System.out.println("**** Window Size **** " + "10800"); System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), "10800")); System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), "10800")); System.out.println("Acked : " + getStatValueFromMap(spoutStats.getAcked(), "10800")); System.out.println("Failed : " + getStatValueFromMap(spoutStats.getFailed(), "10800")); System.out.println("**** Window Size **** " + "86400"); System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), "86400")); System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), "86400")); System.out.println("Acked : " + getStatValueFromMap(spoutStats.getAcked(), "86400")); System.out.println("Failed : " + getStatValueFromMap(spoutStats.getFailed(), "86400")); System.out.println("**** Window Size **** " + "all-time"); System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), ":all-time")); System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), ":all-time")); System.out.println("Acked : " + getStatValueFromMap(spoutStats.getAcked(), ":all-time")); System.out.println("Failed : " + getStatValueFromMap(spoutStats.getFailed(), ":all-time")); System.out.println("**** Output stats (All time) ****"); System.out.println("Stream : " + "default"); System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), ":all-time")); System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), ":all-time")); System.out.println("Acked : " + getStatValueFromMap(spoutStats.getAcked(), ":all-time")); System.out.println("Failed : " + getStatValueFromMap(spoutStats.getFailed(), ":all-time")); System.out.println("**** Executors (All time) ****"); System.out.println("Host : " + executorSummary.getHost()); System.out.println("Port : " + executorSummary.getPort()); System.out.println("Up-Time : " + executorSummary.getUptime_secs()); System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), ":all-time")); System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), ":all-time")); System.out.println("Acked : " + getStatValueFromMap(spoutStats.getAcked(), ":all-time")); System.out.println("Failed : " + getStatValueFromMap(spoutStats.getFailed(), ":all-time")); System.out.println("**** Errors ****"); Map<String, List<ErrorInfo>> errors = topologyInfo.getErrors(); List<ErrorInfo> spoutErrors = errors.get(componentId); for(ErrorInfo errorInfo : spoutErrors) { System.out.println("Spout Error : " + errorInfo.getError()); } } /* * Calculate bolt specific stats */ private static void boltSpecificStats(TopologyInfo topologyInfo, TopologySummary topology, ExecutorSummary executorSummary, String componentId) { ExecutorStats execStats = executorSummary.getStats(); ExecutorSpecificStats execSpecStats = execStats.getSpecific(); BoltStats boltStats = execSpecStats.getBolt(); System.out.println(":::::::::: Component summary ::::::::::"); System.out.println("Id : " + componentId); System.out.println("Topology Name : " + topology.getName()); System.out.println("Executors : " + "1"); System.out.println("Tasks : " + "1"); System.out.println(":::::::::: Bolt stats ::::::::::"); System.out.println("::::::::::: Window Size ::::::::::: " + "600"); System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), "600")); System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), "600")); System.out.println("Acked : " + getBoltStatLongValueFromMap(boltStats.getAcked(), "600")); System.out.println("Failed : " + getBoltStatLongValueFromMap(boltStats.getFailed(), "600")); System.out.println("Executed : " + getBoltStatLongValueFromMap(boltStats.getExecuted(), "600")); System.out.println("Execute Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getExecute_ms_avg(), "600")); System.out.println("Process Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getProcess_ms_avg(), "600")); System.out.println("::::::::::: Window Size ::::::::::: " + "10800"); System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), "10800")); System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), "10800")); System.out.println("Acked : " + getBoltStatLongValueFromMap(boltStats.getAcked(), "10800")); System.out.println("Failed : " + getBoltStatLongValueFromMap(boltStats.getFailed(), "10800")); System.out.println("Executed : " + getBoltStatLongValueFromMap(boltStats.getExecuted(), "10800")); System.out.println("Execute Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getExecute_ms_avg(), "10800")); System.out.println("Process Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getProcess_ms_avg(), "10800")); System.out.println("::::::::::: Window Size ::::::::::: " + "86400"); System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), "86400")); System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), "86400")); System.out.println("Acked : " + getBoltStatLongValueFromMap(boltStats.getAcked(), "86400")); System.out.println("Failed : " + getBoltStatLongValueFromMap(boltStats.getFailed(), "86400")); System.out.println("Executed : " + getBoltStatLongValueFromMap(boltStats.getExecuted(), "86400")); System.out.println("Execute Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getExecute_ms_avg(), "86400")); System.out.println("Process Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getProcess_ms_avg(), "86400")); System.out.println("::::::::::: Window Size ::::::::::: " + "all-time"); System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), ":all-time")); System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), ":all-time")); System.out.println("Acked : " + getBoltStatLongValueFromMap(boltStats.getAcked(), ":all-time")); System.out.println("Failed : " + getBoltStatLongValueFromMap(boltStats.getFailed(), ":all-time")); System.out.println("Executed : " + getBoltStatLongValueFromMap(boltStats.getExecuted(), ":all-time")); System.out.println("Execute Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getExecute_ms_avg(), ":all-time")); System.out.println("Process Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getProcess_ms_avg(), ":all-time")); System.out.println(":::::::::: Output stats (All time) ::::::::::"); System.out.println("Stream : " + "default"); System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), ":all-time")); System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), ":all-time")); System.out.println("Acked : " + getBoltStatLongValueFromMap(boltStats.getAcked(), ":all-time")); System.out.println("Failed : " + getBoltStatLongValueFromMap(boltStats.getFailed(), ":all-time")); System.out.println("Executed : " + getBoltStatLongValueFromMap(boltStats.getExecuted(), ":all-time")); System.out.println("Execute Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getExecute_ms_avg(), ":all-time")); System.out.println("Process Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getProcess_ms_avg(), ":all-time")); System.out.println(":::::::::: Executors (All time) ::::::::::"); System.out.println("Host : " + executorSummary.getHost()); System.out.println("Port : " + executorSummary.getPort()); System.out.println("Up-Time : " + executorSummary.getUptime_secs()); System.out.println("Transferred : " + getStatValueFromMap(execStats.getTransferred(), ":all-time")); System.out.println("Emitted : " + getStatValueFromMap(execStats.getEmitted(), ":all-time")); System.out.println("Acked : " + getBoltStatLongValueFromMap(boltStats.getAcked(), ":all-time")); System.out.println("Failed : " + getBoltStatLongValueFromMap(boltStats.getFailed(), ":all-time")); System.out.println("Executed : " + getBoltStatLongValueFromMap(boltStats.getExecuted(), ":all-time")); System.out.println("Execute Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getExecute_ms_avg(), ":all-time")); System.out.println("Process Latency (ms) : " + getBoltStatDoubleValueFromMap(boltStats.getProcess_ms_avg(), ":all-time")); System.out.println(":::::::::: Errors ::::::::::"); Map<String, List<ErrorInfo>> errors = topologyInfo.getErrors(); System.out.println(errors.keySet()); List<ErrorInfo> boltErrors = errors.get(componentId); for(ErrorInfo errorInfo : boltErrors) { System.out.println("Bolt Error : " + errorInfo.getError()); } } /* * Utility method to parse a Map<> */ public static Long getStatValueFromMap(Map<String, Map<String, Long>> map, String statName) { Long statValue = null; Map<String, Long> intermediateMap = map.get(statName); statValue = intermediateMap.get("default"); return statValue; } /* * Utility method to parse a Map<> as a special case for Bolts */ public static Double getBoltStatDoubleValueFromMap(Map<String, Map<GlobalStreamId, Double>> map, String statName) { Double statValue = 0.0; Map<GlobalStreamId, Double> intermediateMap = map.get(statName); Set<GlobalStreamId> key = intermediateMap.keySet(); if(key.size() > 0) { Iterator<GlobalStreamId> itr = key.iterator(); statValue = intermediateMap.get(itr.next()); } return statValue; } /* * Utility method for Bolts */ public static Long getBoltStatLongValueFromMap(Map<String, Map<GlobalStreamId, Long>> map, String statName) { Long statValue = null; Map<GlobalStreamId, Long> intermediateMap = map.get(statName); Set<GlobalStreamId> key = intermediateMap.keySet(); if(key.size() > 0) { Iterator<GlobalStreamId> itr = key.iterator(); statValue = intermediateMap.get(itr.next()); } return statValue; } } |
Hope this post helps you. All the best !!
Thanks a lot!
ReplyDeletehello, thank you for your work
ReplyDeletehow can I include thrift and storm package when I compiling?
If you are creating an eclipse project, you would need to add the reqd. jars to the build path, else if you are running the class from command line, you need to specify the jars using the "-cp" command at the time of compilation.
ReplyDeleteThx
ReplyDeleteAll "org.apache.thrift7" package I can use -cp from /storm/libs/, where can I find the class under "backtype.storm.generated"? Are you generated these class by yourself from thrift compiler, or it located somewhere in storm?
Hi,thanks for the detailed,all-in-one utility.However,I am getting NullPointer exception at "executor_summary.getStats();"All other fields of executor-summary (such as host,port,config,etc.) are displaying properly,but its just the get_stats thats returning null.
ReplyDeleteCan you provide any pointers?
same here!
DeleteGreat contribution jayanti.
ReplyDeleteThankyou !
ReplyDeleteWe developed a system using Storm cluster.And we are getting Storm UI with results. But results are not satisfiable.Latency time may get increased sometime and sometime decreased.Why it happened?
ReplyDelete