Wordcount Example With Apache Spark

mkdir wordcount-spark
cd wordcount-spark

mkdir -p src/main/scala

cat <<EOF > build.sbt
name := "wordcount"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
EOF

touch src/main/scala/SparkWordCount.scala

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf

object SparkWordCount {
def main(args: Array[String]) {
// create Spark context with Spark configuration
val sc = new SparkContext(new SparkConf().setAppName("Spark Count"))

// get threshold
val threshold = args(1).toInt

// read in text file and split each document into words
val tokenized = sc.textFile(args(0)).flatMap(_.split(" "))

// count the occurrence of each word
val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _)

// filter out words with fewer than threshold occurrences
val filtered = wordCounts.filter(_._2 >= threshold)

// count characters
val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).reduceByKey(_ + _)

System.out.println(charCounts.collect().mkString(", "))
}
}

sbt package

cat <<EOF > /tmp/wordcount.txt
Hello world, Hello
EOF

cp target/scala-2.11/workcount_2.11-1.0.jar /tmp/

cd $SPARK_HOME
./bin/spark-submit --master "local[*]" --class SparkWordCount /tmp/wordcount_2.11-1.0.jar /tmp/wordcount.txt 1

Apache Sqoop + Mysql + Hadoop


wget http://www-us.apache.org/dist/sqoop/1.99.6/sqoop-1.99.6-bin-hadoop200.tar.gz

tar -xvf sqoop-1.99.6-bin-hadoop200.tar.gz

cd sqoop-1.99.6-bin-hadoop200

vim server/conf/catalina.properties

