Monday, September 7, 2015

Deploying the Spark-0.7.x Cluster in Standalone Mode


To deploy the Spark Cluster in the Standalone Mode, run the following script present in the Spark Setup on the cluster's Master node

bin/start-all.sh

If everything is fine, the Spark Master UI should be accessible on port 8083 (or as per the specification provided in "spark-env.sh". In case of no explicit specification, default port is 8080) of the Master node.

Run a Spark Application


Simple Spark applications such as Word Count are available in "spark-0.7.x/examples/src/main/java/spark/examples".

Most of them expect as an argument the "master" on which to run the application, which needs to be "local" when running on a system locally and "spark://MASTER_IP:PORT" (for example spark://192.10.0.3:7077) when running in Standalone mode.

The job submitted should appear on the Spark Master's Web UI as well.

Deploy the Mesos Cluster


If the Spark is to be run over a cluster managed by Mesos. For this, a Mesos cluster needs to be deployed, steps for which are available in the "Setting up a Mesos Cluster" post.

To start a Mesos Cluster, execute the command below on the Mesos Master:
/usr/local/sbin/mesos-start-cluster.sh

If all goes well, Mesos's web UI should be visible on port 8080 of the master machine.

Run a Spark Application over Mesos


To run the Spark applications over the Mesos Cluster, specify "mesos://MASTER_IP:PORT" as the value of the "master" argument. A sample of the URL could be "mesos://192.10.0.3:5050".

The job submitted should appear on the Mesos Master's Web UI as well.

Friday, September 4, 2015

Setting up a Mesos-0.9.0 Cluster

Apart from running in Standalone mode, Spark can also run on clusters managed by Apache Mesos. "Apache Mesos is a cluster manager that provides efficient resource isolation and sharing across distributed applications, or frameworks." 

In addition to Spark, Hadoop, MPI and Hypertable also support running on clusters managed by Apache Mesos. 

Listed below are the steps for deploying a Mesos Cluster. These should to be run on the node supposed to be the Master node in the Mesos Cluster.

1. Mesos 0.9.0-incubating can be downloaded from:
http://archive.apache.org/dist/incubator/mesos/mesos-0.9.0-incubating/                                                                                                   

2. Extract Mesos setup
tar -xvzf  mesos-0.9.0-incubating.tar.gz                                                                                                                                                                                                    

3. Change the current working directory to the extracted mesos setup for it's compilation. 
cd mesos-0.9.0                                                                                                                                                                                                                                                         

The JAVA_HOME to be used needs to specified, while configuring Mesos. This can be done by specifying a command line option "--with-java-home" to the configure command as shown below:
./configure --with-java-home=/usr/lib/jvm/jre-1.6.0-openjdk.x86_64                                                                                                  

After configuring, run the following two commands:
make 
sudo make install                                                                                                                                                                                        

The following files and directories should have been created, if the above steps have been executed successfully:
/usr/local/lib/libmesos.so 
/usr/local/sbin/mesos-daemon.sh  
/usr/local/sbin/mesos-slave             
/usr/local/sbin/mesos-start-masters.sh  
/usr/local/sbin/mesos-stop-cluster.sh 
/usr/local/sbin/mesos-stop-slaves.sh
/usr/local/sbin/mesos-master     
/usr/local/sbin/mesos-start-cluster.sh  
/usr/local/sbin/mesos-start-slaves.sh   
/usr/local/sbin/mesos-stop-masters.sh
/usr/local/var/mesos/conf/mesos.conf.template
/usr/local/var/mesos/deploy                                                                                                                                                                                                    

4. Add the MESOS_NATIVE_LIBRARY variable declaration to "conf/spark-env.sh" in Spark's "conf" directory as shown below:
export MESOS_NATIVE_LIBRARY=/usr/local/lib/libmesos.so                                                                                                                                           

5. Copy the Mesos setup on all the nodes to be included in the Mesos cluster on the same location or simply setup Mesos on each of them by running:
cd mesos-0.9.0
sudo make install                                                                                                                                                                                                                                           

This completes the process of setting up a Mesos Cluster.

Configure Mesos for deployment


1. On the Mesos Cluster's master node, edit the files "/usr/local/var/mesos/deploy/masters" to list down the IP of the Master node and "/usr/local/var/mesos/deploy/slaves" to list down the IPs of the slaves.

2. On all nodes of the Mesos Cluster, edit "/usr/local/var/mesos/conf/mesos.conf" and add the line master=HOST:5050, where HOST is the IP of the Mesos Cluster's master node.

This is the end of the Configuration Phase.

Good luck.

Setting up Spark-0.7.x in Standalone Mode


A Spark Cluster in Standalone Mode comprises of one Master and multiple Spark Worker processes. Standalone mode can be used both on a single local machine or on a cluster. This mode does not require any external resource manager such as Mesos.



To deploy a Spark Cluster in Standalone mode, the following steps need to be executed on any one of the nodes.

1. Download the spark-0.7.x setup from: 
http://spark.apache.org/downloads.html

2. Extract the Spark setup
tar -xzvf spark-0.7.x-sources.tgz

3. Spark requires Scala's bin directory to be present in the PATH variable of the linux machine. Scala 2.9.3 for Linux can be downloaded from:
http://www.scala-lang.org/downloads

4. Extract the Scala setup  
tar -xzvf scala-2.9.3.tgz

5. Export the Scala home by appending the following line into "~/.bashrc" (for CentOS) or "/etc/environment" (for Ubuntu)
export SCALA_HOME=/location_of_extracted_scala_setup/scala-2.9.3

6. Spark can be compiled "sbt" or can be built using Maven. This module states the former method, because of it's simplicity of execution. To compile change directory to the extracted Spark setup and execute the following command:
sbt/sbt package

7. Create a file (if not already present) called "spark-env.sh" in Spark’s "conf" directory, by copying "conf/spark-env.sh.template", and add the SCALA_HOME variable declaration to it as described below:
export SCALA_HOME=<path to Scala directory>

The Web UI port for the Spark Master and Worker can also be optionally specified by appending the following to "spark-env.sh"
export SPARK_MASTER_WEBUI_PORT=8083
export SPARK_WORKER_WEBUI_PORT=8084

8. To specify the nodes which would behave as the Workers, the IP of the nodes are to mentioned in "conf/slaves". For a cluster containing two worker nodes with IP 192.10.0.1 and 192.10.0.2, the "conf/slaves" would contain:
192.10.0.1
192.10.0.2

This completes the setup process on one node. 

For setting up Spark on the other nodes of the cluster, the Spark and Scala Setup should be copied on same locations on the rest of the nodes of the cluster.

Lastly, edit the /etc/hosts file on all the nodes to add the "IP HostName" entries of all the other nodes in the cluster.

Hope that helps !!

Spark Overview

Spark is a cluster computing framework i.e. a framework which uses multiple workstations, multiple storage devices, and redundant interconnections, to form an abstract single highly available system. 

Spark has been imparted the following features:

- open-source under BSD licence 
- in-memory processing
- multi-language APIs in Scala, Java and Python
- rich array of parallel operators
- runnable on Apache Mesos, YARN, Amazon EC2 or in standalone mode
- best suitable for highly iterative jobs
- efficient for interactive data mining jobs

Need of Spark


Spark is a result of the fleeting ongoing developments in the Big-Data world. "Big-Data", a term that can be used to describe data which has got 3 V's to it, Volume, Variety and Velocity. To store and process Big-Data, specialized frameworks were in demand. Hadoop is a predominantly established software framework, composed of a file-system called Hadoop Distributed File System(HDFS) and Map-Reduce. Spark is a strong contender of Map-Reduce. Due to its in-memory processing capability, Spark offers lightening fast results as compared to Map-Reduce.

When to use Spark?


Apart from using Spark for common data processing applications, it should be used specifically for applications where in-memory operations are a major percentage to the processing. The following are the type of applications in which Spark specializes:

1. Iterative algorithms
2. Interactive data mining

Who is using Spark?


UC Berkeley AMPLab is the developer of Spark. Apart from Berkeley, which runs large-scale applications such as spam filtering and traffic prediction, 14 other companies including Conviva, Quantifind have contributed to Spark.

Saturday, July 25, 2015

Mahout’s Naïve Bayes: Test Phase

This post is in continuation to my previous post where Mahout Naive Bayes "trainnb" command has been explained. This one would describe the internal execution steps of the "testnb" command, which is used to test the model we build using 'trainnb".

Command Line Options for "testnb"


Generic Options

 -archives <paths>: comma separated archives to be unarchived on the compute machines.                                                            
 -conf <configuration file>:  specify an application configuration file
 -D <property=value>: use value for given property
 -files <paths>: comma separated files to be copied to the map reduce cluster
 -fs<local|namenode:port>: specify a namenode
 -jt<local|jobtracker:port>: specify a job tracker
 -libjars<paths>: comma separated jar files to include in the classpath.
 -tokenCacheFile<tokensFile>:  name of the file with the tokens

Job-Specific Options
                                                       
  --input (-i) input: Path to job input directory.                                                                                                                                                    
  --output (-o) output: The directory pathname for output.        
  --testComplementary (-c): test complementary?
  --labelIndex (-li) labelIndex: The path to the location of the label index      
  --overwrite (-ow): If present, overwrite the output directory before running job                              
  --help (-h): Print out help                            
  --tempDirtempDir: Intermediate output directory              
  --startPhasestartPhase: First phase to run                        
  --endPhaseendPhase: Last phase to run
  --runSequential (-seq):  run sequential?
  --model (-m) model: The path to the model built during training              

Flow of execution of the "testnb" command



Hope it helped!


Mahout’s Naïve Bayes: Train Phase

Mahout’s Naïve Bayes Classification algorithm executes in two phases:
  1. Train Phase: Trains a model using pre-processed train data
  2. Test Phase: Classify documents (pre-processed) with the help of the model
 This blog post provides code-level understanding of the training process in the algorithm. And testing phase is covered in the next blog. The Mahout command "trainnb" is used to train a Naive Bayes model in Mahout.

"trainnb" command in Mahout



For reference, above is the structure of the train data directory specified as an input (similar to the one used in data pre-processing).

Command Line options for “trainnb”


Generic Options


 -archives <paths>: comma separated archives to be unarchived on the compute machines.                
 -conf <configuration file>:  specify an application configuration file
 -D <property=value>: use value for given property
 -files <paths>: comma separated files to be copied to the map reduce cluster
 -fs<local|namenode:port>: specify a namenode
 -jt<local|jobtracker:port>: specify a job tracker
 -libjars<paths>: comma separated jar files to include inthe classpath.
 -tokenCacheFile<tokensFile>:  name of the file with the tokens

Job-Specific Options
                                                          

  --input (-i) input: Path to job input directory.                 
  --output (-o) output: The directory pathname for output.                                                                     
  --labels (-l) labels:comma-separated list of labels to include in training                                     
  --extractLabels (-el):Extract the labels from the input            
  --alphaI (-a) alphaI: smoothing parameter                          
  --trainComplementary (-c):train complementary?                         
  --labelIndex (-li) labelIndex: The path to store the label index in         
  --overwrite (-ow): If present, overwrite the output directory before running job                           
  --help (-h):Print out help                               
  --tempDirtempDir: Intermediate output directory                
  --startPhasestartPhase:First phase to run                           
  --endPhaseendPhase: Last phase to run         

Flow of execution of the "trainnb" command



Hope it helped!

Query Storm Data Streams using Esper

As you might already be aware and as documented on the web, “Esper is a component for complex event processing (CEP) which enables rapid development of applications that process large volumes of incoming messages or events, regardless of whether incoming messages are historical or real-time in nature.” Since integration of Storm and Esper was visibly very beneficial for complex processing of tuples incoming in the real-time stream in Storm Bolts, a specialized bolt called the “Esper Bolt” is available.

Esper Bolt enables you to run SQL like queries on your real-time data in storm. Since Esper Bolt is already available, you would not need to write a bolt as described above, but only use it while defining the main Topology class. Below is an illustration of the Topology with a SampleSpout emitting tuples containing 3 fields, namely "deviceId", "deviceTemp" and "time". These field names are listed in the Spout class in a method called declareOutputFields() as below:

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("deviceId", "deviceTemp", "time"));
}                                                                                                                                                                                                                                           

