Saturday, July 25, 2015

Query Storm Data Streams using Esper

As you might already be aware and as documented on the web, “Esper is a component for complex event processing (CEP) which enables rapid development of applications that process large volumes of incoming messages or events, regardless of whether incoming messages are historical or real-time in nature.” Since integration of Storm and Esper was visibly very beneficial for complex processing of tuples incoming in the real-time stream in Storm Bolts, a specialized bolt called the “Esper Bolt” is available.

Esper Bolt enables you to run SQL like queries on your real-time data in storm. Since Esper Bolt is already available, you would not need to write a bolt as described above, but only use it while defining the main Topology class. Below is an illustration of the Topology with a SampleSpout emitting tuples containing 3 fields, namely "deviceId", "deviceTemp" and "time". These field names are listed in the Spout class in a method called declareOutputFields() as below:

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("deviceId", "deviceTemp", "time"));
}                                                                                                                                                                                                                                           

This method is one of the methods to be overridden in both spouts and bolts, which declares names of all the fields that would be emitted per tuple.

Moving forward with the Topology definition, below is the Esper Bolt declaring a statement (query) that runs over every tuple in the incoming stream and emits "deviceId" (id of the device) and "deviceTemp” (temperature of the device)  if the value of the incoming tuple field “deviceTemp” lies between 2000.00 and 3000.00

{
TopologyBuilder builder = new TopologyBuilder();

// add the spout
builder.setSpout("deviceDataAbsorber", new SampleSpout(), 1);

        // define the Esper bolt
EsperBolt deviceTrackerBolt = new EsperBolt.Builder()
.inputs().aliasComponent("deviceDataAbsorber").withFields("deviceId", "deviceTemp", "time").ofTypes(String.class, Float.class, String.class).toEventType("Evaluate")
.outputs().onDefaultStream().emit("deviceId", "deviceTemp")
.statements().add("select evaluate.deviceId as deviceId, evaluate.deviceTemp as deviceTemp from Evaluate.win:time_batch(1 sec) as evaluate where evaluate.deviceTemp <= 3000.00f and evaluate.deviceTemp >= 2000.00f")
.build();
}

The topology can have any number of Esper Bolts or user-defined bolts feeding upon the output of the Esper Bolt. You can define similar query statements depending upon your problem statement.

Hope it helped!

No comments:

Post a Comment