Big Data is a hot topic these days, and one aspect of that problem space is processing streams of data in near-real time. One of the applications that can help you do this is Spark, which is produced at UC Berkeley’s AMP (Algorithms, Machines and People) Lab.
The first thing you need when you’re looking at data stream analysis techniques is a stream of data to analyse. I’m using a JSON/WebSocket representation of SIX Financial Information’s real time market data feed.
The next thing I need is a problem to solve using my data stream. Now, I’m no great financial wizard, but I’m going to suggest that there might be something useful to be gained from knowing what sectors are currently “trending” - where trending means showing an overall trend in a positive direction, through lots of price changes.
Spark - a quick introduction
For those of you that haven’t heard of Spark before, it’s a project written by the folks over at Berkeley, and is a key component of their Berkeley Data Analytics Stack. It is written mostly in Scala, and provides APIs for Scala, Java and Python. It is fully compatible with Hadoop Distributed File System, but extends on Hadoop’s core functionality by providing in-memory cluster computation, and, most importantly for this blog post, a stream handling framework.
If you’re interested in finding out more about Spark, they offer a free online introductory course (that will set you back about $10 in Amazon EC2 fees). However, at the time of writing, the streaming exercise doesn’t work as the EC2 image is based on an old version of Spark that uses a decommissioned Twitter API.
Spark Streaming
To consume a stream of data in Spark you need to have a StreamingContext
in
which you register an InputDStream
that in turn can produce a Receiver
object. Spark provides a number of default implementations of these (e.g.
Twitter, Akka Actor, ZeroMQ, etc.) that are accessible from the context. As
there is no default implementation for a WebSocket, so we’re going to have to
define our own.
The real work is done by the Receiver implementation, so we’ll start there.
I planned to use the scalawebsocket
library to access the WebSocket, but unfortunately it’s only available for Scala
2.10, and Spark is only available for Scala 2.9 - it’s when this happens that
you have to swallow down your annoyance at the lack of binary compatibility
between Scala versions - fortunately all I had to do was strip out the logging
statements from scalawebsocket
(they used 2.10 macros), and then I could
recompile it for 2.9.
With that done, we can now implement a simple trait for using our WebSocket (it
uses the Listings
object, which just produces a list of all the available
stock listings for which the sector is known, and a map of sector id to name
for later):
import scalawebsocket.WebSocket
trait PriceWebSocketClient {
import Listings._
def createSocket(handleMessage: String => Unit) = {
websocket = WebSocket().open("ws://localhost:8080/1.0/marketDataWs").onTextMessage(m => {
handleMessage(m)
})
subscriptions.foreach(listing => websocket.sendText("{\"subscribe\":{" + listing + "}}"))
}
var websocket: WebSocket = _
}
This is useful because to start off with, we just want to check that we’re receiving messages from the WebSocket correctly. We can write a simple extension of this trait:
class PriceEcho extends PriceWebSocketClient {
createSocket(println)
}
So once we’ve got our Scala application hooked up to the WebSocket correctly,
we can implement a Receiver
to consume messages using Spark. Seeing as
we’re receiving our stream over a generic network protocol, we’re going to
extend the NetworkReceiver
. All we need to do then is create a block
generator and append our messages onto it:
class PriceReceiver extends NetworkReceiver[String] with PriceWebSocketClient {
lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY_SER)
protected override def onStart() {
blockGenerator.start
createSocket(m => blockGenerator += m)
}
protected override def onStop() {
blockGenerator.stop
websocket.shutdown
}
}
That was pretty simple, but all we’ve got here is a text string containing
JSON data - we can extract the salient bits of data into a case class that is
then going to be easier to manipulate. Let’s create a PriceUpdate
case
class:
import scala.util.parsing.json.JSON
import scala.collection.JavaConversions
import java.util.TreeMap
case class PriceUpdate(id: String, price: Double, lastPrice: Double)
object PriceUpdate {
// No native Scala TreeMap as yet, so I'll borrow Java's
val lastPrices = JavaConversions.asMap(new TreeMap[String,Double])
def apply(text: String): PriceUpdate = {
val (id, price) = getIdAndPriceFromJSON(text)
val lastPrice: Double = lastPrices.getOrElse(id, price)
lastPrices.put(id, price)
PriceUpdate(id, price, lastPrice)
}
def getIdAndPriceFromJSON(text: String) = // snip - simple JSON processing
}
Unfortunately I couldn’t find the financial listing attribute to give me the previous price for a listing. We’ll just close our eyes and pretend there isn’t a threading problem using a central map to hold onto the previous values of the price - obviously if we were writing a production application we wouldn’t be able to use this hack.
Now our receiver can look like this:
import spark.streaming.dstream.NetworkReceiver
import spark.storage.StorageLevel
class PriceReceiver extends NetworkReceiver[PriceUpdate] with PriceWebSocketClient {
lazy val blockGenerator = new BlockGenerator(StorageLevel.MEMORY_ONLY_SER)
protected override def onStart() {
blockGenerator.start
createSocket(m => {
val priceUpdate = PriceUpdate(m)
blockGenerator += priceUpdate
})
}
protected override def onStop() {
blockGenerator.stop
websocket.shutdown
}
}
Much better. Now we need a corresponding InputDStream
. Seeing as we’re only
ever going to be returning new PriceReceiver
objects whenever the
getReceiver
function is called, we can just create our stream as an object:
object stream extends NetworkInputDStream[PriceUpdate](ssc) {
override def getReceiver(): NetworkReceiver[PriceUpdate] = {
new PriceReceiver()
}
}
Right, let’s plug it into a Spark Streaming application and fire it up. If we follow the Spark Quick Start instructions and then the guide for using Spark Streaming, we need to wrap up the following basic outline into an application:
val ssc = new StreamingContext("local", "datastream", Seconds(15), "C:/software/spark-0.7.3", List("target/scala-2.9.3/spark-data-stream_2.9.3-1.0.jar"))
// create InputDStream
ssc.registerInputStream(stream)
// interact with stream
ssc.start()
This code is initialising the streaming context, providing the cluster
details (I’m just using a local single node), the name of the application,
how much time to gather data from the stream before processing it, the
location of the installed Spark software, and the jar file to run the
application from. The latter we can use the output from sbt, all we need
to do is make sure we use sbt package run
from the command line, and it
will produce a jar file in the target directory, and Spark then uses that as
a result of our passing it on creation of the StreamingContext
.
Then we just register our input stream, do some processing on it, and start the streaming context. We can start off by just using the print function, which will just print the first 10 items off the stream:
override def main(args: Array[String]) {
import Listings._
val ssc = new StreamingContext("local", "datastream", Seconds(15), "C:/software/spark-0.7.3", List("target/scala-2.9.3/spark-data-stream_2.9.3-1.0.jar"))
object stream extends NetworkInputDStream[PriceUpdate](ssc) {
override def getReceiver(): NetworkReceiver[PriceUpdate] = {
new PriceReceiver()
}
}
ssc.registerInputStream(stream)
stream.map(pu => listingNames(pu.id) + " - " + pu.lastPrice + " - " + pu.price).print()
ssc.start()
}
Which gives us output like this:
-------------------------------------------
Time: 1375194945000 ms
-------------------------------------------
Croda International PLC - 24.82 - 24.82
ASOS PLC - 47.485 - 47.485
Arian Silver Corp - 0.0435 - 0.0435
Medicx Fund Ltd - 0.7975 - 0.7975
Supergroup PLC - 10.73 - 10.73
Diageo PLC - 20.07 - 20.075
Barclays PLC - 2.891 - 2.8925
QinetiQ Group PLC - 1.874 - 1.874
CSR PLC - 5.7 - 5.7
United Utilities Group PLC - 7.23 - 7.23
...
Processing the data
Now we just need to process the blocks of data. First, we want to turn them into a list of sector, price change and change frequency. If we first turn each item into sector, change and a count of 1, we can do this as follows:
val sectorPriceChanges = stream.map(pu => (listingSectors(pu.id), (pu.price - pu.lastPrice, 1)))
The result of this line is that the stream has been transformed into a tuple
of sector id combined with another tuple containing price change and the
count of 1. Stream elements that are tuple pairs have an extra set of
functions that we can use in the PairDStreamFunctions
class, for which
there is an implicit conversion function available in the StreamingContext
,
so by importing StreamingContext._
we can now use the reduceByKeyAndWindow
function. This function allows us to use a moving frame to reduce over, using
the first value of the pair as the key for the reduction. We supply a reduce
function and an inverse reduce function - then for each iteration within the
frame, Spark will reduce the new data and “un-reduce” the old. Here’s a
picture to try and illustrate this:
Here we’re looking at a sliding window in its old state (red) and new state (blue), with the Spark iterations marked by the dashed lines. As each iteration passes by, the purple area is staying the same, so all Spark needs to do is undo the reduction of the red section that has fallen off the end, and add on the reduction of the new blue section.
So now we need a reduce and inverse reduce function to use - I want my reduction to sum all of the price changes (positive and negative), and then I want to know whether the number of changes were more in a positive direction than negative, so if the price change was positive, I’m going to increase my count, but if negative, I’m going to decrease it:
val reduce = (reduced: (Double,Int), pair: (Double,Int)) => {
if (pair._1 > 0) (reduced._1 + pair._1, reduced._2 + pair._2)
else (reduced._1 + pair._1, reduced._2 - pair._2)
}
val invReduce = (reduced: (Double,Int), pair: (Double,Int)) => {
if (pair._1 > 0) (reduced._1 + pair._1, reduced._2 - pair._2)
else (reduced._1 + pair._1, reduced._2 + pair._2)
}
val windowedPriceChanges = sectorPriceChanges.reduceByKeyAndWindow(reduce, invReduce, Seconds(5*60), Seconds(15))
Now we’ve got a reduced stream of net price change and movement trend. We only want to display the biggest positive movers, so we can filter the stream for those values that have a positive movement trend, then we can switch the tuples around so that we have a key of something we can sort by. Now my statistics isn’t that great, but I want my ordering to be weighted by net price change and the positive movement trend, so I’m just going to multiply the two values together. Finally we can sort the data and print out the top 5. Put it all together and we’ve got our streaming application:
import scala.collection.immutable.List
import spark.SparkContext._
import spark.streaming._
import spark.streaming.StreamingContext._
import spark.streaming.dstream._
object DataStream extends App {
val reportHeader = """----------------------------------------------
Positive Trending
=================
""".stripMargin
override def main(args: Array[String]) {
import Listings._
import System._
val ssc = new StreamingContext("local", "datastream", Seconds(15), "C:/software/spark-0.7.3", List("target/scala-2.9.3/spark-data-stream_2.9.3-1.0.jar"))
object stream extends NetworkInputDStream[PriceUpdate](ssc) {
override def getReceiver(): NetworkReceiver[PriceUpdate] = {
new PriceReceiver()
}
}
ssc.checkpoint("spark")
ssc.registerInputStream(stream)
val reduce = (reduced: (Double,Int), pair: (Double,Int)) => {
if (pair._1 > 0) (reduced._1 + pair._1, reduced._2 + pair._2)
else (reduced._1 + pair._1, reduced._2 - pair._2)
}
val invReduce = (reduced: (Double,Int), pair: (Double,Int)) => {
if (pair._1 > 0) (reduced._1 + pair._1, reduced._2 - pair._2)
else (reduced._1 + pair._1, reduced._2 + pair._2)
}
val sectorPriceChanges = stream.map(pu => (listingSectors(pu.id), (pu.price - pu.lastPrice, 1)))
val windowedPriceChanges = sectorPriceChanges.reduceByKeyAndWindow(reduce, invReduce, Seconds(5*60), Seconds(15))
val positivePriceChanges = windowedPriceChanges.filter{case (_, (_, count)) => count > 0}
val priceChangesToSector = positivePriceChanges.map{case(sector, (value, count)) => (value * count, sector)}
val sortedSectors = priceChangesToSector.transform(rdd => rdd.sortByKey(false)).map(_._2)
sortedSectors.foreach(rdd => {
println("""|----------------------------------------------
|Positive Trending (Time: %d ms)
|----------------------------------------------
|""".stripMargin.format(currentTimeMillis + rdd.take(5).map(sectorCodes(_)).mkString("\n"))
})
ssc.start()
}
}
Note that we’ve had to add a location for Spark to checkpoint the DStream
that is created when we use reduceByKeyAndWindow
with an inverse function
- it isn’t really explained why this is needed (it is just mentioned in passing in the Spark streaming guide), but my assumption is that Spark needs to store the data that is received in each interval so that it has it available when it comes to applying the inverse reduction function.
When we run this (you may run into PermGen space problems the first time you try running your application - I found these went away on a subsequent run), we then get the trending sectors being printed:
----------------------------------------------
Positive Trending (Time: 1375269240035 ms)
----------------------------------------------
Real estate
Telecommunication
Graphics, publishing & printing media
Environmental services & recycling
Agriculture & fishery
----------------------------------------------
Positive Trending (Time: 1375269255035 ms)
----------------------------------------------
Real estate
Graphics, publishing & printing media
Environmental services & recycling
Agriculture & fishery
Electrical appliances & components
----------------------------------------------
Positive Trending (Time: 1375269270034 ms)
----------------------------------------------
Environmental services & recycling
Agriculture & fishery
Electrical appliances & components
Vehicles
Precious metals & precious stones
It looks like Agriculture & fishery or Environmental services & recycling are worth investing in right now, but don’t take my word for it!
In this blog we’ve looked at how stream processing can be achieved using Spark - obviously if we were developing a real application we’d use much more solid statistical analysis, and we might use a smaller sliding interval to do our reduction over. Spark is a powerful application, and its future is definitely looking good - it has a sound footing at UC Berkeley, and has just been accepted as an Apache Incubator project, so expect to see more about it as it becomes a real alternative to Hadoop.