Sunday 27 November 2016

SPARK BASICS Practice on 27 Nov 2016



---------------------------------------------------------------
Spark Basics
---------------------------------------------------------------

1.SparkContext(sc) is the main entry point for spark operaions

2. From spark context, we will create a RDD

3. Create RDD in 2 ways
  -> from collections (List, Seq, Set ,....)
  -> from datasets (csv, tsv, text, json, hive, hdfs, cassandra....)

---------------------------------------------------------------
Start the spark shell using 3 approaches:

1. scala command => spark-shell

2. python command => pyspark

3. R command => sparkR


Spark-1.6.x is compatable with Scala-2.10 & Scala-2.11

Spark-2.0.x is compatable with Scala-2.11

Spark context available as sc.

SQL context available as sqlContext.

------------------------------------------------------------------
RDD => Resilient Distributed DataSets

RDD features:
--------------
1. Immutable
2. Lazy Evaluated (bottom to top approach)
3. Cacheable
4. Type Infer


RDD Operations:
--------------------
1. Transformations

old rdd ====> new rdd

2. Actions

rdd ====> result


---------------------------------------------
Examples in RDD:
--------------------
list <-- {1, 2, 3, 4}

Ex1: Transformations
-------------------------
f(x) => x + 1

f(list) <-- {2, 3, 4, 5}


f(x) => x * x

f(list) <-- {1, 4, 9, 16}



Ex2: Actions
-------------------------
min(list) <--  1

max(list) <--  4

sum(list) <--  10

avg(list) <--  2.5


---------------------------------------------

1.SparkContext(sc) is the main entry point for spark operaions

2. From spark context, we will create a RDD

3. Create RDD in 2 ways
  -> from collections (List, Seq, Set ,....)
  -> from datasets (csv, tsv, text, json, hive, hdfs, cassandra....)

---------------------------------------------
Start the spark shell using 3 approaches:

1. scala command => spark-shell

2. python command => pyspark

3. R command => sparkR


Spark-1.6.x is compatable with Scala-2.10 & Scala-2.11

Spark-2.0.x is compatable with Scala-2.11

Spark context available as sc.

SQL context available as sqlContext.

---------------------------------------------

val list = List(1,2,3,4)

val rdd = sc.parallelize(list)

---------------------------------------------
scala> val list = List(1,2,3,4)
list: List[Int] = List(1, 2, 3, 4)

scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:29

scala> rdd.getNumPartitions
res0: Int = 4

scala> val rdd = sc.parallelize(list, 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:29

scala> rdd.getNumPartitions
res1: Int = 2


---------------------------------------------

scala> rdd.collect
res3: Array[Int] = Array(1, 2, 3, 4)

scala> rdd.map(x => x + 1).collect
res4: Array[Int] = Array(2, 3, 4, 5)

scala> rdd.map(x => x + 1)
res5: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at map at <console>:32

scala> rdd.map(x => x + 1).collect
res6: Array[Int] = Array(2, 3, 4, 5)

scala> rdd.map(x => x * x).collect
res7: Array[Int] = Array(1, 4, 9, 16)


scala> rdd.min
res8: Int = 1

scala> rdd.max
res9: Int = 4

scala> rdd.sum
res10: Double = 10.0

---------------------------------------------
1. create a rdd with 1 to 10 numbers

val rdd = sc.parallelize(1 to 10)

2. add number 1 to each element

val addRdd = rdd.map(x => x + 1)

3. filter even number in above data

val filterRdd = addRdd.filter(x => x % 2 == 1)

4. sum of the all numbers in above data

val sum = filterRdd.sum

5. print all the numbers in above data

filterRdd.foreach(x => println(x))
filterRdd.foreach(println)


6. save this data into local file system / hdfs

val output = "file:///home/orienit/work/output/data"
val output = "hdfs://localhost:8020/output/data"

filterRdd.saveAsTextFile(output)


7. repartition the data

val repartitionRdd = filterRdd.repartition(1)


8. save this data into local file system / hdfs

val output = "file:///home/orienit/work/output/data1"
val output = "hdfs://localhost:8020/output/data1"

repartitionRdd.saveAsTextFile(output)

---------------------------------------------

scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:27

scala> rdd.collect
res11: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)


scala> val addRdd = rdd.map(x => x + 1)
addRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[9] at map at <console>:29

scala> addRdd.collect
res12: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)


scala> val filterRdd = addRdd.filter(x => x % 2 == 1)
filterRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[10] at filter at <console>:31

scala> filterRdd.collect
res13: Array[Int] = Array(3, 5, 7, 9, 11)


scala> val sum = filterRdd.sum
sum: Double = 35.0


scala> filterRdd.foreach(x => println(x))
3
5
9
11
7

scala> filterRdd.foreach(println)
7
3
5
9
11



scala> val output = "file:///home/orienit/work/output/data"
output: String = file:///home/orienit/work/output/data

scala> filterRdd.saveAsTextFile(output)


scala> val repartitionRdd = filterRdd.repartition(1)
repartitionRdd: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[16] at repartition at <console>:33

scala> val output = "file:///home/orienit/work/output/data1"
output: String = file:///home/orienit/work/output/data1

scala> repartitionRdd.saveAsTextFile(output)

scala> val output = "hdfs://localhost:8020/output/data1"
output: String = hdfs://localhost:8020/output/data1

scala> repartitionRdd.saveAsTextFile(output)

---------------------------------------------

Creating a RDD from DataSets:
--------------------------------
val input = "file:///home/orienit/work/input/demoinput"
val rdd = sc.textFile(input)


scala> val input = "file:///home/orienit/work/input/demoinput"
input: String = file:///home/orienit/work/input/demoinput

scala> val rdd = sc.textFile(input)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[21] at textFile at <console>:29

scala> rdd.getNumPartitions
res20: Int = 2

scala> val rdd = sc.textFile(input, 1)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[23] at textFile at <console>:29

scala> rdd.getNumPartitions
res21: Int = 1


scala> rdd.collect
res22: Array[String] = Array(I am going, to hyd, I am learning, hadoop course)

scala> rdd.collect.foreach(println)
I am going
to hyd
I am learning
hadoop course

---------------------------------------------
WordCount in Spark using Scala:
-------------------------------------
// input path
val input = "file:///home/orienit/work/input/demoinput"

// creating a rdd from input
val fileRdd = sc.textFile(input, 1)

// read the file line by line
// split the line into words
val words = fileRdd.flatMap(line => line.split(" "))

// assign count(1) to each word
val counts = words.map(word => (word, 1))


// sum the list of values
val wordcount = counts.reduceByKey((a,b) => a + b)

// save the output into local file system / hdfs
val output = "file:///home/orienit/work/output/wordcount"
val output = "hdfs://localhost:8020/output/wordcount"
wordcount.saveAsTextFile(output)





scala> val input = "file:///home/orienit/work/input/demoinput"
input: String = file:///home/orienit/work/input/demoinput

scala> val fileRdd = sc.textFile(input, 1)
fileRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[25] at textFile at <console>:29

scala> val words = fileRdd.flatMap(line => line.split(" "))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[26] at flatMap at <console>:31

scala> val counts = words.map(word => (word, 1))
counts: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[27] at map at <console>:33

scala> val wordcount = counts.reduceByKey((a,b) => a + b)
wordcount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[28] at reduceByKey at <console>:35

scala> val output = "file:///home/orienit/work/output/wordcount"
output: String = file:///home/orienit/work/output/wordcount

scala> wordcount.saveAsTextFile(output)

scala> val output = "hdfs://localhost:8020/output/wordcount"
output: String = hdfs://localhost:8020/output/wordcount

scala> wordcount.saveAsTextFile(output)


