Sunday 30 October 2016

Control Structures : If Else Expressions : Day 6 Learnings

The Scala built-in control structures are if, while, for, try, and match expressions. Scala’s built-in control structures are sufficient to provide features that their imperative equivalents provide, but because all of Scala’s control structures (except while loops) result in some value, these control structures support functional approach as well.

If..Else Expression Blocks 


The If..Else conditional expression is a classic programming construct for choosing a branch of code based on whether an expression resolves to true or false


In many languages this takes the form of an “if .. else if .. else” block, which starts with an “if,” continues with zero to many else if ” sections, and ends with a final “else” catch-all statement.

As a matter of practice you can write these same “if .. else if .. else” blocks in Scala and they will work just as you have experienced them in Java and other languages. As a matter of formal syntax, however, Scala only supports a single “if ” and optional “else” block, and does not recognise the “else if ” block as a single construct.

So how do “else if ” blocks still work correctly in Scala? Because “if .. else” blocks are based on expression blocks, and expression blocks can be easily nested, an “if .. else if ..else” expression is equivalent to a nested “if .. else { if .. else }” expression. 

Logically this is exactly the same as an “if .. else if .. else” block, and as a matter of syntax Scala recognises the second “if else” as a nested expression of the outer “if .. else” block.

Let’s start exploring actual “if ” and “if .. else” blocks by looking at the syntax for the simple “if ” block.

If Expressions 


Syntax: Using an If Expression

if (<Boolean expression>) <expression>


The term Boolean expression here indicates an expression that will return either true or false .

Here is a simple if block that prints a notice if the Boolean expression is true:

scala> if ( 47 % 3 > 0 ) println("Not a multiple of 3")
Not a multiple of 3 


Of course 47 isn’t a multiple of 3, so the Boolean expression was true and the println was triggered. 

Although an if block can act as an expression, it is better suited for statements like this one. The problem with using if blocks as expressions is that they only conditionally return a value. If the Boolean expression returns false, what do you expect the if block to return? 

scala> val result = if ( false ) "what does this return?" 
result: Any = ()

The type of the result value in this example is unspecified so the compiler used type inference to determine the most appropriate type. Either a String or Unit could have been returned, so the compiler chose the root class Any . This is the one class common to both String (which extends AnyRef ) and to Unit (which extends AnyVal ). Unlike the solitary “if ” block, the “if .. else” block is well suited to working with expressions.


If-Else Expressions 


Syntax: If .. Else Expressions 

if (<Boolean expression>) <expression>
else <expression>


Here is an example: 


scala> val x = 10; val y = 20
x: Int = 10
y: Int = 20

scala> val max = if (x > y) x else y
max: Int = 20


You can see that the x and y values make up the entirety of the if and else expressions.  The resulting value is assigned to max , which we and the Scala compiler know will be an Int because both expressions have return values of type Int .

Note: Some wonder why Scala doesn’t have a ternary expression (popular in C and Java) where the punctuation characters ? and : act as a one-line if and else expression. It should be clear from this example that Scala doesn’t really need it because its if and else blocks can fit compactly on a single line (and, unlike in C and Java, they are already an expression).

Using a single expression without an expression block in if..else expressions works well if everything fits on one line. When your if..else expression doesn’t easily fit on a single line, however, consider using expression blocks to make your code more readable. if expressions without an else should always use curly braces, because they tend to be statements that create side effects.

if..else blocks are a simple and common way to write conditional logic. There are other, more elegant ways to do so in Scala, however, using match expressions.


Note: Sample examples on if..else

If Expression in Scala
scala> if (exp) println("yes")

Multi-line If Expression

scala> if (exp) {
     |  println("Line one")
     |  println("Line two")
     | }

Multi-line Else Expression

scala> if (exp) {
     |  println("Hello")
     |  } else {
     |  println("Line one")
     |  println("Line two")
     | }

Multiple If-Else and Else-If Expression
scala> if (exp1) {

     |  println("Line one")
     |  println("Line two")
     | } else if (exp2) {
     |  println("Line one")
     |  println("Line two")
     | } else if (exp3) {
     |  println("Line one")
     |  println("Line two")
     | } else {
     |  println("Line one")
     |  println("Line two")
     | }



