Monday, November 14, 2011

Storm Installation

New to Storm ? My previous post could help you in finding your feet. In this post, we'll be going the extra mile in an attempt to install Storm. This has got two aspects to it:
    - Setting up Storm locally
    - Setting up a Storm cluster
Let's begin with setting up the storm cluster locally, which hardly is a two step procedure.

Setting up Storm locally

This is kind of mandatory !!!
That's because even if your aim is to get topologies working on a cluster, submitting topologies to that cluster requires a 'storm client', which requires the storm to be setup on your system locally.
Moreover it is always better to dry run topologies on your local system before deploying them as a jar on the cluster. It saves you from the exhaustive debugging on the cluster. So moving forth, we'll be undertaking the following two tasks under this heading:
  1. Setting up Storm for running topologies on the local machine
  2. Setting up the Storm client
As an obvious prerequisite you must be working on Linux with Java 6 installed on it.
So steps for accomplishing the first task :
  • Download a storm release from
    https://github.com/nathanmarz/storm/downloads
  • cd to the unzipped location of the storm setup to test if bin/storm is executable using any of these
    - bin/storm ui
    - bin/storm supervisor
    - bin/storm nimbus
Next to get the ball rolling on running topologies in Storm, you can best start with the 'storm-starter' project using Eclipse. Steps for this are :
  1. Obtain the storm-starter project from the following location :
  2. Add the storm-0.5.*.jar and other required jars present in the storm setup to the build path of your eclipse project.
  3. If you want to start with the simplest thing that could possibly work, the simplest part of this project i.e. the 'WordCountTopology.java' could do the trick.
  4. Since this topology uses the 'SplitSentence' bolt which has been implemented using python, here's a java substitute for the 'SplitSentence' class if your preference is java.

public static class SplitSentence implements IRichBolt {
    OutputCollector _collector;
    public void prepare(Map conf, TopologyContext context,        OutputCollector collector) {
       _collector = collector;
    }
     
    public void execute(Tuple tuple) {
        String sentence = tuple.getString(0);
        for(String word: sentence.split(" ")) {
           _collector.emit(tuple, new Values(word));
     }
    _collector.ack(tuple);
  }

    public void cleanup() {
}

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
     declarer.declare(new Fields("word"));
   }
}

Successfully accomplishing this leaves you have with a checked environment setup for testing and running any storm topology locally.

Setting up the Storm client

Communicating with a remote cluster and submitting topologies to it requires a Storm client on your system.
For this, configure the 'storm.yaml' file located in your storm setup's conf folder by adding the following line to it and place a copy of it at the location '~/.storm/storm.yaml'
 
   nimbus.host: "ip_of_your_remote_cluster's_nimbus"
As an eg :
   nimbus.host: "195.168.78.78”
As an important note also check the permissions of this file so that it is accessible.
Now you should be able to deploy jars on any remote cluster(steps to setup a remote cluster have been listed later in the post) using :
    cd /path_to_your_storm_setup
    bin/storm jar location_of_jar_on_your_system/WordCount.jar     storm.starter.WordCountTopology
and kill running topologies using
    bin/storm kill wordcount

Setting up a Storm Cluster

Time to kick off with setting up a Storm cluster. Here I am assuming a cluster of 3 machines, of which one would be the master node i.e. nimbus and the other two are the worker nodes.

Prerequisites :
  1. Java 6 and Python 2.6
  2. JAVA_HOME should be set, if it is not set in bashrc
These should be installed on all the machines of the cluster.

Installation steps :
  • Setup the Zookeeper Cluster :
Zookeeper is the coordinator for a Storm cluster. The interaction between the nimbus and the worker nodes is done through the Zookeeper. So its compulsary to setup a Zookeeper cluster first. You can follow the instructions from here :
         http://zookeeper.apache.org/doc/r3.3.3/zookeeperAdmin.html
  • Install native dependencies
In the local mode, Storm uses a pure Java messaging system so that you don't need to install native dependencies on your development machine. But in case of a cluster, ZeroMQ and JZMQ are a prerequisite on all the nodes of the cluster including nimbus. 