------------------------------------------

val input = "file:///home/orienit/work/input/demoinput"
val output = "file:///home/orienit/work/output/wordcount"

val fileRdd = sc.textFile(input, 1)
val words = fileRdd.flatMap(line => line.split(" "))
val counts = words.map(word => (word, 1))
val wordcount = counts.reduceByKey((a,b) => a + b)
wordcount.saveAsTextFile(output)

------------------------------------------

val input = "file:///home/orienit/work/input/demoinput"
val output = "file:///home/orienit/work/output/wordcount1"

val fileRdd = sc.textFile(input, 1)
val wordcount = fileRdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a,b) => a + b)
wordcount.saveAsTextFile(output)





Grep in Spark using Scala:
-------------------------------------

val input = "file:///home/orienit/work/input/demoinput"
val output = "file:///home/orienit/work/output/grepjob"

val fileRdd = sc.textFile(input, 1)
val filterRdd = fileRdd.filter(line => line.contains("am"))
filterRdd.saveAsTextFile(output)



WordCount in Spark using Python:
-------------------------------------
input = "file:///home/orienit/work/input/demoinput"
output = "file:///home/orienit/work/output/wordcount-py"

fileRdd = sc.textFile(input, 1)
words = fileRdd.flatMap(lambda line : line.split(" "))
counts = words.map(lambda word : (word, 1))
wordcount = counts.reduceByKey(lambda a,b : a + b)
wordcount.saveAsTextFile(output)


>>> input = "file:///home/orienit/work/input/demoinput"
>>> output = "file:///home/orienit/work/output/wordcount-py"
>>> fileRdd = sc.textFile(input, 1)
16/11/27 15:08:16 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 127.4 KB, free 127.4 KB)
16/11/27 15:08:17 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 13.9 KB, free 141.3 KB)
16/11/27 15:08:17 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:42322 (size: 13.9 KB, free: 3.4 GB)
16/11/27 15:08:17 INFO SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:-2
>>> words = fileRdd.flatMap(lambda line : line.split(" "))
>>> counts = words.map(lambda word : (word, 1))
>>> wordcount = counts.reduceByKey(lambda a,b : a + b)
>>> wordcount.saveAsTextFile(output)


---------------------------------------------
Spark SQL:
---------------------
DataFrame <=> Table


orienit@kalyan:~$ cat /home/orienit/spark/input/student.json

{"name":"anil","id":1,"course":"spark","year":2016}
{"name":"anvith","id":5,"course":"hadoop","year":2015}
{"name":"dev","id":6,"course":"hadoop","year":2015}
{"name":"raj","id":3,"course":"spark","year":2016}
{"name":"sunil","id":4,"course":"hadoop","year":2015}
{"name":"venkat","id":2,"course":"spark","year":2016}


val df = sqlContext.read.json("file:///home/orienit/spark/input/student.json")
val df = sqlContext.read.parquet("file:///home/orienit/spark/input/student.parquet")


scala> val df = sqlContext.read.json("file:///home/orienit/spark/input/student.json")
df: org.apache.spark.sql.DataFrame = [course: string, id: bigint, name: string, year: bigint]

scala> df.show()
+------+---+------+----+
|course| id|  name|year|
+------+---+------+----+
| spark|  1|  anil|2016|
|hadoop|  5|anvith|2015|
|hadoop|  6|   dev|2015|
| spark|  3|   raj|2016|
|hadoop|  4| sunil|2015|
| spark|  2|venkat|2016|
+------+---+------+----+


scala> val df = sqlContext.read.parquet("file:///home/orienit/spark/input/student.parquet")
df: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]

scala> df.show()
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|  anil|  1| spark|2016|
|anvith|  5|hadoop|2015|
|   dev|  6|hadoop|2015|
|   raj|  3| spark|2016|
| sunil|  4|hadoop|2015|
|venkat|  2| spark|2016|
+------+---+------+----+


