SQL DBs

Logs

Spark produces a lot of logs by default. Often, this makes it impossible to read the whole grader feedback. Consequently, we suggest you to tune the log level of Spark like so:

import org.apache.log4j.{Level, Logger}
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)

Apache Bittop

  • bigtop.apache.org Bigtop is an Apache Foundation project for Infrastructure Engineers and Data Scientists looking for comprehensive packaging, testing, and configuration of the leading open source big data components.

Building Spark

SBT

launch included sbt and package
$ ./build/sbt (wait for sbt to load) > package
Note: Maven profiles and variables can be set to control the SBT build, e.g. ./build/sbt -Pyarn -Phadoop-2.3 package

Maven

configure Maven
export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
Note: For Java 8 and above this step is not required. If using build/mvn with no MAVEN_OPTS set, the script will automate this for you
make runnable distribution
./dev/make-distribution.sh in the project root directory

Deploy Environments

Apache Mesos

Hadoop NextGen (YARN)

Amazon AWS EMR and EC2 for Spark and Zeppelin

Configuration

General Configuration
Cluster name
spark-cluster-1
Launch mode
Cluster
Software configuration
Vendor
Amazon
Log URI
s3://aws-logs-058644585154-eu-central-1/elasticmapreduce/
Release
emr-5.0.0
Applications
Spark: Spark 2.0.0 on Hadoop 2.7.2 YARN with Ganglia 3.7.2 and Zeppelin 0.6.1
Hardware configuration
Instance type
m3.xlarge
Number of instances
3 (1 master and 2 core nodes)
Security and access
EC2 key pair
ec2-eu-central-1
Permissions
Default
EMR role
EMR_DefaultRole
EC2 instance profile
EMR_EC2_DefaultRole

Cluster Information

Subnet ID
subnet-eb9e5783
After successful start, log in to cluster
ssh -i [path-to-keypair-file]/ec2-eu-central-1.pem hadoop@ec2-52-59-141-166.eu-central-1.compute.amazonaws.com

Data Sources

XML

Apache Hive

The Apache Hive data warehouse software facilitates reading, writing, and managing large datasets residing in distributed storage using SQL. Structure can be projected onto data already in storage. A command line tool and JDBC driver are provided to connect users to Hive.

Apache HBase

Use Apache HBase™ when you need random, realtime read/write access to your Big Data. This project’s goal is the hosting of very large tables – billions of rows X millions of columns – atop clusters of commodity hardware. Apache HBase is an open-source, distributed, versioned, non-relational database modeled after Google’s Bigtable: A Distributed Storage System for Structured Data by Chang et al. Just as Bigtable leverages the distributed data storage provided by the Google File System, Apache HBase provides Bigtable-like capabilities on top of Hadoop and HDFS.

JSON

flattening

Kafka

Cassandra

The Apache Cassandra database is the right choice when you need scalability and high availability without compromising performance. Linear scalability and proven fault-tolerance on commodity hardware or cloud infrastructure make it the perfect platform for mission-critical data.Cassandra’s support for replicating across multiple datacenters is best-in-class, providing lower latency for your users and the peace of mind of knowing that you can survive regional outages.

Amazon S3

databricks-s3-load-comext-comtrade.png

Amazon Redshift

Tachyon

Tachyon is a memory-centric distributed storage system enabling reliable data sharing at memory-speed across cluster frameworks.

Applications

Flintrock

A command-line tool for launching Apache Spark clusters

Livy

An Open Source REST Service for Apache Spark

Sparkling Water

  • use Spark 1.6; will be coming out for Spark 2.0
  • use spark-shell or sparkling-shell, start with --class water.SparklingWaterDriver
import org.apache.spark.h2o._
val hc = H2OContext.getOrCreate(sc)

Elasticsearch

Apache Lucene is a high performance, full-featured Information Retrieval library, written in Java. Elasticsearch uses Lucene internally to build its state of the art distributed search and analytics capabilities.

APIs

SparkR

R interface to Apache Spark

Examples

Many data scientists love R

  • Open source
  • highly dynamic
  • interactive environment
  • rich ecosystem of packages
  • powerful visualization infrasturcture
  • data frames make data manipulation convenient
  • taught by many schools to stats and computing students

Performance Limitations of R

  • R language: R’s dynamic design imposes restriction on optimization
  • R runtime: single threaded, everything has to fit in memory

What is SparkR?

  • convenient interoperability betwen R and Spark DataFrames
  • Spark: distributed robust processing

SQL Context

interface to Spark functionality in R

  • sparkR DataFrames are implemented on top of SparkSQL tables
  • all DataFrame operations go through a SQL optimizer (catalyst)
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
  • from now on, no “Spark Context (sc)” needed any more

IO

Reading and writing to storage (JVM <> Storage)

sparkDF <- read.df(sqlContext, path = "...", source = "csv") # json, parquet
write.df(sparkDF, source = "json") # parquet
  • no operation between R and the JVM (materialization)

moving data betweeen R and JVM

  • collect(): transports data
  • createDataFrame(): ships to all the drivers, ending up with distributed object

Caching

  • persist(sparkDF, storageLevel)
  • cache(sparkDF) equivalent to persist(sparkDF, "MEMORY_ONLY")
  • cacheTable(sqlContext, "table_name")

subsequent actions will be executed on the representation of data in memory

DataFrame API

similar behaviour of SparkR DataFrames and R data.frames

  • sparkDF$newCol <- sparkDF$col +1
  • subsetDF <- sparkDF[, c(“date”, “type”)]
  • recentData <- subset(sparkDF$date == “2015-100-24”)
  • firstRow <- sparkDF[[1, ]]
  • names(subsetDF) <- c(“Date”, “Type”)
  • dim(recentData)
  • head(collect(count(group_by(subsetDF, “Date”))))

Community

IBM

Configuration

add $SPARK_HOME to environment variables in ~/.profile
export SPARK_HOME="/home/xps13/spark/spark-1.4.1-bin-hadoop2.6"

Folder contents

README.md
Contains short instructions for getting started with Spark.
bin
Contains executable files that can be used to interact with Spark in various ways (e.g., the Spark shell, which we will cover later in this chapter).
core, streaming, python, …
Contains the source code of major components of the Spark project.
examples
Contains some helpful Spark standalone jobs that you can look at and run to learn about the Spark API.

Databricks

Learning Resources

Quick Start

navigate to folder
$ cd /home/xps13/Dropbox/Programming/Scala/Spark/quick-start

use spark-submit to run your application

sbt clean package

$SPARK_HOME/bin/spark-submit \
--class "SimpleApp" \
--master local[4] \
target/scala-2.10/simple-project_2.10-1.0.jar

Learning Spark: Lightning-Fast Big Data Analysis

using spark shell (p.9f.)

start scala shell
$ cd /home/xps13/spark/spark-1.4.1-bin-hadoop2.6 or $ cd $SPARK_HOME $ bin/spark-shell
  • to exit the shell, press Ctrl-D
  • to shut down Spark, you can either call the stop() method on your Spark‐Context
  • or simply exit the application (e.g., with System.exit(0) or sys.exit())

Example 2-2. Scala line count

val lines = sc.textFile("README.md") // Create an RDD called lines
// lines: spark.RDD[String] = MappedRDD[...]

lines.count() // Count the number of items in this RDD
// Long = 127

lines.first() // First item in this RDD, i.e. first line of README.md
// String = # Apache Spark

Example 2-5. Scala filtering example

val lines = sc.textFile("README.md") // textFile() method creates an RDD called lines
// lines: spark.RDD[String] = MappedRDD[...]

val pythonLines = lines.filter(line => line.contains("Python"))
// pythonLines: spark.RDD[String] = FilteredRDD[...]

pythonLines.first()
//  String = ## Interactive Python Shell

Example 2-8. Initializing Spark in Scala

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

val conf = new SparkConf().setMaster("local").setAppName("My App")
val sc = new SparkContext(conf)

2-11 to 2-14. Building Stand-alone applications

2-11. Word count Scala application

// Create a Scala Spark Context.
val conf = new SparkConf().setAppName("wordCount")
val sc = new SparkContext(conf)

// Load our input data.
val input = sc.textFile(inputFile)

// Split it up into words.
val words = input.flatMap(line => line.split(" "))

// Transform into pairs and count.
val counts = words.map(word => (word, 1)).reduceByKey{case (x, y) => x + y}

// Save the word count back out to a text file, causing evaluation.
counts.saveAsTextFile(outputFile)

2-12. sbt build file

name := "learning-spark-mini-example"

version := "0.0.1"

scalaVersion := "2.10.4"

// additional libraries
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.2.0" % "provided"
)

2-14. Scala build and run

navigate to folder and remove “wordcounts” (results of application launched using spark-submit)
$ cd /home/xps13/Dropbox/Programming/Scala/Spark/learning-spark/mini-complete-example
$ rm -r wordcounts
sbt clean package
$SPARK_HOME/bin/spark-submit \
--class com.oreilly.learningsparkexamples.mini.scala.WordCount \
./target/scala-2.10/learning-spark-mini-example_2.10-0.0.1.jar \
./README.md ./wordcounts

RDDs

transformed RDDs are computed lazily, i.e. only when used in an action

Example 3-4. Persisting an RDD in memory

// copied from example 2-5.
val lines = sc.textFile("README.md") // textFile() method creates an RDD called lines
// lines: spark.RDD[String] = MappedRDD[...]

val pythonLines = lines.filter(line => line.contains("Python"))
// pythonLines: spark.RDD[String] = FilteredRDD[...]
pythonLines.persist
// pythonLines.type = MapPartitionsRDD[2] at filter at <console>:23

pythonLines.count()
// Long = 3

pythonLines.first()
// String = high-level APIs in Scala, Java, and Python, and an optimized engine that

Example 3-6. parallelize() method in Scala

val lines = sc.parallelize(List("pandas", "i like pandas"))

Example 3-14. union() transformation

// val inputRDD = sc.textFile("log.txt")
val inputRDD = sc.textFile("/home/xps13/Dropbox/Programming/Scala/Spark/learning-spark/files/fake_logs/log1.log")
val errorsRDD = inputRDD.filter(line => line.contains("error"))
val warningsRDD = inputRDD.filter(line => line.contains("warning"))
val badLinesRDD = errorsRDD.union(warningsRDD)
//  badLinesRDD: org.apache.spark.rdd.RDD[String] = UnionRDD[8] at union at <console>:27

Example 3-16. Scala error count using actions

println("Input had " + badLinesRDD.count() + " concerning lines")
//  Input had 1 concerning lines

println("Here are 10 examples:")
badLinesRDD.take(10).foreach(println)
//  71.19.157.174 - - [24/Sep/2014:22:26:12 +0000] "GET /error HTTP/1.1" 404 505 "-" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36"

Example Scala collect() to retrieve the entire RDD

val getRDD = inputRDD.filter(line => line.contains("GET"))
getRDD.collect()
// Array[String] = Array(66.249.69.97 - - [24/Sep/2014:22:25:44 +0000] "GET /071300/242153 HTTP/1.1" 404 514 "-" "Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)", 71.19.157.174 - - [24/Sep/2014:22:26:12 +0000] "GET /error HTTP/1.1" 404 505 "-" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36", 71.19.157.174 - - [24/Sep/2014:22:26:12 +0000] "GET /favicon.ico HTTP/1.1" 200 1713 "-" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36", 71.19.157.174 - - [24/Sep/2014:22:26:37 +0000] "GET / HTTP/1.1" 200 18785 "-" "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36", 71.19.157.174 - - [24/Sep/2014:22...

Although transformations are lazy, you can force Spark to execute them at any time by running an action, such as count(). This is an easy way to test out just part of your program.

Example 3-21. Scala function passing

// As the RDD class is not automatically imported therefore we have to import it explicitly
import org.apache.spark.rdd.RDD 

class SearchFunctions(val query: String) {
  def isMatch(s: String): Boolean = {
    s.contains(query)
  }
  def getMatchesNoReference(rdd: RDD[String]): RDD[Array[String]] = {
  // Safe: extract just the field we need into a local variable
    val query_ = this.query
    rdd.map(x => x.split(query_))
  }
}
// defined class SearchFunctions
// val test1 = new SearchFunctions("a test").isMatch("a test")
// val lines = sc.parallelize(List("pandas", "i like pandas"))
// val test2 = new SearchFunctions("pandas").getMatchesNoReference(lines)
// test2.collect()

Example 3-27. Scala squaring the values in an RDD

val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x * x)
println(result.collect().mkString(","))

