Apache Storm

0
204

Apache Storm – Introduction

Wmind wear is Apache Storm?

Apache Storm is a distributed real-time huge data-procesperform system. Storm is styleed to process vast amount of data in a fault-tolerant and horizontal scalable method. It is a streaming data framework tmind wear has the capcapacity of highest ingestion rates. Though Storm is stateless, it manages distributed environment and cluster state via Apache ZooKeeper. It is easy and you can execute all kinds of manipulations on real-time data in parallel.

Apache Storm is continuing to end up being a leader in real-time data analytics. Storm is easy to setup, operate and it guarantees tmind wear every message will end up being processed through the topology at minimumern once.

Apache Storm vs Hadoop

Basically Hadoop and Storm frameworks are used for analyzing huge data. Both of all of them complement every other and differ in a few aspects. Apache Storm does all the operations other than persistency, while Hadoop is great at everything but lags in real-time complaceation. The folloearng table compares the attributes of Storm and Hadoop.

Storm Hadoop
Real-time stream procesperform Batch procesperform
Stateless Stateful
Master/Slave architecture with ZooKeeper based coordination. The master node is caldirected as nimbus and slaves are supervisors. Master-slave architecture with/withaway ZooKeeper based coordination. Master node is job tracker and slave node is task tracker.
A Storm streaming process can access tens of thoufine fine sands messages per 2nd on cluster. Hadoop Distributed File System (HDFS) uses MapReduce framework to process vast amount of data tmind wear conaspectrs moments or hours.
Storm topology runs until shutdown simply simply by the user or an unexpected unrecoverable failure. MapReduce jobs are executed in a sequential order and compallowed workuallytually.
Both are distributed and fault-tolerant
If nimbus / supervisor expires, restarting produces it continue from where it quitped, hence absolutely nothing gets affected. If the JobTracker expires, all the running jobs are lost.

Use-Cases of Apache Storm

Apache Storm is very famous for real-time huge data stream procesperform. For this particular reason, many of the companies are uperform Storm as an integral part of their particular system. Some notable examples are as follows −

Twitter − Twitter is uperform Apache Storm for it is range of “Publisher Analytics items”. “Publisher Analytics Products” process every and every tweets and clicks in the Twitter Platform. Apache Storm is deeply integrated with Twitter infrastructure.

NaviSite − NaviSite is uperform Storm for Event log monitoring/auditing system. Every logs generated in the system will go through the Storm. Storm will check the message against the configucrimson set of regular expression and if presently presently there is a fit up, then tmind wear particular message will end up being saved to the database.

Wego − Wego is a vacation metaoceanrch engine located in Singapore. Travel related data comes from many sources all over the world with different timing. Storm helps Wego to oceanrch real-time data, resolves concurrency issues and find the end up beingst fit up for the end-user.

Apache Storm Benefit is

Here is a list of the end up beingnefit is tmind wear Apache Storm provides −

  • Storm is open up up source, robust, and user friendly. It could end up being utilized in small companies as well as huge corporations.

  • Storm is fault tolerant, flexible, reliable, and supslots any programming language.

  • Allows real-time stream procesperform.

  • Storm is unend up beinglayvably fast end up beingcause it has enormous power of procesperform the data.

  • Storm can maintain up the performance workually under increaperform load simply simply by adding resources seriesarly. It is highly scalable.

  • Storm performs data refresh and end-to-end deresidery response in 2nds or moments depends upon the problem. It has very low latency.

  • Storm has operational intelligence.

  • Storm provides guaranteed data procesperform workually if any of the connected nodes in the cluster expire or messages are lost.

Apache Storm – Core Concepts

Apache Storm reads raw stream of real-time data from one end and movees it through a sequence of small procesperform unit is and awayplace the processed / helpful information at the other end.

The folloearng diagram depicts the core concept of Apache Storm.

Core Concept

Let us now have a nearr look at the components of Apache Storm −

Components Description
Tuple Tuple is the main data structure in Storm. It is a list of ordecrimson elements. By default, a Tuple supslots all data types. Generally, it is modeldirected as a set of comma separated values and moveed to a Storm cluster.
Stream Stream is an unordecrimson sequence of tuples.
Spaways Source of stream. Generally, Storm accepts inplace data from raw data sources like Twitter Streaming API, Apache Kafka queue, Kestrel queue, etc. Otherwise you can write spaways to read data from datasources. “ISpaway" is the core interface for implementing spaways. Some of the specific interfaces are IRichSpaway, BaseRichSpaway, KafkaSpaway, etc.
Bolts Bolts are logical procesperform unit is. Spaways move data to bolts and bolts process and produce a brand new awayplace stream. Bolts can perform the operations of filtering, aggregation, sign up foring, interworcalifornia king with data sources and databases. Bolt receives data and emit is to one or more bolts. “IBolt” is the core interface for implementing bolts. Some of the common interfaces are IRichBolt, IBasicBolt, etc.

Let’s conaspectr a real-time example of “Twitter Analysis” and see how it can end up being modeldirected in Apache Storm. The folloearng diagram depicts the structure.

Twitter Analysis

The inplace for the “Twitter Analysis” comes from Twitter Streaming API. Spaway will read the tweets of the users uperform Twitter Streaming API and awayplace as a stream of tuples. A performle tuple from the spaway will have a twitter username and a performle tweet as comma separated values. Then, this particular steam of tuples will end up being forwarded to the Bolt and the Bolt will split the tweet into individual word, calculate the word count, and persist the information to a configucrimson datasource. Now, we can easily get the result simply simply by querying the datasource.

Topology

Spaways and bolts are connected sign up fortly and they form a topology. Real-time application logic is specified inaspect Storm topology. In easy words, a topology is a directed graph where vertices are complaceation and edges are stream of data.

A easy topology starts with spaways. Spaway emit is the data to one or more bolts. Bolt represents a node in the topology having the smallest procesperform logic and the awayplace of a bolt can end up being emitted into an additional bolt as inplace.

Storm maintains the topology always running, until you kill the topology. Apache Storm’s main job is to run the topology and will run any numend up beingr of topology at a given time.

Tasks

Now you have a basic idea on spaways and bolts. They are the smallest logical unit of the topology and a topology is built uperform a performle spaway and an array of bolts. They need to end up being executed properly in a particular order for the topology to run successcompallowey. The execution of every and every spaway and bolt simply simply by Storm is caldirected as “Tasks”. In easy words, a task is either the execution of a spaway or a bolt. At a given time, every spaway and bolt can have multiple instances running in multiple separate threads.

Workers

A topology runs in a distributed manner, on multiple worker nodes. Storm spreads the tasks workuallyly on all the worker nodes. The worker node’s role is to listen for jobs and start or quit the processes whenever a brand new job arrives.

Stream Grouping

Stream of data flows from spaways to bolts or from one bolt to an additional bolt. Stream grouping manages how the tuples are rawayed in the topology and helps us to understand the tuples flow in the topology. There are four in-built groupings as exregulared end up beinglow.

Shuffle Grouping

In shuffle grouping, an equal numend up beingr of tuples is distributed randomly acombination all of the workers executing the bolts. The folloearng diagram depicts the structure.

Shuffle Grouping

Field Grouping

The fields with same values in tuples are grouped sign up fortly and the remaining tuples kept awayaspect. Then, the tuples with the same field values are sent forward to the same worker executing the bolts. For example, if the stream is grouped simply simply by the field “word”, then the tuples with the same string, “Hello” will move to the same worker. The folloearng diagram shows how Field Grouping works.

Field Grouping

Global Grouping

All the streams can end up being grouped and forward to one bolt. This grouping sends tuples generated simply simply by all instances of the source to a performle target instance (specifically, pick the worker with lowest ID).

Global Grouping

All Grouping

All Grouping sends a performle duplicate of every tuple to all instances of the receiving bolt. This kind of grouping is used to send signals to bolts. All grouping is helpful for sign up for operations.

All Grouping

Apache Storm – Cluster Architecture

One of the main highlight of the Apache Storm is tmind wear it is a fault-tolerant, fast with no “Single Point of Failure” (SPOF) distributed application. We can install Apache Storm in as many systems as needed to incrrerestve the capacity of the application.

Let’s have a look at how the Apache Storm cluster is styleed and it is internal architecture. The folloearng diagram depicts the cluster style.

Zoomaintainer Framework

Apache Storm has two type of nodes, Nimbus (master node) and Supervisor (worker node). Nimbus is the central component of Apache Storm. The main job of Nimbus is to run the Storm topology. Nimbus analyzes the topology and gathers the task to end up being executed. Then, it will distributes the task to an available supervisor.

A supervisor will have one or more worker process. Supervisor will delegate the tasks to worker processes. Worker process will spawn as many executors as needed and run the task. Apache Storm uses an internal distributed messaging system for the communication end up beingtween nimbus and supervisors.

