Saturday, July 25, 2015

Mahout’s Naïve Bayes: Test Phase

This post is in continuation to my previous post where Mahout Naive Bayes "trainnb" command has been explained. This one would describe the internal execution steps of the "testnb" command, which is used to test the model we build using 'trainnb".

Command Line Options for "testnb"


Generic Options

 -archives <paths>: comma separated archives to be unarchived on the compute machines.                                                            
 -conf <configuration file>:  specify an application configuration file
 -D <property=value>: use value for given property
 -files <paths>: comma separated files to be copied to the map reduce cluster
 -fs<local|namenode:port>: specify a namenode
 -jt<local|jobtracker:port>: specify a job tracker
 -libjars<paths>: comma separated jar files to include in the classpath.
 -tokenCacheFile<tokensFile>:  name of the file with the tokens

Job-Specific Options
                                                       
  --input (-i) input: Path to job input directory.                                                                                                                                                    
  --output (-o) output: The directory pathname for output.        
  --testComplementary (-c): test complementary?
  --labelIndex (-li) labelIndex: The path to the location of the label index      
  --overwrite (-ow): If present, overwrite the output directory before running job                              
  --help (-h): Print out help                            
  --tempDirtempDir: Intermediate output directory              
  --startPhasestartPhase: First phase to run                        
  --endPhaseendPhase: Last phase to run
  --runSequential (-seq):  run sequential?
  --model (-m) model: The path to the model built during training              

Flow of execution of the "testnb" command



Hope it helped!


Mahout’s Naïve Bayes: Train Phase

Mahout’s Naïve Bayes Classification algorithm executes in two phases:
  1. Train Phase: Trains a model using pre-processed train data
  2. Test Phase: Classify documents (pre-processed) with the help of the model
 This blog post provides code-level understanding of the training process in the algorithm. And testing phase is covered in the next blog. The Mahout command "trainnb" is used to train a Naive Bayes model in Mahout.

"trainnb" command in Mahout



For reference, above is the structure of the train data directory specified as an input (similar to the one used in data pre-processing).

Command Line options for “trainnb”


Generic Options


 -archives <paths>: comma separated archives to be unarchived on the compute machines.                
 -conf <configuration file>:  specify an application configuration file
 -D <property=value>: use value for given property
 -files <paths>: comma separated files to be copied to the map reduce cluster
 -fs<local|namenode:port>: specify a namenode
 -jt<local|jobtracker:port>: specify a job tracker
 -libjars<paths>: comma separated jar files to include inthe classpath.
 -tokenCacheFile<tokensFile>:  name of the file with the tokens

Job-Specific Options
                                                          

  --input (-i) input: Path to job input directory.                 
  --output (-o) output: The directory pathname for output.                                                                     
  --labels (-l) labels:comma-separated list of labels to include in training                                     
  --extractLabels (-el):Extract the labels from the input            
  --alphaI (-a) alphaI: smoothing parameter                          
  --trainComplementary (-c):train complementary?                         
  --labelIndex (-li) labelIndex: The path to store the label index in         
  --overwrite (-ow): If present, overwrite the output directory before running job                           
  --help (-h):Print out help                               
  --tempDirtempDir: Intermediate output directory                
  --startPhasestartPhase:First phase to run                           
  --endPhaseendPhase: Last phase to run         

Flow of execution of the "trainnb" command



Hope it helped!

Query Storm Data Streams using Esper

As you might already be aware and as documented on the web, “Esper is a component for complex event processing (CEP) which enables rapid development of applications that process large volumes of incoming messages or events, regardless of whether incoming messages are historical or real-time in nature.” Since integration of Storm and Esper was visibly very beneficial for complex processing of tuples incoming in the real-time stream in Storm Bolts, a specialized bolt called the “Esper Bolt” is available.

Esper Bolt enables you to run SQL like queries on your real-time data in storm. Since Esper Bolt is already available, you would not need to write a bolt as described above, but only use it while defining the main Topology class. Below is an illustration of the Topology with a SampleSpout emitting tuples containing 3 fields, namely "deviceId", "deviceTemp" and "time". These field names are listed in the Spout class in a method called declareOutputFields() as below:

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("deviceId", "deviceTemp", "time"));
}                                                                                                                                                                                                                                           

