Getting started with Apache Spark

Earlier this year I attended GOTO Conference which had a special track on distributed computing. One of the talks described the evolution of big data processing frameworks. It was really interesting when a presenter mentioned that Hadoop’s MapReduce is a first generation network, Apache Storm and Apache Tez are second generation, where as Apache Spark is generations beyond. Recently, I also got the opportunity to work on Spark as part of client work for building and running machine learning models on sensor data.

I will briefly go over Apache Spark, an open source cluster computing engine which has become really popular in the big data world over the past few years, and how to get started on it with a simple example. Let’s take a peek at Apache Spark.

About Spark

Spark is a large-scale data processing and cluster computing framework built around speed and ease of use which can run standalone, on premise or in the cloud. It supports batch, iterative and near real-time stream processing. It is developed at UC Berkeley, and it’s been open sourced since 2010 as an Apache project. It is written in Scala to run on JVM, and also has high level API support in Scala, Java, Python and R.

Spark’s ecosystem consists of Spark Core API and primarily four other libraries: Spark SQL, Streaming, MLlib (Machine Learning), GraphX (Graph Computation) which run on top of Spark Core. Any typical Spark application requires the Spark Core and one of these libraries based on the application’s use case.

Figure-1

Spark is comparable to other big data and MapReduce technologies like Hadoop. The benchmarks claim that it’s 10 to 100 times faster compared to Hadoop’s MapReduce, due to reduced disk space usage and its in-memory caching between computations. So it can be mostly used as an alternative to Hadoop’s MapReduce.

The Spark API lets developers build complex, multi-stage data pipelines using Directed Acyclic Graph (DAG) pattern. It supports in-memory data sharing across DAG’s which allows different jobs to work with the same data.

Let us look at Spark’s architecture details on how that is achieved.

Architecture

Spark uses a master/slave architecture and its built to run in a cluster. The key components are:

  • Driver – an independent process that runs in JVM. All spark application(s) run on the driver and contains an entry point to spark execution environment which is called the ‘SparkContext’. The driver also has different types of schedulers which schedule tasks that run on nodes in the cluster.
  • Cluster Manager (Master) – manages the resource allocation across applications running in cluster mode. Spark currently supports Standalone, Mesos, Yarn as cluster managers.
  • Worker – a node that runs a job in a cluster.
  • Executor – a process launched within the worker and contains multiple tasks that belong to a single Spark application.
  • Task – a bundle of work (data + computation) that’s executed in the executor(s).
  • Job – multiple tasks and each task gets broken down to be able to run in stages prior to execution.

Screen Shot 2016-07-24 at 1.27.17 PMFigure-2

One of the primary data structures built into the Spark’s core is its Resilient Distributed DataSets (RDDs). RDD is a partitioned data set which represents computations. The term ‘Resilient’ refers to being tolerant to, and having the ability to, recover from node failures. The term ‘Distributed’ refers to having large data sets live on multiple nodes in a cluster.

The main properties of RDD are:

  • Immutable
  • Lazy evaluation
  • Cacheable

The operations that can be performed on RDDs are classified into:

  • Transformations – functions that accept an RDD as input, apply some computation and produce a new RDD as an output (Eg: map, flatMap, filter, reduceByKey, join, etc.). The input RDD is never modified as RDDs are immutable. These operations are lazy and never executed immediately. They are only executed upon invocation of an ‘action’ operation.
  • Actions – operations that produce computed values as output and not RDD’s. The actions trigger execution of RDD transformations happened prior to it and can cause the data movement to the driver from the executors. (Eg: collect, count, fold, foreach, etc..)

Screen Shot 2016-07-24 at 1.47.35 PMFigure-3

Typically any Spark application starts off by creating an RDD from input data sources such as a text file, csv, S3, HDFS, Cassandra, Kafka and others. It is important to note that an RDD Lineage Graph (parent RDDs of an RDD) is created when multiple transformations are applied. Spark also allows to cache and checkpoint the lineage. As RDDs are immutable and Spark tracks lineage, it is resilient and can re-create the source RDD’s on node failures.

Spark also breaks up the processing of RDD operations into multiple smaller tasks, to execute a job on large data sets in parallel. Each task is executed by an executor. Prior to execution, Spark computes the task’s closure. The closure is those variables and methods which must be visible for the executor to perform its computations on the RDD. This closure is serialized and sent to each executor.

Now that we have an understanding of Spark’s architecture, let us look at an actual example on how to create and submit a Spark job.

Example

