Overview

Storm is a framework for distributed real-time computation and querying. Instead of focusing on running declarative queries "in place", Storm focuses on the simplicity of running small chunks of code in parallel.

Storm is the Hadoop of real-time.

As a flexible framework, Storm allows the use of any persistance layer. Choosing a persistance layer is very important in Storm, and in-memory stores like Redis and Memcache are very popular. As a database similar to those, but with clustering, high availability, and Flash/SSD support, Aerospike is an excellent choice for Storm applications.

If you're already familiar with Storm and persistance, you can jump immediately to Aerospike's Storm connectors and get started. Our bolts provide examples of persisting, decorating, and validating through mulitple paths. See the README.md file to just jump in.

https://github.com/aerospike/storm-aerospike

An overview of Storm

Terminology

  • Spouts, which imports data from queues and other sources
  • Bolts, which are arbitrary computation on messages
  • Tuples, which are messages containing key-value pairs
  • Topologies, which describes a set of Spouts and Bolts and the flow of Tuples
  • Nimbus, the job director and tuple manager
  • Trident, the exactly-once framework
  • Supervisor, the JVM which runs Bolts and Spouts

References

Storm 0.8.1 Github repository

Storm Google group

Storm 0.9.0+ Apache Incubator

Michael Noll's installation tips

Trending Words example

Background

Storm was written to build a framework for high performance, message orientedprogramming. It is, fundamentally, a combination of a message layer (Thrift), a networking layer (0mq, then HTTP through Jetty), and a management and routing system (storm). Performance levels of 1M messages per second per server have been claimed. With existing and available Spouts interated with Twitter (GNIP), Java Message Services (JMS), Apache log files, and example Bolts providing filtering for transient signals, computation of "trending words", you can get up and running on a production-grade computation system easily.

Storm was originally written at Twitter by Nathan Marz, and includes work he did at Backtype. Up through version 0.8, Storm was a public github repository, but as of the 0.9 release, Storm has moved to Apache Incubator status, so Apache's project management can be used to move the project forward.

Although this framework is based in Java (and has a strong Clojure community), Bolts can be written in multiple languages. This allows your existing analytics packages, regardless of language, to integrate with Storm. If you are using private packaging or data formats, you can use your existing libraries to convert them into Storm's system. The Topology framework allows configuration of an arbitrary tree of Spouts and Bolts. The Nimbus will assign multiple servers to the processing layer for running these Spouts and Bolts. The ability to add and remove processing servers to an existing Topology is one of the strengths of Storm.

Persistence and Storm

Without stored data, Storm is a rather boring system. A message would only be able to be transformed in flight, and never filtered through time, or correlated. By default, Storm can be configured to "shard" messages, so similar messages can be routed to the same instance of a Bolt, and correlated in-memory. In order to do this, when creating the topology, the author needs to determine the equivilent of a "shard key", which will be used to route messages between Supervisors running Bolts. Then, Tuples which are "similar" (have the same shard key) will be processed by the same Supervisor, thus state about that group can be kept in-memory, and in-process, improving speed.

Imagine a "trending words" system where an individual word always gets routed to the same machine, allowing a simple counter to be maintained, in memory, on that machine. This creates obvious problems when restarting Storm Supervisors, or when changing the size of the Storm cluster. Storm does not migrate data from server to server (re-shard). If the historical data needs are light, or if transient outages while, say, restarting a server means a few minutes without accurate trend data as the in-memory counters become accurate again, this system can be ideal.

If the data models or history is more complex, a shared data store outside Storm can be the necessary solution. A solution that continually stores recent behavior about a user, and a second Bolt which uses that behavior to provide predictions, is best done with stateless Storm Supervisors and a high performance persistance layer.

Aerospike's NoSQL database can be the best solution when very low latency (sub-millisecond), high throughput (1M+ database requests per second per server), high availability (clusters operating for years with no downtime), or large data sets (10T per commodity server) are required. The Aerospike database supports extraordinary high speed key-value access - up to 10x faster than Cassanrda or MongoDB on the same hardware, similar to Redis - while also providing ACID reliability, Analytics operations, and rich functionality including fast MapReduce-style operations, expressive native types (lists, maps, and documents), and in-database operations through user defined functions.

With its free Community Edition and available enterprise support through Aerospike, the Aerospike database is a good starting point for your project, even at lower levels of scale, and provideds easy growth as your project or company becomes larger. A guide to the Aerospike Bolts

Aerospike provides three example Bolts, and an example topology using those Bolts. The Decorate Bolt is an example of retrieving data from an Aerospike server. Configured with a "key", a particular message field which is to be used to look up data, the Bolt does a single lookup, and adds a set of configured fields to the message, and emits the resulting message.

For example, if one field is a user_id, and you wish to retrieve the current set of audience ids from Aerospike (loaded and maintained through batch jobs from an analytics cluster), you would configure the Decorate Bolt to read based on the user_id field, and request the field with a list of audience_ids, and add that to the Tuple. Note that this example includes connection pooling to the AerospikeDB, as the "Prepare" method starts by connecting to the cluster, learning the current data layout of the cluster, and creating the (initially empty) pool of file descriptors.

The Persist Bolt does the opposite. It also must be configured with one of the fields as a key, the stores a set of fields in Aerospike.