Tuesday, April 21, 2015

Commonly Used Hive Queries

Hello Readers, 

I feel extremely fortunate to be hearing questions from many of you on my personal email id about why haven't I blogged since long. So, I am returning to my blog after about two years, with a determination to addup a lot more content (which has piled up in these two years) that might be of any help to anyone working on Big Data. Thanks for the motivation friends. :) So here I go.

To start with, I chose to write the most commonly used Hive queries. You can pick on this blog any time you forget the syntax of any of them ;)

Just like in SQL, there is a common set of Hive queries that tend to be used the most. These include queries for operations like:

1. Sorting
2. Searching
3. Joins
4. Sampling
5. Calculating median
6. Calculating mean
7. Finding records from one table which don't exist in another

Assuming you have a running Hadoop cluster and Hive on top of it already setup, the following sections of the tutorial will help you understand how to go about the following operations in Hive.

Data Set Preparation

Let's assume you have the following tables and data in Hive:



and another table 'my_join_table':


Creating tables in Hive

To create tables like above in Hive, follow the steps below:

1. Create two files 'my_table.csv' and 'my_join_table.csv' and copy the respective comma separated content shown above in each file.

2. Place these files in HDFS

hdfs dfs -mkdir hive_tutorial_my_table                                                                                            
hdfs dfs -put my_table.csv hive_tutorial_my_table
hdfs dfs -mkdir hive_tutorial_my_join_table
hdfs dfs -put my_join_table.csv hive_tutorial_my_join_table

Yes, we have intentionally placed the two files in two different tables, since to load the file into a hive table using Create table statement, you need to specify a folder location. File paths do not work. And by default Hive adds the data of all the files in that folder to the Hive table. Strange right?

3. Create it!

Remember to replace the 'your_hdfs_user_name' with a valid name else you'll get a depressing error statement. ;)

create external table my_table (id int, name string, address string) row format delimited fields terminated by ',' lines terminated by '\n' location '/user/your_hdfs_user_name/hive_tutorial_my_table' tblproperties ("skip.header.line.count"="1");

create external table my_join_table (id int, age int, phone string) row format delimited fields terminated by ',' lines terminated by '\n' location '/user/your_hdfs_user_name/hive_tutorial_my_join_table' tblproperties ("skip.header.line.count"="1");

Cool. So now you have all that's required to try the queries.

Running the Hive Queries

Sorting in Hive

Sorting by Integer: By default, Hive sorts an integer column in the Ascending order.

Query: select id, age, phone from my_join_table order by age;                                                    

Sorting by String: And in the lexicographical order for String columns.

Query: select id, name, address from my_table order by name;                                                    

If you have a very large data set, the ORDER BY clause might take a long long time because it pushes all data through just one reducer which is unacceptable for large datasets. To optimize, use CLUSTER BY. CLUSTER BY ensures each of N reducers gets non-overlapping ranges, then sorts by those ranges at the reducers. You can go for this if you are okay with joining the multiple output files yourself.

Query: select id, name, address from my_table cluster by name;                                                      

Searching in Hive

Searching can be performed using the 'where' clause.

Query: select id, age, phone from my_join_table where age>18;                                                   

Joins in Hive

Inner Join: The INNER JOIN keyword selects all rows from both tables as long as there is a match between the columns in both tables.

Query: select a.id, a.name, a.address, b.age, b.phone from my_table a join my_join_table b on a.id=b.id;

Left Join: The Left Outer Join keyword returns each row that satisfies the join of the first table with the second table. It also returns any rows from the first table that had no matching rows in the second table. The non-matching rows in the second table are returned as null values.

Query: select a.id, a.name, a.address, b.age, b.phone from my_table a left join my_join_table b on a.id=b.id;

Right Join: The RIGHT JOIN keyword returns all rows from the second table, with the matching rows in the first table. The result is NULL in the left side when there is no match.

Query: select a.id, a.name, a.address, b.age, b.phone from my_table a right join my_join_table b on a.id=b.id;