Example 3-30. flatMap() in Scala, splitting lines into multiple words

val lines = sc.parallelize(List("hello world", "hi"))
val words = lines.flatMap(line => line.split(" "))
words.first() // returns "hello"

Basic RDD transformations (see table p41f.)

  • map()
  • flatmap()
  • filter()
  • distinct()
  • sample()
  • reduce(): return new element from two existing elements, e.g. perform aggregations
  • fold(): requires initial zero value of the type to be returned
  • aggregate(): return can be of different type as RDD working on; requires initial zero value
  • collect(): return whole RDD
  • take(n): return n elements from RDD (minimizing number of accessed partitions, therefore biased)
  • top(n): extract top n elements (if order is important)
  • takeSample(withReplacement, num, seed): take a sample of data either with or without replacement
  • foreach(): perform computations on each element in the RDD without bringing it back locally

Two-RDD transformations

  • distinct()
  • union(): may contain duplicates
  • intersection(): performance worse than union because checks for uniqueness
  • substract(): same as intersection() performs a shuffle over the network
  • cartesian(other): very expensive for large RDDs

Example 3-33. reduce() in Scala

val rdd = sc.parallelize(List(1, 2, 3, 3))
val sum = rdd.reduce((x, y) => x + y)
// println(sum)
// sum: Int = 9

Example 3-36. aggregate() in Scala

val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.aggregate((0, 0))( // set sum and counter to zero
  (acc, value) => (acc._1 + value, acc._2 + 1), // add value to sum, add one to counter
  (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2)) // combine results (sum, counter) from two processes (?)
val avg = result._1 / result._2.toDouble // calculate average as sum over numeric count

Implicit Conversions

These implicits turn an RDD into various wrapper classes, such as DoubleRDDFunctions (for RDDs of numeric data) and PairRDDFunctions (for key/value pairs), to expose additional functions such as mean() and variance().

Persistence (Caching)

Example 3-39. Double execution in Scala

import org.apache.spark.storage.StorageLevel
val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x*x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))
  • unpersist() manually remove RDD from the cache

Key/value RDDs

Example 4-2. Creating a pair RDD using the first word as the key in Scala

val lines = sc.textFile("README.md")
val pairs = lines.map(x => (x.split(" ")(0), x))
// val rdd = sc.parallelize(List((1, 2), (3, 4), (5, 6)))

Spark Packages

Apache

Spark Summit

Conference Location
Beurs Van Berlage
Damrak 243
Beursplein 1-3
1012 ZJ Amsterdam
Wifi
SSID: SparkSummit
Password: europe15

Tutorial: Data Science

Talks

Michael Armbrust (Databricks): Spark DataFrames: Simple and Fast Analysis of Structured Data
Wednesday, October 28
11:00 - 11:30
Effectenbeurszaal
Ram Sriharsha (Hortonworks): Magellan: Geospatial Analytics on Spark
Wednesday, October 28
11:35 - 12:05
Effectenbeurszaal
Joseph Bradley (Databricks): Combining the Strengths of MLlib, scikit-learn, and R
Wednesday, October 28
12:10 - 12:40
Graanbeurszaal
Hossein Falaki (Databricks): Enabling exploratory data science with Spark and R
Thursday, October 29
11:35 - 12:05
Graanbeurszaal
Gene Pang (Tachyon Nexus, Inc): Production Spark and Tachyon Use Cases
Thursday, October 29
16:35 - 17:05
Effectenbeurszaal

Networking Events

Spark Meetup
Tuesday, October 27th, 6pm at the Beurs
Attendee Reception
Wednesday, October 28th, from 6-8pm
Spark R Meetup
Wednesday, October 28th 6:30pm at the Beurs

databricks

Libraries

Official

Spark SQL: module for working with structured data

Spark Streaming: makes it easy to build scalable fault-tolerant streaming applications

MLlib: scalable machine learning library

GraphX: API for graphs and graph-parallel computation

Parquet files

Hadoop Installation

NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

[xps13@xps13 sparkDemo]$ hadoop
bash: hadoop: command not found...
Install package 'hadoop-common' to provide command 'hadoop'? [N/y] y
...

Documentation

Tutorials

Local

Amazon AWS

edX

Books

Certification



Published

04 August 2015

Category

datascience