---------------------------------------------

DataFrame supports 2 operations
1. DSL - Domain Specific Language
2. SQL - Structure Query Language

---------------------------------------------

DSL Examples:
------------------

scala> df.show()
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|  anil|  1| spark|2016|
|anvith|  5|hadoop|2015|
|   dev|  6|hadoop|2015|
|   raj|  3| spark|2016|
| sunil|  4|hadoop|2015|
|venkat|  2| spark|2016|
+------+---+------+----+


scala> df.show(3)
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|  anil|  1| spark|2016|
|anvith|  5|hadoop|2015|
|   dev|  6|hadoop|2015|
+------+---+------+----+
only showing top 3 rows


scala> df.limit(3)
res32: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]

scala> df.limit(3).show()
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|  anil|  1| spark|2016|
|anvith|  5|hadoop|2015|
|   dev|  6|hadoop|2015|
+------+---+------+----+


scala> df.select("name", "id")
res34: org.apache.spark.sql.DataFrame = [name: string, id: int]

scala> 

scala> df.select("name", "id").show()
+------+---+
|  name| id|
+------+---+
|  anil|  1|
|anvith|  5|
|   dev|  6|
|   raj|  3|
| sunil|  4|
|venkat|  2|
+------+---+


scala> df.select("name", "id").limit(2).show()
+------+---+
|  name| id|
+------+---+
|  anil|  1|
|anvith|  5|
+------+---+


scala> df.filter("id > 4").show()
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|anvith|  5|hadoop|2015|
|   dev|  6|hadoop|2015|
+------+---+------+----+


scala> df.where("id > 4").show()
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|anvith|  5|hadoop|2015|
|   dev|  6|hadoop|2015|
+------+---+------+----+


---------------------------------------------
SQL Examples:
---------------------
Register DataFrame as Temp Table

scala> df.registerTempTable("student")


scala> sqlContext.sql("select * from student")
res42: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]

scala> sqlContext.sql("select * from student").show()
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|  anil|  1| spark|2016|
|anvith|  5|hadoop|2015|
|   dev|  6|hadoop|2015|
|   raj|  3| spark|2016|
| sunil|  4|hadoop|2015|
|venkat|  2| spark|2016|
+------+---+------+----+


scala> sqlContext.sql("select name, id from student").show()
+------+---+
|  name| id|
+------+---+
|  anil|  1|
|anvith|  5|
|   dev|  6|
|   raj|  3|
| sunil|  4|
|venkat|  2|
+------+---+


scala> sqlContext.sql("select * from student where id > 4").show()
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|anvith|  5|hadoop|2015|
|   dev|  6|hadoop|2015|
+------+---+------+----+


scala> sqlContext.sql("select year, count(*) from student group by year").show()
+----+---+
|year|_c1|
+----+---+
|2015|  3|
|2016|  3|
+----+---+


scala> sqlContext.sql("select year, count(*) as cnt from student group by year").show()
+----+---+
|year|cnt|
+----+---+
|2015|  3|
|2016|  3|
+----+---+




---------------------------------------------

case class Contact(cid: Int, name: String, loc: String, pincode:Int)
case class Orders(oid: Int, cid: Int, status: String)

val contact = sc.textFile("file:///home/orienit/spark/input/contact.csv").map(_.split(","))
val cdf = contact.map(c => Contact(c(0).trim.toInt, c(1), c(2), c(3).trim.toInt)).toDF()

val orders = sc.textFile("file:///home/orienit/spark/input/orders.tsv").map(_.split("\t"))
val odf = orders.map(x => Orders(x(0).trim.toInt, x(1).trim.toInt, x(2))).toDF()

---------------------------------------------

scala> case class Contact(cid: Int, name: String, loc: String, pincode:Int)
defined class Contact

scala> case class Orders(oid: Int, cid: Int, status: String)
defined class Orders

scala> 

