
Spark produces a lot of logs by default. Often, this makes it impossible to read the whole grader feedback. Consequently, we suggest you to tune the log level of Spark like so:

import org.apache.log4j.{Level, Logger}

  • bigtop.apache.org Bigtop is an Apache Foundation project for Infrastructure Engineers and Data Scientists looking for comprehensive packaging, testing, and configuration of the leading open source big data components.

Building Spark


launch included sbt and package
$ ./build/sbt (wait for sbt to load) > package
Note: Maven profiles and variables can be set to control the SBT build, e.g. ./build/sbt -Pyarn -Phadoop-2.3 package


configure Maven
export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
Note: For Java 8 and above this step is not required. If using build/mvn with no MAVEN_OPTS set, the script will automate this for you
make runnable distribution
./dev/make-distribution.sh in the project root directory

Deploy Environments

Apache Mesos

Hadoop NextGen (YARN)

Amazon AWS EMR and EC2 for Spark and Zeppelin


General Configuration
Cluster name
Launch mode
Software configuration
Spark: Spark 2.0.0 on Hadoop 2.7.2 YARN with Ganglia 3.7.2 and Zeppelin 0.6.1
Hardware configuration
Instance type
Number of instances
3 (1 master and 2 core nodes)
Security and access
EC2 key pair
EMR role
EC2 instance profile

Cluster Information

Subnet ID
After successful start, log in to cluster
ssh -i [path-to-keypair-file]/ec2-eu-central-1.pem hadoop@ec2-52-59-141-166.eu-central-1.compute.amazonaws.com

Data Sources


Apache Hive

The Apache Hive data warehouse software facilitates reading, writing, and managing large datasets residing in distributed storage using SQL. Structure can be projected onto data already in storage. A command line tool and JDBC driver are provided to connect users to Hive.

Apache HBase

Use Apache HBase™ when you need random, realtime read/write access to your Big Data. This project’s goal is the hosting of very large tables – billions of rows X millions of columns – atop clusters of commodity hardware. Apache HBase is an open-source, distributed, versioned, non-relational database modeled after Google’s Bigtable: A Distributed Storage System for Structured Data by Chang et al. Just as Bigtable leverages the distributed data storage provided by the Google File System, Apache HBase provides Bigtable-like capabilities on top of Hadoop and HDFS.





The Apache Cassandra database is the right choice when you need scalability and high availability without compromising performance. Linear scalability and proven fault-tolerance on commodity hardware or cloud infrastructure make it the perfect platform for mission-critical data.Cassandra’s support for replicating across multiple datacenters is best-in-class, providing lower latency for your users and the peace of mind of knowing that you can survive regional outages.

Amazon S3


Amazon Redshift


Tachyon is a memory-centric distributed storage system enabling reliable data sharing at memory-speed across cluster frameworks.



A command-line tool for launching Apache Spark clusters


An Open Source REST Service for Apache Spark

Sparkling Water

  • use Spark 1.6; will be coming out for Spark 2.0
  • use spark-shell or sparkling-shell, start with --class water.SparklingWaterDriver
import org.apache.spark.h2o._
val hc = H2OContext.getOrCreate(sc)


Apache Lucene is a high performance, full-featured Information Retrieval library, written in Java. Elasticsearch uses Lucene internally to build its state of the art distributed search and analytics capabilities.



R interface to Apache Spark


Many data scientists love R

  • Open source
  • highly dynamic
  • interactive environment
  • rich ecosystem of packages
  • powerful visualization infrasturcture
  • data frames make data manipulation convenient
  • taught by many schools to stats and computing students

Performance Limitations of R

  • R language: R’s dynamic design imposes restriction on optimization
  • R runtime: single threaded, everything has to fit in memory

What is SparkR?

  • convenient interoperability betwen R and Spark DataFrames
  • Spark: distributed robust processing

SQL Context

interface to Spark functionality in R

  • sparkR DataFrames are implemented on top of SparkSQL tables
  • all DataFrame operations go through a SQL optimizer (catalyst)
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
  • from now on, no “Spark Context (sc)” needed any more


Reading and writing to storage (JVM <> Storage)

sparkDF <- read.df(sqlContext, path = "...", source = "csv") # json, parquet
write.df(sparkDF, source = "json") # parquet
  • no operation between R and the JVM (materialization)

moving data betweeen R and JVM

  • collect(): transports data
  • createDataFrame(): ships to all the drivers, ending up with distributed object


  • persist(sparkDF, storageLevel)
  • cache(sparkDF) equivalent to persist(sparkDF, "MEMORY_ONLY")
  • cacheTable(sqlContext, "table_name")

subsequent actions will be executed on the representation of data in memory