This method is one of the methods to be overridden in both spouts and bolts, which declares names of all the fields that would be emitted per tuple.

Moving forward with the Topology definition, below is the Esper Bolt declaring a statement (query) that runs over every tuple in the incoming stream and emits "deviceId" (id of the device) and "deviceTemp” (temperature of the device)  if the value of the incoming tuple field “deviceTemp” lies between 2000.00 and 3000.00

{
TopologyBuilder builder = new TopologyBuilder();

// add the spout
builder.setSpout("deviceDataAbsorber", new SampleSpout(), 1);

        // define the Esper bolt
EsperBolt deviceTrackerBolt = new EsperBolt.Builder()
.inputs().aliasComponent("deviceDataAbsorber").withFields("deviceId", "deviceTemp", "time").ofTypes(String.class, Float.class, String.class).toEventType("Evaluate")
.outputs().onDefaultStream().emit("deviceId", "deviceTemp")
.statements().add("select evaluate.deviceId as deviceId, evaluate.deviceTemp as deviceTemp from Evaluate.win:time_batch(1 sec) as evaluate where evaluate.deviceTemp <= 3000.00f and evaluate.deviceTemp >= 2000.00f")
.build();
}

The topology can have any number of Esper Bolts or user-defined bolts feeding upon the output of the Esper Bolt. You can define similar query statements depending upon your problem statement.

Hope it helped!

Writing your first Storm Topology

This blog contains multiple posts on Storm, its installation, shell scripts for installation of a Storm cluster and integration of Storm with HBase and RDBMS. But if you're a newbie to Storm this one's for you. Here you can find how to write a simple topology in Storm just like the "Hello World" in Java. So let's get rolling!

Logical Components of a Storm Topology 


The task submitted to a Storm cluster is called a Topology. It is a workflow of linked individual and dependent processing units of work that compositely define the task and the flow in which it is to be executed. These “individual and dependent processing units” can be of the following two types:
  1. Spout: This component of the topology is responsible for fetching the data stream from the source and forwarding the stream data after filtering or pre-processing (if required) to the bolts where the actual processing of this data is supposed to occur. There can be multiple Spouts in a topology each of which might be consuming data from different sources.
  2. Bolt: As mentioned above, the bolts comprise of the computational part of the topology. The “streams” are typically an unbounded sequence of tuples. The data stream composing of smaller units of data each of which is called a “tuple”, arrive at the bolt, where they get processed. A topology is not restricted to a workflow composing just a single spout and a single bolt. It can be a chain of multiple bolts consuming the tuples generated by spouts or by preceding bolts in the workflow as shown below:

A simplistic view of a Storm Topology and its components


This post would walk you through basics of writing Storm topologies and their components in Java.

Creating Spouts


Although Bolts are the processing units where the actual computation occurs but Spouts are the connecting links between the Storm Topology and the external sources of data. For every different source (for example, Kafka, HBase, MongoDB, Cassandra etc.) of data, we need to have specialized Spouts which can connect to the respective source and start consuming data to be able to emit a stream of tuples.
The bolts are simpler to implement since they include the core processing logic very much as we would have in a simple java program and are also abstract from the source of the data.

Any spout in Java is a class implementing the IRichSpout interface. Out of the various overridden methods, following two are of prime focus for us:

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
// Called only once, when the spout is initialized
/ Initialization stuff including variables initialization, creation of log files, creating connection to remote servers if required for example to a Kafka Broker, HBase Master etc.
}

public void nextTuple() {
// called iteratively until the topology is not killed or stopped.
// with every call to this method, a tuple is emitted using a method called “emit” defined in the SpoutOutputCollector class. So, it includes code that defines what value of tuple to emit.
}

To clarify more, here is an example of a simple spout that does not connect to any external source but keeps emitting strings as tuples chosen randomly from a list.

@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
_rand = new Random();
}

@Override
public void nextTuple() {
Utils.sleep(1000);
String[] words = new String[] {"hello", "india", "how", "are", "you"};                                                                                            
Integer[] numbers = new Integer[] {
11, 22, 33, 44, 55
};

if(count == numbers.length -1) {
count = 0;
}
count ++;
int number = numbers[count];
String word = words[count];
_collector.emit(new Values(word, number));
}