scala> val contact = sc.textFile("file:///home/orienit/spark/input/contact.csv").map(_.split(","))
contact: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[113] at map at <console>:27

scala> val cdf = contact.map(c => Contact(c(0).trim.toInt, c(1), c(2), c(3).trim.toInt)).toDF()
cdf: org.apache.spark.sql.DataFrame = [cid: int, name: string, loc: string, pincode: int]

scala> 

scala> val orders = sc.textFile("file:///home/orienit/spark/input/orders.tsv").map(_.split("\t"))
orders: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[118] at map at <console>:27

scala> val odf = orders.map(x => Orders(x(0).trim.toInt, x(1).trim.toInt, x(2))).toDF()
odf: org.apache.spark.sql.DataFrame = [oid: int, cid: int, status: string]


---------------------------------------------

scala> cdf.show()
+---+------+----+-------+
|cid|  name| loc|pincode|
+---+------+----+-------+
|  1|kalyan| hyd| 500072|
|  2|venkat| hyd| 500073|
|  3|  anil| hyd| 500071|
|  4|   raj| hyd| 500075|
|  5|  arun| hyd| 500074|
|  6|  vani|bang| 600072|
|  7| vamsi|bang| 600073|
|  8|prasad|bang| 600076|
|  9|anvith|bang| 600075|
| 10| swamy|bang| 600071|
+---+------+----+-------+


scala> odf.show()
+---+---+-------+
|oid|cid| status|
+---+---+-------+
|111|  1|success|
|112|  1|failure|
|113|  2|success|
|114|  3|success|
|115|  2|failure|
|116|  3|failure|
|117|  2|success|
|118|  5|success|
|119|  6|failure|
|120|  2|success|
|121|  3|failure|
|122|  7|success|
|123|  3|failure|
|124|  2|success|
|125|  1|failure|
|126|  5|success|
+---+---+-------+



---------------------------------------------

scala> cdf.registerTempTable("customers")

scala> odf.registerTempTable("orders")

scala> sqlContext.sql("select customers.*, orders.* from customers join orders on customers.cid == orders.cid").show()
+---+------+----+-------+---+---+-------+
|cid|  name| loc|pincode|oid|cid| status|
+---+------+----+-------+---+---+-------+
|  1|kalyan| hyd| 500072|111|  1|success|
|  1|kalyan| hyd| 500072|112|  1|failure|
|  1|kalyan| hyd| 500072|125|  1|failure|
|  2|venkat| hyd| 500073|113|  2|success|
|  2|venkat| hyd| 500073|115|  2|failure|
|  2|venkat| hyd| 500073|117|  2|success|
|  2|venkat| hyd| 500073|120|  2|success|
|  2|venkat| hyd| 500073|124|  2|success|
|  3|  anil| hyd| 500071|114|  3|success|
|  3|  anil| hyd| 500071|116|  3|failure|
|  3|  anil| hyd| 500071|121|  3|failure|
|  3|  anil| hyd| 500071|123|  3|failure|
|  5|  arun| hyd| 500074|118|  5|success|
|  5|  arun| hyd| 500074|126|  5|success|
|  6|  vani|bang| 600072|119|  6|failure|
|  7| vamsi|bang| 600073|122|  7|success|
+---+------+----+-------+---+---+-------+


---------------------------------------------

scala> val jdf = sqlContext.sql("select customers.cid, customers.name, customers.loc, customers.pincode, orders.oid, orders.status from customers join orders on customers.cid == orders.cid")
jdf: org.apache.spark.sql.DataFrame = [cid: int, name: string, loc: string, pincode: int, oid: int, status: string]


val prop = new java.util.Properties
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","hadoop")

val url = "jdbc:mysql://localhost:3306/kalyan"

jdf.write.jdbc(url, "jointable", prop)


---------------------------------------------

















Share this article with your friends.

2 comments :

Related Posts Plugin for WordPress, Blogger...