Friday 7 April 2017

SPARK BASICS Practice on 02 Apr 2017

`Spark` is meant for `In-Memory Distributed Computing`

`Spark` provides 4 libraries:
1. Spark SQL
2. Spark Streaming
3. Spark MLLib
4. Spark GraphX

`Spark Context` is the Entry point for any `Spark Operations`

`Resilient Distributed DataSets` => RDD

RDD features:
-------------------
1. immutability
2. lazy evaluation
3. cacheable
4. type infer

RDD operations:
-----------------
1. Transformations
<old rdd> ----> <new rdd>

2. Actions
<rdd> ---> <result>


Examples on RDD:
-------------------------
list <- {1,2,3,4,5}

1. Transformations:
---------------------
Ex1:
-----
f(x) <- {x + 1}

f(list) <- {2,3,4,5,6}


Ex2:
-----
f(x) <- {x * x}

f(list) <- {1,4,9,16,25}


2. Actions:
---------------------

sum(list) -> 15
min(list) -> 1
max(list) -> 5



How to Start the Spark:
-----------------------------
scala => spark-shell
python => pyspark
R => sparkR

Spark-1.x:
--------------------------------------
Spark context available as 'sc'
Spark Sql Context available as 'sqlContext'


Spark-2.x:
--------------------------------------
Spark context available as 'sc'
Spark session available as 'spark'


How to Create a RDD:
---------------------------------------
We can create RDD 2 ways
1. from collections (List, Seq, Set, ....)
2. from data sets (text, csv, tsv, json, ...)


1. from collections:
---------------------------------------
val list = List(1,2,3,4,5)

val rdd = sc.parallelize(list)

val rdd = sc.parallelize(list, 2)

Syntax:
-----------------------
val rdd = sc.parallelize(<collection object>, <no.of partitions>)


2. from datasets:
---------------------------------------
val file = "file:///home/orienit/work/input/demoinput"

val rdd = sc.textFile(file)

val rdd = sc.textFile(file, 1)


Syntax:
-----------------------
val rdd = sc.textFile(<file path>, <no.of partitions>)

------------------------------------------------------

scala> val list = List(1,2,3,4,5)
list: List[Int] = List(1, 2, 3, 4, 5)

scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> rdd.getNumPartitions
res0: Int = 4