Full Outer Join: The FULL OUTER JOIN keyword returns all rows from the first table and from the second table. The FULL OUTER JOIN keyword combines the result of both LEFT and RIGHT joins.

Query: select a.id, a.name, a.address, b.age, b.phone from my_table a full outer join my_join_table b on a.id=b.id;

Sampling in Hive

Hive supports sampling by different parameters as follows:

Sampling by percentage:

Query: select id, name, address from my_table TABLESAMPLE(0.1 PERCENT);                          

Sampling by size:

Query: select id, name, address from my_table TABLESAMPLE(10M) s;                                       

Sampling by number of records:

Query: select id, name, address from my_table TABLESAMPLE(2 ROWS) s;                               

Sampling by Bucketing:

Query: create table my_join_table_bucketed(id INT, age INT, phone STRING) comment 'A bucketed copy of my_join_table' clustered by(age) into 4 buckets;
Query: insert into table my_join_table_bucketed select id, age, phone from my_join_table;
Query: SELECT * FROM my_join_table_bucketed TABLESAMPLE(BUCKET 3 OUT OF 4 ON rand()) s;
Query: SELECT * FROM my_join_table_bucketed TABLESAMPLE(BUCKET 1 OUT OF 4 ON age);

Calculating median

The function percentile with the following syntax can be used to calculate median in hive. It returns the exact pth percentile of a column in the group (does not work with floating point types). To calculate the median, p should be equal to 0.5. Otherwise it can be any value between 0 and 1.

percentile(BIGINT col, p)                                                                                                                  

We shall use this to calculate the median of the column 'age' in 'my_join_table'

Query: select percentile(cast(a.age as bigint), 0.5) AS age_median from my_join_table a;            

Calculating mean

Calculating mean of the values in a column is pretty straight forward with the help of the function 'avg(col)'

Query: select avg(a.age) AS age_mean from my_join_table a;                                                      

Finding records from one table which don't exist in another

Now, this is the trickiest one. We intend to find those records of table A which do not exist in table B. Sort of A-B. For this, the schema of the tables need not always be the same. The concept is very simple, we would join the two tables and apply a check statement to include only those records where the join column of the first table is NOT NULL. This way we get those records which are in the first table and not in the second.

Add a row to my_table with id not in table my_join_table.

Query: insert into my_table values (6, 'Jayati', 'add_6');

Query: select a.id, a.name, a.address, b.age, b.phone from my_table a join my_join_table b on a.id=b.id where (a.id IS NOT NULL);

Hope this tutorial gave a good start with Hive!

Sunday, May 26, 2013

Installing RabbitMQ over Ubuntu/CentOS

This post’s gonna walk you through steps on how to install RabbitMQ on a:
  • Ubuntu machine
  • Centos machine
Installing RabbitMQ on Ubuntu

Step-I: Get the setup from
Version: rabbitmq-server_3.0.2-1_all.deb

Step-II: Run the .deb file using
sudo dpkg -i rabbitmq-server_3.0.2-1_all.deb                                                                           

Step-III: Start or Stop the rabbitmq server/broker using
/etc/init.d/rabbitmq_server start
/etc/init.d/rabbitmq_server stop                                                                                                

Step-IV: Check the status of the server using
rabbitmqctl status                                                                                                                  

Installing RabbitMQ on CentOS

Step-I: Get the setup from
Version: rabbitmq-server-3.0.2-1.noarch.rpm

If the CentOS version on your machine is EL5:(For CentOS versions of the 5 series, Get to know that using the command "lsb_release -a") run the following commands: 
su -c 'rpm -Uvh http://download.fedoraproject.org/pub/epel/5/i386/epel-release-5-4.noarch.rpm'
su -c 'yum install foo'                                                                          

Else if its EL6: (For CentOS versions of the 6 series, Get to know that using the command "lsb_release -a") run the following commands: 
su -c 'rpm -Uvh http://download.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm'
su -c 'yum install foo'