Thursday 27 October 2016

SPARK BASICS DAY 2 Practice on 28 Oct 2016

Spark Day 2 Practice:
==============================================


To work with Spark Sql + Hive :
----------------------------------------------------------
1. copy "hive-site.xml" from "$HIVE_HOME/conf" folder to "$SPARK_HOME/conf"

2. copy "mysq connector jar" from "$HIVE_HOME/lib" folder to "$SPARK_HOME/lib

----------------------------------------------------------
hive (kalyan)> select course, count(*) from kalyan.student group by course;

OK
cse 7
mech 7
Time taken: 22.567 seconds, Fetched: 2 row(s)

----------------------------------------------------------
sqlContext.sql("select course, count(*) from kalyan.student group by course").show()



scala> sqlContext.sql("select course, count(*) from kalyan.student group by course").show()
+------+---+
|course|_c1|
+------+---+
|  mech|  7|
|   cse|  7|
+------+---+


scala> sqlContext.sql("select year, count(*) from kalyan.student group by year").show()
+----+---+
|year|_c1|
+----+---+
|   1|  7|
|   2|  7|
+----+---+


----------------------------------------------------------

scala> sqlContext.sql("select course, count(*) from kalyan.student group by course")
res7: org.apache.spark.sql.DataFrame = [course: string, _c1: bigint]


hive (kalyan)> describe kalyan.student;
OK
name                 string               student name        
id                   int                 student id          
course               string               student course      
year                 int                 student year        
Time taken: 0.349 seconds, Fetched: 4 row(s)


scala> sqlContext.sql("select * from kalyan.student")
res8: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]


scala> val df = sqlContext.sql("select * from kalyan.student")
df: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]

scala> df.show
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|  arun|  1|   cse|   1|
| sunil|  2|   cse|   1|
|   raj|  3|   cse|   1|
|naveen|  4|   cse|   1|
| venki|  5|   cse|   2|
|prasad|  6|   cse|   2|
| sudha|  7|   cse|   2|
|  ravi|  1|  mech|   1|
|  raju|  2|  mech|   1|
|  roja|  3|  mech|   1|
|  anil|  4|  mech|   2|
|  rani|  5|  mech|   2|
|anvith|  6|  mech|   2|
| madhu|  7|  mech|   2|
+------+---+------+----+


scala> df.select("name","id").show
+------+---+
|  name| id|
+------+---+
|  arun|  1|
| sunil|  2|
|   raj|  3|
|naveen|  4|
| venki|  5|
|prasad|  6|
| sudha|  7|
|  ravi|  1|
|  raju|  2|
|  roja|  3|
|  anil|  4|
|  rani|  5|
|anvith|  6|
| madhu|  7|
+------+---+


scala> df.filter(df("year") > 1).show
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
| venki|  5|   cse|   2|
|prasad|  6|   cse|   2|
| sudha|  7|   cse|   2|
|  anil|  4|  mech|   2|
|  rani|  5|  mech|   2|
|anvith|  6|  mech|   2|
| madhu|  7|  mech|   2|
+------+---+------+----+


----------------------------------------------------------

scala> df.filter(df("year") > 1).filter(df("id") === 5).show
+-----+---+------+----+
| name| id|course|year|
+-----+---+------+----+
|venki|  5|   cse|   2|
| rani|  5|  mech|   2|
+-----+---+------+----+


scala> df.filter(df("year") > 1).where(df("id") === 5).show
+-----+---+------+----+
| name| id|course|year|
+-----+---+------+----+
|venki|  5|   cse|   2|
| rani|  5|  mech|   2|
+-----+---+------+----+


scala> df.filter(df("year") > 1).where(df("id") === 5).select("name","id")show
warning: there were 1 feature warning(s); re-run with -feature for details
+-----+---+
| name| id|
+-----+---+
|venki|  5|
| rani|  5|
+-----+---+


scala> val gd = df.groupBy(df("year"), df("course"))
gd: org.apache.spark.sql.GroupedData = org.apache.spark.sql.GroupedData@cab0abd