Download and installation commands for ZeroMQ 2.1.7 :

  • Obtain ZeroMQ using
wget  http://download.zeromq.org/zeromq-2.1.7.tar.gz               
  •  tar -xzf zeromq-2.1.7.tar.gz
  •  cd zeromq-2.1.7
  •  ./configure
  •  make
  •  sudo make install

Download and installation commands for JZMQ :

  •  Obtain JZMQ using 
git clone https://github.com/nathanmarz/jzmq.git                                   
  •  cd jzmq
  •  ./autogen.sh
  •  ./configure
  •  make 
  •  sudo make install


- Copy storm setup to all the machines in the cluster . Assuming the following IP for clarity : nimbus IP : A.B.C.Nimbus supervisor node Ips : A.B.C.Sup1 and A.B.C.Sup2 Edit the conf/storm.yaml file as follows: 

storm.yaml” file for master node/nimbus :
storm.zookeeper.servers:
     - "A.B.C.Sup1"
     - "A.B.C.Sup2"

storm.local.dir: "path_to_any_dir_for_temp_storage"                               
java.library.path: "/usr/local/lib/"

nimbus.host: "127.0.0.1"
nimbus.task.launch.secs: 240
supervisor.worker.start.timeout.secs: 240
supervisor.worker.timeout.secs: 240  

storm.yaml” file for all worker nodes :
storm.zookeeper.servers: 
      - "A.B.C.Sup1"
      - "A.B.C.Sup2" 
storm.local.dir: "path_to_any_dir_for_temp_storage"                               
java.library.path: "/usr/local/lib/"
nimbus.host: "A.B.C.Nimbus"
supervisor.slots.ports:
      - 6700
      - 6701
      - 6702
      - 6703

Note : Also copy this storm.yaml file to “~/.storm/” folder on the respective systems.
This completes the cluster setup and you can now submit topologies from your system to it after creating a jar. For further assistance in this follow :

That's all from my end . . .  Hope it was helpful !!!

