Mr.Kalyan, Apache Contributor, Cloudera CCA175 Certified Consultant, 8+ years of Big Data exp, IIT Kharagpur, Gold Medalist.
This blog is mainly meant for Learn Big Data From Basics
1. Development practices
2. Administration practices
3. Interview Questions
4. Big Data integrations
5. Advanced Technologies in Big Data
6. Become more strong on Big Data
Call for Spark & Hadoop Training in Hyderabad, ORIENIT @ 040 65142345 , 9703202345
Pre-Requisites of Flume Project: hadoop-2.6.0 flume-1.6.0 mongodb-3.2.7 java-1.7
NOTE: Make sure that install all the above components
Flume Project Download Links: `hadoop-2.6.0.tar.gz` ==> link `apache-flume-1.6.0-bin.tar.gz` ==> link `mongodb-linux-x86_64-ubuntu1404-3.2.7.tgz` ==> link `kalyan-twitter-hdfs-mongo-agent.conf` ==> link `kalyan-flume-project-0.1.jar` ==> link `mongodb-driver-core-3.3.0.jar` ==> link `mongo-java-driver-3.3.0.jar` ==> link -----------------------------------------------------------------------------
1. create "kalyan-twitter-hdfs-mongo-agent.conf" file with below content
2. Copy "kalyan-twitter-hdfs-mongo-agent.conf" file into "$FUME_HOME/conf" folder
3. Copy "kalyan-flume-project-0.1.jar, mongodb-driver-core-3.3.0.jar and mongo-java-driver-3.3.0.jar" files into"$FLUME_HOME/lib" folder 4. Execute the below command to `Extract data from Twitter into HDFS & MongoDB using Flume`
8. Start the MongoDB client using below command (mongo)
9. Verify the List of DataBases in MongoDB using below command (show dbs)
10. Verify the List of Operations in MongoDB using below commands // list of databases show dbs // use flume database use flume // list of collections show collections // find the count of documents in 'twitter' collection db.twitter.count() // display list of documents in 'twitter' collection
Pre-Requisites of Flume Project: hadoop-2.6.0 flume-1.6.0 mongodb-3.2.7 java-1.7
NOTE: Make sure that install all the above components
Flume Project Download Links: `hadoop-2.6.0.tar.gz` ==> link `apache-flume-1.6.0-bin.tar.gz` ==> link `mongodb-linux-x86_64-ubuntu1404-3.2.7.tgz` ==> link `kalyan-twitter-mongo-agent.conf` ==> link `kalyan-flume-project-0.1.jar` ==> link `mongodb-driver-core-3.3.0.jar` ==> link `mongo-java-driver-3.3.0.jar` ==> link -----------------------------------------------------------------------------
1. create "kalyan-twitter-mongo-agent.conf" file with below content
2. Copy "kalyan-twitter-mongo-agent.conf" file into "$FUME_HOME/conf" folder
3. Copy "kalyan-flume-project-0.1.jar, mongodb-driver-core-3.3.0.jar and mongo-java-driver-3.3.0.jar" files into"$FLUME_HOME/lib" folder 4. Execute the below command to `Extract data from Twitter into MongoDB using Flume`
6. Verify the data in MongoDB 7. Start the MongoDB Server using below command
8. Start the MongoDB client using below command (mongo)
9. Verify the List of DataBases in MongoDB using below command (show dbs)
10. Verify the List of Operations in MongoDB using below commands // list of databases show dbs // use flume database use flume // list of collections show collections // find the count of documents in 'twitter' collection db.twitter.count()
// display list of documents in 'twitter' collection
NOTE: Make sure that install all the above components
Flume Project Download Links: `hadoop-2.6.0.tar.gz` ==> link `apache-flume-1.6.0-bin.tar.gz` ==> link `kalyan-twitter-hdfs-agent.conf` ==> link `kalyan-flume-project-0.1.jar` ==> link
Spark RDD operations ---------------------------------------- command line options in spark: Scala => spark-shell Python => pyspark R =>SparkR ---------------------------------------- // create a RDD from scala collection scala> val rdd = sc.parallelize(List(1,2,3,4,5,6)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:27 // get the number of partitions from RDD scala> rdd.getNumPartitions res1: Int = 4 // create a RDD from scala collection using number of partitions scala> val rdd = sc.parallelize(List(1,2,3,4,5,6), 2) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:27 // get the number of partitions from RDD scala> rdd.getNumPartitions res2: Int = 2 // create a RDD from scala collection using number of partitions scala> val rdd = sc.parallelize(List(1,2,3,4,5,6), 1) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:27 // get the number of partitions from RDD scala> rdd.getNumPartitions res3: Int = 1 ---------------------------------------- // create a RDD from scala collection scala> val rdd = sc.parallelize(List(1,2,3,4,5,6)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[4] at parallelize at <console>:27 // sum the RDD data scala> rdd.sum res4: Double = 21.0 // min the RDD data scala> rdd.min res5: Int = 1 // max the RDD data scala> rdd.max res6: Int = 6 // transform the RDD data (x => x + 1) scala> val rdd1 = rdd.map(x => x + 1) rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at map at <console>:29 scala> rdd.foreach(println) 1 2 3 5 6 4 scala> rdd1.foreach(println) 2 5 3 4 6 7 // transform the RDD data (x => x > 1) scala> val rdd2 = rdd.filter(x => x > 3) rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at filter at <console>:29 scala> rdd2.foreach(println) 4 5 6 ---------------------------------------- We can prepare a RDD with 2 approaches 1. from collection val rdd = sc.parallelize(collection, number of partitions) 2. from data source(text / csv / tsv / json files....) val rdd = sc.textFile(path of the file, number of partitions ) Note: file can be from Local File System / HDFS / S3 / FTP .... ---------------------------------------- scala> val rdd = sc.textFile("file:///home/hadoop/work/input/demoinput") rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at textFile at <console>:27 scala> rdd.getNumPartitions res10: Int = 2 scala> val rdd = sc.textFile("file:///home/hadoop/work/input/demoinput", 1) rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[11] at textFile at <console>:27 scala> rdd.getNumPartitions res11: Int = 1 scala> rdd.foreach(println) I am going to hyd I am learning hadoop course ---------------------------------------- scala> val fileRDD = sc.textFile("file:///home/hadoop/work/input/demoinput", 1) fileRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at textFile at <console>:27 scala> fileRDD.foreach(println) I am going to hyd I am learning hadoop course scala> fileRDD.flatMap(line => line.split(" ")).foreach(println) I am going to hyd I am learning hadoop course scala> val wordsRDD = fileRDD.flatMap(line => line.split(" ")) wordsRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[16] at flatMap at <console>:29 scala> val tupleRDD = wordsRDD.map( word => (word, 1)) tupleRDD: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[17] at map at <console>:31 scala> tupleRDD.foreach(println) (I,1) (am,1) (going,1) (to,1) (hyd,1) (I,1) (am,1) (learning,1) (hadoop,1) (course,1) scala> val wordcountRDD = tupleRDD.reduceByKey( (a,b) => a + b ) wordcountRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:33 scala> wordcountRDD.foreach(println) (learning,1) (hadoop,1) (am,2) (hyd,1) (I,2) (to,1) (going,1) (course,1) scala> val wordcountRDD = tupleRDD.reduceByKey( _ + _) wordcountRDD: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[19] at reduceByKey at <console>:33 scala> wordcountRDD.foreach(println) (learning,1) (hadoop,1) (am,2) (hyd,1) (I,2) (to,1) (going,1) (course,1) scala> wordcountRDD.sortBy( t => t._1).foreach(println) (I,2) (am,2) (course,1) (going,1) (hadoop,1) (hyd,1) (learning,1) (to,1) scala> wordcountRDD.sortBy( t => t._2).foreach(println) (learning,1) (hadoop,1) (hyd,1) (to,1) (going,1) (course,1) (am,2) (I,2) scala> wordcountRDD.sortBy( t => t._1, false).foreach(println) (to,1) (learning,1) (hyd,1) (hadoop,1) (going,1) (course,1) (am,2) (I,2) scala> wordcountRDD.sortBy( t => t._1, true).foreach(println) (I,2) (am,2) (course,1) (going,1) (hadoop,1) (hyd,1) (learning,1) (to,1) scala> wordcountRDD.sortBy( t => t._1, true).getNumPartitions res24: Int = 1 scala> wordcountRDD.sortBy( t => t._1, true, 2).getNumPartitions res25: Int = 2 ---------------------------------------------- Word Count in Spark using Scala ---------------------------------------------- val fileRDD = sc.textFile("file:///home/hadoop/work/input/demoinput", 1) val wordsRDD = fileRDD.flatMap(line => line.split(" ")) val tupleRDD = wordsRDD.map( word => (word, 1)) val wordcountRDD = tupleRDD.reduceByKey( _ + _) (or) val wordcountRDD = fileRDD.flatMap(line => line.split(" ")).map( word => (word, 1)).reduceByKey( _ + _) wordcountRDD.saveAsTextFile("file:///home/hadoop/work/output/spark-op") wordcountRDD.saveAsTextFile("hdfs://localhost:8020/output/spark-op") val sortedRDD = wordcountRDD.sortBy( t => t._1, true) sortedRDD.saveAsTextFile("file:///home/hadoop/work/output/spark-sorted-op") sortedRDD.saveAsTextFile("hdfs://localhost:8020/output/spark-sorted-op") ---------------------------------------------- Word Count in Spark using Python ---------------------------------------------- fileRDD = sc.textFile("file:///home/hadoop/work/input/demoinput", 1) wordsRDD = fileRDD.flatMap(lambda line : line.split(" ")) tupleRDD = wordsRDD.map(lambda word : (word, 1)) wordcountRDD = tupleRDD.reduceByKey( lambda a, b : a + b) (or) wordcountRDD = fileRDD.flatMap(lambda line : line.split(" "))).map( lambda word : (word, 1)).reduceByKey( lambda a, b : a + b ) wordcountRDD.saveAsTextFile("file:///home/hadoop/work/output/spark-python-op") wordcountRDD.saveAsTextFile("hdfs://localhost:8020/output/spark-python-op") ---------------------------------------------- Grep Job in Spark using Scala ---------------------------------------------- scala> val fileRDD = sc.textFile("file:///home/hadoop/work/input/demoinput", 1) fileRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[41] at textFile at <console>:27 scala> fileRDD.foreach(println) I am going to hyd I am learning hadoop course scala> val grepRDD = fileRDD.filter(line => line.contains("am")) grepRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[42] at filter at <console>:29 scala> grepRDD.foreach(println) I am going I am learning