This method is one of the methods to be overridden in both spouts and bolts, which declares names of all the fields that would be emitted per tuple.

Moving forward with the Topology definition, below is the Esper Bolt declaring a statement (query) that runs over every tuple in the incoming stream and emits "deviceId" (id of the device) and "deviceTemp” (temperature of the device)  if the value of the incoming tuple field “deviceTemp” lies between 2000.00 and 3000.00

{
TopologyBuilder builder = new TopologyBuilder();

// add the spout
builder.setSpout("deviceDataAbsorber", new SampleSpout(), 1);

        // define the Esper bolt
EsperBolt deviceTrackerBolt = new EsperBolt.Builder()
.inputs().aliasComponent("deviceDataAbsorber").withFields("deviceId", "deviceTemp", "time").ofTypes(String.class, Float.class, String.class).toEventType("Evaluate")
.outputs().onDefaultStream().emit("deviceId", "deviceTemp")
.statements().add("select evaluate.deviceId as deviceId, evaluate.deviceTemp as deviceTemp from Evaluate.win:time_batch(1 sec) as evaluate where evaluate.deviceTemp <= 3000.00f and evaluate.deviceTemp >= 2000.00f")
.build();
}

The topology can have any number of Esper Bolts or user-defined bolts feeding upon the output of the Esper Bolt. You can define similar query statements depending upon your problem statement.