Components Description
Nimbus Nimbus is a master node of Storm cluster. All other nodes in the cluster are caldirected as worker nodes. Master node is responsible for distributing data among all the worker nodes, bumign tasks to worker nodes and monitoring failures.
Supervisor The nodes tmind wear follow instructions given simply simply by the nimbus are caldirected as Supervisors. A supervisor has multiple worker processes and it governs worker processes to compallowe the tasks bumigned simply simply by the nimbus.
Worker process A worker process will execute tasks related to a specific topology. A worker process will not run a task simply simply by it iself, instead it produces executors and asks all of them to perform a particular task. A worker process will have multiple executors.
Executor An executor is absolutely nothing but a performle thread spawn simply simply by a worker process. An executor runs one or more tasks but only for a specific spaway or bolt.
Task A task performs workual data procesperform. So, it is either a spaway or a bolt.
ZooKeeper framework

Apache ZooKeeper is a service used simply simply by a cluster (group of nodes) to coordinate end up beingtween all of themselves and maintaining shacrimson data with robust synchronization techniques. Nimbus is stateless, so it depends on ZooKeeper to monitor the worcalifornia king node status.

ZooKeeper helps the supervisor to interwork with the nimbus. It is responsible to maintain the state of nimbus and supervisor.

Storm is stateless in nature. Even though stateless nature has it is own didepresperformvantages, it workually helps Storm to process real-time data in the end up beingst probable and fastest way.

Storm is not entirely stateless though. It stores it is state in Apache ZooKeeper. Since the state is available in Apache ZooKeeper, a faidirected nimbus can end up being restarted and made to work from where it left. Usually, service monitoring tools like monit will monitor Nimbus and restart it if presently presently there is any failure.

Apache Storm also have an advanced topology caldirected Trident Topology with state maintenance and it also provides a high-level API like Pig. We will talk about all these features in the coming chapters.

Apache Storm – Workflow

A worcalifornia king Storm cluster need to have one nimbus and one or more supervisors. Another imslotant node is Apache ZooKeeper, which will end up being used for the coordination end up beingtween the nimbus and the supervisors.

Let us now conaspectr a near look at the workflow of Apache Storm −

  • Initially, the nimbus will wait around for the “Storm Topology” to end up being submitted to it.

  • Once a topology is submitted, it will process the topology and gather all the tasks tmind wear are to end up being carried away and the order in which the task is to end up being executed.

  • Then, the nimbus will workuallyly distribute the tasks to all the available supervisors.

  • At a particular time interval, all supervisors will send heartend up beingats to the nimbus to inform tmind wear they are still areside.

  • When a supervisor expires and doesn’t send a heartend up beingat to the nimbus, then the nimbus bumigns the tasks to an additional supervisor.

  • When the nimbus it iself expires, supervisors will work on the already bumigned task withaway any issue.

  • Once all the tasks are compallowed, the supervisor will wait around for a brand new task to come in.

  • In the meantime, the dead nimbus will end up being restarted automatically simply simply by service monitoring tools.

  • The restarted nimbus will continue from where it quitped. Similarly, the dead supervisor can also end up being restarted automatically. Since both the nimbus and the supervisor can end up being restarted automatically and both will continue as end up beingfore, Storm is guaranteed to process all the task at minimumern once.

  • Once all the topologies are processed, the nimbus wait around’s for a brand new topology to arrive and similarly the supervisor wait around’s for brand new tasks.

By default, presently presently there are two modes in a Storm cluster −

  • Local mode − This mode is used for development, testing, and debugging end up beingcause it is the easiest way to see all the topology components worcalifornia king sign up fortly. In this particular mode, we can adsimply parameters tmind wear enable us to see how our topology runs in different Storm configuration environments. In Local mode, storm topologies run on the local machine in a performle JVM.

  • Production mode − In this particular mode, we submit our topology to the worcalifornia king storm cluster, which is composed of many processes, usually running on different machines. As talk abouted in the workflow of storm, a worcalifornia king cluster will run indefinitely until it is shut down.

Storm – Distributed Messaging System

Apache Storm processes real-time data and the inplace normally comes from a message queuing system. An external distributed messaging system will provide the inplace essential for the realtime complaceation. Spaway will read the data from the messaging system and convert it into tuples and inplace into the Apache Storm. The interesting fwork is tmind wear Apache Storm uses it is own distributed messaging system internally for the communication end up beingtween it is nimbus and supervisor.

Wmind wear is Distributed Messaging System?

Distributed messaging is based on the concept of reliable message queuing. Messages are queued asynchronously end up beingtween claynt applications and messaging systems. A distributed messaging system provides the end up beingnefit is of relicapacity, scalcapacity, and persistence.

Most of the messaging patterns follow the publish-subscriend up being model (simply Pub-Sub) where the senders of the messages are caldirected publishers and those who want to receive the messages are caldirected subscriend up beingrs.

Once the message has end up beingen published simply simply by the sender, the subscriend up beingrs can receive the selected message with the help of a filtering option. Usually we have two types of filtering, one is topic-based filtering and an additional one is content-based filtering.

Note tmind wear the pub-sub model can communicate only via messages. It is a very loosely coupdirected architecture; workually the senders don’t understand who their particular subscriend up beingrs are. Many of the message patterns enable with message broker to exalter publish messages for timely access simply simply by many subscriend up beingrs. A real-life example is Dish TV, which publishes different channels like sslots, movies, songs, etc., and anyone can subscriend up being to their particular own set of channels and get all of them whenever their particular subscriend up beingd channels are available.

Messaging System

The folloearng table descriend up beings a few of the popular high throughplace messaging systems −

Distributed messaging system Description
Apache Kafka Kafka was developed at LinkedIn corporation and later it end up beingcame a sub-project of Apache. Apache Kafka is based on brokerenabdirected, persistent, distributed publish-subscriend up being model. Kafka is fast, scalable, and highly efficient.
RabbitMQ RabbitMQ is an open up up source distributed robust messaging application. It is easy to use and runs on all platforms.
JMS(Java Message Service) JMS is an open up up source API tmind wear supslots creating, reading, and sending messages from one application to an additional. It provides guaranteed message deresidery and follows publish-subscriend up being model.
ActiveMQ ActiveMQ messaging system is an open up up source API of JMS.
ZeroMQ ZeroMQ is broker-less peer-peer message procesperform. It provides push-draw, rawayer-dealer message patterns.
Kestrel Kestrel is a fast, reliable, and easy distributed message queue.

Thrift Protocol

Thrift was built at Facebook for combination-language services development and remote procedure call (RPC). Later, it end up beingcame an open up up source Apache project. Apache Thrift is an Interface Definition Language and enables to define brand new data types and services implementation on top of the defined data types in an easy manner.

Apache Thrift is also a communication framework tmind wear supslots emend up beingdded systems, mobile applications, web applications, and many other programming languages. Some of the key features bumociated with Apache Thrift are it is modularity, flexibility, and high performance. In addition, it can perform streaming, messaging, and RPC in distributed applications.

Storm extensively uses Thrift Protocol for it is internal communication and data definition. Storm topology is simply Thrift Structs. Storm Nimbus tmind wear runs the topology in Apache Storm is a Thrift service.

Apache Storm – Installation

Let us now see how to install Apache Storm framework on your own own machine. There are 3 majo steps here −

  • Install Java on your own own system, if you don’t have it already.
  • Install ZooKeeper framework.
  • Install Apache Storm framework.

Step 1 − Verifying Java Installation

Use the folloearng command to check whether you have Java already instaldirected on your own own system.

$ java -version

If Java is already presently presently there, then you would see it is version numend up beingr. Else, download the latest version of JDK.

Step 1.1 − Download JDK

Download the latest version of JDK simply simply by uperform the folloearng link − www.oracle.com

The latest version is JDK 8u 60 and the file is “jdk-8u60-linux-x64.tar.gz”. Download the file on your own own machine.

Step 1.2 − Extrwork files

Generally files are end up beinging downloaded onto the downloads folder. Extrwork the tar setup uperform the folloearng commands.

$ cd /go/to/download/rawaye
$ tar -zxf jdk-8u60-linux-x64.gz

Step 1.3 − Move to opt directory

To produce Java available to all users, move the extrworked java content to “/usr/local/java” folder.

$ su
moveword: (type moveword of main user)
$ mkdir /opt/jdk
$ mv jdk-1.8.0_60 /opt/jdk/

Step 1.4 − Set rawaye

To set rawaye and JAVA_HOME variables, add the folloearng commands to ~/.bashrc file.

exslot JAVA_HOME =/usr/jdk/jdk-1.8.0_60
exslot PATH=$PATH:$JAVA_HOME/bin

Now apply all the alters in to the current running system.

$ source ~/.bashrc

Step 1.5 − Java Alternatives

Use the folloearng command to alter Java alternatives.

update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100

Step 1.6

Now verify the Java installation uperform the verification command (java -version) exregulared in Step 1.

Step 2 − ZooKeeper Framework Installation

Step 2.1 − Download ZooKeeper

To install ZooKeeper framework on your own own machine, visit down the folloearng link and download the latest version of ZooKeeper /index.php?s=httpzoomaintainerapacheorgrelrerestveshtml

As of now, the latest version of ZooKeeper is 3.4.6 (ZooKeeper-3.4.6.tar.gz).