scala> gd.count.show
+----+------+-----+
|year|course|count|
+----+------+-----+
|   1|  mech|    3|
|   1|   cse|    4|
|   2|  mech|    4|
|   2|   cse|    3|
+----+------+-----+




----------------------------------------------------------


scala> df.registerTempTable("student")

scala> sqlContext.sql("select * from student")
res36: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]

scala> sqlContext.sql("select * from student").show
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|  arun|  1|   cse|   1|
| sunil|  2|   cse|   1|
|   raj|  3|   cse|   1|
|naveen|  4|   cse|   1|
| venki|  5|   cse|   2|
|prasad|  6|   cse|   2|
| sudha|  7|   cse|   2|
|  ravi|  1|  mech|   1|
|  raju|  2|  mech|   1|
|  roja|  3|  mech|   1|
|  anil|  4|  mech|   2|
|  rani|  5|  mech|   2|
|anvith|  6|  mech|   2|
| madhu|  7|  mech|   2|
+------+---+------+----+


scala> sqlContext.sql("select year, course, count(*) from student group by year, course").show
+----+------+---+
|year|course|_c2|
+----+------+---+
|   1|  mech|  3|
|   1|   cse|  4|
|   2|  mech|  4|
|   2|   cse|  3|
+----+------+---+

----------------------------------------------------------

val prop = new java.util.Properties
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("user","root")
prop.setProperty("password","hadoop")

val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/kalyan", "student", prop)

----------------------------------------------------------

scala> val jdbcDF = sqlContext.read.jdbc("jdbc:mysql://localhost:3306/kalyan", "student", prop)
jdbcDF: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]


val hiveDF = sqlContext.sql("select * from kalyan.student")



scala> val hiveDF = sqlContext.sql("select * from kalyan.student")
hiveDF: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]

scala> hiveDF.show
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|  arun|  1|   cse|   1|
| sunil|  2|   cse|   1|
|   raj|  3|   cse|   1|
|naveen|  4|   cse|   1|
| venki|  5|   cse|   2|
|prasad|  6|   cse|   2|
| sudha|  7|   cse|   2|
|  ravi|  1|  mech|   1|
|  raju|  2|  mech|   1|
|  roja|  3|  mech|   1|
|  anil|  4|  mech|   2|
|  rani|  5|  mech|   2|
|anvith|  6|  mech|   2|
| madhu|  7|  mech|   2|
+------+---+------+----+

----------------------------------------------------------

val jsonDF = sqlContext.read.json("file:///home/orienit/spark/input/student.json")

val parquetDF = sqlContext.read.parquet("file:///home/orienit/spark/input/student.parquet")


----------------------------------------------------------

scala> val jsonDF = sqlContext.read.json("file:///home/orienit/spark/input/student.json")
jsonDF: org.apache.spark.sql.DataFrame = [course: string, id: bigint, name: string, year: bigint]

scala> jsonDF.show
+------+---+------+----+
|course| id|  name|year|
+------+---+------+----+
| spark|  1|  anil|2016|
|hadoop|  5|anvith|2015|
|hadoop|  6|   dev|2015|
| spark|  3|   raj|2016|
|hadoop|  4| sunil|2015|
| spark|  2|venkat|2016|
+------+---+------+----+


----------------------------------------------------------


scala> val parquetDF = sqlContext.read.parquet("file:///home/orienit/spark/input/student.parquet")
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
parquetDF: org.apache.spark.sql.DataFrame = [name: string, id: int, course: string, year: int]

scala> parquetDF.show
+------+---+------+----+
|  name| id|course|year|
+------+---+------+----+
|  anil|  1| spark|2016|
|anvith|  5|hadoop|2015|
|   dev|  6|hadoop|2015|
|   raj|  3| spark|2016|
| sunil|  4|hadoop|2015|
|venkat|  2| spark|2016|
+------+---+------+----+



----------------------------------------------------------

From RDMBS => jdbcDF => create table => jdbcstudent
From HIVE => hiveDF => create table => hivestudent
From JSON => jsonDF => create table => jsonstudent
From PARQUET => parquetDF => create table => parquetstudent

jdbcDF.registerTempTable("jdbcstudent")
hiveDF.registerTempTable("hivestudent")
jsonDF.registerTempTable("jsonstudent")
parquetDF.registerTempTable("parquetstudent")