Hope it helped!

Writing your first Storm Topology

This blog contains multiple posts on Storm, its installation, shell scripts for installation of a Storm cluster and integration of Storm with HBase and RDBMS. But if you're a newbie to Storm this one's for you. Here you can find how to write a simple topology in Storm just like the "Hello World" in Java. So let's get rolling!

Logical Components of a Storm Topology 


The task submitted to a Storm cluster is called a Topology. It is a workflow of linked individual and dependent processing units of work that compositely define the task and the flow in which it is to be executed. These “individual and dependent processing units” can be of the following two types:
  1. Spout: This component of the topology is responsible for fetching the data stream from the source and forwarding the stream data after filtering or pre-processing (if required) to the bolts where the actual processing of this data is supposed to occur. There can be multiple Spouts in a topology each of which might be consuming data from different sources.
  2. Bolt: As mentioned above, the bolts comprise of the computational part of the topology. The “streams” are typically an unbounded sequence of tuples. The data stream composing of smaller units of data each of which is called a “tuple”, arrive at the bolt, where they get processed. A topology is not restricted to a workflow composing just a single spout and a single bolt. It can be a chain of multiple bolts consuming the tuples generated by spouts or by preceding bolts in the workflow as shown below:

A simplistic view of a Storm Topology and its components


This post would walk you through basics of writing Storm topologies and their components in Java.