Step-II: Get the Erlang repository using:
sudo wget -O /etc/yum.repos.d/epel-erlang.repo http://repos.fedorapeople.org/repos/peter/erlang/epel-erlang.repo

Step-III: Install Erlang using:
sudo yum install erlang                                                                                                              

Step-IV: You need to import a signing key for RabbitMQ, using the command:
sudo rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc                                                                 

Step-V: Install the downloaded setup in Step-I using:
sudo yum install rabbitmq-server-3.0.2-1.noarch.rpm                                                                   

Step-VI: Start/Stop rabbitmq server using
sudo /sbin/service rabbitmq-server start
sudo /sbin/service rabbitmq-server stop                                                                                           

Some Extra Notes
  • If you ever feel the need to clear all messages from a rabbitmq queue, run the following commands:
rabbitmqctl stop_app
rabbitmqctl force_reset
/etc/init.d/rabbitmq-server stop
/etc/init.d/rabbitmq-server start                                                                                                  
  • If you need to configure some rabbitmq server parameters off the league, for example "disk_free_limit", create a file called “rabbitmq.config” and place it in “/etc/rabbitmq” for the server to read it at the time of startup. Here’s a sample config file for your ready reference:
    {rabbit, [{disk_free_limit, 1000}]}

All the very best !!!

Running Weka's Logistic Regression using Command Line

Running Weka’s algorithms from command line, requires a very simple setup of Weka to be in place. All you need is to download latest release of WEKA. One of useful links working at the time of writing this post is:

Next, you’ll need to unzip this setup, which would give you a directory with name “weka-3-6-9”. We would call it WEKA_HOME for reference in this blog post.

You might want to run Weka’s logistic regression algorithm on two types of input data.
  • One is the sample data files in ARFF format already available in “WEKA_HOME/data”
  • Other is over some data files you already have in CSV format with you. For example, donut.csv file provided by Mahout for running it’s Logistic Regression over it.

Running LR over ARFF files

We would be using the file “WEKA_HOME/data/weather.nominal.arff” for running the algorithm. Cd to WEKA_HOME and run the following command

java -cp ./weka.jar weka.classifiers.functions.Logistic -t WEKA_HOME/weather.nominal.arff -T WEKA_HOME/weather.nominal.arff -d /some_location_on_your_machine/weather.nominal.model.arff

which should generate the trained model at “/some_location_on_your_machine/weather.nominal.model.arff” and the console output should look something like:

Logistic Regression with ridge parameter of 1.0E-8
Variable                              yes
outlook=sunny                    -45.2378
outlook=overcast                  57.5375
outlook=rainy                     -5.9067
temperature=hot                   -8.3327
temperature=mild                  44.8546
temperature=cool                 -45.4929
humidity                         118.1425
windy                             72.9648
Intercept                        -89.2032

Odds Ratios...
Variable                              yes
outlook=sunny                           0
outlook=overcast      9.73275593611619E24
outlook=rainy                      0.0027
temperature=hot                    0.0002
temperature=mild     3.020787521374072E19
temperature=cool                        0
humidity            2.0353933107400553E51
windy                4.877521304260806E31

Time taken to build model: 0.12 seconds
Time taken to test model on training data: 0.01 seconds

=== Error on training data ===

Correctly Classified Instances          14              100      %
Incorrectly Classified Instances         0                0      %
Kappa statistic                          1  
Mean absolute error                      0  
Root mean squared error                  0  
Relative absolute error                  0.0002 %
Root relative squared error              0.0008 %
Total Number of Instances               14  

=== Confusion Matrix ===

 a b   <-- classified as
 9 0 | a = yes
 0 5 | b = no

=== Error on test data ===

Correctly Classified Instances          14              100      %
Incorrectly Classified Instances         0                0      %
Kappa statistic                          1  
Mean absolute error                      0  
Root mean squared error                  0  
Relative absolute error                  0.0002 %
Root relative squared error              0.0008 %
Total Number of Instances               14  

=== Confusion Matrix ===

 a b   <-- classified as
 9 0 | a = yes
 0 5 | b = no                                                                                                                                            

Here the three arguments mean:

  • -t <name of training file> : Sets training file.
  • -T <name of test file> : Sets test file. If missing, a cross-validation will be performed on the training data.
  • -d <name of output file> : Sets model output file. In case the filename ends with '.xml', only the options are saved to the XML file, not the model.

For help on all available arguments, try running the following command from WEKA_HOME:

java -cp ./weka.jar weka.classifiers.functions.Logistic -h                                                              

Running LR over CSV files

For running Weka’s LR over a CSV file, you’ll need to convert it into ARFF format using a converter provided by WEKA. Using command line in linux, here are the steps:

Step-I: Convert the data into arff format, for converting from CSV to ARFF, run the following command from WEKA_HOME:
java -cp ./weka.jar weka.core.converters.CSVLoader someCSVFile.csv > outputARFFFile.arff                                                                     

Step-II: Run the NumericToNominal filter over the arff file
java -cp ./weka.jar weka.filters.unsupervised.attribute.NumericToNominal -i outputARFFFile.arff -o outputARFFFile.nominal.arff                                                           

Step-III: Run the classifier over the outputARFFFile.nominal.arff
java -cp ./weka.jar weka.classifiers.functions.Logistic -t outputARFFFile.nominal.arff -T outputARFFFile.nominal.arff -d outputARFFFile.nominal.model.arff                                        

You might encounter an exception stating

"Cannot handle unary class!"                                                                                                               

To resolve this, apply the attribute filter and eliminate the attribute which has same value for all the records in the file using:

java -cp ./weka.jar weka.filters.AttributeFilter -i outputARFFFile.nominal.arff -o outputARFFFile.filtered.nominal.arff -R 8                                        

where the value of “–R” would vary depending upon your input file and the id of attribute to be eliminated in the input arff file.

After this, try running the classifier on the obtained “outputARFFFile.filtered.nominal.arff” file as in:

java -cp ./weka.jar weka.classifiers.functions.Logistic -t outputARFFFile.filtered.nominal.arff -T outputARFFFile.filtered.nominal.arff -d outputARFFFile.nominal.model.arff                                 

The output should appear somewhat like we got when running the classifier over the provided sample data mentioned above.

With these steps, you are ready to play with WEKA. Go for it. Cheers !!!

Saturday, May 25, 2013

Running Mahout's Logistic Regression

Logistic Regression(SGD) is one the algorithms available in Mahout. This blog post lists and provides all that is required for the same. To install Mahout on your machine, you can refer to my previous post.

Logistic Regression executes in two major phases:
  • Train the model: This step is about creating a model using some train data, that can further be used for the classification of any input data rather I would say test data.
  • Test the model: This step tests the generated model in step 1 by evaluating the results of classification of test data, and measuring the accuracy, scores and confusion matrix.

Steps for running Mahout’s LR

Step-I: Get the input data file called donut.csv, which is present in the mahout setup. But for your ready reference I have also shared it. You can download it from here.

Step-II: Next cd to the MAHOUT_HOME. Here we would be running the “org.apache.mahout.classifier.sgd.TrainLogistic” class that would train the model for us using the “donut.csv” file what we would be providing as train data. Here’s the command to be run from within MAHOUT_HOME:

bin/mahout org.apache.mahout.classifier.sgd.TrainLogistic --passes 1 --rate 1 --lambda 0.5 --input loc_of_file/donut.csv --features 21 --output any_loc_on_your_machine/donut.model --target color --categories 2 --predictors x y xx xy yy a b c --types n n

If the Mahout version is 0.7 you are likely to face the error below:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/util/ProgramDriver

    at org.apache.mahout.driver.MahoutDriver.main(MahoutDriver.java:96)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.util.ProgramDriver
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:423)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:356)
    ... 1 more