42 comments:

  1. I just came across Storm and started playing with it...!

    nimbus.task.launch.secs: 240
    supervisor.worker.start.timeout.secs: 240
    supervisor.worker.timeout.secs: 240

    Can you explain me what does these do when we run storm in cluster..??
    I mean what happens if we not mentions these lines...?

    ReplyDelete
  2. will this setup work equally well on a windows environment

    ReplyDelete
    Replies
    1. Not sure ... I haven't tested it on Windows.

      Delete
    2. Storm works on windows only for "local" mode. It is not possible to run a storm cluster on windows because of its dependent libraries

      Delete
  3. I used storm-0.8.1.
    to run one process, i run into the storm folder, example : ./bin/storm nimbus
    but it has a exception:
    Exception in thread "main" java.lang.NoClassDefFoundError: clojure.core.protocols$seq_reduce
    at java.lang.Class.initializeClass(libgcj.so.10)
    at clojure.core.protocols__init.load(Unknown Source:19)
    at clojure.core.protocols__init.(Unknown Source)
    at java.lang.Class.initializeClass(libgcj.so.10)
    at java.lang.Class.forName(libgcj.so.10)
    at clojure.lang.RT.loadClassForName(RT.java:2056)
    at clojure.lang.RT.load(RT.java:419)
    at clojure.lang.RT.load(RT.java:400)
    at clojure.core$load$fn__4890.invoke(core.clj:5415)
    at clojure.core$load.doInvoke(core.clj:5414)
    at clojure.lang.RestFn.invoke(RestFn.java:408)
    at clojure.core__init.load(Unknown Source:6010)
    at clojure.core__init.(Unknown Source)
    at java.lang.Class.initializeClass(libgcj.so.10)
    at java.lang.Class.forName(libgcj.so.10)
    at clojure.lang.RT.loadClassForName(RT.java:2056)
    at clojure.lang.RT.load(RT.java:419)
    at clojure.lang.RT.load(RT.java:400)
    at clojure.lang.RT.doInit(RT.java:436)
    at clojure.lang.RT.(RT.java:318)
    at java.lang.Class.initializeClass(libgcj.so.10)
    at clojure.lang.Namespace.(Namespace.java:34)
    at clojure.lang.Namespace.findOrCreate(Namespace.java:176)
    at clojure.lang.Var.internPrivate(Var.java:163)
    at backtype.storm.command.config_value.(Unknown Source)
    at java.lang.Class.initializeClass(libgcj.so.10)
    Caused by: java.lang.VerifyError: verification failed at PC 78 in clojure.core.protocols$seq_reduce:invoke((Ljava.lang.Object;Ljava.lang.Object;Ljava.lang.Object;)Ljava.lang.Object;): incompatible type on stack
    at java.lang.Class.initializeClass(libgcj.so.10)
    ...25 more
    What is this problem? How does this problem solve?
    Thanks for your helps.

    ReplyDelete
  4. Haven't come across such error but this somewhere means that the JDK rejects the bytecode produced probably bcoz you are not using Oracle JDK 7 but GJC (GNU's JVM) that Clojure or any non-trivial project does not support.
    Make sure you have installed all the prerequisites without any failure msgs.
    Further please make sure you don't have gcj (for example, uninstall it) and use
    java -version
    Refer to : https://github.com/technomancy/leiningen/issues/676

    ReplyDelete
  5. hi jayati,

    thank ..for this post.
    i am new to storm.. can u send me the link to twitter stream java api example using storm

    ReplyDelete
    Replies
    1. Have a look at this link:

      https://github.com/nathanmarz/storm-starter/blob/master/src/jvm/storm/starter/spout/TwitterSampleSpout.java

      Delete
  6. i am trying to integrate storm and esper for complex event processing..if you have done it can you share your thoughts?

    It would be really great if you can share your ideas regarding the scalablity and performance of Storm. As it is meant for online stream processing but does it have anything out of the box approach which supports sliding and tumbling window concept

    ReplyDelete
  7. I have Zookeeper running on 2181 (ports are opened):

    hduser@hduser-laptop:~/.storm$ netstat -a | grep -e "2181"
    tcp 0 0 *:2181 *:* LISTEN

    But when I tried to kick start nimbus, then I got the below exception in the nimbus log file:

    2013-05-17 19:17:12 ZooKeeper [INFO] Initiating client connection, connectString=localhost:2181 sessionTimeout=20000 watcher=com.netflix.curator.ConnectionState@53e6978d
    2013-05-17 19:17:12 ClientCnxn [INFO] Opening socket connection to server localhost/127.0.0.1:2181
    2013-05-17 19:17:12 ClientCnxn [INFO] Socket connection established to localhost/127.0.0.1:2181, initiating session
    2013-05-17 19:17:13 ClientCnxn [INFO] Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x13eb48fd6420000, negotiated timeout = 20000
    2013-05-17 19:17:13 zookeeper [INFO] Zookeeper state update: :connected:none
    2013-05-17 19:17:13 ZooKeeper [INFO] Session: 0x13eb48fd6420000 closed
    2013-05-17 19:17:13 CuratorFrameworkImpl [INFO] Starting
    2013-05-17 19:17:13 ZooKeeper [INFO] Initiating client connection, connectString=localhost:2181/storm sessionTimeout=20000 watcher=com.netflix.curator.ConnectionState@11c0b8a0
    2013-05-17 19:17:13 ClientCnxn [INFO] Opening socket connection to server localhost/127.0.0.1:2181
    2013-05-17 19:17:13 ClientCnxn [INFO] EventThread shut down
    2013-05-17 19:17:13 ClientCnxn [INFO] Socket connection established to localhost/127.0.0.1:2181, initiating session
    2013-05-17 19:17:13 ClientCnxn [INFO] Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x13eb48fd6420001, negotiated timeout = 20000
    2013-05-17 19:17:13 nimbus [INFO] Starting Nimbus server...
    2013-05-17 19:22:04 TNonblockingServer [WARN] Got an IOException in internalRead!
    java.io.IOException: Connection reset by peer
    at sun.nio.ch.FileDispatcher.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:21)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:202)
    at sun.nio.ch.IOUtil.read(IOUtil.java:175)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:243)
    at org.apache.thrift7.transport.TNonblockingSocket.read(TNonblockingSocket.java:141)
    at org.apache.thrift7.server.TNonblockingServer$FrameBuffer.internalRead(TNonblockingServer.java:669)
    at org.apache.thrift7.server.TNonblockingServer$FrameBuffer.read(TNonblockingServer.java:458)
    at org.apache.thrift7.server.TNonblockingServer$SelectThread.handleRead(TNonblockingServer.java:359)
    at org.apache.thrift7.server.TNonblockingServer$SelectThread.select(TNonblockingServer.java:304)
    at org.apache.thrift7.server.TNonblockingServer$SelectThread.run(TNonblockingServer.java:243)

    I have ensured that even the port 6627 is left unblocked. I have placed the storm.yaml file inside storm/conf folder as well inside .storm folder in my home root folder with all permissions.

    So what do you think is the prob with this ?

    Cheers

    ReplyDelete
  8. I new to twitter storm.,
    i installing same thing on windows and linux ., but it is not working.

    ReplyDelete
    Replies
    1. Hi,

      The steps should be working fine on Linux unless there is some issue with the prerequisites not being satisfied.

      This blog is not for windows, as setting up storm cluster on windows requires some modifications.

      Jayati

      Delete
  9. when i run storm ui it will give me error as address already in use what it means?

    ReplyDelete
    Replies
    1. It means the port on which the UI runs is already occupied on the machine.

      Please make sure there is no other process running on the same port.

      You could use "sudo netstat -nlp | grep " to determine.

      Jayati

      Delete
  10. Exception in thread "main" expected '', but found Scalar
    in "", line 5, column 1:
    nimbus.host:"localhost"
    this error occur when example runs.what it means

    ReplyDelete
  11. is there need to install redis server with storm???

    ReplyDelete
  12. Hi i could not install storm on linux Ubunto 12.10.
    Can someone please help me and detail me the installation procedure?

    ReplyDelete
    Replies
    1. Hi,

      Please make sure that you carefully look at the output of all the commands which are a part of the installation.

      Setting variables like JAVA_HOME etc. matter a lot, due to which many commands like "configure" might fail.

      Jayati

      Delete
  13. Hi I am trying to run storm-starter using eclipse but it will show error main class not found.
    I am also trying to run storm topology example by using storm jar command but i cant get clear idea how to run it.

    ReplyDelete
  14. Hey this is good information for installing storm cluster in local mode. I have also wriiten a blog related to this. to see my blog click on this link.
    link

    ReplyDelete
  15. Hi i have not been able to start nimbus after following the steps u had outlined. It is showing the following error:
    Exception in thread "main" expected '', but found BlockMappingStart
    in "", line 29, column 1:
    storm.zookeeper.port: 2181
    ^

    at org.yaml.snakeyaml.parser.ParserImpl$ParseDocumentStart.produce(ParserImpl.java:231)
    at org.yaml.snakeyaml.parser.ParserImpl.peekEvent(ParserImpl.java:161)
    at org.yaml.snakeyaml.parser.ParserImpl.checkEvent(ParserImpl.java:146)
    at org.yaml.snakeyaml.composer.Composer.getSingleNode(Composer.java:109)
    at org.yaml.snakeyaml.constructor.BaseConstructor.getSingleData(BaseConstructor.java:121)
    at org.yaml.snakeyaml.Yaml.loadFromReader(Yaml.java:480)
    at org.yaml.snakeyaml.Yaml.load(Yaml.java:423)
    at backtype.storm.utils.Utils.findAndReadConfigFile(Utils.java:110)
    at backtype.storm.utils.Utils.readStormConfig(Utils.java:147)
    at backtype.storm.config$read_storm_config.invoke(config.clj:66)
    at backtype.storm.command.config_value$_main.invoke(config_value.clj:7)
    at clojure.lang.AFn.applyToHelper(AFn.java:161)
    at clojure.lang.AFn.applyTo(AFn.java:151)
    at backtype.storm.command.config_value.main(Unknown Source)

    ReplyDelete
    Replies
    1. Can you please make sure your config file is free from any errors like redundant spaces between key and the value or some small mis-spell or something.

      This is because the errror is clearly when loading the yaml file.

      Jayati

      Delete
  16. While following the above steps to install JZMQ I got an error saying classdist_noinst.stamp is not found. Checked and found that the following steps work:

    git clone https://github.com/nathanmarz/jzmq.git
    cd jzmq
    cd src
    touch classdist_noinst.stamp
    javac -d . org/zeromq/*.java
    cd ..

    ./autogen.sh
    ./configure
    sudo make
    sudo make install

    Hope this helps someone

    ReplyDelete
  17. Hi..I setup storm on local cluster but there is difficulty in distributed cluster...Nimbus machine get connected to only one zookeeper not more than one.As per your instruction I set up machines.Can you provide me host file??

    ReplyDelete
  18. Can you plz elaborate a bit .. Nimbus gets connected to only a single zookeeper node means ?

    ReplyDelete
    Replies
    1. i want to create storm cluster on 3 to 4 machine...I already install it on 2 machine..One machine is zookeeper and other is nimbus machine..Nimbus machine get connected to zookeeper machine..And i get cluster summary also...And data distribution is done between these two machnes..Now I want to increase no of zookeeper machine and same nimbus get connected with that zookeper..How it will possible??

      Delete
  19. Ok ... for that you need to install a zookeeper cluster and specify all of their ips in the yaml file like:

    storm.zookeeper.servers:
    - "A.B.C.Sup1"
    - "A.B.C.Sup2"

    In that case all the nodes of your zk cluster will be used.

    ReplyDelete
    Replies
    1. As you described in your blog worker node and their yaml file...That worker node act as zookeeper??That yaml file used for developing zookeeper cluster..?

      Delete
  20. No no ... that is a way to specify to the Storm cluster nodes about the zk cluster nodes they should be using. Prior to that you need to setup the cluster of zookeeper which is independent of the Storm setup. Here's the link that can be used for that :

    http://zookeeper.apache.org/doc/r3.3.3/zookeeperAdmin.html#sc_zkMulitServerSetup

    ReplyDelete
  21. Is worker node similar to zookeeper?Can i connect no. of storm machine to single zookeeper??And how we can connect it?

    ReplyDelete
  22. hi, I'm running Storm on a Windows dev machine, topologies are running fine from eclipse environment, however when I submitted a Topology to Storm using "Storm jar...", it shows up in the storm UI -> Topology summary as 'Active' topology, however it does not run. It shows num workers, num executors etc...all '0'. I submitted the sample that comes with Storm 'ExclamationTopology'.

    Thanks
    AJ

    ReplyDelete
    Replies
    1. Hi Anuj,
      I am trying to setup Storm on my Windows workstation. It would be great if you could point me to a tutorial which explains how to set it up on Windows.
      Any help is much appreciated.
      Thanks,
      Jainam

      Delete
  23. Hey , for learning i have setup a ubuntu machine for kafka and which internally uses zookeeper now at the same time i want to configure storm on that machine . now i want to know in that case do i need to install zookeeper separately for storm?

    ReplyDelete
  24. following method need to be added in class SplitSentence
    @Override
    public Map getComponentConfiguration() {
    return null;
    }

    ReplyDelete
  25. https://storm.apache.org/documentation/Setting-up-a-Storm-cluster.html

    ReplyDelete
  26. Hey Jayanti,Nice bog.Helped alot while configuring cluster. Now I want to Sort Streams of huge data using storm. Can you help me with it if you have any reference documentation or example. Thank you in advance.

    ReplyDelete
  27. Hi Jayanti ,
    while doing make command on jzmq iTs throughing error

    "Making all in src
    make[1]: Entering directory '/home/everestbox/platfrom/jzmq/src'
    make[1]: *** No rule to make target 'classdist_noinst.stamp', needed by 'org/zeromq/ZMQ.class'. Stop.
    make[1]: Leaving directory '/home/everestbox/platfrom/jzmq/src'
    Makefile:399: recipe for target 'all-recursive' failed
    make: *** [all-recursive] Error 1 "

    Any idea ?

    ReplyDelete