Getting started with Hadoop 2.3.0

Googling will get you instructions for the old version so here are some notes for 2.3.0

Note that there appears to be quite a difference with version 2 although it is supposed to be mostly compatible

You should read the whole post before charging off and trying any of this stuff as you might not want to start at the beginning!

References
http://codesfusion.blogspot.co.uk/2013/10/setup-hadoop-2x-220-on-ubuntu.html
which has a script at: https://github.com/ericduq/hadoop-scripts – this is good but needs changes around the downloading of the hadoop file – be careful if you run it more than once

Changes from the blog (not necessary if using the script)

in ~/.bashrc
export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64

sudo ssh hduser@localhost -i /home/hduser/.ssh/id_rsa

If start-dfs.sh gives errors
try

hdfs getconf -namenodes

If you see the following:

OpenJDK 64-Bit Server VM warning: You have loaded library /usr/local/hadoop/lib/native/libhadoop.so.1.0.0 which might have disabled stack guard. The VM will try to fix the stack guard now.
It's highly recommended that you fix the library with 'execstack -c ', or link it with '-z noexecstack'.
14/03/13 15:27:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Try the following in /usr/local/hadoop/etc/hadoop/hadoop-env.sh

export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_INSTALL/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_INSTALL/lib"
export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64

(Although this should work in ~/.bashrc it appears not to)

Using files in HDFS

#Create a directory and copy a file to and fro
hadoop fs -mkdir -p /user/hduser
hadoop fs -copyFromLocal someFile.txt someFile.txt

hadoop fs –copyToLocal /user/hduser/someFile.txt someFile2.txt

#Get a directory listing of the user’s home directory in HDFS

hadoop fs –ls

#Display the contents of the HDFS file /user/hduser/someFile.txt
hadoop fs –cat /user/hduser/someFile.txt

#Delete the file
hadoop fs –rm someFile.txt

Doing something

Context is everything so what am I trying to do?

I am working with VCF (Variant Call Format) files which are used to hold genetic information – I won’t go into details as it’s not very relevant here.

VCF is a text file format. It contains meta-information lines, a header line, and then data lines each containing information about a position in the genome.