Don’t worry, all you need to do is:

export CLASSPATH=${CLASSPATH}:your_MAHOUT_HOME/mahout-distribution-0.7/lib/hadoop/hadoop-core- 

After editing the CLASSPATH as mentioned above the command should run successfully and print something like:

color ~ -0.016*Intercept Term + -0.016*xy + -0.016*yy
      Intercept Term -0.01559
                  xy -0.01559
                  yy -0.01559
    0.000000000     0.000000000     0.000000000     0.000000000     0.000000000     0.000000000     0.000000000     0.000000000     0.000000000     0.000000000    -0.015590929     0.000000000     0.000000000     0.000000000     0.000000000     0.000000000     0.000000000     0.000000000     0.000000000     0.000000000     0.000000000 
13/05/26 02:14:02 INFO driver.MahoutDriver: Program took 588 ms (Minutes: 0.0098)

The most important parameters influencing the execution of the Training process are:

"--passes": the number of times to pass over the input data
"--lambda": the amount of coefficient decay to use
"--rate": the learning rate

You can vary the values of these 3 variables and see the change in performance of the algorithm. Also, you can now see that the model would have been created at the location which you had specified in the command.

Step-III: Now it’s time to run the classifier using the model that has been trained in Step-II. As the test data we would be using the same donut.csv file that we used for training or you can even split the file in some ratio for eg. 70-30 and use the 70% file for training of the model and 30% file for testing. Here’s the command for testing the model and running the classifier:

bin/mahout org.apache.mahout.classifier.sgd.RunLogistic --input loc_of_file/donut.csv  --model loc_of_model/donut.model --auc --scores --confusion

which should print an output something like:

AUC = 0.52
confusion: [[27.0, 13.0], [0.0, 0.0]]
entropy: [[-0.7, -0.4], [-0.7, -0.5]]
13/05/26 02:16:19 INFO driver.MahoutDriver: Program took 474 ms (Minutes: 0.0079)                                                                          

Similarly, you can try on a variety of data sets that you might have. I have seen upto 93% accuracy of results of classification on a different data set.
All the best !!!

Installing Mahout on Linux

Mahout is an acquisition of highly scalable machine learning algorithms over very large data sets. Although the real power of Mahout can be vouched for only on large HDFS data, but Mahout also supports running algorithm on local filesystem data, that can help you get a feel of how to run Mahout algorithms.

Installing Mahout on Linux

Before you can run any Mahout algorithm you need a Mahout installation ready on your Linux machine which can be carried out in two ways as described below:

Method I- Extracting the tarball

Yes, it is that simple. Just download the latest Mahout release of from
Extract the downloaded tarball using:

tar –xzvf  /path_to_downloaded_tarball/mahout-distribution-0.x.tar.gz                                                                       
This should result in a folder with name /path_to_downloaded_tarball/mahout-distribution-0.x
Now, you can run any of the algorithms using the script “bin/mahout” present in the extracted folder. For testing your installation, you can also run 

without any other arguments.

Method II- Building Mahout

