Spark RDD operations
----------------------------------------
command line options in spark:
Scala => spark-shell
Python => pyspark
R => SparkR
----------------------------------------
// create a RDD from scala collection
scala> val rdd = sc.parallelize(List(1,2,3,4,5,6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:27
// get the number of partitions from RDD
scala> rdd.getNumPartitions
res1: Int = 4
// create a RDD from scala collection using number of partitions
scala> val rdd = sc.parallelize(List(1,2,3,4,5,6), 2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:27
// get the number of partitions from RDD
scala> rdd.getNumPartitions
res2: Int = 2
// create a RDD from scala collection using number of partitions
scala> val rdd = sc.parallelize(List(1,2,3,4,5,6), 1)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:27
// get the number of partitions from RDD
scala> rdd.getNumPartitions
res3: Int = 1
----------------------------------------
// create a RDD from scala collection
scala> val rdd = sc.parallelize(List(1,2,3,4,5,6))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:27
// sum the RDD data
scala> rdd.sum
res4: Double = 21.0
// min the RDD data
scala> rdd.min
res5: Int = 1
// max the RDD data
scala> rdd.max
res6: Int = 6
// transform the RDD data (x => x + 1)
scala> val rdd1 = rdd.map(x => x + 1)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at map at <console>:29
scala> rdd.foreach(println)
1
2
3
5
6
4
scala> rdd1.foreach(println)
2
5
3
4
6
7
// transform the RDD data (x => x > 1)
scala> val rdd2 = rdd.filter(x => x > 3)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at filter at <console>:29
scala> rdd2.foreach(println)
4
5
6
----------------------------------------
We can prepare a RDD with 2 approaches
1. from collection
val rdd = sc.parallelize(collection, number of partitions)
2. from data source(text / csv / tsv / json files....)
val rdd = sc.textFile(path of the file, number of partitions )
Note: file can be from Local File System / HDFS / S3 / FTP ....
----------------------------------------
scala> val rdd = sc.textFile("file:///home/hadoop/work/input/demoinput")
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at textFile at <console>:27
scala> rdd.getNumPartitions
res10: Int = 2
scala> val rdd = sc.textFile("file:///home/hadoop/work/input/demoinput", 1)
rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at textFile at <console>:27
scala> rdd.getNumPartitions
res11: Int = 1
scala> rdd.foreach(println)
I am going
to hyd
I am learning
hadoop course
----------------------------------------
scala> val fileRDD = sc.textFile("file:///home/hadoop/work/input/demoinput", 1)
fileRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at textFile at <console>:27
scala> fileRDD.foreach(println)
I am going
to hyd
I am learning
hadoop course
scala> fileRDD.flatMap(line => line.split(" ")).foreach(println)
I
am
going
to
hyd
I
am
learning
hadoop
course
scala> val wordsRDD = fileRDD.flatMap(line => line.split(" "))
wordsRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at flatMap at <console>:29
scala> val tupleRDD = wordsRDD.map( word => (word, 1))
tupleRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[17] at map at <console>:31
scala> tupleRDD.foreach(println)
(I,1)
(am,1)
(going,1)
(to,1)
(hyd,1)
(I,1)
(am,1)
(learning,1)
(hadoop,1)
(course,1)
scala> val wordcountRDD = tupleRDD.reduceByKey( (a,b) => a + b )
wordcountRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:33
scala> wordcountRDD.foreach(println)
(learning,1)
(hadoop,1)
(am,2)
(hyd,1)
(I,2)
(to,1)
(going,1)
(course,1)
scala> val wordcountRDD = tupleRDD.reduceByKey( _ + _)
wordcountRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[19] at reduceByKey at <console>:33
scala> wordcountRDD.foreach(println)
(learning,1)
(hadoop,1)
(am,2)
(hyd,1)
(I,2)
(to,1)
(going,1)
(course,1)
scala> wordcountRDD.sortBy( t => t._1).foreach(println)
(I,2)
(am,2)
(course,1)
(going,1)
(hadoop,1)
(hyd,1)
(learning,1)
(to,1)
scala> wordcountRDD.sortBy( t => t._2).foreach(println)
(learning,1)
(hadoop,1)
(hyd,1)
(to,1)
(going,1)
(course,1)
(am,2)
(I,2)
scala> wordcountRDD.sortBy( t => t._1, false).foreach(println)
(to,1)
(learning,1)
(hyd,1)
(hadoop,1)
(going,1)
(course,1)
(am,2)
(I,2)
scala> wordcountRDD.sortBy( t => t._1, true).foreach(println)
(I,2)
(am,2)
(course,1)
(going,1)
(hadoop,1)
(hyd,1)
(learning,1)
(to,1)
scala> wordcountRDD.sortBy( t => t._1, true).getNumPartitions
res24: Int = 1
scala> wordcountRDD.sortBy( t => t._1, true, 2).getNumPartitions
res25: Int = 2
----------------------------------------------
Word Count in Spark using Scala
----------------------------------------------
val fileRDD = sc.textFile("file:///home/hadoop/work/input/demoinput", 1)
val wordsRDD = fileRDD.flatMap(line => line.split(" "))
val tupleRDD = wordsRDD.map( word => (word, 1))
val wordcountRDD = tupleRDD.reduceByKey( _ + _)
(or)
val wordcountRDD = fileRDD.flatMap(line => line.split(" ")).map( word => (word, 1)).reduceByKey( _ + _)
wordcountRDD.saveAsTextFile("file:///home/hadoop/work/output/spark-op")
wordcountRDD.saveAsTextFile("hdfs://localhost:8020/output/spark-op")
val sortedRDD = wordcountRDD.sortBy( t => t._1, true)
sortedRDD.saveAsTextFile("file:///home/hadoop/work/output/spark-sorted-op")
sortedRDD.saveAsTextFile("hdfs://localhost:8020/output/spark-sorted-op")
----------------------------------------------
Word Count in Spark using Python
----------------------------------------------
fileRDD = sc.textFile("file:///home/hadoop/work/input/demoinput", 1)
wordsRDD = fileRDD.flatMap(lambda line : line.split(" "))
tupleRDD = wordsRDD.map(lambda word : (word, 1))
wordcountRDD = tupleRDD.reduceByKey( lambda a, b : a + b)
(or)
wordcountRDD = fileRDD.flatMap(lambda line : line.split(" "))).map( lambda word : (word, 1)).reduceByKey( lambda a, b : a + b )
wordcountRDD.saveAsTextFile("file:///home/hadoop/work/output/spark-python-op")
wordcountRDD.saveAsTextFile("hdfs://localhost:8020/output/spark-python-op")
----------------------------------------------
Grep Job in Spark using Scala
----------------------------------------------
scala> val fileRDD = sc.textFile("file:///home/hadoop/work/input/demoinput", 1)
fileRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[41] at textFile at <console>:27
scala> fileRDD.foreach(println)
I am going
to hyd
I am learning
hadoop course
scala> val grepRDD = fileRDD.filter(line => line.contains("am"))
grepRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[42] at filter at <console>:29
scala> grepRDD.foreach(println)
I am going
I am learning
Share this article with your friends.
No comments :
Post a Comment