Creating Spouts


Although Bolts are the processing units where the actual computation occurs but Spouts are the connecting links between the Storm Topology and the external sources of data. For every different source (for example, Kafka, HBase, MongoDB, Cassandra etc.) of data, we need to have specialized Spouts which can connect to the respective source and start consuming data to be able to emit a stream of tuples.
The bolts are simpler to implement since they include the core processing logic very much as we would have in a simple java program and are also abstract from the source of the data.

Any spout in Java is a class implementing the IRichSpout interface. Out of the various overridden methods, following two are of prime focus for us:

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
// Called only once, when the spout is initialized
/ Initialization stuff including variables initialization, creation of log files, creating connection to remote servers if required for example to a Kafka Broker, HBase Master etc.
}

public void nextTuple() {
// called iteratively until the topology is not killed or stopped.
// with every call to this method, a tuple is emitted using a method called “emit” defined in the SpoutOutputCollector class. So, it includes code that defines what value of tuple to emit.
}

To clarify more, here is an example of a simple spout that does not connect to any external source but keeps emitting strings as tuples chosen randomly from a list.

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
_rand = new Random();
}

@Override
public void nextTuple() {
Utils.sleep(1000);
String[] words = new String[] {"hello", "india", "how", "are", "you"};                                                                                            
Integer[] numbers = new Integer[] {
11, 22, 33, 44, 55
};

if(count == numbers.length -1) {
count = 0;
}
count ++;
int number = numbers[count];
String word = words[count];
_collector.emit(new Values(word, number));
}

If you need to connect to some external source to obtain you stream data, you would need to define the code for connecting to it in the open() method and then keep fetching the data from this source in the nextTuple() method to emit it further.

Creating Bolts


A bolt is a Java class implementing the interface “IBasicBolt”. Just as we have two methods to focus in case of Spout amongst the set of overridden methods, we have two methods in case of Bolt as well which are as follows:

public void prepare(Map stormConf, TopologyContext context) {
// similar to the open() method of a Spout.
// Called only once, when the bolt is initialized and contains all the initialization stuff.
}

public void execute(Tuple input, BasicOutputCollector collector) {
// called iteratively until the topology is not killed or stopped.
// every time a tuple is received this method is called, and the tuple gets processed as per the definition of this method
}

Continuing with the sample spout we saw above which emits strings from a give list, here we would define a bolt that would append an “!!!” to the received string as a part of the tuple. Here’s the java code for the two methods:

public void prepare(Map stormConf, TopologyContext context) {
//create an output log file where the output results would be logged
try {
String logFileName = logFileLocation;
// “file” and “outputFile” have already been declared as class variables.                                                                                                                                                                    
file = new FileWriter(logFileName);
outputFile = new PrintWriter(file);
outputFile.println("In the prepare() method of bolt");
} catch (IOException e) {
System.out.println(“An exception has occurred”);
e.printStackTrace();
}
}