DataFrame API

similar behaviour of SparkR DataFrames and R data.frames

  • sparkDF$newCol <- sparkDF$col +1
  • subsetDF <- sparkDF[, c(“date”, “type”)]
  • recentData <- subset(sparkDF$date == “2015-100-24”)
  • firstRow <- sparkDF[[1, ]]
  • names(subsetDF) <- c(“Date”, “Type”)
  • dim(recentData)
  • head(collect(count(group_by(subsetDF, “Date”))))




add $SPARK_HOME to environment variables in ~/.profile
export SPARK_HOME="/home/xps13/spark/spark-1.4.1-bin-hadoop2.6"

Folder contents

Contains short instructions for getting started with Spark.
Contains executable files that can be used to interact with Spark in various ways (e.g., the Spark shell, which we will cover later in this chapter).
core, streaming, python, …
Contains the source code of major components of the Spark project.
Contains some helpful Spark standalone jobs that you can look at and run to learn about the Spark API.


Learning Resources

Quick Start

navigate to folder
$ cd /home/xps13/Dropbox/Programming/Scala/Spark/quick-start

use spark-submit to run your application

sbt clean package

$SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \

Learning Spark: Lightning-Fast Big Data Analysis

using spark shell (p.9f.)

start scala shell
$ cd /home/xps13/spark/spark-1.4.1-bin-hadoop2.6 or $ cd $SPARK_HOME $ bin/spark-shell
  • to exit the shell, press Ctrl-D
  • to shut down Spark, you can either call the stop() method on your Spark‐Context
  • or simply exit the application (e.g., with System.exit(0) or sys.exit())

Example 2-2. Scala line count

val lines = sc.textFile("README.md") // Create an RDD called lines
// lines: spark.RDD[String] = MappedRDD[...]

lines.count() // Count the number of items in this RDD
// Long = 127

lines.first() // First item in this RDD, i.e. first line of README.md
// String = # Apache Spark

Example 2-5. Scala filtering example

val lines = sc.textFile("README.md") // textFile() method creates an RDD called lines
// lines: spark.RDD[String] = MappedRDD[...]

val pythonLines = lines.filter(line => line.contains("Python"))
// pythonLines: spark.RDD[String] = FilteredRDD[...]

//  String = ## Interactive Python Shell

Example 2-8. Initializing Spark in Scala

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)

2-11 to 2-14. Building Stand-alone applications

2-11. Word count Scala application

// Create a Scala Spark Context.
val conf = new SparkConf().setAppName("wordCount")
val sc = new SparkContext(conf)

// Load our input data.
val input = sc.textFile(inputFile)

// Split it up into words.
val words = input.flatMap(line => line.split(" "))

// Transform into pairs and count.
val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y}

// Save the word count back out to a text file, causing evaluation.

2-12. sbt build file

name := "learning-spark-mini-example"

version := "0.0.1"

scalaVersion := "2.10.4"

// additional libraries
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.2.0" % "provided"

2-14. Scala build and run

navigate to folder and remove “wordcounts” (results of application launched using spark-submit)
$ cd /home/xps13/Dropbox/Programming/Scala/Spark/learning-spark/mini-complete-example
$ rm -r wordcounts
sbt clean package
$SPARK_HOME/bin/spark-submit \
--class com.oreilly.learningsparkexamples.mini.scala.WordCount \
./target/scala-2.10/learning-spark-mini-example_2.10-0.0.1.jar \
./README.md ./wordcounts


transformed RDDs are computed lazily, i.e. only when used in an action

Example 3-4. Persisting an RDD in memory

// copied from example 2-5.
val lines = sc.textFile("README.md") // textFile() method creates an RDD called lines
// lines: spark.RDD[String] = MappedRDD[...]

val pythonLines = lines.filter(line => line.contains("Python"))
// pythonLines: spark.RDD[String] = FilteredRDD[...]
// pythonLines.type = MapPartitionsRDD[2] at filter at <console>:23

// Long = 3

// String = high-level APIs in Scala, Java, and Python, and an optimized engine that

Example 3-6. parallelize() method in Scala

val lines = sc.parallelize(List("pandas", "i like pandas"))

Example 3-14. union() transformation

// val inputRDD = sc.textFile("log.txt")
val inputRDD = sc.textFile("/home/xps13/Dropbox/Programming/Scala/Spark/learning-spark/files/fake_logs/log1.log")
val errorsRDD = inputRDD.filter(line => line.contains("error"))
val warningsRDD = inputRDD.filter(line => line.contains("warning"))
val badLinesRDD = errorsRDD.union(warningsRDD)
//  badLinesRDD: org.apache.spark.rdd.RDD[String] = UnionRDD[8] at union at <console>:27