If you need to connect to some external source to obtain you stream data, you would need to define the code for connecting to it in the open() method and then keep fetching the data from this source in the nextTuple() method to emit it further.

Creating Bolts


A bolt is a Java class implementing the interface “IBasicBolt”. Just as we have two methods to focus in case of Spout amongst the set of overridden methods, we have two methods in case of Bolt as well which are as follows:

public void prepare(Map stormConf, TopologyContext context) {
// similar to the open() method of a Spout.
// Called only once, when the bolt is initialized and contains all the initialization stuff.
}

public void execute(Tuple input, BasicOutputCollector collector) {
// called iteratively until the topology is not killed or stopped.
// every time a tuple is received this method is called, and the tuple gets processed as per the definition of this method
}

Continuing with the sample spout we saw above which emits strings from a give list, here we would define a bolt that would append an “!!!” to the received string as a part of the tuple. Here’s the java code for the two methods:

public void prepare(Map stormConf, TopologyContext context) {
//create an output log file where the output results would be logged
try {
String logFileName = logFileLocation;
// “file” and “outputFile” have already been declared as class variables.                                                                                                                                                                    
file = new FileWriter(logFileName);
outputFile = new PrintWriter(file);
outputFile.println("In the prepare() method of bolt");
} catch (IOException e) {
System.out.println(“An exception has occurred”);
e.printStackTrace();
}
}

public void execute(Tuple input, BasicOutputCollector collector) {
// get the string that needs to be processed from the tuple
String inputMsg = input.getString(0);
inputMsg = inputMsg + “!!!”
outputFile.println("Received the message:" +  inputMsg);
outputFile.flush();
        collector.emit(tuple(inputMsg));
}

I am sure you must have already made out that, the simple string processing can be replaced by any complex computations that are required as per your problem statement.

Creating Topologies


After you are done with building the blocks of your topology, you also need to link them to form a workflow, which would define a sequence in which they would get executed, precisely you need to create a topology.

A java class that creates an instance of all the spouts and bolt classes and links them together using a TopologyBuilder class object is what we need to achieve here. Have a look at the code snippet below illustrating how to create a topology with the above spout and bolt as a part of it:

public static void main(String[] args) {

// Number of workers to be used for the topology
int numberOfWorkers = 2;

// Number of executors to be used for the spout
int numberOfExecutorsSpout = 1;

// Number of executors to be used for the bolt
int numberOfExecutorsBolt = 1;

// IP of the Storm cluster node on which Nimbus runs
String nimbusHost = "your_nimbus_host_ip";

TopologyBuilder builder = new TopologyBuilder();
Config conf = new Config();

//set the spout for the topology
builder.setSpout("spout",new TestSpout(false), numberOfExecutorsSpout);

//set the bolt for the topology
builder.setBolt("bolt",new TestBolt(), numberOfExecutorsBolt).shuffleGrouping("spout");

// job configuration for remote Storm cluster starts
conf.setNumWorkers(numberOfWorkers);
conf.put(Config.NIMBUS_HOST, nimbusHost);
conf.put(Config.NIMBUS_THRIFT_PORT, 6627L);

// job configuration for a remote Storm cluster
try {
StormSubmitter.submitTopology("test_topology", conf, builder.createTopology());
} catch (AlreadyAliveException e) {
System.out.println("Topology with the Same name is already running on the cluster.");                                                                                          
e.printStackTrace();
} catch (InvalidTopologyException e) {
System.out.println("Topology seems to be invalid.");
e.printStackTrace();
}
}

The above code would launch a topology with name “test_topology” on the Storm cluster.

Hope it helped. Cheers !

Friday, July 24, 2015

Understanding Data Pre-processing in Mahout–Part II

In continuation to my previous post where first one of the two commonly used commands for data pre-processing in Mahout is described, we shall continue with the second one i.e. “seq2sparse” in this post.

The command expects sequence files as an input, which have been formed using the “seqdirectory” command. During its processing, this command creates a couple of sub-directories in the output directory, like tokenized-documents, tf-vectors etc. The flow described below explains the execution of the command on the basis of the sequence of formation of these sub-directories. The input directory would be assumed to contain two sub-directories each of which has two files:


                                                           