scala> val rdd = sc.parallelize(list, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:26

scala> rdd.getNumPartitions
res1: Int = 2


------------------------------------------------------

scala> val file = "file:///home/orienit/work/input/demoinput"
file: String = file:///home/orienit/work/input/demoinput

scala> val rdd = sc.textFile(file)
rdd: org.apache.spark.rdd.RDD[String] = file:///home/orienit/work/input/demoinput MapPartitionsRDD[3] at textFile at <console>:26

scala> rdd.getNumPartitions
res2: Int = 2

scala> val rdd = sc.textFile(file, 1)
rdd: org.apache.spark.rdd.RDD[String] = file:///home/orienit/work/input/demoinput MapPartitionsRDD[5] at textFile at <console>:26

scala> rdd.getNumPartitions
res3: Int = 1

------------------------------------------------------
Examples on RDD
------------------------------------------------------
val list = List(1,2,3,4,5)

val rdd1 = sc.parallelize(list, 2)

------------------------------------------------------

val rdd2 = rdd1.map(x => x + 1)

val rdd3 = rdd1.map(x => x +* x)

------------------------------------------------------

scala> val list = List(1,2,3,4,5)
list: List[Int] = List(1, 2, 3, 4, 5)

scala> val rdd1 = sc.parallelize(list, 2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:26

------------------------------------------------------

scala> val rdd2 = rdd1.map(x => x + 1)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at map at <console>:28

scala> rdd1.collect
res4: Array[Int] = Array(1, 2, 3, 4, 5)

scala> rdd2.collect
res5: Array[Int] = Array(2, 3, 4, 5, 6)


scala> val rdd3 = rdd1.map(x => x * x)
rdd3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[8] at map at <console>:28

scala> rdd3.collect
res6: Array[Int] = Array(1, 4, 9, 16, 25)

------------------------------------------------------

rdd1.min

rdd1.max

rdd1.sum


------------------------------------------------------
scala> rdd1.min
res7: Int = 1

scala> rdd1.max
res8: Int = 5

scala> rdd1.sum
res9: Double = 15.0

scala> rdd1.collect
res10: Array[Int] = Array(1, 2, 3, 4, 5)

scala> rdd1.count
res11: Long = 5

------------------------------------------------------
Word Count in Spark using Scala
------------------------------------------------------

val input = "file:///home/orienit/work/input/demoinput"

val output = "file:///home/orienit/work/output/spark-op"

val fileRdd = sc.textFile(input, 1)

val wordsRdd = fileRdd.flatMap(line => line.split(" "))

val tuplesRdd = wordsRdd.map(word => (word, 1))

val wordCountRdd = tuplesRdd.reduceByKey((a,b) => a + b)

wordCountRdd.saveAsTextFile(output)

------------------------------------------------------
Optimize the Code :
------------------------------------------------------

val input = "file:///home/orienit/work/input/demoinput"

val output = "file:///home/orienit/work/output/spark-op"

val fileRdd = sc.textFile(input, 1)

val wordCountRdd = fileRdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b)

wordCountRdd.saveAsTextFile(output)


------------------------------------------------------

scala> val input = "file:///home/orienit/work/input/demoinput"
input: String = file:///home/orienit/work/input/demoinput

scala> val output = "file:///home/orienit/work/output/spark-op"
output: String = file:///home/orienit/work/output/spark-op

scala> val fileRdd = sc.textFile(input, 1)
fileRdd: org.apache.spark.rdd.RDD[String] = file:///home/orienit/work/input/demoinput MapPartitionsRDD[11] at textFile at <console>:26

scala> fileRdd.collect
res12: Array[String] = Array(I am going, to hyd, I am learning, hadoop course)

scala> val wordsRdd = fileRdd.flatMap(line => line.split(" "))
wordsRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[12] at flatMap at <console>:28

scala> wordsRdd.collect
res13: Array[String] = Array(I, am, going, to, hyd, I, am, learning, hadoop, course)

scala> val tuplesRdd = wordsRdd.map(word => (word, 1))
tuplesRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[13] at map at <console>:30

scala> tuplesRdd.collect
res14: Array[(String, Int)] = Array((I,1), (am,1), (going,1), (to,1), (hyd,1), (I,1), (am,1), (learning,1), (hadoop,1), (course,1))

scala> val wordCountRdd = tuplesRdd.reduceByKey((a,b) => a + b)
wordCountRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[14] at reduceByKey at <console>:32

scala> wordCountRdd.collect
res15: Array[(String, Int)] = Array((learning,1), (hadoop,1), (am,2), (hyd,1), (I,2), (to,1), (going,1), (course,1))

scala> wordCountRdd.saveAsTextFile(output)




------------------------------------------------------
Grep Job in Spark using Scala:
------------------------------------------------------

val input = "file:///home/orienit/work/input/demoinput"

val output = "file:///home/orienit/work/output/spark-grep-op"

val fileRdd = sc.textFile(input, 1)

val grepRdd = fileRdd.filter(line => line.contains("am"))

grepRdd.saveAsTextFile(output)



------------------------------------------------------
Sed Job in Spark using Scala:
------------------------------------------------------

val input = "file:///home/orienit/work/input/demoinput"

val output = "file:///home/orienit/work/output/spark-sed-op"

val fileRdd = sc.textFile(input, 1)

val sedRdd = fileRdd.map(line => line.replaceAll("am", "xyz"))

sedRdd.saveAsTextFile(output)

------------------------------------------------------
Spark SQL:
------------------------------------------------------

hive> select * from kalyan.student;
scala> spark.sql("select * from kalyan.student").show

------------------------------------------------------

hive> select year, count(*) from kalyan.student group by year;
scala> spark.sql("select year, count(*) from kalyan.student group by year").show

------------------------------------------------------
Data Frames in Spark
------------------------------------------------------
val hiveDf = spark.sql("select * from kalyan.student")

hiveDf.show


hiveDf.registerTempTable("hivetbl")


val prop = new java.util.Properties
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","hadoop")

val jdbcDf = spark.read.jdbc("jdbc:mysql://localhost:3306/kalyan", "student", prop)

jdbcDf.show

jdbcDf.registerTempTable("jdbctbl")

------------------------------------------------------
scala> val hiveDf = spark.sql("select * from kalyan.student")
hiveDf: org.apache.spark.sql.DataFrame = [name: string, id: int ... 2 more fields]

scala> hiveDf.show
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|  arun|  1|   cse|   1|
| sunil|  2|   cse|   1|
|   raj|  3|   cse|   1|
|naveen|  4|   cse|   1|
| venki|  5|   cse|   2|
|prasad|  6|   cse|   2|
| sudha|  7|   cse|   2|
|  ravi|  1|  mech|   1|
|  raju|  2|  mech|   1|
|  roja|  3|  mech|   1|
|  anil|  4|  mech|   2|
|  rani|  5|  mech|   2|
|anvith|  6|  mech|   2|
| madhu|  7|  mech|   2|
|  arun|  1|    it|   3|
| sunil|  2|    it|   3|
|   raj|  3|    it|   3|
|naveen|  4|    it|   3|
| venki|  5|    it|   4|
|prasad|  6|    it|   4|
+------+---+------+----+
only showing top 20 rows

------------------------------------------------------

scala> val jdbcDf = spark.read.jdbc("jdbc:mysql://localhost:3306/kalyan", "student", prop)
jdbcDf: org.apache.spark.sql.DataFrame = [name: string, id: int ... 2 more fields]

scala> jdbcDf.show
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|  anil|  1| spark|2016|
|anvith|  5|hadoop|2015|
|   dev|  6|hadoop|2015|
|   raj|  3| spark|2016|
| sunil|  4|hadoop|2015|
|venkat|  2| spark|2016|
+------+---+------+----+

------------------------------------------------------

scala> hiveDf.registerTempTable("hivetbl")
warning: there was one deprecation warning; re-run with -deprecation for details

scala> jdbcDf.registerTempTable("jdbctbl")
warning: there was one deprecation warning; re-run with -deprecation for details

------------------------------------------------------

spark.sql("select hivetbl.*, jdbctbl.* from hivetbl join jdbctbl on hivetbl.name = jdbctbl.name").show

------------------------------------------------------

scala> spark.sql("select hivetbl.*, jdbctbl.* from hivetbl join jdbctbl on hivetbl.name = jdbctbl.name").show
+------+---+------+----+------+---+------+----+
|  name| id|course|year|  name| id|course|year|
+------+---+------+----+------+---+------+----+
|  anil|  4|   ece|   4|  anil|  1| spark|2016|
|  anil|  4|  mech|   2|  anil|  1| spark|2016|
|anvith|  6|   ece|   4|anvith|  5|hadoop|2015|
|anvith|  6|  mech|   2|anvith|  5|hadoop|2015|
|   raj|  3|    it|   3|   raj|  3| spark|2016|
|   raj|  3|   cse|   1|   raj|  3| spark|2016|
| sunil|  2|    it|   3| sunil|  4|hadoop|2015|
| sunil|  2|   cse|   1| sunil|  4|hadoop|2015|
+------+---+------+----+------+---+------+----+



------------------------------------------------------

val casDf = sqlContext.read.cassandraFormat("student", "kalyan").load()

casDf.show

------------------------------------------------------


scala> val casDf = sqlContext.read.cassandraFormat("student", "kalyan").load()
casDf: org.apache.spark.sql.DataFrame = [name: string, course: string ... 2 more fields]

scala> casDf.show
+------+------+---+----+
|  name|course| id|year|
+------+------+---+----+
|  anil| spark|  1|2016|
|   raj| spark|  3|2016|
|anvith|hadoop|  5|2015|
|   dev|hadoop|  6|2015|
| sunil|hadoop|  4|2015|
|venkat| spark|  2|2016|
|    kk|hadoop|  7|2015|
+------+------+---+----+

------------------------------------------------------
INSERT INTO kalyan.student(name, id, course, year) VALUES ('rajesh', 8, 'hadoop', 2017);

------------------------------------------------------

scala> casDf.show
+------+------+---+----+
|  name|course| id|year|
+------+------+---+----+
|  anil| spark|  1|2016|
|   raj| spark|  3|2016|
|anvith|hadoop|  5|2015|
|   dev|hadoop|  6|2015|
| sunil|hadoop|  4|2015|
|venkat| spark|  2|2016|
|    kk|hadoop|  7|2015|
|rajesh|hadoop|  8|2017|
+------+------+---+----+

------------------------------------------------------

cqlsh:kalyan> select year, count(*) from kalyan.student group by year;
SyntaxException: line 1:42 missing EOF at 'group' (...(*) from kalyan.student [group] by...)

------------------------------------------------------

casDf.registerTempTable("castbl")


spark.sql("select year, count(*) from castbl group by year").show

------------------------------------------------------

scala> casDf.registerTempTable("castbl")
warning: there was one deprecation warning; re-run with -deprecation for details


scala> spark.sql("select year, count(*) from castbl group by year").show 
+----+--------+
|year|count(1)|
+----+--------+
|2015|       4|
|2016|       3|
|2017|       1|
+----+--------+



------------------------------------------------------
spark.sql("select castbl.*, jdbctbl.* from castbl join jdbctbl on castbl.name = jdbctbl.name").show
------------------------------------------------------

scala> spark.sql("select castbl.*, jdbctbl.* from castbl join jdbctbl on castbl.name = jdbctbl.name").show
+------+------+---+----+------+---+------+----+
|  name|course| id|year|  name| id|course|year|
+------+------+---+----+------+---+------+----+
|  anil| spark|  1|2016|  anil|  1| spark|2016|
|anvith|hadoop|  5|2015|anvith|  5|hadoop|2015|
|   dev|hadoop|  6|2015|   dev|  6|hadoop|2015|
|   raj| spark|  3|2016|   raj|  3| spark|2016|
| sunil|hadoop|  4|2015| sunil|  4|hadoop|2015|
|venkat| spark|  2|2016|venkat|  2| spark|2016|
+------+------+---+----+------+---+------+----+


------------------------------------------------------
scala> casDf.toJSON.collect.foreach(println)
{"name":"anil","course":"spark","id":1,"year":2016}
{"name":"raj","course":"spark","id":3,"year":2016}
{"name":"anvith","course":"hadoop","id":5,"year":2015}
{"name":"dev","course":"hadoop","id":6,"year":2015}
{"name":"sunil","course":"hadoop","id":4,"year":2015}
{"name":"venkat","course":"spark","id":2,"year":2016}
{"name":"kk","course":"hadoop","id":7,"year":2015}
{"name":"rajesh","course":"hadoop","id":8,"year":2017}


scala> hiveDf.toJSON.collect.foreach(println)
{"name":"arun","id":1,"course":"cse","year":1}
{"name":"sunil","id":2,"course":"cse","year":1}
{"name":"raj","id":3,"course":"cse","year":1}
{"name":"naveen","id":4,"course":"cse","year":1}
{"name":"venki","id":5,"course":"cse","year":2}
{"name":"prasad","id":6,"course":"cse","year":2}
{"name":"sudha","id":7,"course":"cse","year":2}
{"name":"ravi","id":1,"course":"mech","year":1}
{"name":"raju","id":2,"course":"mech","year":1}
{"name":"roja","id":3,"course":"mech","year":1}
{"name":"anil","id":4,"course":"mech","year":2}
{"name":"rani","id":5,"course":"mech","year":2}
{"name":"anvith","id":6,"course":"mech","year":2}
{"name":"madhu","id":7,"course":"mech","year":2}


scala> jdbcDf.toJSON.collect.foreach(println)
{"name":"anil","id":1,"course":"spark","year":2016}
{"name":"anvith","id":5,"course":"hadoop","year":2015}
{"name":"dev","id":6,"course":"hadoop","year":2015}
{"name":"raj","id":3,"course":"spark","year":2016}
{"name":"sunil","id":4,"course":"hadoop","year":2015}
{"name":"venkat","id":2,"course":"spark","year":2016}

------------------------------------------------------

Share this article with your friends.

4 comments :

  1. Aptron Solutions provides Best Hadoop Training in Delhi and big data courses with job-oriented sessions Our Hadoop and Big Data trainers are real-time employees who have designed Hadoop course with all basic and advanced contents. Every one of our trainers are working in MNC's in Delhi and they take up classes in their free time.
    For More Info: Hadoop Training in Delhi

    ReplyDelete
  2. Oracle Fusion HCM Training
    course provides students with an overview of the various components of human resource management. It also provides an overview of each Oracle Fusion HCM Online module. The Fusion HCM course will cover topics such as creating users and permissions, managing employee data, and running reports. Thankful for your teaching & Guidance.

    ReplyDelete

Related Posts Plugin for WordPress, Blogger...