Example 3-16. Scala error count using actions

println("Input had " + badLinesRDD.count() + " concerning lines")
//  Input had 1 concerning lines

println("Here are 10 examples:")
// - - [24/Sep/2014:22:26:12 +0000] "GET /error HTTP/1.1" 404 505 "-" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36"

Example Scala collect() to retrieve the entire RDD

val getRDD = inputRDD.filter(line => line.contains("GET"))
// Array[String] = Array( - - [24/Sep/2014:22:25:44 +0000] "GET /071300/242153 HTTP/1.1" 404 514 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)", - - [24/Sep/2014:22:26:12 +0000] "GET /error HTTP/1.1" 404 505 "-" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36", - - [24/Sep/2014:22:26:12 +0000] "GET /favicon.ico HTTP/1.1" 200 1713 "-" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36", - - [24/Sep/2014:22:26:37 +0000] "GET / HTTP/1.1" 200 18785 "-" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36", - - [24/Sep/2014:22...

Although transformations are lazy, you can force Spark to execute them at any time by running an action, such as count(). This is an easy way to test out just part of your program.

Example 3-21. Scala function passing

// As the RDD class is not automatically imported therefore we have to import it explicitly
import org.apache.spark.rdd.RDD 

class SearchFunctions(val query: String) {
  def isMatch(s: String): Boolean = {
  def getMatchesNoReference(rdd: RDD[String]): RDD[Array[String]] = {
  // Safe: extract just the field we need into a local variable
    val query_ = this.query
    rdd.map(x => x.split(query_))
// defined class SearchFunctions
// val test1 = new SearchFunctions("a test").isMatch("a test")
// val lines = sc.parallelize(List("pandas", "i like pandas"))
// val test2 = new SearchFunctions("pandas").getMatchesNoReference(lines)
// test2.collect()

Example 3-27. Scala squaring the values in an RDD

val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x * x)

Example 3-30. flatMap() in Scala, splitting lines into multiple words

val lines = sc.parallelize(List("hello world", "hi"))
val words = lines.flatMap(line => line.split(" "))
words.first() // returns "hello"

Basic RDD transformations (see table p41f.)

  • map()
  • flatmap()
  • filter()
  • distinct()
  • sample()
  • reduce(): return new element from two existing elements, e.g. perform aggregations
  • fold(): requires initial zero value of the type to be returned
  • aggregate(): return can be of different type as RDD working on; requires initial zero value
  • collect(): return whole RDD
  • take(n): return n elements from RDD (minimizing number of accessed partitions, therefore biased)
  • top(n): extract top n elements (if order is important)
  • takeSample(withReplacement, num, seed): take a sample of data either with or without replacement
  • foreach(): perform computations on each element in the RDD without bringing it back locally

Two-RDD transformations

  • distinct()
  • union(): may contain duplicates
  • intersection(): performance worse than union because checks for uniqueness
  • substract(): same as intersection() performs a shuffle over the network
  • cartesian(other): very expensive for large RDDs

Example 3-33. reduce() in Scala

val rdd = sc.parallelize(List(1, 2, 3, 3))
val sum = rdd.reduce((x, y) => x + y)
// println(sum)
// sum: Int = 9

Example 3-36. aggregate() in Scala

val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.aggregate((0, 0))( // set sum and counter to zero
  (acc, value) => (acc._1 + value, acc._2 + 1), // add value to sum, add one to counter
  (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) // combine results (sum, counter) from two processes (?)
val avg = result._1 / result._2.toDouble // calculate average as sum over numeric count

Implicit Conversions

These implicits turn an RDD into various wrapper classes, such as DoubleRDDFunctions (for RDDs of numeric data) and PairRDDFunctions (for key/value pairs), to expose additional functions such as mean() and variance().

Persistence (Caching)

Example 3-39. Double execution in Scala

import org.apache.spark.storage.StorageLevel
val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x*x)
  • unpersist() manually remove RDD from the cache

Key/value RDDs

Example 4-2. Creating a pair RDD using the first word as the key in Scala

val lines = sc.textFile("README.md")
val pairs = lines.map(x => (x.split(" ")(0), x))
// val rdd = sc.parallelize(List((1, 2), (3, 4), (5, 6)))

Spark Packages


Spark SQL: module for working with structured data

Spark Streaming: makes it easy to build scalable fault-tolerant streaming applications

MLlib: scalable machine learning library

GraphX: API for graphs and graph-parallel computation

Parquet files

Hadoop Installation

NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[xps13@xps13 sparkDemo]$ hadoop
bash: hadoop: command not found...
Install package 'hadoop-common' to provide command 'hadoop'? [N/y] y




Amazon AWS





