Thursday 22 September 2016

SPARK BASICS DAY 1 Practice on 23 Sept 2016















Spark RDD operations
----------------------------------------
command line options in spark:

Scala => spark-shell
Python => pyspark
R         => SparkR

----------------------------------------
// create a RDD from scala collection

scala> val rdd = sc.parallelize(List(1,2,3,4,5,6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:27


// get the number of partitions from RDD

scala> rdd.getNumPartitions
res1: Int = 4


// create a RDD from scala collection using number of partitions

scala> val rdd = sc.parallelize(List(1,2,3,4,5,6), 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:27


// get the number of partitions from RDD

scala> rdd.getNumPartitions
res2: Int = 2

// create a RDD from scala collection using number of partitions

scala> val rdd = sc.parallelize(List(1,2,3,4,5,6), 1)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:27

// get the number of partitions from RDD

scala> rdd.getNumPartitions
res3: Int = 1

----------------------------------------
// create a RDD from scala collection

scala> val rdd = sc.parallelize(List(1,2,3,4,5,6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:27

// sum the RDD data

scala> rdd.sum
res4: Double = 21.0                                                             


// min the RDD data

scala> rdd.min
res5: Int = 1


// max the RDD data

scala> rdd.max
res6: Int = 6


// transform the RDD data (x => x + 1)

scala> val rdd1 = rdd.map(x => x + 1)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at map at <console>:29

scala> rdd.foreach(println)
1
2
3
5
6
4

scala> rdd1.foreach(println)
2
5
3
4
6
7


// transform the RDD data (x => x > 1)

scala> val rdd2 = rdd.filter(x => x > 3)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at filter at <console>:29

scala> rdd2.foreach(println)
4
5
6

----------------------------------------

We can prepare a RDD with 2 approaches

1. from collection

val rdd = sc.parallelize(collection, number of partitions)


2. from data source(text / csv / tsv / json files....)

val rdd = sc.textFile(path of the file, number of partitions )

Note: file can be from Local File System / HDFS / S3 / FTP ....

----------------------------------------

scala> val rdd = sc.textFile("file:///home/hadoop/work/input/demoinput")
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at textFile at <console>:27

scala> rdd.getNumPartitions
res10: Int = 2

scala> val rdd = sc.textFile("file:///home/hadoop/work/input/demoinput", 1)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at textFile at <console>:27

scala> rdd.getNumPartitions
res11: Int = 1


scala> rdd.foreach(println)
I am going
to hyd
I am learning
hadoop course

----------------------------------------

scala> val fileRDD = sc.textFile("file:///home/hadoop/work/input/demoinput", 1)
fileRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at textFile at <console>:27

scala> fileRDD.foreach(println)
I am going
to hyd
I am learning
hadoop course


scala> fileRDD.flatMap(line => line.split(" ")).foreach(println)
I
am
going
to
hyd
I
am
learning
hadoop
course


scala> val wordsRDD = fileRDD.flatMap(line => line.split(" "))
wordsRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at flatMap at <console>:29

scala> val tupleRDD = wordsRDD.map( word => (word, 1))
tupleRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[17] at map at <console>:31


scala> tupleRDD.foreach(println)
(I,1)
(am,1)
(going,1)
(to,1)
(hyd,1)
(I,1)
(am,1)
(learning,1)
(hadoop,1)
(course,1)


scala> val wordcountRDD = tupleRDD.reduceByKey( (a,b) => a + b )
wordcountRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:33

scala> wordcountRDD.foreach(println)
(learning,1)
(hadoop,1)
(am,2)
(hyd,1)
(I,2)
(to,1)
(going,1)
(course,1)


scala> val wordcountRDD = tupleRDD.reduceByKey( _ + _)
wordcountRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[19] at reduceByKey at <console>:33

scala> wordcountRDD.foreach(println)
(learning,1)
(hadoop,1)
(am,2)
(hyd,1)
(I,2)
(to,1)
(going,1)
(course,1)


scala> wordcountRDD.sortBy( t => t._1).foreach(println)
(I,2)
(am,2)
(course,1)
(going,1)
(hadoop,1)
(hyd,1)
(learning,1)
(to,1)

scala> wordcountRDD.sortBy( t => t._2).foreach(println)
(learning,1)
(hadoop,1)
(hyd,1)
(to,1)
(going,1)
(course,1)
(am,2)
(I,2)



scala> wordcountRDD.sortBy( t => t._1, false).foreach(println)
(to,1)
(learning,1)
(hyd,1)
(hadoop,1)
(going,1)
(course,1)
(am,2)
(I,2)

scala> wordcountRDD.sortBy( t => t._1, true).foreach(println)
(I,2)
(am,2)
(course,1)
(going,1)
(hadoop,1)
(hyd,1)
(learning,1)
(to,1)


scala> wordcountRDD.sortBy( t => t._1, true).getNumPartitions
res24: Int = 1

scala> wordcountRDD.sortBy( t => t._1, true, 2).getNumPartitions
res25: Int = 2

----------------------------------------------
Word Count in Spark using Scala
----------------------------------------------

val fileRDD = sc.textFile("file:///home/hadoop/work/input/demoinput", 1)
val wordsRDD = fileRDD.flatMap(line => line.split(" "))
val tupleRDD = wordsRDD.map( word => (word, 1))
val wordcountRDD = tupleRDD.reduceByKey( _ + _)


(or)

val wordcountRDD = fileRDD.flatMap(line => line.split(" ")).map( word => (word, 1)).reduceByKey( _ + _)


wordcountRDD.saveAsTextFile("file:///home/hadoop/work/output/spark-op")
wordcountRDD.saveAsTextFile("hdfs://localhost:8020/output/spark-op")


val sortedRDD = wordcountRDD.sortBy( t => t._1, true)
sortedRDD.saveAsTextFile("file:///home/hadoop/work/output/spark-sorted-op")
sortedRDD.saveAsTextFile("hdfs://localhost:8020/output/spark-sorted-op")


----------------------------------------------
Word Count in Spark using Python
----------------------------------------------

fileRDD = sc.textFile("file:///home/hadoop/work/input/demoinput", 1)
wordsRDD = fileRDD.flatMap(lambda line : line.split(" "))
tupleRDD = wordsRDD.map(lambda word : (word, 1))
wordcountRDD = tupleRDD.reduceByKey( lambda a, b : a + b)


(or)

wordcountRDD = fileRDD.flatMap(lambda line : line.split(" "))).map( lambda word : (word, 1)).reduceByKey( lambda a, b : a + b )


wordcountRDD.saveAsTextFile("file:///home/hadoop/work/output/spark-python-op")
wordcountRDD.saveAsTextFile("hdfs://localhost:8020/output/spark-python-op")



----------------------------------------------
Grep Job in Spark using Scala
----------------------------------------------

scala> val fileRDD = sc.textFile("file:///home/hadoop/work/input/demoinput", 1)
fileRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[41] at textFile at <console>:27

scala> fileRDD.foreach(println)
I am going
to hyd
I am learning
hadoop course

scala> val grepRDD = fileRDD.filter(line => line.contains("am"))
grepRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[42] at filter at <console>:29

scala> grepRDD.foreach(println)
I am going
I am learning



Share this article with your friends.

No comments :

Post a Comment

Related Posts Plugin for WordPress, Blogger...