val query1 = "select jdbcstudent.*, hivestudent.* from jdbcstudent join hivestudent on jdbcstudent.name == hivestudent.name"

sqlContext.sql(query1).show


scala> val query1 = "select jdbcstudent.*, hivestudent.* from jdbcstudent join hivestudent on jdbcstudent.name == hivestudent.name"
query1: String = select jdbcstudent.*, hivestudent.* from jdbcstudent join hivestudent on jdbcstudent.name == hivestudent.name

scala> sqlContext.sql(query1).show
[Stage 61:===========================================>          (161 + 4                                                                        +------+---+------+----+------+---+------+----+
|  name| id|course|year|  name| id|course|year|
+------+---+------+----+------+---+------+----+
|   raj|  3| spark|2016|   raj|  3|   cse|   1|
|anvith|  5|hadoop|2015|anvith|  6|  mech|   2|
| sunil|  4|hadoop|2015| sunil|  2|   cse|   1|
|  anil|  1| spark|2016|  anil|  4|  mech|   2|
+------+---+------+----+------+---+------+----+


----------------------------------------------------------

val query2 = "select jsonstudent.*, parquetstudent.* from jsonstudent join parquetstudent on jsonstudent.name == parquetstudent.name"

sqlContext.sql(query2).show


----------------------------------------------------------

scala> val query2 = "select jsonstudent.*, parquetstudent.* from jsonstudent join parquetstudent on jsonstudent.name == parquetstudent.name"
query2: String = select jsonstudent.*, parquetstudent.* from jsonstudent join parquetstudent on jsonstudent.name == parquetstudent.name

scala> 

scala> sqlContext.sql(query2).show
+------+---+------+----+------+---+------+----+
|course| id|  name|year|  name| id|course|year|
+------+---+------+----+------+---+------+----+
| spark|  1|  anil|2016|  anil|  1| spark|2016|
|hadoop|  5|anvith|2015|anvith|  5|hadoop|2015|
|hadoop|  6|   dev|2015|   dev|  6|hadoop|2015|
| spark|  3|   raj|2016|   raj|  3| spark|2016|
|hadoop|  4| sunil|2015| sunil|  4|hadoop|2015|
| spark|  2|venkat|2016|venkat|  2| spark|2016|
+------+---+------+----+------+---+------+----+




----------------------------------------------------------


SPARK BASICS DAY 1 Practice on 27 Oct 2016

Spark Day 1 Practice:
==============================================


SparkContext (sc) => Main point of contact in spark

RDD => Resilient Distributed Dataset

RDD Features:
------------------
-> Immutable
-> Lazy evaluated
-> Cacheable
-> Type inferred


list <- (1,2,3,4)


RDD operations:
--------------------
Transformation:
---------------
old rdd => new rdd

f(x) -> { x + 1 }
f(list) <- (2,3,4,5)


f(x) -> { x * x }
f(list) <- (1,4,9,16)


Action:
---------------
rdd => result

min(list) -> 1
max(list) -> 4
sum(list) -> 10
avg(list) -> 2.5



Spark Supports Scala, Java, Python and R 
-------------------------------------------
Scala + Spark => spark-shell
Python + Spark => pyspark
R + Spark => SparkR

Spark with Scala examples:
--------------------------------------------
Create RDD in 2 ways:
------------------------
1. collection ( list / set / seq / ..)

2. data sets (text / csv / tsv / json / ...)


scala> val rdd1 = sc.parallelize(List(1,2,3,4))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27


Transformations on RDD:
----------------------------

scala> rdd1.foreach(println)
3
4
2
1

scala> rdd1.map(x =>  x + 1).foreach(println)
5
4
2
3

scala> rdd1.map(x =>  x + 1).foreach(println)
2
4
5
3


How to get the number of partitions:
---------------------------------------
scala> rdd1.getNumPartitions
res7: Int = 4

scala> rdd1.partitions.length
res8: Int = 4


Create a RDD with specific number of partitions:
----------------------------------------------
scala> val rdd2 = sc.parallelize(List(1,2,3,4), 2)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:27

scala> rdd2.getNumPartitions
res9: Int = 2

scala> rdd2.partitions.length
res10: Int = 2



Use collect to preserve the order:
-----------------------------------------
Note: Don't use collect in Production Environment (Large Data Sets)

scala> rdd2.collect.foreach(println)
1
2
3
4

scala> rdd2.collect.map(x =>  x + 1).foreach(println)
2
3
4
5

scala> rdd2.collect.map(x =>  x * x).foreach(println)
1
4
9
16

scala> rdd2.collect.filter(x =>  x % 2 == 0).foreach(println)
2
4


Actions on RDD:
----------------------------
scala> rdd2.min
res25: Int = 1

scala> rdd2.max
res26: Int = 4

scala> rdd2.sum
res27: Double = 10.0

scala> rdd2.count
res28: Long = 4



Create a RDD from Text File:
---------------------------------
scala> val fileRdd = sc.textFile("file:///home/orienit/work/input/demoinput")
fileRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at textFile at <console>:27

scala> fileRdd.getNumPartitions
res29: Int = 2

scala> fileRdd.partitions.length
res30: Int = 2


Create a RDD from Text File with specific partition number:
-------------------------------------------------------------
scala> val fileRdd = sc.textFile("file:///home/orienit/work/input/demoinput", 1)
fileRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[15] at textFile at <console>:27

scala> fileRdd.getNumPartitions
res31: Int = 1

scala> fileRdd.partitions.length
res32: Int = 1


Word Count Problem in Spark
---------------------------------------
scala> val fileRdd = sc.textFile("file:///home/orienit/work/input/demoinput", 1)
fileRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[17] at textFile at <console>:27

scala> fileRdd.collect.foreach(println)
I am going
to hyd
I am learning
hadoop course

scala> fileRdd.flatMap(line => line).collect.foreach(println)
scala> fileRdd.flatMap(line => line.split("")).collect.foreach(println)

I

a
m

g
o
i
n
g
.....



scala> fileRdd.flatMap(line => line.split(" ")).collect.foreach(println) 
I
am
going
to
hyd
I
am
learning
hadoop
course


scala> val wordsRdd = fileRdd.flatMap(line => line.split(" "))
wordsRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[21] at flatMap at <console>:29

scala> wordsRdd.collect.foreach(println)
I
am
going
to
hyd
I
am
learning
hadoop
course



scala> val countsRdd = wordsRdd.map(word => (word,1))
countsRdd: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[22] at map at <console>:31

scala> countsRdd.collect.foreach(println)
(I,1)
(am,1)
(going,1)
(to,1)
(hyd,1)
(I,1)
(am,1)
(learning,1)
(hadoop,1)
(course,1)


scala> val wordCountRdd = countsRdd.reduceByKey( _ + _)
wordCountRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[23] at reduceByKey at <console>:33

scala> val wordCountRdd = countsRdd.reduceByKey((a,b) => a + b)
wordCountRdd: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[24] at reduceByKey at <console>:33

scala> wordCountRdd.collect.foreach(println)
(learning,1)
(hadoop,1)
(am,2)
(hyd,1)
(I,2)
(to,1)
(going,1)
(course,1)


Solutions:
-------------------

Solution1:
-------------
val fileRdd = sc.textFile("file:///home/orienit/work/input/demoinput", 1)

val wordsRdd = fileRdd.flatMap(line => line.split(" "))

val countsRdd = wordsRdd.map(word => (word,1))

val wordCountRdd = countsRdd.reduceByKey( _ + _)

wordCountRdd.saveAsTextFile("file:///home/orienit/work/output/wordcount-op")


Solution2:
-------------
val fileRdd = sc.textFile("file:///home/orienit/work/input/demoinput", 1)

val wordCountRdd = fileRdd.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey( _ + _)

wordCountRdd.saveAsTextFile("file:///home/orienit/work/output/wordcount-op")



Solution3:
-------------

sc.textFile("file:///home/orienit/work/input/demoinput", 1).flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey( _ + _).saveAsTextFile("file:///home/orienit/work/output/wordcount-op")




Grep Job using Spark:
------------------------------------
val fileRdd = sc.textFile("file:///home/orienit/work/input/demoinput", 1)

val filterRdd = fileRdd.filter(line => line.contains("am"))

filterRdd.saveAsTextFile("file:///home/orienit/work/output/grep-op")



Sed Job using Spark:
------------------------------------
val fileRdd = sc.textFile("file:///home/orienit/work/input/demoinput", 1)

val mapRdd = fileRdd.map(line => line.replace("am", "at"))

mapRdd.saveAsTextFile("file:///home/orienit/work/output/sed-op")




Wednesday 26 October 2016

Twitter Data Sentiment Analysis Using Pig

Pre-Requisites of Twitter Data + Pig + Sentiment Analysis Project:

hadoop-2.6.0
pig-0.15.0
java-1.7

NOTE: Make sure that install all the above components

Twitter Data + Pig + Sentiment Analysis Project Download Links:

`hadoop-2.6.0.tar.gz` ==> link
`pig-0.15.0.tar.gz` ==> link
`sentimentanalysis-pig.jar` ==> link
`tweets` ==> link

-----------------------------------------------------------------------------

1. Create `sentimentanalysis` folder in your machine

command: mkdir ~/sentimentanalysis




2. Download sample tweets or Download twitter data using flume to do Sentiment Analysis and copy to '~/sentimentanalysis' folder

Note: Download sample tweets link




Example: Sample Tweets

i am learning hadoop course
i am good in hadoop
i am learning hadoop
i am not feeling well
why we need bigdata
i am not happy with rdbms
ravi is not working today
india got the world cup
learn hadoop from kalyan blog
learn spark from kalyan blog


3. verify using cat command

command: cat ~/sentimentanalysis/tweets




4. start the hadoop using below command

command: start-all.sh





5. verify is running or not using "jps" command




6. Open browser using below url

http://localhost:50070/dfshealth.jsp




7. Load the sample tweets into HDFS

hadoop fs -mkdir -p /kalyan/sentimentanalysis/pig/input








hadoop fs -put ~/sentimentanalysis/tweets /kalyan/sentimentanalysis/pig/input











8. start the pig either local mode or mapreduce mode

command: pig -x mapreduce




9. Load the sample tweets in pig `tweets` bag

tweets = load '/kalyan/sentimentanalysis/pig/input' AS (tweet : chararray);




10. Display the data in pig `tweets` bag

dump tweets;




11. Download `sentimentanalysis-pig.jar` file and copy to '~/sentimentanalysis' folder

Note: Download sentimentanalysis-pig.jar link





12. Load the `sentimentanalysis-pig.jar` into HDFS

hadoop fs -put ~/sentimentanalysis/sentimentanalysis-pig.jar /kalyan/sentimentanalysis/pig









13. Add `sentimentanalysis-pig.jar` file into hive class path using below command

REGISTER <PATH OF THE JAR FILE>;

REGISTER hdfs://localhost:8020/kalyan/sentimentanalysis/pig/sentimentanalysis-pig.jar;






14. Define the sentiment function in pig

DEFINE <function name> 'UDF CLASS NAME WITH PACKAGE'

DEFINE sentiment com.orienit.kalyan.sentimentanalysis.pig.udf.SentimentUdf();




15. Analyse the tweets using sentiment function using below commands

sentimenttweets1 = FOREACH tweets GENERATE tweet, sentiment(tweet) as sentiment;




16. Display the data in `sentimenttweets1` bag in pig

dump sentimenttweets1;





17. Store the `sentimenttweets1` result into hdfs folder

STORE sentimenttweets1 INTO '/kalyan/sentimentanalysis/pig/sentimenttweets1';







18. Analyse the `tweets` from `sentimenttweets1` bag using case statement

sentimenttweets2 = FOREACH sentimenttweets1 GENERATE tweet, (
CASE
WHEN sentiment == 1 THEN 'positive'
WHEN sentiment == 0 THEN 'neutral'
WHEN sentiment == -1 THEN 'negative'
END
);




19. Store the `sentimenttweets2` result into hdfs folder

STORE sentimenttweets2 INTO '/kalyan/sentimentanalysis/pig/sentimenttweets2';









Related Posts Plugin for WordPress, Blogger...