Command options


Usage:                                                   
                       
 [--minSupport <minSupport> --analyzerName <analyzerName> --chunkSize        
<chunkSize> --output <output> --input <input> --minDF <minDF> --maxDFSigma    
<maxDFSigma> --maxDFPercent <maxDFPercent> --weight <weight> --norm <norm>    
--minLLR <minLLR> --numReducers <numReducers> --maxNGramSize <ngramSize>      
--overwrite --help --sequentialAccessVector --namedVector --logNormalize]      

Options:                                                                        

--minSupport (-s) minSupport : (Optional) Minimum Support. Default Value: 2                              
--analyzerName (-a)  analyzerName: The class name of the analyzer          
--chunkSize (-chunk) chunkSize: The chunkSize in MegaBytes. 100-10000 MB
--output (-o) output: The directory pathname for output.      
--input (-i) input:  Path to job input directory.            
--minDF (-md) minDF: The minimum document frequency.  Default is 1                                    
--maxDFSigma (-xs) maxDFSigma:  What portion of the tf (tf-idf) vectors to be used, expressed in times the standard deviation (sigma) of the document frequencies of these vectors.             Can be used to remove really high frequency terms. Expressed as a double value. Good value to be specified is 3.0. In case the value is less then 0 no vectors will be filtered out. Default is   -1.0.  Overrides maxDFPercent          
--maxDFPercent (-x) maxDFPercent: The max percentage of docs for the DF. Can be used to remove really high frequency terms. Expressed as an integer between 0 and 100. Default is 99.  If  maxDFSigma is also set, it will override this value.
--weight (-wt) weight:  The kind of weight to use. Currently TF or TFIDF
--norm (-n) norm: The norm to use, expressed as either a float or "INF" if you want to use the  Infinite norm.  Must be greater or equal to 0.  The default is not to normalize  
--minLLR (-ml) minLLR: (Optional)The minimum Log Likelihood Ratio(Float)  Default is 1.0
--numReducers (-nr) numReducers: (Optional) Number of reduce tasks. Default Value: 1                
--maxNGramSize (-ng) ngramSize: (Optional) The maximum size of ngrams create (2 = bigrams, 3 = trigrams, etc) Default Value:1                        
--overwrite (-ow): If set, overwrite the output directory  
--help (-h): Print out help
--sequentialAccessVector(-seq):(Optional)Whether output vectors should be SequentialAccessVectors. If set else false
--namedVector (-nv): (Optional) Whether output vectors should be NamedVectors. If set true else false  
--logNormalize (-lnorm): (Optional) Whether output vectors should be logNormalize. If set true else false  

Code-level Explanation


Code-level understanding of "seq2sparse" command
Hope this helped!

Understanding Data Pre-processing in Mahout – Part I

Two most common commands used for pre-processing of train or test data when running Mahout algorithms are:
  • seqdirectory:  Turns raw text in a directory into mahout sequence file.
  • seq2sparse: Creates tfidf weighted vector from the sequence files created in “seqdirectory”.
This blog post describes the first command “seqdirectory” to the code level and the next blog post shall focus on the second one.

Let’s start then!

Command Options


Firstly have a look at the Command-line options that can be used with “seqdirectory”.

 -archives <paths>: comma separated archives to be unarchived on the compute machines.
 -conf <configuration file>:  specify an application configuration file
 -D <property=value>: use value for given property
 -files <paths>: comma separated files to be copied to the map reduce cluster
 -fs <local|namenode:port>: specify a namenode
 -jt <local|jobtracker:port>: specify a job tracker
 -libjars <paths>: comma separated jar files to include in the classpath.
 -tokenCacheFile <tokensFile>: name of the file with the tokens

Job-specific Options


  --input (-i) input: Path to job input directory.
  --output (-o) output:  The directory pathname for output.                      
  --overwrite (-ow): If present, overwrite the output directory before running job                  
  --chunkSize (-chunk) chunkSize: The chunkSize in MegaBytes. Defaults to 64              
  --fileFilterClass (-filter) fileFilterClass: The name of the class to use for file parsing.
  --keyPrefix (-prefix) keyPrefix: The prefix to be prepended to the key                      
  --charset (-c) charset: The name of the character encoding of the input files.
  --tempDir tempDir: Intermediate output directory
  --startPhase startPhase: First phase to run          
  --endPhase endPhase: Last phase to run          