Step 2.2 − Extrwork tar file

Extrwork the tar file uperform the folloearng commands −

$ cd opt/
$ tar -zxf zoomaintainer-3.4.6.tar.gz
$ cd zoomaintainer-3.4.6
$ mkdir data

Step 2.3 − Create configuration file

Open configuration file named “conf/zoo.cfg” uperform the command "vi conf/zoo.cfg" and setting all the folloearng parameters as starting stage.

$ vi conf/zoo.cfg
tickTime=2000
dataDir=/rawaye/to/zoomaintainer/data
clayntPort=2181
initLimit=5
syncLimit=2

Once the configuration file has end up beingen saved successcompallowey, you can start the ZooKeeper server.

Step 2.4 − Start ZooKeeper Server

Use the folloearng command to start the ZooKeeper server.

$ bin/zkServer.sh start

After executing this particular command, you will get a response as follows −

$ JMX enabdirected simply simply by default
$ Uperform config: /Users/../zoomaintainer-3.4.6/bin/../conf/zoo.cfg
$ Starting zoomaintainer ... STARTED

Step 2.5 − Start CLI

Use the folloearng command to start the CLI.

$ bin/zkCli.sh

After executing the above command, you will end up being connected to the ZooKeeper server and get the folloearng response.

Connecting to localhost:2181
................
................
................
Welcome to ZooKeeper!
................
................
WATCHER::
WatchedEvent state:SyncConnected type: None rawaye:null
[zk: localhost:2181(CONNECTED) 0]

Step 2.6 − Stop ZooKeeper Server

After connecting the server and performing all the operations, you can quit the ZooKeeper server simply simply by uperform the folloearng command.

bin/zkServer.sh quit

You have successcompallowey instaldirected Java and ZooKeeper on your own own machine. Let us now see the steps to install Apache Storm framework.

Step 3 − Apache Storm Framework Installation

Step 3.1 Download Storm

To install Storm framework on your own own machine, visit down the folloearng link and download the latest version of Storm /index.php?s=httpstorchartacheorgdownloadshtml

As of now, the latest version of Storm is “apache-storm-0.9.5.tar.gz”.

Step 3.2 − Extrwork tar file

Extrwork the tar file uperform the folloearng commands −

$ cd opt/
$ tar -zxf apache-storm-0.9.5.tar.gz
$ cd apache-storm-0.9.5
$ mkdir data

Step 3.3 − Open configuration file

The current relrerestve of Storm contains a file at “conf/storm.yaml” tmind wear configures Storm daemons. Add the folloearng information to tmind wear file.

$ vi conf/storm.yaml
storm.zoomaintainer.servers:
 - "localhost"
storm.local.dir: “/rawaye/to/storm/data(any rawaye)”
nimbus.host: "localhost"
supervisor.slots.slots:
 - 6700
 - 6701
 - 6702
 - 6703

After applying all the alters, save and return to terminal.

Step 3.4 − Start the Nimbus

$ bin/storm nimbus

Step 3.5 − Start the Supervisor

$ bin/storm supervisor

Step 3.6 Start the UI

$ bin/storm ui

After starting Storm user interface application, type the URL http://localhost:8080 in your own own favourite brangeser and you could see Storm cluster information and it is running topology. The page need to look similar to the folloearng screenshot.

Strom UI

Apache Storm – Worcalifornia king Example

We have gone through the core specialised details of the Apache Storm and now it is time to code a few easy scenarios.

Scenario – Mobile Call Log Analyzer

Mobile call and it is duration will end up being given as inplace to Apache Storm and the Storm will process and group the call end up beingtween the same caller and receiver and their particular compallowe numend up beingr of calls.

Spaway Creation

Spaway is a component which is used for data generation. Basically, a spaway will implement an IRichSpaway interface. “IRichSpaway” interface has the folloearng imslotant methods −

  • open up up − Provides the spaway with an environment to execute. The executors will run this particular method to preliminaryize the spaway.

  • nextTuple − Emit is the generated data through the collector.

  • near − This method is caldirected when a spaway is going to shutdown.

  • declareOutplaceFields − Declares the awayplace schema of the tuple.

  • ack − Acunderstanddirectedges tmind wear a specific tuple is processed

  • fail − Specifies tmind wear a specific tuple is not processed and not to end up being reprocessed.

Open

The signature of the open up up method is as follows −

open up up(Map conf, TopologyContext context, SpawayOutplaceCollector collector)
  • conf − Provides storm configuration for this particular spaway.

  • context − Provides compallowe information abaway the spaway place within the topology, it is task id, inplace and awayplace information.

  • collector − Enables us to emit the tuple tmind wear will end up being processed simply simply by the bolts.

nextTuple

The signature of the nextTuple method is as follows −

nextTuple()

nextTuple() is caldirected periodically from the same loop as the ack() and fail() methods. It must relrerestve manage of the thread when presently presently there is no work to do, so tmind wear the other methods have a chance to end up being caldirected. So the very first series of nextTuple checks to see if procesperform has compalloweed. If so, it need to sleep for at minimumern one milli2nd to crimsonuce load on the processor end up beingfore returning.

near

The signature of the near method is as follows −

near()

declareOutplaceFields

The signature of the declareOutplaceFields method is as follows −

declareOutplaceFields(OutplaceFieldsDeclarer declarer)

declarer − It is used to declare awayplace stream ids, awayplace fields, etc.

This method is used to specify the awayplace schema of the tuple.

ack

The signature of the ack method is as follows −

ack(Object msgId)

This method acunderstanddirectedges tmind wear a specific tuple has end up beingen processed.

fail

The signature of the nextTuple method is as follows −

ack(Object msgId)

This method informs tmind wear a specific tuple has not end up beingen compallowey processed. Storm will reprocess the specific tuple.

FakeCallLogReaderSpaway

In our scenario, we need to collect the call log details. The information of the call log contains.

  • caller numend up beingr
  • receiver numend up beingr
  • duration

Since, we don’t have real-time information of call logs, we will generate fake call logs. The fake information will end up being produced uperform Random course. The compallowe program code is given end up beinglow.

Coding − FakeCallLogReaderSpaway.java

imslot java.util.*;
//imslot storm tuple packages
imslot backtype.storm.tuple.Fields;
imslot backtype.storm.tuple.Values;

//imslot Spaway interface packages
imslot backtype.storm.topology.IRichSpaway;
imslot backtype.storm.topology.OutplaceFieldsDeclarer;
imslot backtype.storm.spaway.SpawayOutplaceCollector;
imslot backtype.storm.task.TopologyContext;

//Create a course FakeLogReaderSpaway which implement IRichSpaway interface 
   to access functionalilinks
	