public void execute(Tuple input, BasicOutputCollector collector) {
// get the string that needs to be processed from the tuple
String inputMsg = input.getString(0);
inputMsg = inputMsg + “!!!”
outputFile.println("Received the message:" +  inputMsg);
outputFile.flush();
        collector.emit(tuple(inputMsg));
}

I am sure you must have already made out that, the simple string processing can be replaced by any complex computations that are required as per your problem statement.

Creating Topologies


After you are done with building the blocks of your topology, you also need to link them to form a workflow, which would define a sequence in which they would get executed, precisely you need to create a topology.

A java class that creates an instance of all the spouts and bolt classes and links them together using a TopologyBuilder class object is what we need to achieve here. Have a look at the code snippet below illustrating how to create a topology with the above spout and bolt as a part of it:

public static void main(String[] args) {

// Number of workers to be used for the topology
int numberOfWorkers = 2;

// Number of executors to be used for the spout
int numberOfExecutorsSpout = 1;

// Number of executors to be used for the bolt
int numberOfExecutorsBolt = 1;

// IP of the Storm cluster node on which Nimbus runs
String nimbusHost = "your_nimbus_host_ip";

TopologyBuilder builder = new TopologyBuilder();
Config conf = new Config();

//set the spout for the topology
builder.setSpout("spout",new TestSpout(false), numberOfExecutorsSpout);

//set the bolt for the topology
builder.setBolt("bolt",new TestBolt(), numberOfExecutorsBolt).shuffleGrouping("spout");

// job configuration for remote Storm cluster starts
conf.setNumWorkers(numberOfWorkers);
conf.put(Config.NIMBUS_HOST, nimbusHost);
conf.put(Config.NIMBUS_THRIFT_PORT, 6627L);

// job configuration for a remote Storm cluster
try {
StormSubmitter.submitTopology("test_topology", conf, builder.createTopology());
} catch (AlreadyAliveException e) {
System.out.println("Topology with the Same name is already running on the cluster.");                                                                                          
e.printStackTrace();
} catch (InvalidTopologyException e) {
System.out.println("Topology seems to be invalid.");
e.printStackTrace();
}
}

The above code would launch a topology with name “test_topology” on the Storm cluster.

Hope it helped. Cheers !

Friday, July 24, 2015

Understanding Data Pre-processing in Mahout–Part II

In continuation to my previous post where first one of the two commonly used commands for data pre-processing in Mahout is described, we shall continue with the second one i.e. “seq2sparse” in this post.

The command expects sequence files as an input, which have been formed using the “seqdirectory” command. During its processing, this command creates a couple of sub-directories in the output directory, like tokenized-documents, tf-vectors etc. The flow described below explains the execution of the command on the basis of the sequence of formation of these sub-directories. The input directory would be assumed to contain two sub-directories each of which has two files:


                                                           

Command options


Usage:                                                   
                       
 [--minSupport <minSupport> --analyzerName <analyzerName> --chunkSize        
<chunkSize> --output <output> --input <input> --minDF <minDF> --maxDFSigma    
<maxDFSigma> --maxDFPercent <maxDFPercent> --weight <weight> --norm <norm>    
--minLLR <minLLR> --numReducers <numReducers> --maxNGramSize <ngramSize>      
--overwrite --help --sequentialAccessVector --namedVector --logNormalize]      

Options:                                                                        