common.loader=${catalina.base}/lib,${catalina.base}/lib/*.jar,${catalina.home}/lib,${catalina.home}/lib/*.jar,${catalina.home}/../lib/*.jar,/opt/hadoop-2.6.3/share/hadoop/common/*.jar,/opt/hadoop-2.6.3/share/hadoop/common/lib/*.jar,/opt/hadoop-2.6.3/share/hadoop/hdfs/*.jar,/opt/hadoop-2.6.3/share/hadoop/hdfs/lib/*.jar,/opt/hadoop-2.6.3/share/hadoop/mapreduce/*.jar,/opt/hadoop-2.6.3/share/hadoop/mapreduce/lib/*.jar,/opt/hadoop-2.6.3/share/hadoop/yarn/*.jar,/opt/hadoop-2.6.3/share/hadoop/yarn/lib/*.jar,/usr/lib/hive/lib/*.jar

mkdir lib

vim server/conf/sqoop.properties
org.apache.sqoop.submission.engine.mapreduce.configuration.directory=/opt/hadoop-2.6.3/etc/hadoop

./bin/sqoop2-tool verify
...
Verification was successful.

./bin/sqoop2-server start

$HADOOP_HOME/sbin/start-all.sh

sudo apt-get install mysql-server

mysql -p -uroot -e "create database testdb;"

mysql -p -uroot -e "CREATE TABLE testdb.exampletable (id INT,data VARCHAR(100));"

mysql -p -uroot -e "INSERT INTO testdb.exampletable(id,data) VALUES(1,'test data');"

./bin/sqoop2-shell

sqoop:000> create link -c 1
Creating link for connector with id 1
0 [main] WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Please fill following values to create new link object
Name: mysql

Link configuration

JDBC Driver Class: com.mysql.jdbc.Driver
JDBC Connection String: jdbc:mysql://localhost/testdb
Username: root
Password: ****
JDBC Connection Properties:
There are currently 0 values in the map:
entry# protocol=tcp
There are currently 1 values in the map:
protocol = tcp
entry#
New link was successfully created with validation status OK and persistent id 1

sqoop:000> create link -c 3
Creating link for connector with id 3
Please fill following values to create new link object
Name: hdfs

Link configuration

HDFS URI: hdfs://localhost:8020/
Hadoop conf directory: /opt/hadoop-2.6.3/etc/hadoop
New link was successfully created with validation status OK and persistent id 2

sqoop:000> create job -f 1 -t 2
Creating job for links with from id 1 and to id 2
Please fill following values to create new job object
Name: sqoopy

From database configuration

Schema name: testdb
Table name: exampletable
Table SQL statement:
Table column names: id,data
Partition column name: id
Null value allowed for the partition column:
Boundary query:

Incremental read

Check column:
Last value:

To HDFS configuration

Override null value:
Null value:
Output format:
0 : TEXT_FILE
1 : SEQUENCE_FILE
Choose: 0
Compression format:
0 : NONE
1 : DEFAULT
2 : DEFLATE
3 : GZIP
4 : BZIP2
5 : LZO
6 : LZ4
7 : SNAPPY
8 : CUSTOM
Choose: 0
Custom compression format:
Output directory: sqoop
Append mode:

Throttling resources

Extractors:
Loaders:
New job was successfully created with validation status OK and persistent id 1

sqoop:000> start job -j 1

$HADOOP_HOME/bin/hadoop dfs -cat /user//sqoop/ca29e7ba-1155-4614-8529-4e11b0260170.txt

Run Wordcount Mapreduce Example with YARN

// download hadoop-2.6.3.tar.gz
tar -xvf ~/Downloads/hadoop-2.6.3.tar.gz -C ~/opt/
cd ~/opt/hadoop-2.6.3
ssh-keygen
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

mkdir -p /disk1/hdfs/name
mkdir -p /remote/hdfs/name
mkdir -p /disk1/hdfs/data
mkdir -p /disk2/hdfs/data
mkdir -p /disk1/hdfs/namesecondary
mkdir -p /disk2/hdfs/namesecondary
mkdir -p /disk1/nm-local-dir
mkdir -p /disk2/nm-local-dir

vim etc/hadoop/core-site.xml

<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost/</value>
</property>
</configuration>

vim etc/hadoop/hdfs-site.xml
<configuration>
<property>
<name>dfs.namenode.name.dir</name>
<value>/disk1/hdfs/name,/remote/hdfs/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/disk1/hdfs/data,/disk2/hdfs/data</value>
</property>
<property> <name>dfs.namenode.checkpoint.dir</name>
<value>/disk1/hdfs/namesecondary,/disk2/hdfs/namesecondary</value>
</property>
</configuration>

vim etc/hadoop/yarn-site.xml

<configuration>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>localhost</value>
</property>
<property>
<name>yarn.nodemanager.local-dirs</name>
<value>/disk1/nm-local-dir,/disk2/nm-local-dir</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>16384</value>
</property>
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>16</value>
</property>
<property>
<name>yarn.scheduler.capacity.maximum-am-resource-percent</name>
<value>100</value>
</property>

<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>1228</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>9830</value>
</property>
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>9830</value>
</property>

</configuration>

cp etc/hadoop/mapred-site.xml.template etc/hadoop/mapred-site.xml

vim etc/hadoop/mapred-site.xml

<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>

<property>
<name>yarn.app.mapreduce.am.resource.mb</name>
<value>1228</value>
</property>
<property>
<name>yarn.app.mapreduce.am.command-opts</name>
<value>-Xmx983m</value>
</property>
<property>
<name>mapreduce.map.memory.mb</name>
<value>1228</value>
</property>
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>1228</value>
</property>
<property>
<name>mapreduce.map.java.opts</name>
<value>-Xmx983m</value>
</property>
<property>
<name>mapreduce.reduce.java.opts</name>
<value>-Xmx983m</value>
</property>

</configuration>

vim etc/hadoop/hadoop-env.sh

export JAVA_HOME=/opt/jdk

./bin/hdfs namenode -format mycluster
./sbin/start-dfs.sh
./sbin/start-yarn.sh
./bin/hadoop dfs -mkdir -p hdfs://localhost/wc/in
echo “Hello world, Hello” > /tmp/input.txt
./bin/hadoop dfs -copyFromLocal /tmp/inout.txt hdfs://localhost/wc/in
./bin/hadoop jar ./share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.3.jar wordcount hdfs://localhost/wc/in hdfs://localhost/wc/out

Install Apache Tez 0.7.0

// install jdk 1.7
// install hadoop 2.7
// install maven 3.3
// install npm and bower

wget https://github.com/google/protobuf/releases/download/v2.5.0/protobuf-2.5.0.tar.gz

tar -xvf protobuf-2.5.0.tar.gz

cd protobuf-2.5.0/

sudo apt-get install g++
sudo ./configure
sudo make
sudo make check
sudo make install
sudo ldconfig
cd ../

wget http://apache.lauf-forum.at/tez/0.7.0/apache-tez-0.7.0-src.tar.gz

tar -xvf apache-tez-0.7.0-src.tar.gz

cd apache-tez-0.7.0-src

$HADOOP_HOME/bin/hadoop version
Hadoop 2.7.1

// update hadoop.version in pom.xml to 2.7.1 and protobuf.version to 2.5.0

mvn clean package -DskipTests=true -Dmaven.javadoc.skip=true

$HADOOP_HOME/sbin/start-dfs.sh

$HADOOP_HOME/bin/hadoop dfs -mkdir -p hdfs://localhost/apps/tez

$HADOOP_HOME/bin/hadoop dfs -copyFromLocal tez-dist/target/tez-0.7.0.tar.gz hdfs://localhost/apps/tez

mkdir conf
vim conf/tez-site.xml

<?xml version=”1.0″ encoding=”UTF-8″?>
<?xml-stylesheet type=”text/xsl” href=”configuration.xsl”?>
<configuration>
<property>
<name>tez.lib.uris</name>
<value>${fs.defaultFS}/apps/tez/tez-0.7.0.tar.gz</value>
</property>
</configuration>

export TEZ_HOME=$(pwd)
export TEZ_JARS=$TEZ_HOME/tez-dist/target/tez-0.7.0
export TEZ_CONF_DIR=$TEZ_HOME/conf
export HADOOP_CLASSPATH=$TEZ_CONF_DIR:$TEZ_JARS/*:$TEZ_JARS/lib/*:$HADOOP_CLASSPATH

mkdir in
mkdir out

vim in/test.txt
Hello World!

rm $TEZ_HOME/tez-dist/target/tez-0.7.0/lib/slf4j-log4j12-1.7.5.jar
$HADOOP_HOME/sbin/stop-dfs.sh
$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/stop-yarn.sh
$HADOOP_HOME/sbin/start-yarn.sh

$HADOOP_HOME/bin/hadoop dfs -mkdir -p hdfs://localhost/apps/in
$HADOOP_HOME/bin/hadoop dfs -copyFromLocal in/test.txt hdfs://localhost/apps/in

$HADOOP_HOME/bin/hadoop jar $TEZ_HOME/tez-examples/target/tez-examples-0.7.0.jar orderedwordcount hdfs://localhost/apps/in hdfs://localhost/apps/out

Unit Test for WrodCount MapReduce


import java.io.IOException;
import java.util.Arrays;

import org.apache.hadoop.io.*;
import org.apache.hadoop.mrunit.mapreduce.MapDriver;
import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
import org.junit.*;

public class WordCountTest {

@Test
public void mapperTest() throws IOException, InterruptedException {
Text value = new Text("Hello");
new MapDriver().withMapper(new TokenizerMapper())
.withInput(new IntWritable(), value).withOutput(new Text("Hello"), new IntWritable(1)).runTest();
}

@Test
public void reducerTest() throws IOException, InterruptedException {
new ReduceDriver().withReducer(new IntSumReducer())
.withInput(new Text("Hello"), Arrays.asList(new IntWritable(1), new IntWritable(1)))
.withOutput(new Text("Hello"), new IntWritable(2)).runTest();
}
}

Hcatalog + Pig


$DERBY_HOME/bin/startNetworkServer

$HADOOP_HOME/sbin$ ./start-all.sh

$HIVE_HOME/bin/hiveserver2

$HIVE_HOME/bin/hive --service metastore

$HADOOP_HOME/bin/hadoop fs -mkdir hdfs://localhost/hcatalog-example

$HADOOP_HOME/bin/hadoop fs -put /tmp/test-dataset.csv hdfs://localhost/hcatalog-example

$HADOOP_HOME/bin/hadoop fs -cat hdfs://localhost/hcatalog-example/test-dataset.csv | head -n 4

playerID,yearID,gameNum,gameID,teamID,lgID,GP,startingPos
aaronha01,1955,0,NLS195507120,ML1,NL,1,
aaronha01,1956,0,ALS195607100,ML1,NL,1,
aaronha01,1957,0,NLS195707090,ML1,NL,1,9

./hcat -e "CREATE TABLE default.gamedataset (playerID STRING,yearID INT,gameNum INT ,gameID STRING ,teamID STRING ,lgID STRING ,GP INT,startingPosts INT) PARTITIONED BY (country STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';"

./hcat -e "alter table default.gamedataset add partition (country='DE') location '/hcatalog-example/'"

vim $PIG_HOME/conf/pig.properties

pig.load.default.statements=/opt/pig-0.15.0/.pigbootup

vim $PIG_HOME/.pigbootup

REGISTER /opt/apache-hive-1.2.1-bin/hcatalog/share/hcatalog/hcatalog-core-1.2.1.jar;
REGISTER /opt/apache-hive-1.2.1-bin/lib/hive-exec-1.2.1.jar;
REGISTER /opt/apache-hive-1.2.1-bin/lib/hive-metastore-1.2.1.jar;

vim $HOME/.bashrc

export PIG_OPTS=-Dhive.metastore.uris=thrift://localhost:9083
export PIG_CLASSPATH=$HCAT_HOME/share/hcatalog/*:$HIVE_HOME/lib/*

$PIG_HOME/bin/pig -useHCatalog

A = load 'default.gamedataset' using org.apache.hive.hcatalog.pig.HCatLoader();
dump A;

Install Apache Hive


tar -xvf db-derby-10.11.1.1-bin.tar.gz
tar -xvf apache-hive-1.2.1-bin.tar.gz
vim ~/.bashrc

export HADOOP_HOME=/opt/hadoop-2.6.2
export DERBY_HOME=/opt/db-derby-10.11.1.1-bin
export PATH=$PATH:$DERBY_HOME/bin
export CLASSPATH=$CLASSPATH:$DERBY_HOME/lib/derby.jar:$DERBY_HOME/lib/derbytools.jar
export HIVE_HOME=/opt/apache-hive-1.2.1-bin

mkdir $DERBY_HOME/data
cp $HIVE_HOME/conf/hive-default.xml.template $HIVE_HOME/conf/hive-site.xml
vim $HIVE_HOME/conf/hive-site.xml

< configuration>
<property>
<name>javax.jdo.option.ConnectionURL</name>
<value>jdbc:derby://localhost:1527/metastore_db;create=true</value>
<description>JDBC connect string for a JDBC metastore</description>
</property>
</configuration>

vim $HIVE_HOME/conf/jpox.properties

javax.jdo.PersistenceManagerFactoryClass =
org.jpox.PersistenceManagerFactoryImpl
org.jpox.autoCreateSchema = false
org.jpox.validateTables = false
org.jpox.validateColumns = false
org.jpox.validateConstraints = false
org.jpox.storeManagerType = rdbms
org.jpox.autoCreateSchema = true
org.jpox.autoStartMechanismMode = checked
org.jpox.transactionIsolation = read_committed
javax.jdo.option.DetachAllOnCommit = true
javax.jdo.option.NontransactionalRead = true
javax.jdo.option.ConnectionDriverName = org.apache.derby.jdbc.ClientDriver
javax.jdo.option.ConnectionURL = jdbc:derby://localhost:1527/metastore_db;create = true
javax.jdo.option.ConnectionUserName = APP
javax.jdo.option.ConnectionPassword = mine

$DERBY_HOME/bin/startNetworkServer

$HADOOP_HOME/sbin$ ./start-all.sh

$HADOOP_HOME/bin/hadoop fs -mkdir /tmp
$HADOOP_HOME/bin/hadoop fs -mkdir /user/hive/warehouse
$HADOOP_HOME/bin/hadoop fs -chmod g+w /tmp
$HADOOP_HOME/bin/hadoop fs -chmod g+w /user/hive/warehouse

cp $DERBY_HOME/lib/derbyclient.jar $HIVE_HOME/lib/

$HIVE_HOME/bin/hiveserver2

$HIVE_HOME/bin/hive --service metastore

$HIVE_HOME/bin$ ./beeline
Beeline version 1.2.1 by Apache Hive
beeline: !connect jdbc:hive2://localhost:10000/default "hive" ""
Connecting to jdbc:hive2://localhost:10000/default
Connected to: Apache Hive (version 1.2.1)
Driver: Hive JDBC (version 1.2.1)
Transaction isolation: TRANSACTION_REPEATABLE_READ
0: jdbc:hive2://localhost:10000/default: