Sunday, 19 February 2017

SPARK BASICS Practice on 05 Feb 2017

Spark:
--------------------
`Spark Context` => Entry point for Spark Operations

`RDD` => Resilient Distributed DataSets

`RDD` features:
----------------------
1. Immutability
2. Lazy Evaluation
3. Cacheable
4. Type Infer

`RDD` operations:
----------------------
1. Transformations

(input is RDD) => (ouput is RDD)


2. Actions

(input is RDD) => (output is Result)


Examples on RDD:
-----------------------
list <- {1,2,3,4}

Transformations:
----------------------
f(x) <- {x + 1}
f(list) <- {2,3,4,5}

f(x) <- {x * x}
f(list) <- {1,4,9,16}

Actions:
----------------------
sum(list) <- 10
min(list) <- 1
max(list) <- 4

Spark Libraries:
------------------
1. Spark SQL
2. Spark Streaming
3. Spark MLLib
4. Spark GraphX


Command Line approach:
---------------------------
Scala => spark-shell
Python => pyspark
R   => sparkR

Spark context available as sc
SQL context available as sqlContext


How to Create RDD from Spark Context:
----------------------------------------
We can create RDD from Spark Context, using
1. From Collections (List, Seq, Set, ..)
2. From Data Sets (text, csv, tsv, ...)


Creating RDD from Collections:
-------------------------------
val list = List(1,2,3,4)
val rdd = sc.parallelize(list)

-------------------------------------------

scala> val list = List(1,2,3,4)
list: List[Int] = List(1, 2, 3, 4)

scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:29

scala> rdd.getNumPartitions
res1: Int = 4

scala> val rdd = sc.parallelize(list, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:29

scala> rdd.getNumPartitions
res2: Int = 2

-------------------------------------------

scala> rdd.collect()
res3: Array[Int] = Array(1, 2, 3, 4)

scala> rdd.collect().foreach(println)
1
2
3
4

scala> rdd.foreach(println)
3
4
1
2

scala> rdd.foreach(println)
1
2
3
4

-------------------------------------------

scala> rdd.collect
res10: Array[Int] = Array(1, 2, 3, 4)

scala> rdd.map(x => x + 1)
res11: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at <console>:32

scala> rdd.map(x => x + 1).collect
res12: Array[Int] = Array(2, 3, 4, 5)

scala> rdd.map(x => x * x).collect
res13: Array[Int] = Array(1, 4, 9, 16)


-------------------------------------------

scala> rdd.collect
res14: Array[Int] = Array(1, 2, 3, 4)

scala> rdd.sum
res15: Double = 10.0

scala> rdd.min
res16: Int = 1

scala> rdd.max
res17: Int = 4

scala> rdd.count
res18: Long = 4

-------------------------------------------

Creating RDD from DataSets:
-------------------------------
val file = "file:///home/orienit/work/input/demoinput"

val rdd = sc.textFile(file)

-------------------------------------------

scala> val file = "file:///home/orienit/work/input/demoinput"
file: String = file:///home/orienit/work/input/demoinput

scala> val rdd = sc.textFile(file)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at textFile at <console>:29

scala> rdd.getNumPartitions
res19: Int = 2

scala> val rdd = sc.textFile(file, 1)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at textFile at <console>:29

scala> rdd.getNumPartitions
res20: Int = 1

-------------------------------------------
Word Count Job in Spark
-------------------------------------------
val file = "file:///home/orienit/work/input/demoinput"

val output = "file:///home/orienit/work/output/demooutput"

// creating a rdd from text file
val rdd = sc.textFile(file)

// get all the words from each line
val words = rdd.flatMap(line => line.split(" "))

// create tuple for each word
val tuple = words.map(word => (word, 1))

// final word & count from tuples
val wordcount = tuple.reduceByKey((a,b) => a + b)

// sort the data based on word
val sorted = wordcount.sortBy(word => word)

// save the data into output folder
sorted.saveAsTextFile(output)


-------------------------------------------

scala> rdd.collect
res21: Array[String] = Array(I am going, to hyd, I am learning, hadoop course)

scala> rdd.flatMap(line => line.split(" ")).collect
res22: Array[String] = Array(I, am, going, to, hyd, I, am, learning, hadoop, course)

scala> rdd.flatMap(line => line.split(" ")).map(word => (word, 1)).collect
res23: Array[(String, Int)] = Array((I,1), (am,1), (going,1), (to,1), (hyd,1), (I,1), (am,1), (learning,1), (hadoop,1), (course,1))

scala> rdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b).collect
res24: Array[(String, Int)] = Array((learning,1), (hadoop,1), (am,2), (hyd,1), (I,2), (to,1), (going,1), (course,1))

scala> rdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b).sortBy(word => word)
res25: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[21] at sortBy at <console>:32

scala> rdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b).sortBy(word => word).collect
res26: Array[(String, Int)] = Array((I,2), (am,2), (course,1), (going,1), (hadoop,1), (hyd,1), (learning,1), (to,1))


scala> rdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b).sortBy(word => word).collect.foreach(println)
(I,2)
(am,2)
(course,1)
(going,1)
(hadoop,1)
(hyd,1)
(learning,1)
(to,1)

-------------------------------------------


-------------------------------------------
Grep Job in Spark
-------------------------------------------
val file = "file:///home/orienit/work/input/demoinput"

val output = "file:///home/orienit/work/output/demooutput-1"

// creating a rdd from text file
val rdd = sc.textFile(file)

// filter the data based on pattern
val filterData = rdd.filter(line => line.contains("am"))

// save the data into output folder
filterData.saveAsTextFile(output)

-------------------------------------------

val df1 = sqlContext.sql("SELECT * from kalyan.src");

val df2 = sqlContext.sql("SELECT * from student")

df1.show

df2.show

-------------------------------------------

df1.createOrReplaceTempView("tbl1")

df2.createOrReplaceTempView("tbl2")


sqlContext.sql("SELECT * from tbl1").show()

sqlContext.sql("SELECT * from tbl2").show()

sqlContext.sql("SELECT tbl1.*, tbl2.* from tbl1 join tbl2 on tbl1.key = tbl2.id").show()


-------------------------------------------

Share this article with your friends.
Related Posts Plugin for WordPress, Blogger...