--minSupport (-s) minSupport : (Optional) Minimum Support. Default Value: 2                              
--analyzerName (-a)  analyzerName: The class name of the analyzer          
--chunkSize (-chunk) chunkSize: The chunkSize in MegaBytes. 100-10000 MB
--output (-o) output: The directory pathname for output.      
--input (-i) input:  Path to job input directory.            
--minDF (-md) minDF: The minimum document frequency.  Default is 1                                    
--maxDFSigma (-xs) maxDFSigma:  What portion of the tf (tf-idf) vectors to be used, expressed in times the standard deviation (sigma) of the document frequencies of these vectors.             Can be used to remove really high frequency terms. Expressed as a double value. Good value to be specified is 3.0. In case the value is less then 0 no vectors will be filtered out. Default is   -1.0.  Overrides maxDFPercent          
--maxDFPercent (-x) maxDFPercent: The max percentage of docs for the DF. Can be used to remove really high frequency terms. Expressed as an integer between 0 and 100. Default is 99.  If  maxDFSigma is also set, it will override this value.
--weight (-wt) weight:  The kind of weight to use. Currently TF or TFIDF
--norm (-n) norm: The norm to use, expressed as either a float or "INF" if you want to use the  Infinite norm.  Must be greater or equal to 0.  The default is not to normalize  
--minLLR (-ml) minLLR: (Optional)The minimum Log Likelihood Ratio(Float)  Default is 1.0
--numReducers (-nr) numReducers: (Optional) Number of reduce tasks. Default Value: 1                
--maxNGramSize (-ng) ngramSize: (Optional) The maximum size of ngrams create (2 = bigrams, 3 = trigrams, etc) Default Value:1                        
--overwrite (-ow): If set, overwrite the output directory  
--help (-h): Print out help
--sequentialAccessVector(-seq):(Optional)Whether output vectors should be SequentialAccessVectors. If set else false
--namedVector (-nv): (Optional) Whether output vectors should be NamedVectors. If set true else false  
--logNormalize (-lnorm): (Optional) Whether output vectors should be logNormalize. If set true else false  

Code-level Explanation


Code-level understanding of "seq2sparse" command
Hope this helped!

Understanding Data Pre-processing in Mahout – Part I

Two most common commands used for pre-processing of train or test data when running Mahout algorithms are:
  • seqdirectory:  Turns raw text in a directory into mahout sequence file.
  • seq2sparse: Creates tfidf weighted vector from the sequence files created in “seqdirectory”.
This blog post describes the first command “seqdirectory” to the code level and the next blog post shall focus on the second one.

Let’s start then!

Command Options


Firstly have a look at the Command-line options that can be used with “seqdirectory”.

 -archives <paths>: comma separated archives to be unarchived on the compute machines.
 -conf <configuration file>:  specify an application configuration file
 -D <property=value>: use value for given property
 -files <paths>: comma separated files to be copied to the map reduce cluster
 -fs <local|namenode:port>: specify a namenode
 -jt <local|jobtracker:port>: specify a job tracker
 -libjars <paths>: comma separated jar files to include in the classpath.
 -tokenCacheFile <tokensFile>: name of the file with the tokens

Job-specific Options


  --input (-i) input: Path to job input directory.
  --output (-o) output:  The directory pathname for output.                      
  --overwrite (-ow): If present, overwrite the output directory before running job                  
  --chunkSize (-chunk) chunkSize: The chunkSize in MegaBytes. Defaults to 64              
  --fileFilterClass (-filter) fileFilterClass: The name of the class to use for file parsing.
  --keyPrefix (-prefix) keyPrefix: The prefix to be prepended to the key                      
  --charset (-c) charset: The name of the character encoding of the input files.
  --tempDir tempDir: Intermediate output directory
  --startPhase startPhase: First phase to run          
  --endPhase endPhase: Last phase to run          

Code-level Explanation


The following image segregates the internal execution of the command into steps, and then further breaks down each step into tasks to be able to give a clear picture of what happens behind the scenes.

Code-level understanding of "seqdirectory" command


The next post describes the "seq2sparse" command in a similar fashion. Hope this helped.

















Thursday, July 16, 2015

Installing Hadoop-1.x.x in Pseudo-Distributed Mode

Disclaimer: The installation steps shared in this blog post are typically for the hadoop-1.x.x series. If you are looking for hadoop-2.x.x series installation steps i.e. with YARN, this post isn’t the right place.

Hadoop installation can be done in the following three modes. This post elaborates the Pseudo-Distributed Mode.
  • Standalone Mode
In this mode Hadoop is configured to run in a non-distributed mode, as a single Java process. This is useful for debugging. This is the default mode for Hadoop.
  • Pseudo-Distributed Mode
Hadoop can also be run on a single-node in a pseudo-distributed mode where each Hadoop daemon runs in a separate Java process. Such a mode is called Pseudo-Distributed mode.
  • Fully-Distributed Mode
In this mode we install, configure and manage non-trivial Hadoop clusters ranging from a few nodes to extremely large clusters with thousands of nodes.

Supported Platforms


GNU/Linux is supported as a development and production platform.
Win32 is supported as a development platform. Distributed operation has not been well tested on Win32, so it is not supported as a production platform.

Required Software


Required software for Linux and Windows include:
  • JavaTM 1.6.x, preferably from Sun, must be installed.
  • ssh must be installed and sshd must be running to use the Hadoop scripts that manage remote Hadoop daemons.
  • Additional requirements for Windows includes Cygwin. Its required for shell support in addition to the required software above.