Before we jump into the example, the first step is to make sure that Spark is downloaded and running locally. To download and install Spark, follow the instructions on the site (https://spark.apache.org/downloads.html). I have installed spark-1.6.2-bin-hadoop2.4.tgz and did extract it locally.

Consider a use case wherein you want to group all words in a data set by the first letter and count their occurrences. A simple Spark program is going to look as follows where we read lines from a file, split words, group them by first letter and later count them.

Screen Shot 2016-07-24 at 7.07.47 PMFigure-4

Note that the SparkContext is the entry point to any Spark program. It contains SparkConf that holds Spark related configuration parameters. The local[2] on line-5 tells Spark to run the program on 2 cores. When you run the same job in a standalone cluster the configuration parameters can also be passed in as additional command line arguments, which I will show you in a bit.

The transformation operations flatMap on line-10 splits the words in the source file, map on line-11 creates pairs of words with key as the first letter of the word. The groupByKey() on line-12 is a special transformation where data gets re-organized across workers to achieve co-location by key prior to aggregation. The mapValues on line-13 makes sure it maps the key to the word count per key which is the first letter. The final operation on line-14 which is the collect() is the action which essentially triggers the execution of the transformations above it.

Note that for better performance we always need to think about how to reduce the shuffles between worker nodes. Running the above program on a standalone cluster locally gives you a peek at what’s going on behind the scenes in Spark. Understanding the internals will always help to build better applications.

So let us setup a standalone cluster locally. Following are steps to create a cluster with one master and two worker nodes (1 core, 1G memory).

      • Go to ~/<spark-dir>/conf and create/update spark-env.sh.
      • Set the following properties in spark-env.sh
      • SPARK_WORKER_INSTANCES=2
        SPARK_WORKER_CORES=1
        SPARK_WORKER_MEMORY=1G
      • Go to ~/<spark-dir>/sbin and run the ./start-master.sh to start Spark master.
      • Open browser and go to localhost:8080 to validate that master setup is successful.
      • Go to ~/<spark-dir>/sbin and run the ./start-slave.sh <master-url>.
      • Refresh browser pointing to localhost:8080 and you should see the 2 workers.

Figure-5

Once the cluster is setup locally, now you can submit the spark job as a jar to the spark master using spark-submit program.

      • Remove the master config from line 5 in Figure-4 and create a jar file with WordCount.scala in it.
      • Go to ~/spark/bin and run this on the command line and run
      • ./spark-submit –verbose –conf spark.master=spark://ch-nb-1051:7077 –conf spark.driver.memory=1G –conf spark.executor.memory=1G –class WordCount ~/WordCount.jar

The application should be visible on the Spark UI under running applications. Within a few seconds it will be displayed under completed applications once its execution is complete.

Figure-6

When you click on the application id (see Figure-6) and Application UI in the following page, you can look at the details of the job (Figure 7).

Figure-7

Typically Spark prepares a Directed Acyclic Graph (DAG) with multiple stages prior to actual execution of the job. Also, it breaks down the RDD operations which require data to be re-organized (shuffled) between multiple workers into a new stage. So a job execution plan has multiple ‘stages’ and the Spark driver program manages and schedules necessary tasks (data + computation) that run in every stage.

Going back to our application since groupByKey() operation in our example requires the words with the same first letter be co-located, Spark evaluates that there is going to be data distribution possible as part of the execution plan, and it splits our job into 2 stages.

So when you expand ‘DAG Visualization’ you will notice the 2 stages.

Figure-8

All tasks specific to a stage have to be complete before execution of the next stage begins. So Stage 1 containing groupByKey operation starts only after all tasks in Stage 0 are complete.

Also notice that 5 tasks are created for processing a file close to 150 M, and as per Figure-6 & Figure-7 the entire job took about 37 secs to complete. The groupByKey operation is a bit expensive and is not recommended. It forces full data set shuffles between nodes in the cluster before performing aggregations. So I did switch groupByKey  with reduceByKey as the later operation performs a pre-aggregation in every node, which results in less data shuffle across nodes.

Screen Shot 2016-07-24 at 10.37.44 PM

Figure-9

When I did re-run the job by replacing with reduceByKey operation the job execution time did come down to ~8 secs. Notice below in Figure-10 that Stage 1 only took 0.1 secs and the shuffle size is only about 4.8 k compared to 61.6 M in Figure-7

Screen Shot 2016-07-24 at 10.26.46 PM

Figure-10

 

CONCLUSION

I hope this gives a bit of insight into Spark.

Spark has been significant in the big data world for a while now. It is being used actively to solve big data batch and stream analysis problems across different business verticals. It is continuing to grow and has already become a primary component of the big data ecosystem. So if you haven’t checked out Spark yet, I would definitely suggest you folks to take a look.

 

References

http://spark.apache.org
https://en.wikipedia.org/wiki/Apache_Spark
https://databricks.com/spark/about
http://people.eecs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf
Related videos by Spark founders and others