Code-level Explanation


The following image segregates the internal execution of the command into steps, and then further breaks down each step into tasks to be able to give a clear picture of what happens behind the scenes.

Code-level understanding of "seqdirectory" command


The next post describes the "seq2sparse" command in a similar fashion. Hope this helped.

















Thursday, July 16, 2015

Installing Hadoop-1.x.x in Pseudo-Distributed Mode

Disclaimer: The installation steps shared in this blog post are typically for the hadoop-1.x.x series. If you are looking for hadoop-2.x.x series installation steps i.e. with YARN, this post isn’t the right place.

Hadoop installation can be done in the following three modes. This post elaborates the Pseudo-Distributed Mode.
  • Standalone Mode
In this mode Hadoop is configured to run in a non-distributed mode, as a single Java process. This is useful for debugging. This is the default mode for Hadoop.
  • Pseudo-Distributed Mode
Hadoop can also be run on a single-node in a pseudo-distributed mode where each Hadoop daemon runs in a separate Java process. Such a mode is called Pseudo-Distributed mode.
  • Fully-Distributed Mode
In this mode we install, configure and manage non-trivial Hadoop clusters ranging from a few nodes to extremely large clusters with thousands of nodes.

Supported Platforms


GNU/Linux is supported as a development and production platform.
Win32 is supported as a development platform. Distributed operation has not been well tested on Win32, so it is not supported as a production platform.

Required Software


Required software for Linux and Windows include:
  • JavaTM 1.6.x, preferably from Sun, must be installed.
  • ssh must be installed and sshd must be running to use the Hadoop scripts that manage remote Hadoop daemons.
  • Additional requirements for Windows includes Cygwin. Its required for shell support in addition to the required software above.

Installing Software

If your cluster doesn't have the requisite software you will need to install it.

          For example on Ubuntu Linux:


$ sudo apt-get install ssh
$ sudo apt-get install rsync                      

On Windows, if you did not install the required software when you installed cygwin, start the cygwin installer and select the packages:
          openssh - the Net category

Download Hadoop


Obtain a Hadoop-1.x.x stable release from http://hadoop.apache.org/releases.html
  • Unpack the downloaded Hadoop distribution. In the distribution, edit the file conf/hadoop-env.sh to define at least JAVA_HOME to be the root of your Java installation.
  • Try the following command:

$ bin/hadoop                                                      

        This will display the usage documentation for the hadoop script.
  • Now you are ready to start your Hadoop cluster in one of the three supported modes:
  1. Local (Standalone) Mode
  2. Pseudo-Distributed Mode
  3. Fully-Distributed Mode
For the Pseudo-Distributed Mode we need to configure 3 files namely: 


  • conf/core-site.xml
<configuration>  
      <property>     
            <name>fs.default.name</name>
            <value>hdfs://localhost:9000</value>                                                                     
      </property>
 </configuration>
  • conf/hdfs-site.xml
<configuration>
      <property>
            <name>dfs.replication</name>    
             <value>1</value>   
      </property>
</configuration>
  • conf/mapred-site.xml
    <configuration>   
         <property>     
              <name>mapred.job.tracker</name>     
              <value>localhost:9001</value>   
          </property>
    </configuration>


Setup Passphraseless SSH


          We need to ssh to the localhost without a passphrase:
             $ ssh localhost 
          If you cannot ssh to localhost without a passphrase, execute the following commands:
$ ssh-keygen -t dsa -P '' -f ~/.ssh/id_dsa
(This command generates a public key on the system and stores it on ~/.ssh/id_dsa)
$ cat ~/.ssh/id_dsa.pub >> ~/.ssh/authorized_keys
(This command copies the public key to the set of authorized keys of the system)
This completes the ssh passphraseless setup process.


Start n Stop the Cluster

  • Format a new distributed-file system:
$ bin/hadoop namenode -format
  • Start the hadoop daemons:
$ bin/start-all.sh
  •  Accessible UI
NameNode UI Port - 50070
JobTracker UI Port – 50030
  • Stop the hadoop daemons:
$bin/stop-all.sh