Hadoop itself is written in Java so the natural choice for interacting with it is to use a Java client and while there is a VCF reader in GATK (see http://plindenbaum.blogspot.fr/2012/11/readingwriting-vcf-file-with-gatk-api.html) it is more common to use python.

Tutorials in Data-Intensive Computing gives some great, if incomplete at this time, advice on using Hadoop Streaming together with pyvcf (there’s some nice stuff on using Hadoop on a more traditional cluster as well which is an alternative to the methods described above)

Pydoop provides an alternative to Streaming via hadoop pipes but seems not to have quite caught up with the current state of play.

Another possibility is to use Jython to translate the python into java see here

One nice thing about using Streaming is that it’s fairly easy to do a comparison between a Hadoop implementation and a traditional implementation.

So here are some numbers (using the parsevcf.py from the Data-Intensive Computing tutorial)

Create the header file

parsevcf.py -b data.vcf > header.txt

Pipes

date;$(which python) $PWD/parsevcf.py -m $PWD/header.txt,0.30 < data.vcf |
$(which python) $PWD/parsevcf.py -r > out;date

Hadoop (Single node on the same computer)

hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.3.0.jar -mapper "$(which python) $PWD/parsevcf.py -m $PWD/header.txt,0.30" -reducer "$(which python) $PWD/parsevcf.py -r" -input $PWD/vcfparse/data.vcf -output $PWD/vcfparse/output

The output files contain the same data however the rows are held in a different order.

When running on a cluster we’ll need to use the -combiner option and -file to ship the scripts to the cluster.

The MapReduce framework orders the keys in the output, if you’re doing this in Java you will get an iterator for each key but, obviously, not when you’re streaming.

Running locally with a 1020M test input file seems to indicate a good speed up (~2 mins vs ~6 mins) so now I’ve tried it with a relatively small file it’s time to scale up a bit to a 12G file and moving to an 8 processor VM (slower disk) – not an ideal test machine but it’s what I’ve got easily to hand and is better than using my desktop where there are other things going on.

Results

You can look at some basic statistics via http://localhost:8088/

Note that it does take a while to copy the file to/from the Hadoop file system which is not included here

Number of splits: 93

method Map Jobs Reduce Jobs Time
pipes N/A N/A 2 hours 46 mins 17 secs
Single Node Default Default 49 mins 29 secs
Single Node 4 4  1 hr 14 mins 51 secs
Single Node 6 2 1 hr 6 secs
Single Node 2 6 1 hr 13 mins 25 secs

An example using streaming, Map/Reduce with a tab based input file

Assuming you’ve got everything set up

Start your engines

If necessary

start-dfs.sh
start-yarn.sh

dfs is the file system

yarn is the job scheduler

Copy your input file to the dfs


hadoop fs -mkdir -p /user/hduser
hadoop fs -copyFromLocal someFile.txt data
hadoop fs -ls -h

The task

The aim is to calculate the variant density using a particular window on the genome.

This is a slightly more complex version of the classic “hello world” of hadoop – the word count.

Input data

The input file is a tab delimited file containing one line for each variant 28G, over 95,000,000 lines.

We are interested in the chromosome, position and whether the PASS filter has been applied.

The program

First we need to work out which column contains the PASS filter – awk is quite helpful to check this

head -1 data | awk -F\t '{print $18}'

(Remember awk counts from 1 not 0)

The mapper

For the mapper we will build a key/value pair for each line – the key is a combination of the chromosome and bucket (1kb window) and the value a count and whether it passes/fails (we don’t really need the count…)

#!/usr/bin/env python

import sys

for line in sys.stdin:
    cells = line.split('t')
    chrom = cells[0]
    pos = cells[1]
    pass_filter = None
    if (cells[17] == "True"):
      pass_filter = True
    if (cells[17] == "False"):
      pass_filter = False
    if (pass_filter is not None):
      bucket = int(int(pos)/1000)
      point = (bucket + 1) * (1000 / 2)
      print ("%s-%dt1-%s" % (chrom, point, str(pass_filter)))

 

You can easily test this on the command line using pipes e.g.

head -5 data | python mapper.py

The reducer

The reducer takes the output from the mapper and merges it according to the key

Test again using pipes

 

import sys

last_key = None
running_total = 0
passes = 0

for input_line in sys.stdin:
    input_line = input_line.strip()
    this_key, value = input_line.split("t", 1)
    variant, pass_filter = value.split('-')
    if last_key == this_key:
        running_total += int(variant)
        if (pass_filter == "True"):
          passes = passes + 1
    else:
        if last_key:
            chrom, pos = last_key.split('-')
            print( "%st%st%dt%d" % (chrom, pos, running_total, passes) )
        running_total = int(variant)
        if (pass_filter == "True"):
          passes = 1
        else:
          passes = 0
        last_key = this_key

if last_key == this_key:
    chrom, pos = last_key.split('-')
    print( "%st%st%dt%d" % (chrom, pos, running_total, passes) )

 

 

head -5 data | python mapper.py | python reducer.py

 

Running

Note the mapper and reducer scripts are on the local file system and the input and output files on the hfs

hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.3.0.jar -mapper "$(which python) $PWD/mapper.py" -reducer "$(which python) $PWD/reducer.py" -input data -output output

Copy the output back to the local file system

hadoop fs -copyToLocal output

If you want to sort the output then the following command does a nice job

sort -V output/part-00000

Don’t forget to clean up after yourself


hadoop fs -rm -r output
hadoop fs -rm data

Now we’ve got the job running we can look at start to make it go faster. The first thing to try is to increase the number of tasks – I’m using an 8 processor VM so I’ll try 4 of each to start with (the property names for doing this have changed)

hadoop fs -rm -r output
hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.3.0.jar -D mapreduce.job.reduces=4 -D mapreduce.job.maps=4 -mapper "$(which python) $PWD/mapper.py" -reducer "$(which python) $PWD/reducer.py" -input data -output output

Looking at the output I can see

INFO mapreduce.JobSubmitter: number of splits:224

This seems to indicate that I could usefully go up to 224(*2) jobs if I had enough cores free

Results

method Map Jobs Reduce Jobs Time
pipes N/A N/A 31 mins 31 secs
Single Node Default Default 39 mins 6 secs
Single Node 4 4 28 mins 18 secs
Single Node 5 3 33 mins 1 secs
Single Node 3 5 31 mins 8 secs
Single Node 5 5 28 mins 40 secs
Single Node 6 2 49 mins 55 secs

Conclusion

From these brief experiments it looks like there is no point using the Map/Reduce framework for trivial tasks even on large files.

A more positive result is that it looks like there may well be some advantage for more complex tasks and this merits some further investigation as I’ve only scratched the surface here.

Some things to look at are:

The output won’t be in the same order as the input so if this is important Hadoop streaming has Comparator and Partitioner to help sort results from the map to the reduce
You can decide to split the map outputs based on certain key fields, not the whole keys see the Hadoop Partioner Class
See docs for 1.2.1 here

How do I generate output files with gzip format?

Instead of plain text files, you can generate gzip files as your generated output. Pass ‘-D mapred.output.compress=true -D mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec’ as option to your streaming job.

How to use a compressed input file

Leave a Reply

Your email address will not be published. Required fields are marked *