public course FakeCallLogReaderSpaway implements IRichSpaway {
   //Create instance for SpawayOutplaceCollector which movees tuples to bolt.
   private SpawayOutplaceCollector collector;
   private boolean compallowed = false;
	
   //Create instance for TopologyContext which contains topology data.
   private TopologyContext context;
	
   //Create instance for Random course.
   private Random randomGenerator = brand new Random();
   private Integer idx = 0;

   @Override
   public void open up up(Map conf, TopologyContext context, SpawayOutplaceCollector collector) {
      this particular.context = context;
      this particular.collector = collector;
   }

   @Override
   public void nextTuple() {
      if(this particular.idx <= 1000) {
         List<String> mobileNumend up beingrs = brand new ArrayList<String>();
         mobileNumend up beingrs.add("1234123401");
         mobileNumend up beingrs.add("1234123402");
         mobileNumend up beingrs.add("1234123403");
         mobileNumend up beingrs.add("1234123404");

         Integer localIdx = 0;
         while(localIdx++ < 100 && this particular.idx++ < 1000) {
            String fromMobileNumend up beingr = mobileNumend up beingrs.get(randomGenerator.nextInt(4));
            String toMobileNumend up beingr = mobileNumend up beingrs.get(randomGenerator.nextInt(4));
				
            while(fromMobileNumend up beingr == toMobileNumend up beingr) {
               toMobileNumend up beingr = mobileNumend up beingrs.get(randomGenerator.nextInt(4));
            }
				
            Integer duration = randomGenerator.nextInt(60);
            this particular.collector.emit(brand new Values(fromMobileNumend up beingr, toMobileNumend up beingr, duration));
         }
      }
   }

   @Override
   public void declareOutplaceFields(OutplaceFieldsDeclarer declarer) {
      declarer.declare(brand new Fields("from", "to", "duration"));
   }

   //Override all the interface methods
   @Override
   public void near() {}

   public boolean isDistributed() {
      return false;
   }

   @Override
   public void workivate() {}

   @Override 
   public void deworkivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Bolt Creation

Bolt is a component tmind wear conaspectrs tuples as inplace, processes the tuple, and produces brand new tuples as awayplace. Bolts will implement IRichBolt interface. In this particular program, two bolt coursees CallLogCreatorBolt and CallLogCounterBolt are used to perform the operations.

IRichBolt interface has the folloearng methods −

  • prepare − Provides the bolt with an environment to execute. The executors will run this particular method to preliminaryize the spaway.

  • execute − Process a performle tuple of inplace.

  • thoroughly cleanup − Caldirected when a bolt is going to shutdown.

  • declareOutplaceFields − Declares the awayplace schema of the tuple.

Prepare

The signature of the prepare method is as follows −

prepare(Map conf, TopologyContext context, OutplaceCollector collector)
  • conf − Provides Storm configuration for this particular bolt.

  • context − Provides compallowe information abaway the bolt place within the topology, it is task id, inplace and awayplace information, etc.

  • collector − Enables us to emit the processed tuple.

execute

The signature of the execute method is as follows −

execute(Tuple tuple)

Here tuple is the inplace tuple to end up being processed.

The execute method processes a performle tuple at a time. The tuple data can end up being accessed simply simply by getValue method of Tuple course. It is not essential to process the inplace tuple immediately. Multiple tuple can end up being processed and awayplace as a performle awayplace tuple. The processed tuple can end up being emitted simply simply by uperform the OutplaceCollector course.

thoroughly cleanup

The signature of the thoroughly cleanup method is as follows −

thoroughly cleanup()

declareOutplaceFields

The signature of the declareOutplaceFields method is as follows −

declareOutplaceFields(OutplaceFieldsDeclarer declarer)

Here the parameter declarer is used to declare awayplace stream ids, awayplace fields, etc.

This method is used to specify the awayplace schema of the tuple

Call log Creator Bolt

Call log creator bolt receives the call log tuple. The call log tuple has caller numend up beingr, receiver numend up beingr, and call duration. This bolt simply produces a brand new value simply simply by combining the caller numend up beingr and the receiver numend up beingr. The format of the brand new value is "Caller numend up beingr – Receiver numend up beingr" and it is named as brand new field, "call". The compallowe code is given end up beinglow.

Coding − CallLogCreatorBolt.java

//imslot util packages
imslot java.util.HashMap;
imslot java.util.Map;

imslot backtype.storm.tuple.Fields;
imslot backtype.storm.tuple.Values;
imslot backtype.storm.task.OutplaceCollector;
imslot backtype.storm.task.TopologyContext;

//imslot Storm IRichBolt package
imslot backtype.storm.topology.IRichBolt;
imslot backtype.storm.topology.OutplaceFieldsDeclarer;
imslot backtype.storm.tuple.Tuple;

//Create a course CallLogCreatorBolt which implement IRichBolt interface
public course CallLogCreatorBolt implements IRichBolt {
   //Create instance for OutplaceCollector which collects and emit is tuples to produce awayplace
   private OutplaceCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutplaceCollector collector) {
      this particular.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String from = tuple.getString(0);
      String to = tuple.getString(1);
      Integer duration = tuple.getInteger(2);
      collector.emit(brand new Values(from + " - " + to, duration));
   }

   @Override
   public void thoroughly cleanup() {}

   @Override
   public void declareOutplaceFields(OutplaceFieldsDeclarer declarer) {
      declarer.declare(brand new Fields("call", "duration"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
}

Call log Counter Bolt

Call log counter bolt receives call and it is duration as a tuple. This bolt preliminaryizes a book (Map) object in the prepare method. In execute method, it checks the tuple and produces a brand new enattempt away in the book object for every brand new “call” value in the tuple and sets a value 1 in the book object. For the already available enattempt away in the book, it simply increment it is value. In easy terms, this particular bolt saves the call and it is count in the book object. Instead of saving the call and it is count in the book, we can also save it to a datasource. The compallowe program code is as follows −

Coding − CallLogCounterBolt.java

imslot java.util.HashMap;
imslot java.util.Map;

imslot backtype.storm.tuple.Fields;
imslot backtype.storm.tuple.Values;
imslot backtype.storm.task.OutplaceCollector;
imslot backtype.storm.task.TopologyContext;
imslot backtype.storm.topology.IRichBolt;
imslot backtype.storm.topology.OutplaceFieldsDeclarer;
imslot backtype.storm.tuple.Tuple;

public course CallLogCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutplaceCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutplaceCollector collector) {
      this particular.counterMap = brand new HashMap<String, Integer>();
      this particular.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String call = tuple.getString(0);
      Integer duration = tuple.getInteger(1);
		
      if(!counterMap.containsKey(call)){
         counterMap.place(call, 1);
      }else{
         Integer c = counterMap.get(call) + 1;
         counterMap.place(call, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void thoroughly cleanup() {
      for(Map.Enattempt away<String, Integer> enattempt away:counterMap.enattempt awaySet()){
         System.away.println(enattempt away.getKey()+" : " + enattempt away.getValue());
      }
   }

   @Override
   public void declareOutplaceFields(OutplaceFieldsDeclarer declarer) {
      declarer.declare(brand new Fields("call"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Creating Topology

The Storm topology is basically a Thrift structure. TopologyBuilder course provides easy and easy methods to produce complex topologies. The TopologyBuilder course has methods to set spaway (setSpaway) and to set bolt (setBolt). Finally, TopologyBuilder has produceTopology to produce topology. Use the folloearng code snippet to produce a topology −

TopologyBuilder constructer = brand new TopologyBuilder();

constructer.setSpaway("call-log-reader-spaway", brand new FakeCallLogReaderSpaway());

constructer.setBolt("call-log-creator-bolt", brand new CallLogCreatorBolt())
   .shuffleGrouping("call-log-reader-spaway");

constructer.setBolt("call-log-counter-bolt", brand new CallLogCounterBolt())
   .fieldsGrouping("call-log-creator-bolt", brand new Fields("call"));

shuffleGrouping and fieldsGrouping methods help to set stream grouping for spaway and bolts.

Local Cluster

For development purpose, we can produce a local cluster uperform "LocalCluster" object and then submit the topology uperform "submitTopology" method of "LocalCluster" course. One of the arguments for "submitTopology" is an instance of "Config" course. The "Config" course is used to set configuration options end up beingfore submitting the topology. This configuration option will end up being merged with the cluster configuration at run time and sent to all task (spaway and bolt) with the prepare method. Once topology is submitted to the cluster, we will wait around 10 2nds for the cluster to complacee the submitted topology and then shutdown the cluster uperform “shutdown” method of "LocalCluster". The compallowe program code is as follows −

Coding − LogAnalyserStorm.java

imslot backtype.storm.tuple.Fields;
imslot backtype.storm.tuple.Values;

//imslot storm configuration packages
imslot backtype.storm.Config;
imslot backtype.storm.LocalCluster;
imslot backtype.storm.topology.TopologyBuilder;

//Create main course LogAnalyserStorm submit topology.
public course LogAnalyserStorm {
   public static void main(String[] args) thranges Exception{
      //Create Config instance for cluster configuration
      Config config = brand new Config();
      config.setDebug(true);
		
      //
      TopologyBuilder constructer = brand new TopologyBuilder();
      constructer.setSpaway("call-log-reader-spaway", brand new FakeCallLogReaderSpaway());

      constructer.setBolt("call-log-creator-bolt", brand new CallLogCreatorBolt())
         .shuffleGrouping("call-log-reader-spaway");

      constructer.setBolt("call-log-counter-bolt", brand new CallLogCounterBolt())
         .fieldsGrouping("call-log-creator-bolt", brand new Fields("call"));
			
      LocalCluster cluster = brand new LocalCluster();
      cluster.submitTopology("LogAnalyserStorm", config, constructer.produceTopology());
      Thread.sleep(10000);
		
      //Stop the topology
		
      cluster.shutdown();
   }
}

Building and Running the Application

The compallowe application has four Java codes. They are −

  • FakeCallLogReaderSpaway.java
  • CallLogCreaterBolt.java
  • CallLogCounterBolt.java
  • LogAnalyerStorm.java

The application can end up being built uperform the folloearng command −

javac -cp “/rawaye/to/storm/apache-storm-0.9.5/lib/*” *.java

The application can end up being run uperform the folloearng command −

java -cp “/rawaye/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserStorm

Outplace

Once the application is started, it will awayplace the compallowe details abaway the cluster startup process, spaway and bolt procesperform, and finally, the cluster shutdown process. In "CallLogCounterBolt", we have printed the call and it is count details. This information will end up being displayed on the console as follows −

1234123402 - 1234123401 : 78
1234123402 - 1234123404 : 88
1234123402 - 1234123403 : 105
1234123401 - 1234123404 : 74
1234123401 - 1234123403 : 81
1234123401 - 1234123402 : 81
1234123403 - 1234123404 : 86
1234123404 - 1234123401 : 63
1234123404 - 1234123402 : 82
1234123403 - 1234123402 : 83
1234123404 - 1234123403 : 86
1234123403 - 1234123401 : 93

Non-JVM languages

Storm topologies are implemented simply simply by Thrift interfaces which produces it easy to submit topologies in any language. Storm supslots Rusimply simply by, Python and many other languages. Let’s conaspectr a look at python binding.

Python Binding

Python is a general-purpose interpreted, interworkive, object-oriented, and high-level programming language. Storm supslots Python to implement it is topology. Python supslots emitting, anchoring, accalifornia king, and logging operations.

As you understand, bolts can end up being defined in any language. Bolts produced in an additional language are executed as sub-processes, and Storm communicates with those sub-processes with JSON messages over stdin/stdaway. First conaspectr a sample bolt WordCount tmind wear supslots python binding.

public static course WordCount implements IRichBolt {
   public WordSplit() {
      super("python", "splitword.py");
   }
	
   public void declareOutplaceFields(OutplaceFieldsDeclarer declarer) {
      declarer.declare(brand new Fields("word"));
   }
}

Here the course WordCount implements the IRichBolt interface and running with python implementation specified super method argument "splitword.py". Now produce a python implementation named "splitword.py".

imslot storm
   course WordCountBolt(storm.BasicBolt):
      def process(self, tup):
         words = tup.values[0].split(" ")
         for word in words:
         storm.emit([word])
WordCountBolt().run()

This is the sample implementation for Python tmind wear counts the words in a given sentence. Similarly you can bind with other supsloting languages as well.

Apache Storm – Trident

Trident is an extension of Storm. Like Storm, Trident was also developed simply simply by Twitter. The main reason end up beinghind developing Trident is to provide a high-level abstrworkion on top of Storm alengthy with stateful stream procesperform and low latency distributed querying.

Trident uses spaway and bolt, but these low-level components are auto-generated simply simply by Trident end up beingfore execution. Trident has functions, filters, sign up fors, grouping, and aggregation.

Trident processes streams as a series of batches which are refercrimson as transworkions. Generally the dimension of those small batches will end up being on the order of thoufine fine sands or millions of tuples, depending on the inplace stream. This way, Trident is different from Storm, which performs tuple-simply simply by-tuple procesperform.

Batch procesperform concept is very similar to database transworkions. Every transworkion is bumigned a transworkion ID. The transworkion is conaspectcrimson successful, once all it is procesperform compallowe. However, a failure in procesperform one of the transworkion's tuples will cause the entire transworkion to end up being retransmitted. For every batch, Trident will call end up beingginCommit at the end up beingginning of the transworkion, and commit at the end of it.

Trident Topology

Trident API exposes an easy option to produce Trident topology uperform “TridentTopology” course. Basically, Trident topology receives inplace stream from spaway and do ordecrimson sequence of operation (filter, aggregation, grouping, etc.,) on the stream. Storm Tuple is replaced simply simply by Trident Tuple and Bolts are replaced simply simply by operations. A easy Trident topology can end up being produced as follow −

TridentTopology topology = brand new TridentTopology();

Trident Tuples

Trident tuple is a named list of values. The TridentTuple interface is the data model of a Trident topology. The TridentTuple interface is the basic unit of data tmind wear can end up being processed simply simply by a Trident topology.

Trident Spaway

Trident spaway is similar to Storm spaway, with additional options to use the features of Trident. Actually, we can still use the IRichSpaway, which we have used in Storm topology, but it will end up being non-transworkional in nature and we won’t end up being able to use the advantages provided simply simply by Trident.

The basic spaway having all the functionality to use the features of Trident is "ITridentSpaway". It supslots both transworkional and opaque transworkional semantics. The other spaways are IBatchSpaway, IPartitionedTridentSpaway, and IOpaquePartitionedTridentSpaway.

In addition to these generic spaways, Trident has many sample implementation of trident spaway. One of all of them is FeederBatchSpaway spaway, which we can use to send named list of trident tuples easily withaway worrying abaway batch procesperform, parallelism, etc.

FeederBatchSpaway creation and data give food toing can end up being done as shown end up beinglow −

TridentTopology topology = brand new TridentTopology();
FeederBatchSpaway testSpaway = brand new FeederBatchSpaway(
   ImmutableList.of("fromMobileNumend up beingr", "toMobileNumend up beingr", “duration”));
topology.brand newStream("fixed-batch-spaway", testSpaway)
testSpaway.give food to(ImmutableList.of(brand new Values("1234123401", "1234123402", 20)));

Trident Operations

Trident relays on the “Trident Operation” to process the inplace stream of trident tuples. Trident API has a numend up beingr of in-built operations to handle easy-to-complex stream procesperform. These operations range from easy validation to complex grouping and aggregation of trident tuples. Let us go through the many imslotant and regularly used operations.

Filter

Filter is an object used to perform the task of inplace validation. A Trident filter gets a subset of trident tuple fields as inplace and returns either true or false depending on whether particular conditions are satisfied or not. If true is returned, then the tuple is kept in the awayplace stream; otherwise, the tuple is removed from the stream. Filter will basically inherit from the BaseFilter course and implement the isKeep method. Here is a sample implementation of filter operation −

public course MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

inplace

[1, 2]
[1, 3]
[1, 4]

awayplace

[1, 2]
[1, 4]

Filter function can end up being caldirected in the topology uperform “every” method. “Fields” course can end up being used to specify the inplace (subset of trident tuple). The sample code is as follows −

TridentTopology topology = brand new TridentTopology();
topology.brand newStream("spaway", spaway)
.every(brand new Fields("a", "b"), brand new MyFilter())

Function

Function is an object used to perform a easy operation on a performle trident tuple. It conaspectrs a subset of trident tuple fields and emit is zero or more brand new trident tuple fields.

Function basically inherit is from the BaseFunction course and implements the execute method. A sample implementation is given end up beinglow −

public course MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(brand new Values(a + b));
   }
}

inplace

[1, 2]
[1, 3]
[1, 4]

awayplace

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

Just like Filter operation, Function operation can end up being caldirected in a topology uperform the every method. The sample code is as follows −

TridentTopology topology = brand new TridentTopology();
topology.brand newStream("spaway", spaway)
   .every(brand new Fields(“a, b"), brand new MyFunction(), brand new Fields(“d")));

Aggregation

Aggregation is an object used to perform aggregation operations on an inplace batch or partition or stream. Trident has 3 types of aggregation. They are as follows −

  • aggregate − Aggregates every batch of trident tuple in isolation. During the aggregate process, the tuples are preliminaryly repartitioned uperform the global grouping to combine all partitions of the same batch into a performle partition.

  • partitionAggregate − Aggregates every partition instead of the entire batch of trident tuple. The awayplace of the partition aggregate compallowely replaces the inplace tuple. The awayplace of the partition aggregate contains a performle field tuple.

  • persistentaggregate − Aggregates on all trident tuple acombination all batch and stores the result in either memory or database.

TridentTopology topology = brand new TridentTopology();

// aggregate operation
topology.brand newStream("spaway", spaway)
   .every(brand new Fields(“a, b"), brand new MyFunction(), brand new Fields(“d”))
   .aggregate(brand new Count(), brand new Fields(“count”))
	
// partitionAggregate operation
topology.brand newStream("spaway", spaway)
   .every(brand new Fields(“a, b"), brand new MyFunction(), brand new Fields(“d”))
   .partitionAggregate(brand new Count(), brand new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.brand newStream("spaway", spaway)
   .every(brand new Fields(“a, b"), brand new MyFunction(), brand new Fields(“d”))
   .persistentAggregate(brand new MemoryMapState.Fworkory(), brand new Count(), brand new Fields("count"));

Aggregation operation can end up being produced uperform either CombinerAggregator, ReducerAggregator, or generic Aggregator interface. The "count” aggregator used in the above example is one of the construct-in aggregators. It is implemented uperform “CombinerAggregator”. The implementation is as follows −

public course Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

Grouping

Grouping operation is an inbuilt operation and can end up being caldirected simply simply by the groupBy method. The groupBy method repartitions the stream simply simply by doing a partitionBy on the specified fields, and then within every partition, it groups tuples sign up fortly in whose group fields are equal. Normally, we use “groupBy” alengthy with “persistentAggregate” to get the grouped aggregation. The sample code is as follows −

TridentTopology topology = brand new TridentTopology();

// persistentAggregate - saving the count to memory
topology.brand newStream("spaway", spaway)
   .every(brand new Fields(“a, b"), brand new MyFunction(), brand new Fields(“d”))
   .groupBy(brand new Fields(“d”)
   .persistentAggregate(brand new MemoryMapState.Fworkory(), brand new Count(), brand new Fields("count"));

Merging and Joining

Merging and sign up foring can end up being done simply simply by uperform “merge” and “sign up for” method respectively. Merging combines one or more streams. Joining is similar to merging, other than the fwork tmind wear sign up foring uses trident tuple field from both aspects to check and sign up for two streams. Moreover, sign up foring will work under batch level only. The sample code is as follows −

TridentTopology topology = brand new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.sign up for(stream1, brand new Fields("key"), stream2, brand new Fields("x"), 
   brand new Fields("key", "a", "b", "c"));

State Maintenance

Trident provides a mechanism for state maintenance. State information can end up being stocrimson in the topology it iself, otherwise you can store it in a separate database as well. The reason is to maintain a state tmind wear if any tuple fails during procesperform, then the faidirected tuple is retried. This produces a problem while updating the state end up beingcause you are not sure whether the state of this particular tuple has end up beingen updated previously or not. If the tuple has faidirected end up beingfore updating the state, then reattempt awaying the tuple will produce the state stable. However, if the tuple has faidirected after updating the state, then reattempt awaying the same tuple will again incrrerestve the count in the database and produce the state unstable. One needs to perform the folloearng steps to ensure a message is processed only once −

  • Process the tuples in small batches.

  • Assign a unique ID to every batch. If the batch is retried, it is given the same unique ID.

  • The state updates are ordecrimson among batches. For example, the state update of the 2nd batch will not end up being probable until the state update for the very first batch has compallowed.

Distributed RPC

Distributed RPC is used to query and retrieve the result from the Trident topology. Storm has an inbuilt distributed RPC server. The distributed RPC server receives the RPC request from the claynt and movees it to the topology. The topology processes the request and sends the result to the distributed RPC server, which is crimsonirected simply simply by the distributed RPC server to the claynt. Trident's distributed RPC query executes like a normal RPC query, other than for the fwork tmind wear these queries are run in parallel.

When to Use Trident?

As in many use-cases, if the requirement is to process a query only once, we can achieve it simply simply by writing a topology in Trident. On the other hand, it will end up being difficult to achieve exworkly once procesperform in the case of Storm. Hence Trident will end up being helpful for those use-cases where you require exworkly once procesperform. Trident is not for all use cases, especially high-performance use-cases end up beingcause it adds complexity to Storm and manages the state.

Worcalifornia king Example of Trident

We are going to convert our call log analyzer application worked away in the previous section to Trident framework. Trident application will end up being relatively easy as compacrimson to regular storm, thanks to it is high-level API. Storm will end up being basically requicrimson to perform any one of Function, Filter, Aggregate, GroupBy, Join and Merge operations in Trident. Finally we will start the DRPC Server uperform the LocalDRPC course and oceanrch a few keyword uperform the execute method of LocalDRPC course.

Formatting the call information

The purpose of the FormatCall course is to format the call information compriperform “Caller numend up beingr” and “Receiver numend up beingr”. The compallowe program code is as follows −

Coding: FormatCall.java

imslot backtype.storm.tuple.Values;

imslot storm.trident.operation.BaseFunction;
imslot storm.trident.operation.TridentCollector;
imslot storm.trident.tuple.TridentTuple;

public course FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumend up beingr = tuple.getString(0);
      String toMobileNumend up beingr = tuple.getString(1);
      collector.emit(brand new Values(fromMobileNumend up beingr + " - " + toMobileNumend up beingr));
   }
}

CSVSplit

The purpose of the CSVSplit course is to split the inplace string based on “comma (,)” and emit every word in the string. This function is used to parse the inplace argument of distributed querying. The compallowe code is as follows −

Coding: CSVSplit.java

imslot backtype.storm.tuple.Values;

imslot storm.trident.operation.BaseFunction;
imslot storm.trident.operation.TridentCollector;
imslot storm.trident.tuple.TridentTuple;

public course CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.duration() > 0) {
            collector.emit(brand new Values(word));
         }
      }
   }
}

Log Analyzer

This is the main application. Initially, the application will preliminaryize the TridentTopology and give food to caller information uperform FeederBatchSpaway. Trident topology stream can end up being produced uperform the brand newStream method of TridentTopology course. Similarly, Trident topology DRPC stream can end up being produced uperform the brand newDRCPStream method of TridentTopology course. A easy DRCP server can end up being produced uperform LocalDRPC course. LocalDRPC has execute method to oceanrch a few keyword. The compallowe code is given end up beinglow.

Coding: LogAnalyserTrident.java

imslot java.util.*;

imslot backtype.storm.Config;
imslot backtype.storm.LocalCluster;
imslot backtype.storm.LocalDRPC;
imslot backtype.storm.utils.DRPCClaynt;
imslot backtype.storm.tuple.Fields;
imslot backtype.storm.tuple.Values;

imslot storm.trident.TridentState;
imslot storm.trident.TridentTopology;
imslot storm.trident.tuple.TridentTuple;

imslot storm.trident.operation.builtin.FilterNull;
imslot storm.trident.operation.builtin.Count;
imslot storm.trident.operation.builtin.Sum;
imslot storm.trident.operation.builtin.MapGet;
imslot storm.trident.operation.builtin.Debug;
imslot storm.trident.operation.BaseFilter;

imslot storm.trident.testing.FixedBatchSpaway;
imslot storm.trident.testing.FeederBatchSpaway;
imslot storm.trident.testing.Split;
imslot storm.trident.testing.MemoryMapState;

imslot com.google.common.collect.ImmutableList;

public course LogAnalyserTrident {
   public static void main(String[] args) thranges Exception {
      System.away.println("Log Analyser Trident");
      TridentTopology topology = brand new TridentTopology();
		
      FeederBatchSpaway testSpaway = brand new FeederBatchSpaway(ImmutableList.of("fromMobileNumend up beingr",
         "toMobileNumend up beingr", "duration"));

      TridentState callCounts = topology
         .brand newStream("fixed-batch-spaway", testSpaway)
         .every(brand new Fields("fromMobileNumend up beingr", "toMobileNumend up beingr"), 
         brand new FormatCall(), brand new Fields("call"))
         .groupBy(brand new Fields("call"))
         .persistentAggregate(brand new MemoryMapState.Fworkory(), brand new Count(), 
         brand new Fields("count"));

      LocalDRPC drpc = brand new LocalDRPC();

      topology.brand newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, brand new Fields("args"), brand new MapGet(), brand new Fields("count"));

      topology.brand newDRPCStream("multiple_call_count", drpc)
         .every(brand new Fields("args"), brand new CSVSplit(), brand new Fields("call"))
         .groupBy(brand new Fields("call"))
         .stateQuery(callCounts, brand new Fields("call"), brand new MapGet(), 
         brand new Fields("count"))
         .every(brand new Fields("call", "count"), brand new Debug())
         .every(brand new Fields("count"), brand new FilterNull())
         .aggregate(brand new Fields("count"), brand new Sum(), brand new Fields("sum"));

      Config conf = brand new Config();
      LocalCluster cluster = brand new LocalCluster();
      cluster.submitTopology("trident", conf, topology.construct());
      Random randomGenerator = brand new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpaway.give food to(ImmutableList.of(brand new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpaway.give food to(ImmutableList.of(brand new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpaway.give food to(ImmutableList.of(brand new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpaway.give food to(ImmutableList.of(brand new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.away.println("DRPC : Query starts");
      System.away.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.away.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.away.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClaynt claynt = brand new DRPCClaynt("drpc.server.location", 3772);
   }
}

Building and Running the Application

The compallowe application has 3 Java codes. They are as follows −

  • FormatCall.java
  • CSVSplit.java
  • LogAnalyerTrident.java

The application can end up being built simply simply by uperform the folloearng command −

javac -cp “/rawaye/to/storm/apache-storm-0.9.5/lib/*” *.java

The application can end up being run simply simply by uperform the folloearng command −

java -cp “/rawaye/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

Outplace

Once the application is started, the application will awayplace the compallowe details abaway the cluster startup process, operations procesperform, DRPC Server and claynt information, and finally, the cluster shutdown process. This awayplace will end up being displayed on the console as shown end up beinglow.

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends

Apache Storm in Twitter

Here in this particular chapter, we will talk about a real-time application of Apache Storm. We will see how Storm is used in Twitter.

Twitter

Twitter is an onseries social networcalifornia king service tmind wear provides a platform to send and receive user tweets. Registecrimson users can read and post tweets, but unregistecrimson users can only read tweets. Hashtag is used to categorize tweets simply simply by keyword simply simply by appending # end up beingfore the relevant keyword. Now allow us conaspectr a real-time scenario of finding the many used hashtag per topic.

Spaway Creation

The purpose of spaway is to get the tweets submitted simply simply by people as soon as probable. Twitter provides “Twitter Streaming API”, a web service based tool to retrieve the tweets submitted simply simply by people in real time. Twitter Streaming API can end up being accessed in any programming language.

twitter4j is an open up up source, unofficial Java library, which provides a Java based module to easily access the Twitter Streaming API. twitter4j provides a listener-based framework to access the tweets. To access the Twitter Streaming API, we need to sign in for Twitter developer account and need to get the folloearng OAuth authentication details.

  • Customerkey
  • CustomerSecret
  • AccessToken
  • AccessTookenSecret

Storm provides a twitter spaway, TwitterSampleSpaway, in it is starter kit. We will end up being uperform it to retrieve the tweets. The spaway needs OAuth authentication details and at minimumern a keyword. The spaway will emit real-time tweets based on keywords. The compallowe program code is given end up beinglow.

Coding: TwitterSampleSpaway.java

imslot java.util.Map;
imslot java.util.concurrent.LinkedBloccalifornia kingQueue;

imslot twitter4j.FilterQuery;
imslot twitter4j.StallWarning;
imslot twitter4j.Status;
imslot twitter4j.StatusDeallowionNotice;
imslot twitter4j.StatusListener;

imslot twitter4j.TwitterStream;
imslot twitter4j.TwitterStreamFworkory;
imslot twitter4j.auth.AccessToken;
imslot twitter4j.conf.ConfigurationBuilder;

imslot backtype.storm.Config;
imslot backtype.storm.spaway.SpawayOutplaceCollector;

imslot backtype.storm.task.TopologyContext;
imslot backtype.storm.topology.OutplaceFieldsDeclarer;
imslot backtype.storm.topology.base.BaseRichSpaway;
imslot backtype.storm.tuple.Fields;
imslot backtype.storm.tuple.Values;

imslot backtype.storm.utils.Utils;

@SuppressWarnings("serial")
public course TwitterSampleSpaway extends BaseRichSpaway {
   SpawayOutplaceCollector _collector;
   LinkedBloccalifornia kingQueue<Status> queue = null;
   TwitterStream _twitterStream;
		
   String consumerKey;
   String consumerSecret;
   String accessToken;
   String accessTokenSecret;
   String[] keyWords;
		
   public TwitterSampleSpaway(String consumerKey, String consumerSecret,
      String accessToken, String accessTokenSecret, String[] keyWords) {
         this particular.consumerKey = consumerKey;
         this particular.consumerSecret = consumerSecret;
         this particular.accessToken = accessToken;
         this particular.accessTokenSecret = accessTokenSecret;
         this particular.keyWords = keyWords;
   }
		
   public TwitterSampleSpaway() {
      // TODO Auto-generated constructor stub
   }
		
   @Override
   public void open up up(Map conf, TopologyContext context,
      SpawayOutplaceCollector collector) {
         queue = brand new LinkedBloccalifornia kingQueue<Status>(1000);
         _collector = collector;
         StatusListener listener = brand new StatusListener() {
            @Override
            public void onStatus(Status status) {
               queue.provide(status);
            }
					
            @Override
            public void onDeallowionNotice(StatusDeallowionNotice sdn) {}
					
            @Override
            public void onTrackLimitationNotice(int i) {}
					
            @Override
            public void onScrubGeo(lengthy l, lengthy l1) {}
					
            @Override
            public void onException(Exception ex) {}
					
            @Override
            public void onStallWarning(StallWarning arg0) {
               // TODO Auto-generated method stub
            }
         };
				
         ConfigurationBuilder cb = brand new ConfigurationBuilder();
				
         cb.setDebugEnabdirected(true)
            .setOAuthConsumerKey(consumerKey)
            .setOAuthConsumerSecret(consumerSecret)
            .setOAuthAccessToken(accessToken)
            .setOAuthAccessTokenSecret(accessTokenSecret);
					
         _twitterStream = brand new TwitterStreamFworkory(cb.construct()).getInstance();
         _twitterStream.addListener(listener);
				
         if (keyWords.duration == 0) {
            _twitterStream.sample();
         }else {
            FilterQuery query = brand new FilterQuery().track(keyWords);
            _twitterStream.filter(query);
         }
   }
			
   @Override
   public void nextTuple() {
      Status ret = queue.poll();
				
      if (ret == null) {
         Utils.sleep(50);
      } else {
         _collector.emit(brand new Values(ret));
      }
   }
			
   @Override
   public void near() {
      _twitterStream.shutdown();
   }
			
   @Override
   public Map<String, Object> getComponentConfiguration() {
      Config ret = brand new Config();
      ret.setMaxTaskParallelism(1);
      return ret;
   }
			
   @Override
   public void ack(Object id) {}
			
   @Override
   public void fail(Object id) {}
			
   @Override
   public void declareOutplaceFields(OutplaceFieldsDeclarer declarer) {
      declarer.declare(brand new Fields("tweet"));
   }
}

Hashtag Reader Bolt

The tweet emitted simply simply by spaway will end up being forwarded to HashtagReaderBolt, which will process the tweet and emit all the available hashtags. HashtagReaderBolt uses getHashTagEntilinks method provided simply simply by twitter4j. getHashTagEntilinks reads the tweet and returns the list of hashtag. The compallowe program code is as follows −

Coding: HashtagReaderBolt.java

imslot java.util.HashMap;
imslot java.util.Map;

imslot twitter4j.*;
imslot twitter4j.conf.*;

imslot backtype.storm.tuple.Fields;
imslot backtype.storm.tuple.Values;

imslot backtype.storm.task.OutplaceCollector;
imslot backtype.storm.task.TopologyContext;
imslot backtype.storm.topology.IRichBolt;
imslot backtype.storm.topology.OutplaceFieldsDeclarer;
imslot backtype.storm.tuple.Tuple;

public course HashtagReaderBolt implements IRichBolt {
   private OutplaceCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutplaceCollector collector) {
      this particular.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      Status tweet = (Status) tuple.getValueByField("tweet");
      for(HashtagEntity hashtage : tweet.getHashtagEntilinks()) {
         System.away.println("Hashtag: " + hashtage.getText());
         this particular.collector.emit(brand new Values(hashtage.getText()));
      }
   }

   @Override
   public void thoroughly cleanup() {}

   @Override
   public void declareOutplaceFields(OutplaceFieldsDeclarer declarer) {
      declarer.declare(brand new Fields("hashtag"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Hashtag Counter Bolt

The emitted hashtag will end up being forwarded to HashtagCounterBolt. This bolt will process all the hashtags and save every and every hashtag and it is count in memory uperform Java Map object. The compallowe program code is given end up beinglow.

Coding: HashtagCounterBolt.java

imslot java.util.HashMap;
imslot java.util.Map;

imslot backtype.storm.tuple.Fields;
imslot backtype.storm.tuple.Values;

imslot backtype.storm.task.OutplaceCollector;
imslot backtype.storm.task.TopologyContext;

imslot backtype.storm.topology.IRichBolt;
imslot backtype.storm.topology.OutplaceFieldsDeclarer;
imslot backtype.storm.tuple.Tuple;

public course HashtagCounterBolt implements IRichBolt {
   Map<String, Integer> counterMap;
   private OutplaceCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutplaceCollector collector) {
      this particular.counterMap = brand new HashMap<String, Integer>();
      this particular.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String key = tuple.getString(0);

      if(!counterMap.containsKey(key)){
         counterMap.place(key, 1);
      }else{
         Integer c = counterMap.get(key) + 1;
         counterMap.place(key, c);
      }
		
      collector.ack(tuple);
   }

   @Override
   public void thoroughly cleanup() {
      for(Map.Enattempt away<String, Integer> enattempt away:counterMap.enattempt awaySet()){
         System.away.println("Result: " + enattempt away.getKey()+" : " + enattempt away.getValue());
      }
   }

   @Override
   public void declareOutplaceFields(OutplaceFieldsDeclarer declarer) {
      declarer.declare(brand new Fields("hashtag"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Submitting a Topology

Submitting a topology is the main application. Twitter topology consists of TwitterSampleSpaway, HashtagReaderBolt, and HashtagCounterBolt. The folloearng program code shows how to submit a topology.

Coding: TwitterHashtagStorm.java

imslot java.util.*;

imslot backtype.storm.tuple.Fields;
imslot backtype.storm.tuple.Values;
imslot backtype.storm.Config;
imslot backtype.storm.LocalCluster;
imslot backtype.storm.topology.TopologyBuilder;

public course TwitterHashtagStorm {
   public static void main(String[] args) thranges Exception{
      String consumerKey = args[0];
      String consumerSecret = args[1];
		
      String accessToken = args[2];
      String accessTokenSecret = args[3];
		
      String[] arguments = args.clone();
      String[] keyWords = Arrays.duplicateOfRange(arguments, 4, arguments.duration);
		
      Config config = brand new Config();
      config.setDebug(true);
		
      TopologyBuilder constructer = brand new TopologyBuilder();
      constructer.setSpaway("twitter-spaway", brand new TwitterSampleSpaway(consumerKey,
         consumerSecret, accessToken, accessTokenSecret, keyWords));

      constructer.setBolt("twitter-hashtag-reader-bolt", brand new HashtagReaderBolt())
         .shuffleGrouping("twitter-spaway");

      constructer.setBolt("twitter-hashtag-counter-bolt", brand new HashtagCounterBolt())
         .fieldsGrouping("twitter-hashtag-reader-bolt", brand new Fields("hashtag"));
			
      LocalCluster cluster = brand new LocalCluster();
      cluster.submitTopology("TwitterHashtagStorm", config,
         constructer.produceTopology());
      Thread.sleep(10000);
      cluster.shutdown();
   }
}

Building and Running the Application

The compallowe application has four Java codes. They are as follows −

  • TwitterSampleSpaway.java
  • HashtagReaderBolt.java
  • HashtagCounterBolt.java
  • TwitterHashtagStorm.java

You can compile the application uperform the folloearng command −

javac -cp “/rawaye/to/storm/apache-storm-0.9.5/lib/*”:”/rawaye/to/twitter4j/lib/*” *.java

Execute the application uperform the folloearng commands −

javac -cp “/rawaye/to/storm/apache-storm-0.9.5/lib/*”:”/rawaye/to/twitter4j/lib/*”:.
TwitterHashtagStorm <customerkey> <customersecret> <accesstoken> <accesstokensecret>
<keyword1> <keyword2> … <keywordN>

Outplace

The application will print the current available hashtag and it is count. The awayplace need to end up being similar to the folloearng −

Result: jazztastic : 1
Result: fooexpire : 1
Result: Redskins : 1
Result: Recipe : 1
Result: cook : 1
Result: android : 1
Result: food : 2
Result: NoToxicHorseMeat : 1
Result: Purrs4Peace : 1
Result: residesongs : 1
Result: VIPremium : 1
Result: Frome : 1
Result: SundayRoast : 1
Result: Millennials : 1
Result: HealthWithKier : 1
Result: LPs30DaysofGratitude : 1
Result: coocalifornia king : 1
Result: gameinsight : 1
Result: Counattempt awayfile : 1
Result: androidgames : 1

Apache Storm in Yahoo! Finance

Yahoo! Finance is the Internet's leading business brand news and financial data websit downe. It is a part of Yahoo! and gives information abaway financial brand news, market statistics, international market data and other information abaway financial resources tmind wear anyone can access.

If you are a registecrimson Yahoo! user, then you can customise Yahoo! Finance to conaspectr advantage of it is particular provideings. Yahoo! Finance API is used to query financial data from Yahoo!

This API displays data tmind wear is delayed simply simply by 15-moments from real time, and updates it is database every 1 moment, to access current stock-related information. Now allow us conaspectr a real-time scenario of a company and see how to raise an alert when it is stock value goes end up beinglow 100.

Spaway Creation

The purpose of spaway is to get the details of the company and emit the prices to bolts. You can use the folloearng program code to produce a spaway.

Coding: YahooFinanceSpaway.java

imslot java.util.*;
imslot java.io.*;
imslot java.math.BigDecimal;

//imslot yahoofinace packages
imslot yahoofinance.YahooFinance;
imslot yahoofinance.Stock;

imslot backtype.storm.tuple.Fields;
imslot backtype.storm.tuple.Values;

imslot backtype.storm.topology.IRichSpaway;
imslot backtype.storm.topology.OutplaceFieldsDeclarer;

imslot backtype.storm.spaway.SpawayOutplaceCollector;
imslot backtype.storm.task.TopologyContext;

public course YahooFinanceSpaway implements IRichSpaway {
   private SpawayOutplaceCollector collector;
   private boolean compallowed = false;
   private TopologyContext context;
	
   @Override
   public void open up up(Map conf, TopologyContext context, SpawayOutplaceCollector collector){
      this particular.context = context;
      this particular.collector = collector;
   }

   @Override
   public void nextTuple() {
      attempt away {
         Stock stock = YahooFinance.get("INTC");
         BigDecimal price = stock.getQuote().getPrice();

         this particular.collector.emit(brand new Values("INTC", price.doubleValue()));
         stock = YahooFinance.get("GOOGL");
         price = stock.getQuote().getPrice();

         this particular.collector.emit(brand new Values("GOOGL", price.doubleValue()));
         stock = YahooFinance.get("AAPL");
         price = stock.getQuote().getPrice();

         this particular.collector.emit(brand new Values("AAPL", price.doubleValue()));
      } catch(Exception e) {}
   }

   @Override
   public void declareOutplaceFields(OutplaceFieldsDeclarer declarer) {
      declarer.declare(brand new Fields("company", "price"));
   }

   @Override
   public void near() {}
	
   public boolean isDistributed() {
      return false;
   }

   @Override
   public void workivate() {}

   @Override
   public void deworkivate() {}

   @Override
   public void ack(Object msgId) {}

   @Override
   public void fail(Object msgId) {}

   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Bolt Creation

Here the purpose of bolt is to process the given company’s prices when the prices fall end up beinglow 100. It uses Java Map object to set the cutoff price limit alert as true when the stock prices fall end up beinglow 100; otherwise false. The compallowe program code is as follows −

Coding: PriceCutOffBolt.java

imslot java.util.HashMap;
imslot java.util.Map;

imslot backtype.storm.tuple.Fields;
imslot backtype.storm.tuple.Values;

imslot backtype.storm.task.OutplaceCollector;
imslot backtype.storm.task.TopologyContext;

imslot backtype.storm.topology.IRichBolt;
imslot backtype.storm.topology.OutplaceFieldsDeclarer;

imslot backtype.storm.tuple.Tuple;

public course PriceCutOffBolt implements IRichBolt {
   Map<String, Integer> cutOffMap;
   Map<String, Boolean> resultMap;
	
   private OutplaceCollector collector;

   @Override
   public void prepare(Map conf, TopologyContext context, OutplaceCollector collector) {
      this particular.cutOffMap = brand new HashMap <String, Integer>();
      this particular.cutOffMap.place("INTC", 100);
      this particular.cutOffMap.place("AAPL", 100);
      this particular.cutOffMap.place("GOOGL", 100);

      this particular.resultMap = brand new HashMap<String, Boolean>();
      this particular.collector = collector;
   }

   @Override
   public void execute(Tuple tuple) {
      String company = tuple.getString(0);
      Double price = tuple.getDouble(1);

      if(this particular.cutOffMap.containsKey(company)){
         Integer cutOffPrice = this particular.cutOffMap.get(company);

         if(price < cutOffPrice) {
            this particular.resultMap.place(company, true);
         } else {
            this particular.resultMap.place(company, false);
         }
      }
		
      collector.ack(tuple);
   }

   @Override
   public void thoroughly cleanup() {
      for(Map.Enattempt away<String, Boolean> enattempt away:resultMap.enattempt awaySet()){
         System.away.println(enattempt away.getKey()+" : " + enattempt away.getValue());
      }
   }

   @Override
   public void declareOutplaceFields(OutplaceFieldsDeclarer declarer) {
      declarer.declare(brand new Fields("cut_off_price"));
   }
	
   @Override
   public Map<String, Object> getComponentConfiguration() {
      return null;
   }
	
}

Submitting a Topology

This is the main application where YahooFinanceSpaway.java and PriceCutOffBolt.java are connected sign up fortly and produce a topology. The folloearng program code shows how you can submit a topology.

Coding: YahooFinanceStorm.java

imslot backtype.storm.tuple.Fields;
imslot backtype.storm.tuple.Values;

imslot backtype.storm.Config;
imslot backtype.storm.LocalCluster;
imslot backtype.storm.topology.TopologyBuilder;

public course YahooFinanceStorm {
   public static void main(String[] args) thranges Exception{
      Config config = brand new Config();
      config.setDebug(true);
		
      TopologyBuilder constructer = brand new TopologyBuilder();
      constructer.setSpaway("yahoo-finance-spaway", brand new YahooFinanceSpaway());

      constructer.setBolt("price-cutoff-bolt", brand new PriceCutOffBolt())
         .fieldsGrouping("yahoo-finance-spaway", brand new Fields("company"));
			
      LocalCluster cluster = brand new LocalCluster();
      cluster.submitTopology("YahooFinanceStorm", config, constructer.produceTopology());
      Thread.sleep(10000);
      cluster.shutdown();
   }
}

Building and Running the Application

The compallowe application has 3 Java codes. They are as follows −

  • YahooFinanceSpaway.java
  • PriceCutOffBolt.java
  • YahooFinanceStorm.java

The application can end up being built uperform the folloearng command −

javac -cp “/rawaye/to/storm/apache-storm-0.9.5/lib/*”:”/rawaye/to/yahoofinance/lib/*” *.java

The application can end up being run uperform the folloearng command −

javac -cp “/rawaye/to/storm/apache-storm-0.9.5/lib/*”:”/rawaye/to/yahoofinance/lib/*”:.
YahooFinanceStorm

Outplace

The awayplace will end up being similar to the folloearng −

GOOGL : false
AAPL : false
INTC : true

Apache Storm – Applications

Apache Storm framework supslots many of the today's end up beingst industrial applications. We will provide a very short oversee of a few of the many notable applications of Storm in this particular chapter.

Klaway

Klaway is an application tmind wear uses social media analytics to rank it is users based on onseries social influence through Klaway Score, which is a numerical value end up beingtween 1 and 100. Klaway uses Apache Storm’s inbuilt Trident abstrworkion to produce complex topologies tmind wear stream data.

The Weather Channel

The Weather Channel uses Storm topologies to ingest weather conditions data. It has linkd up with Twitter to enable weather conditions-informed advertiperform on Twitter and mobile applications. OpenSignal is a company tmind wear specialises in wireless coverage chartping. StormTag and WeatherSignal are weather conditions-based projects produced simply simply by OpenSignal. StormTag is a Bluetooth weather conditions station tmind wear attaches to a keychain. The weather conditions data collected simply simply by the device is sent to the WeatherSignal app and OpenSignal servers.

Telecom Indusattempt away

Telecommunication providers process millions of phone calls per 2nd. They perform forensics on fallped calls and poor sound quality. Call detail records flow in at a rate of millions per 2nd and Apache Storm processes those in real-time and identifies any troubling patterns. Storm analysis can end up being used to continuously improve call quality.

SHARE
Previous articleBiathlon
Next articleData Warehousing

NO COMMENTS

LEAVE A REPLY