Installing Software

If your cluster doesn't have the requisite software you will need to install it.

          For example on Ubuntu Linux:


$ sudo apt-get install ssh
$ sudo apt-get install rsync                      

On Windows, if you did not install the required software when you installed cygwin, start the cygwin installer and select the packages:
          openssh - the Net category

Download Hadoop


Obtain a Hadoop-1.x.x stable release from http://hadoop.apache.org/releases.html
  • Unpack the downloaded Hadoop distribution. In the distribution, edit the file conf/hadoop-env.sh to define at least JAVA_HOME to be the root of your Java installation.
  • Try the following command:

$ bin/hadoop                                                      

        This will display the usage documentation for the hadoop script.
  • Now you are ready to start your Hadoop cluster in one of the three supported modes:
  1. Local (Standalone) Mode
  2. Pseudo-Distributed Mode
  3. Fully-Distributed Mode
For the Pseudo-Distributed Mode we need to configure 3 files namely: 


  • conf/core-site.xml
<configuration>  
      <property>     
            <name>fs.default.name</name>
            <value>hdfs://localhost:9000</value>                                                                     
      </property>
 </configuration>
  • conf/hdfs-site.xml
<configuration>
      <property>
            <name>dfs.replication</name>    
             <value>1</value>   
      </property>
</configuration>
  • conf/mapred-site.xml
    <configuration>   
         <property>     
              <name>mapred.job.tracker</name>     
              <value>localhost:9001</value>   
          </property>
    </configuration>


Setup Passphraseless SSH


          We need to ssh to the localhost without a passphrase:
             $ ssh localhost 
          If you cannot ssh to localhost without a passphrase, execute the following commands:
$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
(This command generates a public key on the system and stores it on ~/.ssh/id_dsa)
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
(This command copies the public key to the set of authorized keys of the system)
This completes the ssh passphraseless setup process.


Start n Stop the Cluster

  • Format a new distributed-file system:
$ bin/hadoop namenode -format
  • Start the hadoop daemons:
$ bin/start-all.sh
  •  Accessible UI
NameNode UI Port - 50070
JobTracker UI Port – 50030
  • Stop the hadoop daemons:
$bin/stop-all.sh


Friday, June 26, 2015

Start-up script for an installed Apache Storm Cluster

If you have installed a Storm cluster using my shell scripts in the previous blogs or even otherwise, this script will save you from manually visiting each node and starting the appropriate service(nimbus/supervisor/ui) there. All you have to do is grab a remote machine and run the script. The script will ask for the required information and your cluster would be up. Also, the script should work equally fine for both Ubuntu and CentOS.


#!/bin/bash

# Directory Name
echo "Enter the relative path to the storm setup on the machines (For example, /opt/stormSetup/storm-0.9.0-wip4):"
read -e stormDir

# Read usernames and ips of all the storm cluster nodes
echo "Enter the IP of nimbus machine :"
read -e stormNimbus;
clusterMachineIPs[0]=$stormNimbus

echo "Enter the username of nimbus :"
read -e usernameNimbus
clusterMachineUsernames[0]=$usernameNimbus

# Read supervisor
echo "Enter the number of supervisor machines";
read -e n;
for ((  i = 1 ;  i <= n;  i++  ))
do
echo "Enter the IP of storm supervisor machine $i:"
read -e stormSupervisor;
clusterMachineIPs[i]=$stormSupervisor
echo "Enter the username of machine ${clusterMachineIPs[i]}"
read -e username
clusterMachineUsernames[i]=$username
done

sshpass -p root ssh -o StrictHostKeyChecking=no  $usernameNimbus@$stormNimbus $stormDir/bin/storm nimbus&

# Start the supervisor nodes
for ((  i = 1 ;  i <= n;  i++  ))
do
sshpass -p root ssh -o StrictHostKeyChecking=no  ${clusterMachineUsernames[i]}@${clusterMachineIPs[i]} $stormDir/bin/storm supervisor&
done

# Start the UI on the nimbus machine
sshpass -p root ssh -o StrictHostKeyChecking=no  $usernameNimbus@$stormNimbus $stormDir/bin/storm ui&


Visit your UI on the browser after a few minutes. Hope it shows up fine. Cheers!

Installation Script for Apache Storm on CentOS

