Spark:
--------------------
`Spark Context` => Entry point for Spark Operations
`RDD` => Resilient Distributed DataSets
`RDD` features:
----------------------
1. Immutability
2. Lazy Evaluation
3. Cacheable
4. Type Infer
`RDD` operations:
----------------------
1. Transformations
(input is RDD) => (ouput is RDD)
2. Actions
(input is RDD) => (output is Result)
Examples on RDD:
-----------------------
list <- {1,2,3,4}
Transformations:
----------------------
f(x) <- {x + 1}
f(list) <- {2,3,4,5}
f(x) <- {x * x}
f(list) <- {1,4,9,16}
Actions:
----------------------
sum(list) <- 10
min(list) <- 1
max(list) <- 4
Spark Libraries:
------------------
1. Spark SQL
2. Spark Streaming
3. Spark MLLib
4. Spark GraphX
Command Line approach:
---------------------------
Scala => spark-shell
Python => pyspark
R => sparkR
Spark context available as sc
SQL context available as sqlContext
How to Create RDD from Spark Context:
----------------------------------------
We can create RDD from Spark Context, using
1. From Collections (List, Seq, Set, ..)
2. From Data Sets (text, csv, tsv, ...)
Creating RDD from Collections:
-------------------------------
val list = List(1,2,3,4)
val rdd = sc.parallelize(list)
-------------------------------------------
scala> val list = List(1,2,3,4)
list: List[Int] = List(1, 2, 3, 4)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:29
scala> rdd.getNumPartitions
res1: Int = 4
scala> val rdd = sc.parallelize(list, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:29
scala> rdd.getNumPartitions
res2: Int = 2
-------------------------------------------
scala> rdd.collect()
res3: Array[Int] = Array(1, 2, 3, 4)
scala> rdd.collect().foreach(println)
1
2
3
4
scala> rdd.foreach(println)
3
4
1
2
scala> rdd.foreach(println)
1
2
3
4
-------------------------------------------
scala> rdd.collect
res10: Array[Int] = Array(1, 2, 3, 4)
scala> rdd.map(x => x + 1)
res11: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at <console>:32
scala> rdd.map(x => x + 1).collect
res12: Array[Int] = Array(2, 3, 4, 5)
scala> rdd.map(x => x * x).collect
res13: Array[Int] = Array(1, 4, 9, 16)
-------------------------------------------
scala> rdd.collect
res14: Array[Int] = Array(1, 2, 3, 4)
scala> rdd.sum
res15: Double = 10.0
scala> rdd.min
res16: Int = 1
scala> rdd.max
res17: Int = 4
scala> rdd.count
res18: Long = 4
-------------------------------------------
Creating RDD from DataSets:
-------------------------------
val file = "file:///home/orienit/work/input/demoinput"
val rdd = sc.textFile(file)
-------------------------------------------
scala> val file = "file:///home/orienit/work/input/demoinput"
file: String = file:///home/orienit/work/input/demoinput
scala> val rdd = sc.textFile(file)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at textFile at <console>:29
scala> rdd.getNumPartitions
res19: Int = 2
scala> val rdd = sc.textFile(file, 1)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at textFile at <console>:29
scala> rdd.getNumPartitions
res20: Int = 1
-------------------------------------------
Word Count Job in Spark
-------------------------------------------
val file = "file:///home/orienit/work/input/demoinput"
val output = "file:///home/orienit/work/output/demooutput"
// creating a rdd from text file
val rdd = sc.textFile(file)
// get all the words from each line
val words = rdd.flatMap(line => line.split(" "))
// create tuple for each word
val tuple = words.map(word => (word, 1))
// final word & count from tuples
val wordcount = tuple.reduceByKey((a,b) => a + b)
// sort the data based on word
val sorted = wordcount.sortBy(word => word)
// save the data into output folder
sorted.saveAsTextFile(output)
-------------------------------------------
scala> rdd.collect
res21: Array[String] = Array(I am going, to hyd, I am learning, hadoop course)
scala> rdd.flatMap(line => line.split(" ")).collect
res22: Array[String] = Array(I, am, going, to, hyd, I, am, learning, hadoop, course)
scala> rdd.flatMap(line => line.split(" ")).map(word => (word, 1)).collect
res23: Array[(String, Int)] = Array((I,1), (am,1), (going,1), (to,1), (hyd,1), (I,1), (am,1), (learning,1), (hadoop,1), (course,1))
scala> rdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b).collect
res24: Array[(String, Int)] = Array((learning,1), (hadoop,1), (am,2), (hyd,1), (I,2), (to,1), (going,1), (course,1))
scala> rdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b).sortBy(word => word)
res25: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[21] at sortBy at <console>:32
scala> rdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b).sortBy(word => word).collect
res26: Array[(String, Int)] = Array((I,2), (am,2), (course,1), (going,1), (hadoop,1), (hyd,1), (learning,1), (to,1))
scala> rdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b).sortBy(word => word).collect.foreach(println)
(I,2)
(am,2)
(course,1)
(going,1)
(hadoop,1)
(hyd,1)
(learning,1)
(to,1)
-------------------------------------------
-------------------------------------------
Grep Job in Spark
-------------------------------------------
val file = "file:///home/orienit/work/input/demoinput"
val output = "file:///home/orienit/work/output/demooutput-1"
// creating a rdd from text file
val rdd = sc.textFile(file)
// filter the data based on pattern
val filterData = rdd.filter(line => line.contains("am"))
// save the data into output folder
filterData.saveAsTextFile(output)
-------------------------------------------
val df1 = sqlContext.sql("SELECT * from kalyan.src");
val df2 = sqlContext.sql("SELECT * from student")
df1.show
df2.show
-------------------------------------------
df1.createOrReplaceTempView("tbl1")
df2.createOrReplaceTempView("tbl2")
sqlContext.sql("SELECT * from tbl1").show()
sqlContext.sql("SELECT * from tbl2").show()
sqlContext.sql("SELECT tbl1.*, tbl2.* from tbl1 join tbl2 on tbl1.key = tbl2.id").show()
-------------------------------------------