Apache Spark
SQL DBs
Logs
Spark produces a lot of logs by default. Often, this makes it impossible to read the whole grader feedback. Consequently, we suggest you to tune the log level of Spark like so:
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Apache Bittop
- bigtop.apache.org Bigtop is an Apache Foundation project for Infrastructure Engineers and Data Scientists looking for comprehensive packaging, testing, and configuration of the leading open source big data components.
Building Spark
SBT
- launch included sbt and package
$ ./build/sbt
(wait for sbt to load)> package
Note: Maven profiles and variables can be set to control the SBT build, e.g../build/sbt -Pyarn -Phadoop-2.3 package
Maven
- configure Maven
export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
Note: For Java 8 and above this step is not required. If using build/mvn with no MAVEN_OPTS set, the script will automate this for you- make runnable distribution
./dev/make-distribution.sh
in the project root directory
Deploy Environments
Apache Mesos
Hadoop NextGen (YARN)
Amazon AWS EMR and EC2 for Spark and Zeppelin
- Amazon EMR Developer Guide: Apache Spark
-
outdated: github: amplab: spark-ec2
- Amazon EMR Release Guide: Create a Cluster With Spark
Configuration
General Configuration
- Cluster name
spark-cluster-1
- Launch mode
Cluster
Software configuration
- Vendor
Amazon
- Log URI
s3://aws-logs-058644585154-eu-central-1/elasticmapreduce/
- Release
emr-5.0.0
- Applications
Spark: Spark 2.0.0 on Hadoop 2.7.2 YARN with Ganglia 3.7.2 and Zeppelin 0.6.1
Hardware configuration
- Instance type
m3.xlarge
- Number of instances
3
(1 master and 2 core nodes)
Security and access
- EC2 key pair
ec2-eu-central-1
- Permissions
Default
- EMR role
EMR_DefaultRole
- EC2 instance profile
EMR_EC2_DefaultRole
Cluster Information
- Subnet ID
subnet-eb9e5783
- After successful start, log in to cluster
ssh -i [path-to-keypair-file]/ec2-eu-central-1.pem hadoop@ec2-52-59-141-166.eu-central-1.compute.amazonaws.com
Data Sources
- Apache Avro is a data serialization system
XML
Apache Hive
The Apache Hive data warehouse software facilitates reading, writing, and managing large datasets residing in distributed storage using SQL. Structure can be projected onto data already in storage. A command line tool and JDBC driver are provided to connect users to Hive.
Apache HBase
Use Apache HBase™ when you need random, realtime read/write access to your Big Data. This project’s goal is the hosting of very large tables – billions of rows X millions of columns – atop clusters of commodity hardware. Apache HBase is an open-source, distributed, versioned, non-relational database modeled after Google’s Bigtable: A Distributed Storage System for Structured Data by Chang et al. Just as Bigtable leverages the distributed data storage provided by the Google File System, Apache HBase provides Bigtable-like capabilities on top of Hadoop and HDFS.
JSON
flattening
Kafka
Cassandra
The Apache Cassandra database is the right choice when you need scalability and high availability without compromising performance. Linear scalability and proven fault-tolerance on commodity hardware or cloud infrastructure make it the perfect platform for mission-critical data.Cassandra’s support for replicating across multiple datacenters is best-in-class, providing lower latency for your users and the peace of mind of knowing that you can survive regional outages.
Amazon S3
Amazon Redshift
- github: databricks: spark-redshift
- github: databricks: spark-redshift: tutorial
- databricks.com: Introducing Redshift Data Source for Spark
Tachyon
Tachyon is a memory-centric distributed storage system enabling reliable data sharing at memory-speed across cluster frameworks.
Applications
Flintrock
A command-line tool for launching Apache Spark clusters
Livy
An Open Source REST Service for Apache Spark
Sparkling Water
- use Spark 1.6; will be coming out for Spark 2.0
- use spark-shell or sparkling-shell, start with
--class water.SparklingWaterDriver
import org.apache.spark.h2o._
val hc = H2OContext.getOrCreate(sc)
Elasticsearch
Apache Lucene is a high performance, full-featured Information Retrieval library, written in Java. Elasticsearch uses Lucene internally to build its state of the art distributed search and analytics capabilities.
APIs
SparkR
R interface to Apache Spark
Examples
- spark.apache.org: SparkR (R on Spark)
- rpubs.com: tcosta: SparkR 1.5 MLlib Logistic Regression Example
- zeppelinhub: SparkR, MLlib and SparkSQL for predictions over the NYC flight data
Many data scientists love R
- Open source
- highly dynamic
- interactive environment
- rich ecosystem of packages
- powerful visualization infrasturcture
- data frames make data manipulation convenient
- taught by many schools to stats and computing students
Performance Limitations of R
- R language: R’s dynamic design imposes restriction on optimization
- R runtime: single threaded, everything has to fit in memory
What is SparkR?
- convenient interoperability betwen R and Spark DataFrames
- Spark: distributed robust processing
SQL Context
interface to Spark functionality in R
- sparkR DataFrames are implemented on top of SparkSQL tables
- all DataFrame operations go through a SQL optimizer (catalyst)
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
- from now on, no “Spark Context (sc)” needed any more
IO
Reading and writing to storage (JVM <> Storage)
sparkDF <- read.df(sqlContext, path = "...", source = "csv") # json, parquet
write.df(sparkDF, source = "json") # parquet
- no operation between R and the JVM (materialization)
moving data betweeen R and JVM
collect()
: transports datacreateDataFrame()
: ships to all the drivers, ending up with distributed object
Caching
persist(sparkDF, storageLevel)
cache(sparkDF)
equivalent topersist(sparkDF, "MEMORY_ONLY")
cacheTable(sqlContext, "table_name")
subsequent actions will be executed on the representation of data in memory
DataFrame API
similar behaviour of SparkR DataFrames and R data.frames
- sparkDF$newCol <- sparkDF$col +1
- subsetDF <- sparkDF[, c(“date”, “type”)]
- recentData <- subset(sparkDF$date == “2015-100-24”)
- firstRow <- sparkDF[[1, ]]
- names(subsetDF) <- c(“Date”, “Type”)
- dim(recentData)
- head(collect(count(group_by(subsetDF, “Date”))))
Community
IBM
Configuration
- add
$SPARK_HOME
to environment variables in~/.profile
export SPARK_HOME="/home/xps13/spark/spark-1.4.1-bin-hadoop2.6"
Folder contents
README.md
- Contains short instructions for getting started with Spark.
- bin
- Contains executable files that can be used to interact with Spark in various ways (e.g., the Spark shell, which we will cover later in this chapter).
- core, streaming, python, …
- Contains the source code of major components of the Spark project.
- examples
- Contains some helpful Spark standalone jobs that you can look at and run to learn about the Spark API.
Databricks
- Databricks documentation docs.databricks.com
Learning Resources
Quick Start
- navigate to folder
$ cd /home/xps13/Dropbox/Programming/Scala/Spark/quick-start
use spark-submit to run your application
sbt clean package
$SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/scala-2.10/simple-project_2.10-1.0.jar
Learning Spark: Lightning-Fast Big Data Analysis
using spark shell (p.9f.)
- start scala shell
$ cd /home/xps13/spark/spark-1.4.1-bin-hadoop2.6
or$ cd $SPARK_HOME
$ bin/spark-shell
- to exit the shell, press
Ctrl-D
- to shut down Spark, you can either call the
stop()
method on your Spark‐Context - or simply exit the application (e.g., with
System.exit(0)
orsys.exit()
)
Example 2-2. Scala line count
val lines = sc.textFile("README.md") // Create an RDD called lines
// lines: spark.RDD[String] = MappedRDD[...]
lines.count() // Count the number of items in this RDD
// Long = 127
lines.first() // First item in this RDD, i.e. first line of README.md
// String = # Apache Spark
Example 2-5. Scala filtering example
val lines = sc.textFile("README.md") // textFile() method creates an RDD called lines
// lines: spark.RDD[String] = MappedRDD[...]
val pythonLines = lines.filter(line => line.contains("Python"))
// pythonLines: spark.RDD[String] = FilteredRDD[...]
pythonLines.first()
// String = ## Interactive Python Shell
Example 2-8. Initializing Spark in Scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)
2-11 to 2-14. Building Stand-alone applications
2-11. Word count Scala application
// Create a Scala Spark Context.
val conf = new SparkConf().setAppName("wordCount")
val sc = new SparkContext(conf)
// Load our input data.
val input = sc.textFile(inputFile)
// Split it up into words.
val words = input.flatMap(line => line.split(" "))
// Transform into pairs and count.
val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y}
// Save the word count back out to a text file, causing evaluation.
counts.saveAsTextFile(outputFile)
2-12. sbt build file
name := "learning-spark-mini-example"
version := "0.0.1"
scalaVersion := "2.10.4"
// additional libraries
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.2.0" % "provided"
)
2-14. Scala build and run
- navigate to folder and remove “wordcounts” (results of application launched using
spark-submit
) $ cd /home/xps13/Dropbox/Programming/Scala/Spark/learning-spark/mini-complete-example
$ rm -r wordcounts
sbt clean package
$SPARK_HOME/bin/spark-submit \
--class com.oreilly.learningsparkexamples.mini.scala.WordCount \
./target/scala-2.10/learning-spark-mini-example_2.10-0.0.1.jar \
./README.md ./wordcounts
RDDs
transformed RDDs are computed lazily, i.e. only when used in an action
Example 3-4. Persisting an RDD in memory
// copied from example 2-5.
val lines = sc.textFile("README.md") // textFile() method creates an RDD called lines
// lines: spark.RDD[String] = MappedRDD[...]
val pythonLines = lines.filter(line => line.contains("Python"))
// pythonLines: spark.RDD[String] = FilteredRDD[...]
pythonLines.persist
// pythonLines.type = MapPartitionsRDD[2] at filter at <console>:23
pythonLines.count()
// Long = 3
pythonLines.first()
// String = high-level APIs in Scala, Java, and Python, and an optimized engine that
Example 3-6. parallelize()
method in Scala
val lines = sc.parallelize(List("pandas", "i like pandas"))
Example 3-14. union()
transformation
// val inputRDD = sc.textFile("log.txt")
val inputRDD = sc.textFile("/home/xps13/Dropbox/Programming/Scala/Spark/learning-spark/files/fake_logs/log1.log")
val errorsRDD = inputRDD.filter(line => line.contains("error"))
val warningsRDD = inputRDD.filter(line => line.contains("warning"))
val badLinesRDD = errorsRDD.union(warningsRDD)
// badLinesRDD: org.apache.spark.rdd.RDD[String] = UnionRDD[8] at union at <console>:27
Example 3-16. Scala error count using actions
println("Input had " + badLinesRDD.count() + " concerning lines")
// Input had 1 concerning lines
println("Here are 10 examples:")
badLinesRDD.take(10).foreach(println)
// 71.19.157.174 - - [24/Sep/2014:22:26:12 +0000] "GET /error HTTP/1.1" 404 505 "-" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36"
Example Scala collect()
to retrieve the entire RDD
val getRDD = inputRDD.filter(line => line.contains("GET"))
getRDD.collect()
// Array[String] = Array(66.249.69.97 - - [24/Sep/2014:22:25:44 +0000] "GET /071300/242153 HTTP/1.1" 404 514 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)", 71.19.157.174 - - [24/Sep/2014:22:26:12 +0000] "GET /error HTTP/1.1" 404 505 "-" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36", 71.19.157.174 - - [24/Sep/2014:22:26:12 +0000] "GET /favicon.ico HTTP/1.1" 200 1713 "-" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36", 71.19.157.174 - - [24/Sep/2014:22:26:37 +0000] "GET / HTTP/1.1" 200 18785 "-" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36", 71.19.157.174 - - [24/Sep/2014:22...
Although transformations are lazy, you can force Spark to execute them at any time by running an action, such as count(). This is an easy way to test out just part of your program.
Example 3-21. Scala function passing
// As the RDD class is not automatically imported therefore we have to import it explicitly
import org.apache.spark.rdd.RDD
class SearchFunctions(val query: String) {
def isMatch(s: String): Boolean = {
s.contains(query)
}
def getMatchesNoReference(rdd: RDD[String]): RDD[Array[String]] = {
// Safe: extract just the field we need into a local variable
val query_ = this.query
rdd.map(x => x.split(query_))
}
}
// defined class SearchFunctions
// val test1 = new SearchFunctions("a test").isMatch("a test")
// val lines = sc.parallelize(List("pandas", "i like pandas"))
// val test2 = new SearchFunctions("pandas").getMatchesNoReference(lines)
// test2.collect()
Example 3-27. Scala squaring the values in an RDD
val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x * x)
println(result.collect().mkString(","))
Example 3-30. flatMap() in Scala, splitting lines into multiple words
val lines = sc.parallelize(List("hello world", "hi"))
val words = lines.flatMap(line => line.split(" "))
words.first() // returns "hello"
Basic RDD transformations (see table p41f.)
map()
flatmap()
filter()
distinct()
sample()
reduce()
: return new element from two existing elements, e.g. perform aggregationsfold()
: requires initial zero value of the type to be returnedaggregate()
: return can be of different type as RDD working on; requires initial zero valuecollect()
: return whole RDDtake(n)
: returnn
elements from RDD (minimizing number of accessed partitions, therefore biased)top(n)
: extract topn
elements (if order is important)takeSample(withReplacement, num, seed)
: take a sample of data either with or without replacementforeach()
: perform computations on each element in the RDD without bringing it back locally
Two-RDD transformations
distinct()
union()
: may contain duplicatesintersection()
: performance worse than union because checks for uniquenesssubstract()
: same asintersection()
performs a shuffle over the networkcartesian(other)
: very expensive for large RDDs
Example 3-33. reduce()
in Scala
val rdd = sc.parallelize(List(1, 2, 3, 3))
val sum = rdd.reduce((x, y) => x + y)
// println(sum)
// sum: Int = 9
Example 3-36. aggregate() in Scala
val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.aggregate((0, 0))( // set sum and counter to zero
(acc, value) => (acc._1 + value, acc._2 + 1), // add value to sum, add one to counter
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) // combine results (sum, counter) from two processes (?)
val avg = result._1 / result._2.toDouble // calculate average as sum over numeric count
Implicit Conversions
- Scaladocs for the RDD class http://bit.ly/1KiC7uO
- SparkContext object’s ScalaDoc http://bit.ly/1Bc4fNt
These implicits turn an RDD into various wrapper classes, such as DoubleRDDFunctions (for RDDs of numeric data) and PairRDDFunctions (for key/value pairs), to expose additional functions such as mean() and variance().
Persistence (Caching)
Example 3-39. Double execution in Scala
import org.apache.spark.storage.StorageLevel
val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x*x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))
unpersist()
manually remove RDD from the cache
Key/value RDDs
Example 4-2. Creating a pair RDD using the first word as the key in Scala
val lines = sc.textFile("README.md")
val pairs = lines.map(x => (x.split(" ")(0), x))
// val rdd = sc.parallelize(List((1, 2), (3, 4), (5, 6)))
Spark Packages
- github: drubbo: SparkGIS: adds GIS functionalities to SparkSQL
- github: databricks: spark-redshift: Spark and Redshift integration
- github: databricks: spark-sbt-package: SBT plugin that simplifies the development process of Spark Packages and their use in applications
- github: zinniasystems: spark-ml-class
- github: harsha2010: magellan: Geospatial Analytics Using Spark
Apache
- github: apache: Spark SQL
- spark.apache.org: Spark SQL, DataFrames and Datasets Guide
-
Java 8
- databricks: spark-with-java-8
Spark Summit
- 27-29 October 2015
- Spark Summit Europe 2015
- Conference Agenda
- Conference Agenda: PDF version
- Conference Location
- Beurs Van Berlage
Damrak 243
Beursplein 1-3
1012 ZJ Amsterdam - Wifi
- SSID: SparkSummit
Password: europe15
Tutorial: Data Science
- Wikipedia: ETL
- Wikipedia: EDA (Scala)
- Data Science with Spark
offline copy at
~/Dropbox/GitHub/revel.js/ml-slides/index.html
Talks
- Michael Armbrust (Databricks): Spark DataFrames: Simple and Fast Analysis of Structured Data
- Wednesday, October 28
11:00 - 11:30
Effectenbeurszaal - Ram Sriharsha (Hortonworks): Magellan: Geospatial Analytics on Spark
- Wednesday, October 28
11:35 - 12:05
Effectenbeurszaal - Joseph Bradley (Databricks): Combining the Strengths of MLlib, scikit-learn, and R
- Wednesday, October 28
12:10 - 12:40
Graanbeurszaal - Hossein Falaki (Databricks): Enabling exploratory data science with Spark and R
- Thursday, October 29
11:35 - 12:05
Graanbeurszaal - Gene Pang (Tachyon Nexus, Inc): Production Spark and Tachyon Use Cases
- Thursday, October 29
16:35 - 17:05
Effectenbeurszaal
Networking Events
- Spark Meetup
- Tuesday, October 27th, 6pm at the Beurs
- Attendee Reception
- Wednesday, October 28th, from 6-8pm
- Spark R Meetup
- Wednesday, October 28th 6:30pm at the Beurs
databricks
Libraries
Official
Spark SQL: module for working with structured data
Spark Streaming: makes it easy to build scalable fault-tolerant streaming applications
MLlib: scalable machine learning library
GraphX: API for graphs and graph-parallel computation
Parquet files
Hadoop Installation
- download from apache.org: hadoop
NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[xps13@xps13 sparkDemo]$ hadoop
bash: hadoop: command not found...
Install package 'hadoop-common' to provide command 'hadoop'? [N/y] y
...
Documentation
Tutorials
Local
Amazon AWS
Links
- Awesome List: Spark
- Spark Notebook
- cloudera: Why Apache Spark is a Crossover Hit for Data Scientists
- databricks.com: ML Pipelines: A New High-Level API for MLlib
- gitbook.com: Databricks Spark Reference Applications
- Spark RDD API examples, Zhen He, La Trobe University
edX
- BerkeleyX: CS190.1x: Scalable Machine Learning
- BerkeleyX: CS100.1x Introduction to Big Data with Apache Spark
Books
- Sandy Ryza et al. - Advanced Analytics with Spark: Patterns for Learning from Data at Scale
- Nick Pentreath - Machine Learning with Spark - Tackle Big Data with Powerful Spark Machine Learning Algorithms (2015)
- github: robertzk: Machine Learning with Spark
- Holden Karau - Fast Data Processing With Spark (2013)
- Spark in Action: GitHub Repo
- Databricks Apache Spark Publications