CentOS and Ubuntu and two famous Linux distribution used pretty widely. My last post shares an installation script for Storm cluster over Ubuntu machines and this one is for CentOS. The few usage rules are just the same as for Ubuntu. I'll recite here. Although the script should work for older versions of Apache Storm, it has been tested for storm-0.9.0-wip7. The script has embedded descriptive messages for each input it expects from you. The installation would be done in the '/opt' folder of the machines in a sub-directory of your choice. Make sure the user installing the cluster has admin rights on the /opt folder. The script also takes care of installing all the required dependencies. To use the script for versions other than the supported one, you need to make changes to the script and replace the "storm-0.9.0-wip7" occurrence with your storm version.


#!/bin/bash
# Local FS Setups location
echo "Enter the location of the setups folder. For example '/home/abc/storminstallation/setups'"
read -e setupsLocation
# Directory Name
echo "Enter the directory name"
read -e realTimePlatformDir
rtpLocalDir="\/opt\/$realTimePlatformDir\/storm\/storm_temp"
rtpLocalDirMake=/opt/$realTimePlatformDir/storm/storm_temp
echo $rtpLocalDir;
echo "Enter the IP of nimbus machine :"
read -e stormNimbus;
array[0]=$stormNimbus


# Read supervisor
echo "Enter the number of supervisor machines";
read -e n;
for ((  i = 1 ;  i <= n;  i++  ))
do
echo "Enter the IP of storm supervisor machine $i:"
read -e stormSupervisor;
array[i]=$stormSupervisor
done

# Read zookeeper
echo "Enter the number of machines in the zookeeper cluster";
read -e m;
for ((  i = 1 ;  i <= m;  i++  ))
do
echo "Enter the IP of zookeeper machine $i:"
read -e zkServer;
zkEntry="- \""$zkServer"\""
zKArray=$zKArray","$zkEntry
done

# Copy the required setups to all the storm machines
for ((  i = 1 ;  i <= n+1;  i++  ))
do
echo "Enter the username of machine ${array[i-1]}"
read -e username
echo "Username:"$username
if [ $username == 'root' ]; then
echo 'root';
yamlFilePath="/root/.storm";
else
echo $username;
yamlFilePath="/home/$username/.storm";
fi
echo "the storm.yaml file would be formed at : $yamlFilePath";
echo "Enter the value for JAVA_HOME to be set on the machine ${array[i-1]}"
read -e javaHome;
echo 'JAVA_HOME would be set to :'$javaHome;
ssh -t $username@${array[i-1]} "if [ ! -d /opt/$realTimePlatformDir ]; then
    sudo mkdir /opt/$realTimePlatformDir;
    sudo chown -R $username: /opt/$realTimePlatformDir;
    mkdir /opt/$realTimePlatformDir/storm;
    mkdir $rtpLocalDirMake;
    mkdir $yamlFilePath;
  fi"
     
scp -r -q $setupsLocation/storm-0.9.0-wip7 $username@${array[i-1]}:/opt/$realTimePlatformDir/storm/storm-0.9.0-wip7
     
ssh -t $username@${array[i-1]} "sed -i 's/ZOOKEEPER_IPS/$zKArray/g' /opt/$realTimePlatformDir/storm/storm-0.9.0-wip7/conf/storm.yaml;
sed -i 's/,/\n/g' /opt/$realTimePlatformDir/storm/storm-0.9.0-wip7/conf/storm.yaml;
sed -i 's/NIMBUS_IP/$stormNimbus/g' /opt/$realTimePlatformDir/storm/storm-0.9.0-wip7/conf/storm.yaml;
sed -i 's/LOCAL_DIR/$rtpLocalDir/g' /opt/$realTimePlatformDir/storm/storm-0.9.0-wip7/conf/storm.yaml;
cp /opt/$realTimePlatformDir/storm/storm-0.9.0-wip7/conf/storm.yaml $yamlFilePath;
sudo yum install git;
sudo yum install libuuid-devel;"

ssh -t $username@${array[i-1]} "cd /opt/$realTimePlatformDir/storm;
wget http://download.zeromq.org/zeromq-2.1.7.tar.gz;
tar -xzf zeromq-2.1.7.tar.gz
cd zeromq-2.1.7                    
./configure
make
sudo make install

cd ..
export JAVA_HOME=$javaHome;
echo $JAVA_HOME;
git clone https://github.com/nathanmarz/jzmq.git
cd jzmq
./autogen.sh
./configure
make
sudo make install"

done


Hope this helps. My next post shares small start-up script for the installed Storm cluster.