Friday, 7 April 2017

SPARK BASICS Practice on 02 Apr 2017

`Spark` is meant for `In-Memory Distributed Computing`

`Spark` provides 4 libraries:
1. Spark SQL
2. Spark Streaming
3. Spark MLLib
4. Spark GraphX

`Spark Context` is the Entry point for any `Spark Operations`

`Resilient Distributed DataSets` => RDD

RDD features:
-------------------
1. immutability
2. lazy evaluation
3. cacheable
4. type infer

RDD operations:
-----------------
1. Transformations
<old rdd> ----> <new rdd>

2. Actions
<rdd> ---> <result>


Examples on RDD:
-------------------------
list <- {1,2,3,4,5}

1. Transformations:
---------------------
Ex1:
-----
f(x) <- {x + 1}

f(list) <- {2,3,4,5,6}


Ex2:
-----
f(x) <- {x * x}

f(list) <- {1,4,9,16,25}


2. Actions:
---------------------

sum(list) -> 15
min(list) -> 1
max(list) -> 5



How to Start the Spark:
-----------------------------
scala => spark-shell
python => pyspark
R => sparkR

Spark-1.x:
--------------------------------------
Spark context available as 'sc'
Spark Sql Context available as 'sqlContext'


Spark-2.x:
--------------------------------------
Spark context available as 'sc'
Spark session available as 'spark'


How to Create a RDD:
---------------------------------------
We can create RDD 2 ways
1. from collections (List, Seq, Set, ....)
2. from data sets (text, csv, tsv, json, ...)


1. from collections:
---------------------------------------
val list = List(1,2,3,4,5)

val rdd = sc.parallelize(list)

val rdd = sc.parallelize(list, 2)

Syntax:
-----------------------
val rdd = sc.parallelize(<collection object>, <no.of partitions>)


2. from datasets:
---------------------------------------
val file = "file:///home/orienit/work/input/demoinput"

val rdd = sc.textFile(file)

val rdd = sc.textFile(file, 1)


Syntax:
-----------------------
val rdd = sc.textFile(<file path>, <no.of partitions>)

------------------------------------------------------

scala> val list = List(1,2,3,4,5)
list: List[Int] = List(1, 2, 3, 4, 5)

scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> rdd.getNumPartitions
res0: Int = 4

scala> val rdd = sc.parallelize(list, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:26

scala> rdd.getNumPartitions
res1: Int = 2


------------------------------------------------------

scala> val file = "file:///home/orienit/work/input/demoinput"
file: String = file:///home/orienit/work/input/demoinput

scala> val rdd = sc.textFile(file)
rdd: org.apache.spark.rdd.RDD[String] = file:///home/orienit/work/input/demoinput MapPartitionsRDD[3] at textFile at <console>:26

scala> rdd.getNumPartitions
res2: Int = 2

scala> val rdd = sc.textFile(file, 1)
rdd: org.apache.spark.rdd.RDD[String] = file:///home/orienit/work/input/demoinput MapPartitionsRDD[5] at textFile at <console>:26

scala> rdd.getNumPartitions
res3: Int = 1

------------------------------------------------------
Examples on RDD
------------------------------------------------------
val list = List(1,2,3,4,5)

val rdd1 = sc.parallelize(list, 2)

------------------------------------------------------

val rdd2 = rdd1.map(x => x + 1)

val rdd3 = rdd1.map(x => x +* x)

------------------------------------------------------

scala> val list = List(1,2,3,4,5)
list: List[Int] = List(1, 2, 3, 4, 5)

scala> val rdd1 = sc.parallelize(list, 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:26

------------------------------------------------------

scala> val rdd2 = rdd1.map(x => x + 1)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at map at <console>:28

scala> rdd1.collect
res4: Array[Int] = Array(1, 2, 3, 4, 5)

scala> rdd2.collect
res5: Array[Int] = Array(2, 3, 4, 5, 6)


scala> val rdd3 = rdd1.map(x => x * x)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at map at <console>:28

scala> rdd3.collect
res6: Array[Int] = Array(1, 4, 9, 16, 25)

------------------------------------------------------

rdd1.min

rdd1.max

rdd1.sum


------------------------------------------------------
scala> rdd1.min
res7: Int = 1

scala> rdd1.max
res8: Int = 5

scala> rdd1.sum
res9: Double = 15.0

scala> rdd1.collect
res10: Array[Int] = Array(1, 2, 3, 4, 5)

scala> rdd1.count
res11: Long = 5

------------------------------------------------------
Word Count in Spark using Scala
------------------------------------------------------

val input = "file:///home/orienit/work/input/demoinput"

val output = "file:///home/orienit/work/output/spark-op"

val fileRdd = sc.textFile(input, 1)

val wordsRdd = fileRdd.flatMap(line => line.split(" "))

val tuplesRdd = wordsRdd.map(word => (word, 1))

val wordCountRdd = tuplesRdd.reduceByKey((a,b) => a + b)

wordCountRdd.saveAsTextFile(output)

------------------------------------------------------
Optimize the Code :
------------------------------------------------------

val input = "file:///home/orienit/work/input/demoinput"

val output = "file:///home/orienit/work/output/spark-op"

val fileRdd = sc.textFile(input, 1)

val wordCountRdd = fileRdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b)

wordCountRdd.saveAsTextFile(output)


------------------------------------------------------

scala> val input = "file:///home/orienit/work/input/demoinput"
input: String = file:///home/orienit/work/input/demoinput

scala> val output = "file:///home/orienit/work/output/spark-op"
output: String = file:///home/orienit/work/output/spark-op

scala> val fileRdd = sc.textFile(input, 1)
fileRdd: org.apache.spark.rdd.RDD[String] = file:///home/orienit/work/input/demoinput MapPartitionsRDD[11] at textFile at <console>:26

scala> fileRdd.collect
res12: Array[String] = Array(I am going, to hyd, I am learning, hadoop course)

scala> val wordsRdd = fileRdd.flatMap(line => line.split(" "))
wordsRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[12] at flatMap at <console>:28

scala> wordsRdd.collect
res13: Array[String] = Array(I, am, going, to, hyd, I, am, learning, hadoop, course)

scala> val tuplesRdd = wordsRdd.map(word => (word, 1))
tuplesRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[13] at map at <console>:30

scala> tuplesRdd.collect
res14: Array[(String, Int)] = Array((I,1), (am,1), (going,1), (to,1), (hyd,1), (I,1), (am,1), (learning,1), (hadoop,1), (course,1))

scala> val wordCountRdd = tuplesRdd.reduceByKey((a,b) => a + b)
wordCountRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[14] at reduceByKey at <console>:32

scala> wordCountRdd.collect
res15: Array[(String, Int)] = Array((learning,1), (hadoop,1), (am,2), (hyd,1), (I,2), (to,1), (going,1), (course,1))

scala> wordCountRdd.saveAsTextFile(output)




------------------------------------------------------
Grep Job in Spark using Scala:
------------------------------------------------------

val input = "file:///home/orienit/work/input/demoinput"

val output = "file:///home/orienit/work/output/spark-grep-op"

val fileRdd = sc.textFile(input, 1)

val grepRdd = fileRdd.filter(line => line.contains("am"))

grepRdd.saveAsTextFile(output)



------------------------------------------------------
Sed Job in Spark using Scala:
------------------------------------------------------

val input = "file:///home/orienit/work/input/demoinput"

val output = "file:///home/orienit/work/output/spark-sed-op"

val fileRdd = sc.textFile(input, 1)

val sedRdd = fileRdd.map(line => line.replaceAll("am", "xyz"))

sedRdd.saveAsTextFile(output)

------------------------------------------------------
Spark SQL:
------------------------------------------------------

hive> select * from kalyan.student;
scala> spark.sql("select * from kalyan.student").show

------------------------------------------------------

hive> select year, count(*) from kalyan.student group by year;
scala> spark.sql("select year, count(*) from kalyan.student group by year").show

------------------------------------------------------
Data Frames in Spark
------------------------------------------------------
val hiveDf = spark.sql("select * from kalyan.student")

hiveDf.show


hiveDf.registerTempTable("hivetbl")


val prop = new java.util.Properties
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","hadoop")

val jdbcDf = spark.read.jdbc("jdbc:mysql://localhost:3306/kalyan", "student", prop)

jdbcDf.show

jdbcDf.registerTempTable("jdbctbl")

------------------------------------------------------
scala> val hiveDf = spark.sql("select * from kalyan.student")
hiveDf: org.apache.spark.sql.DataFrame = [name: string, id: int ... 2 more fields]

scala> hiveDf.show
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|  arun|  1|   cse|   1|
| sunil|  2|   cse|   1|
|   raj|  3|   cse|   1|
|naveen|  4|   cse|   1|
| venki|  5|   cse|   2|
|prasad|  6|   cse|   2|
| sudha|  7|   cse|   2|
|  ravi|  1|  mech|   1|
|  raju|  2|  mech|   1|
|  roja|  3|  mech|   1|
|  anil|  4|  mech|   2|
|  rani|  5|  mech|   2|
|anvith|  6|  mech|   2|
| madhu|  7|  mech|   2|
|  arun|  1|    it|   3|
| sunil|  2|    it|   3|
|   raj|  3|    it|   3|
|naveen|  4|    it|   3|
| venki|  5|    it|   4|
|prasad|  6|    it|   4|
+------+---+------+----+
only showing top 20 rows

------------------------------------------------------

scala> val jdbcDf = spark.read.jdbc("jdbc:mysql://localhost:3306/kalyan", "student", prop)
jdbcDf: org.apache.spark.sql.DataFrame = [name: string, id: int ... 2 more fields]

scala> jdbcDf.show
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|  anil|  1| spark|2016|
|anvith|  5|hadoop|2015|
|   dev|  6|hadoop|2015|
|   raj|  3| spark|2016|
| sunil|  4|hadoop|2015|
|venkat|  2| spark|2016|
+------+---+------+----+

------------------------------------------------------

scala> hiveDf.registerTempTable("hivetbl")
warning: there was one deprecation warning; re-run with -deprecation for details

scala> jdbcDf.registerTempTable("jdbctbl")
warning: there was one deprecation warning; re-run with -deprecation for details

------------------------------------------------------

spark.sql("select hivetbl.*, jdbctbl.* from hivetbl join jdbctbl on hivetbl.name = jdbctbl.name").show

------------------------------------------------------

scala> spark.sql("select hivetbl.*, jdbctbl.* from hivetbl join jdbctbl on hivetbl.name = jdbctbl.name").show
+------+---+------+----+------+---+------+----+
|  name| id|course|year|  name| id|course|year|
+------+---+------+----+------+---+------+----+
|  anil|  4|   ece|   4|  anil|  1| spark|2016|
|  anil|  4|  mech|   2|  anil|  1| spark|2016|
|anvith|  6|   ece|   4|anvith|  5|hadoop|2015|
|anvith|  6|  mech|   2|anvith|  5|hadoop|2015|
|   raj|  3|    it|   3|   raj|  3| spark|2016|
|   raj|  3|   cse|   1|   raj|  3| spark|2016|
| sunil|  2|    it|   3| sunil|  4|hadoop|2015|
| sunil|  2|   cse|   1| sunil|  4|hadoop|2015|
+------+---+------+----+------+---+------+----+



------------------------------------------------------

val casDf = sqlContext.read.cassandraFormat("student", "kalyan").load()

casDf.show

------------------------------------------------------


scala> val casDf = sqlContext.read.cassandraFormat("student", "kalyan").load()
casDf: org.apache.spark.sql.DataFrame = [name: string, course: string ... 2 more fields]

scala> casDf.show
+------+------+---+----+
|  name|course| id|year|
+------+------+---+----+
|  anil| spark|  1|2016|
|   raj| spark|  3|2016|
|anvith|hadoop|  5|2015|
|   dev|hadoop|  6|2015|
| sunil|hadoop|  4|2015|
|venkat| spark|  2|2016|
|    kk|hadoop|  7|2015|
+------+------+---+----+

------------------------------------------------------
INSERT INTO kalyan.student(name, id, course, year) VALUES ('rajesh', 8, 'hadoop', 2017);

------------------------------------------------------

scala> casDf.show
+------+------+---+----+
|  name|course| id|year|
+------+------+---+----+
|  anil| spark|  1|2016|
|   raj| spark|  3|2016|
|anvith|hadoop|  5|2015|
|   dev|hadoop|  6|2015|
| sunil|hadoop|  4|2015|
|venkat| spark|  2|2016|
|    kk|hadoop|  7|2015|
|rajesh|hadoop|  8|2017|
+------+------+---+----+

------------------------------------------------------

cqlsh:kalyan> select year, count(*) from kalyan.student group by year;
SyntaxException: line 1:42 missing EOF at 'group' (...(*) from kalyan.student [group] by...)

------------------------------------------------------

casDf.registerTempTable("castbl")


spark.sql("select year, count(*) from castbl group by year").show

------------------------------------------------------

scala> casDf.registerTempTable("castbl")
warning: there was one deprecation warning; re-run with -deprecation for details


scala> spark.sql("select year, count(*) from castbl group by year").show 
+----+--------+
|year|count(1)|
+----+--------+
|2015|       4|
|2016|       3|
|2017|       1|
+----+--------+



------------------------------------------------------
spark.sql("select castbl.*, jdbctbl.* from castbl join jdbctbl on castbl.name = jdbctbl.name").show
------------------------------------------------------

scala> spark.sql("select castbl.*, jdbctbl.* from castbl join jdbctbl on castbl.name = jdbctbl.name").show
+------+------+---+----+------+---+------+----+
|  name|course| id|year|  name| id|course|year|
+------+------+---+----+------+---+------+----+
|  anil| spark|  1|2016|  anil|  1| spark|2016|
|anvith|hadoop|  5|2015|anvith|  5|hadoop|2015|
|   dev|hadoop|  6|2015|   dev|  6|hadoop|2015|
|   raj| spark|  3|2016|   raj|  3| spark|2016|
| sunil|hadoop|  4|2015| sunil|  4|hadoop|2015|
|venkat| spark|  2|2016|venkat|  2| spark|2016|
+------+------+---+----+------+---+------+----+


------------------------------------------------------
scala> casDf.toJSON.collect.foreach(println)
{"name":"anil","course":"spark","id":1,"year":2016}
{"name":"raj","course":"spark","id":3,"year":2016}
{"name":"anvith","course":"hadoop","id":5,"year":2015}
{"name":"dev","course":"hadoop","id":6,"year":2015}
{"name":"sunil","course":"hadoop","id":4,"year":2015}
{"name":"venkat","course":"spark","id":2,"year":2016}
{"name":"kk","course":"hadoop","id":7,"year":2015}
{"name":"rajesh","course":"hadoop","id":8,"year":2017}


scala> hiveDf.toJSON.collect.foreach(println)
{"name":"arun","id":1,"course":"cse","year":1}
{"name":"sunil","id":2,"course":"cse","year":1}
{"name":"raj","id":3,"course":"cse","year":1}
{"name":"naveen","id":4,"course":"cse","year":1}
{"name":"venki","id":5,"course":"cse","year":2}
{"name":"prasad","id":6,"course":"cse","year":2}
{"name":"sudha","id":7,"course":"cse","year":2}
{"name":"ravi","id":1,"course":"mech","year":1}
{"name":"raju","id":2,"course":"mech","year":1}
{"name":"roja","id":3,"course":"mech","year":1}
{"name":"anil","id":4,"course":"mech","year":2}
{"name":"rani","id":5,"course":"mech","year":2}
{"name":"anvith","id":6,"course":"mech","year":2}
{"name":"madhu","id":7,"course":"mech","year":2}


scala> jdbcDf.toJSON.collect.foreach(println)
{"name":"anil","id":1,"course":"spark","year":2016}
{"name":"anvith","id":5,"course":"hadoop","year":2015}
{"name":"dev","id":6,"course":"hadoop","year":2015}
{"name":"raj","id":3,"course":"spark","year":2016}
{"name":"sunil","id":4,"course":"hadoop","year":2015}
{"name":"venkat","id":2,"course":"spark","year":2016}

------------------------------------------------------

Share this article with your friends.
Related Posts Plugin for WordPress, Blogger...