1. Prerequisites for Building Mahout
 -   Java JDK 1.6
 -   Maven 2.2 or higher (http://maven.apache.org/)

Install maven and svn using following commands:
sudo apt-get install maven2                                                                

sudo apt-get install subversion                                                                                                    

2. Create a directory where you would want to check out the Mahout code, we’ll call it here MAHOUT_HOME:
cd MAHOUT_HOME                                                                                                              

3. Use Subversion to check out the code:
svn co http://svn.apache.org/repos/asf/mahout/trunk                                                                     

4. Compiling

mvn -DskipTests install                                                                                                           

5. Setting the environment variables

export MAHOUT_HOME=/location_of_checked_out_mahout        
export PATH=$PATH:$MAHOUT_HOME                                                                             

After following either of the above methods, you can now run any of the available mahout algorithms with appropriate arguments. Also, note that you can run the algorithm over HDFS data or local file system data. In order to run algorithms over data on your local file system set an environment variable with the name “MAHOUT_LOCAL” to anything other than an empty string. That would force mahout to run locally even if HADOOP_CONF_DIR and HADOOP_HOME are set.
To plunge into Mahout by trying out running an algorithm, you can refer to my next post. Hope this proved to be a good starter for you. 
All the best !!!

Wednesday, May 22, 2013

Kafka Monitoring using JMX-JMXTrans-Ganglia

Monitoring Kafka Clusters using Ganglia is a matter of a few steps. This blog post lists down those steps with an assumption that you have your Kafka Cluster ready.

Step-I: Setup JMXTrans on all the machines of the Kafka cluster as done on the Storm cluster in the previous post.

Step-II: In the kafka setup, edit “kafka-run-class.sh” script file by adding the following line to it:

KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote=true -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false "

Step-III: Also, edit the “kafka-server-start.sh” script file present in the kafka setup to set the JMX port to 9999 by adding the following line:

export JMX_PORT=${JMX_PORT:-9999}                                      
Now, on all the nodes of the cluster on which you have performed the above steps, you can run the following json file after which it should start reporting its metrics to the Ganglia server.

Sample JSON file

Run the code below in the form of a json file using the following command:

/usr/share/jmxtrans/jmxtrans.sh start /path_to_sample_json/example.json       

Note: Please change the paths of output files in the code below to paths accessible on your cluster machines.

  "servers" : [ {
    "port" : "9999", <--- Defined Kafka JMX Port
    "host" : "",  <--- Kafka Server
    "queries" : [ {
      "outputWriters" : [ {
        "@class" :
          "settings" : {
                   "outputFile" : "/home/jayati/JMXTrans/kafkaStats/bufferPool_direct_stats.txt",
                   "v31" : false
      } ],
      "obj" : "java.nio:type=BufferPool,name=direct",
      "resultAlias": "bufferPool.direct",
      "attr" : [ "Count", "MemoryUsed", "Name", "ObjectName", "TotalCapacity" ]
    }, {
      "outputWriters" : [ {
        "@class" :
        "settings" : {
                   "outputFile" : "/home/jayati/JMXTrans/kafkaStats/bufferPool_mapped_stats.txt",
                   "v31" : false
      } ],
      "obj" : "java.nio:type=BufferPool,name=mapped",
      "resultAlias": "bufferPool.mapped",
      "attr" : [ "Count", "MemoryUsed", "Name", "ObjectName", "TotalCapacity" ]
    }, {
      "outputWriters" : [ {
        "@class" :
        "settings" : {
                   "outputFile" : "/home/jayati/JMXTrans/kafkaStats/kafka_log4j_stats.txt",
                   "v31" : false
      } ],
      "obj" : "kafka:type=kafka.Log4jController",
      "resultAlias": "kafka.log4jController",
      "attr" : [ "Loggers" ]
    }, {
      "outputWriters" : [ {
        "@class" :
          "settings" : {
                   "outputFile" : "/home/jayati/JMXTrans/kafkaStats/kafka_socketServer_stats.txt",
                   "v31" : false
      } ],
      "obj" : "kafka:type=kafka.SocketServerStats",
      "resultAlias": "kafka.socketServerStats",
      "attr" : [ "AvgFetchRequestMs", "AvgProduceRequestMs", "BytesReadPerSecond", "BytesWrittenPerSecond", "FetchRequestsPerSecond", "MaxFetchRequestMs", "MaxProduceRequestMs" , "NumFetchRequests" , "NumProduceRequests" , "ProduceRequestsPerSecond", "TotalBytesRead", "TotalBytesWritten", "TotalFetchRequestMs", "TotalProduceRequestMs" ]
    } ],
    "numQueryThreads" : 2
  } ]
Get high on the Ganglia graphs showing your Kafka Cluster metrics. :) 
All the best !!!