Apr 5, 2017

Setting up Spark Streaming - Part I

Adam Činčura
Adam Činčura
Setting up Spark Streaming - Part I

Analysing customer usage data on a streaming basis promised Showmax a significant improvement compared to a cumbersome batch approach. Setting this up using Spark Streaming, however, has proven far more challenging than we expected.

At Showmax we’re always looking at ways to improve the customer experience. To do this, we need data on viewing behaviour - data such as view duration, time, customer location, and devices used. Until recently, the analysis of this data at Showmax meant aggregating logs into one spot and then mapping these logs into useful reports.

The numbers are pretty big - we’re talking about 700 gigabytes of logs per day, growing by 100 gigabytes each month. This means the batch approach is cumbersome at best. Even just moving the data can take 4-12 hours, which is complicated further by massive variability in the size of the data load between weekday and weekend viewing. On top of this, the requests for reports from the analytics team were increasing in both scope and frequency.

Post image

The logical solution was to move from a batch to streaming approach. According to Spark Streaming Introduction:
“Spark Streaming brings Apache Spark’s language-integrated API to stream processing, letting you write streaming jobs the same way you write batch jobs. It supports Java, Scala and Python.”

So far, so good. We already use Spark, and our data transformation logic is written in Spark using its SQL extension. From the article published on Databricks I learned that I could use all my current logic in the streaming version.

Given the code re-use, my assumption was this would take around three days to complete. In reality, it took me more than a month, and the point of this post is to give some insight into my somewhat painful learning process.

Post image

We were already using RabbitMQ to transport the logs so this was the datasource planned for the Spark Streaming application. Since there is no official support for RabbitMQ in Spark Streaming I looked for a 3rd party library and came across Stratio/spark-rabbitmq (ver 0.3.0) which apparently had all the necessary features. I used the library in combination with Spark streaming 1.6.2 (as the library was missing support for for Spark 2.0 in October 2016). All credit to Stratio for their good work - I found it pretty easy to configure and the data was delivered to Spark with no issues.

Post image

But once it hit our production environment, things didn’t go to plan. The Spark Streaming application ran out of memory within 12 hours. My initial diagnosis was that I’d misconfigured Spark Streaming from my side. I spent almost two weeks tuning the parameters, which resulted in minor success (12 hours run-time was increased to three days), but clearly it was still not production-ready.

I then rewrote my code in case this was the source of the memory leak, unfortunately with no success. After doing more research on memory leakage and Spark Streaming, and finding very little commentary, I began to wonder if I was chasing the wrong issue.

After four weeks of work and still no solution in sight, I have to admit I was starting to regret taking this piece of work on. I had one last place to look, which was Stratio’s library for RabbitMQ connection.

There was no logged issue describing this problem on gitHub, and I was pretty desperate, so I decided to write my own implementation of the connector.

This was surprisingly simple (kudos to Spark for the prepared support of Custom receivers ). Just 100 lines of Java code were sufficient for our needs. The implementation took me four hours. Here is the snippet of the key receiver method. All the other instrumentation was done by Spark out of the box.

protected void receive() throws InterruptedException {
  try {
    // create connection to the queue
    while (!isStopped() && !Thread.currentThread().isInterrupted()) {
      // get message from the queue here
      // Store the received message to Spark
      store(new String(message.getBody()));

      // ack the message in RabbitMQ here
  // Close the connection to queue  
  } catch (IOException | TimeoutException ex) {
    // Restart the receiver in case of error
    restart(message, ex);

In the end this was a frustratingly simple solution. My own implementation of the connector allowed us to update to the newest version of Spark and take advantage of its newest features, and the memory issues were solved.

Conclusion: Do look a gift horse in the mouth!

Our transition from regularly executed Spark batch jobs to this Spark streaming solution, which promises to bring us near realtime analysis of user behaviour, turned out to be far more challenging than originally expected. It took me two weeks just to solve the memory issues and the whole development took over a month. The reality was quite shocking given my rosy forecast of three days for complete implementation and deployment.

Of course, things didn’t quite end here. The memory issues were solved but performance issues showed up. During the peak hours we were unable to process the batches fast enough, which resulted in infinitely growing queue of unprocessed batches. It still wasn’t production ready application.

The moral of the story? The software development sharing culture makes so much sense. Rather than individually solving the same problem over and over, borrowing and sharing is far more efficient. There is, however, a danger of becoming blinkered and forgetting that sometimes it’s faster to develop your own solution. Have a good look at what you borrow before you use it.

They say you shouldn’t look a gift horse in the mouth. But the truth is you’d better have a good look the animal you’re riding. Make sure it’s able to take you where you want to go.

In my next blog I’ll explain how we tuned the database, repartitioned streams and made a few other simple changes which gave us huge performance improvements.

Note: Stratio included support for Spark 2.0 in December 2016 and they have fixed all bunch of bugs at the same time, which would solve the issues we had. But in the meantime we already had our own solution.

Share article via: