Python, MPI and Sun Grid Engine

Really you need to go here
Do not apt-get install openmpi-bin without reading this first

To see whether your Open MPI installation has been configured to use Sun Grid Engine:

ompi_info | grep gridengine
MCA ras: gridengine (MCA v2.0, API v2.0, Component v1.3)
./configure --with-sge
make
make install

Do the straightforward virtualenv setup

sudo apt-get install python-virtualenv
virtualenv example
cd example
source bin/activate
pip install numpy
pip install cython

Installing hdf5 with mpi

Install hdf5 from source to ~/install if necessary – the package should be OK

wget http://www.hdfgroup.org/ftp/HDF5/current/src/hdf5-1.8.13.tar.gz
tar zxvf hdf5-1.8.13.tar.gz
cd hdf5-1.8.13
export CC=/usr/local/bin/mpicc
mkdir ~/install
./configure --prefix=/home/${USER}/install --enable-parallel --enable-shared
make
#make test
make install
#If you want to...
export PATH=/home/${USER}/install/bin:${PATH}
export LD_LIBRARY_PATH=/home/${USER}/install/lib:${LD_LIBRARY_PATH}
export CC=/usr/local/bin/mpicc
pip install mpi4py
pip install h5py --install-option="--mpi"
#If hdf5 is installed in your home directory add --hdf5=/home/${USER}/install to the --install-option

SGE configuration

http://docs.oracle.com/cd/E19923-01/820-6793-10/ExecutingBatchPrograms.html is quite useful but a number of the commands are wrong…

Before you can run parallel jobs, make sure that you have defined the parallel environment and queue before running the job.
To see queues

qconf -spl

To define a new parallel environment

qconf -ap mpi_pe

To look at the config of a pr

qconf -sp mpi_pe

The value of control_slaves must be TRUE; otherwise, qrsh exits with an error message.

The value of job_is_first_task must be FALSE or the job launcher consumes a slot. In other words, mpirun itself will count as one of the slots and the job will fail, because only n-1 processes will start.
The allocation_rule must be either $fill_up or $round_robin or only one host will be used.

You can look at the remote execution parameters using

qconf -sconf
qconf -aq mpi.q
qconf -mattr queue pe_list "mpi_pe" mpi.q

Checking and running jobs

The program demo.py – note order of imports. This tests the use of h5py in an MPI environment so may be more complex than you need.

from mpi4py import MPI
import h5py

rank = MPI.COMM_WORLD.rank  # The process ID (integer 0-3 for 4-process run)

f = h5py.File('parallel_test.hdf5', 'w', driver='mpio', comm=MPI.COMM_WORLD)
#f.atomic = True

dset = f.create_dataset('test', (MPI.COMM_WORLD.Get_size(),), dtype='i')
dset[rank] = rank

grp = f.create_group("subgroup")
dset2 = grp.create_dataset('host',(MPI.COMM_WORLD.Get_size(),), dtype='S10')
dset2[rank] = MPI.Get_processor_name()

f.close()

The command file

source mpi/bin/activate
mpiexec --prefix /usr/local python demo.py

Submitting the job

qsub -cwd -S /bin/bash -pe mpi_pe 2 runq.sh 


Checking mpiexec

mpiexec --prefix /usr/local -n 4 -host oscar,november ~/temp/mpi4py-1.3.1/run.sh
#!/bin/bash
(cd
source mpi/bin/activate
cd ~/temp/mpi4py-1.3.1/
python demo/helloworld.py
)

To avoid extracting mpi4py/demo/helloworld.py

#!/usr/bin/env python
"""
Parallel Hello World
"""

from mpi4py import MPI
import sys

size = MPI.COMM_WORLD.Get_size()
rank = MPI.COMM_WORLD.Get_rank()
name = MPI.Get_processor_name()

sys.stdout.write(
    "Hello, World! I am process %d of %d on %s.n"
    % (rank, size, name))

Troubleshooting

If you get Host key verification failed. make sure that you can ssh to all the nodes configured for the queue (server1 is not the same as server1.example.org)

Use NFSv4 – if you use v3 then you will get the following message:

File locking failed in ADIOI_Set_lock(fd 13,cmd F_SETLKW/7,type F_WRLCK/1,whence 0) with return value FFFFFFFF and errno 5.
- If the file system is NFS, you need to use NFS version 3, ensure that the lockd daemon is running on all the machines, and mount the directory with the 'noac' option (no attribute caching).
- If the file system is LUSTRE, ensure that the directory is mounted with the 'flock' option.
ADIOI_Set_lock:: Input/output error
ADIOI_Set_lock:offset 2164, length 4
File locking failed in ADIOI_Set_lock(fd 12,cmd F_SETLKW/7,type F_WRLCK/1,whence 0) with return value FFFFFFFF and errno 5.
- If the file system is NFS, you need to use NFS version 3, ensure that the lockd daemon is running on all the machines, and mount the directory with the 'noac' option (no attribute caching).
- If the file system is LUSTRE, ensure that the directory is mounted with the 'flock' option.
ADIOI_Set_lock:: Input/output error
ADIOI_Set_lock:offset 2160, length 4
[hostname][[54842,1],3][btl_tcp_endpoint.c:459:mca_btl_tcp_endpoint_recv_blocking] recv(17) failed: Connection reset by peer (104)
[hostname][[54842,1],2][btl_tcp_endpoint.c:459:mca_btl_tcp_endpoint_recv_blocking] recv(15) failed: Connection reset by peer (104)

Using

The basic idea is to split the work into chunks and then combine the results. You can see from the demo.py above that if you are using h5py then writing your results out is handled transparently, which is nice.

Scatter(v)/Gather(v)

https://www.cac.cornell.edu/ranger/MPIcc/scatterv.aspx
https://github.com/jbornschein/mpi4py-examples/blob/master/03-scatter-gather
http://stackoverflow.com/questions/12812422/how-can-i-send-part-of-an-array-with-scatter/12815841#12815841

The v variant is used if you cannot break the data into equally sized blocks.

Barrier blocks until all processes have called it.

Getting results from all workers – this will return an array [ worker_data from rank 0, worker_data from rank 1, … ]

worker_data = ....
comm.Barrier()

all_data = comm.gather(worker_data, root = 0)
if (rank == 0):
    #all_data contains the results

BCast/Reduce

BCast sends data from one process to all others
Reduce combines data from all process

Send/Recv

This is probably easier to understand than scatter/gather but you are doing extra work.

There are two obvious strategies available.

Create a results variable of the right dimensions and fill it in as each worker completes:

comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size

#Very crude e.g. if total_size not a multiple of size
total_size = 20
chunk_size = total_size / ((size - 1))

if rank == 0:
    all_data = np.zeros((total_size, 4), dtype='i4')
    num_workers = size - 1
    closed_workers = 0
    while closed_workers < num_workers:
        data = np.zeros((chunk_size, 4), dtype='i4')
        x = MPI.Status()
        comm.Recv(data, source=MPI.ANY_SOURCE,tag = MPI.ANY_TAG, status = x)
        source = x.Get_source()
        tag = x.Get_tag()
        insert_point = ((tag - 1) * chunk_size)
        all_data[insert_point:insert_point+chunk_size] = data
        closed_workers += 1

Wait for each worker to complete in turn and append to the results

AC_TAG = 99 
if rank == 0:
   for i in range(size-1):
       data = np.zeros((chunk_size, 4), dtype='i4')
       comm.Recv(data, source=i+1,tag = AC_TAG)
       if i == 0:
         all_data = data
       else:
         all_data = np.concatenate((all_data,data))

Just as an example here we are expecting the data to be a numpy 2d array but it could be anything and could just be created once with np.empty as the contents will be overwritten.

The key difference to notice is the value of the source and tag parameters to comm.Recv this needs to be matched by the corresponding parameter to comm.Send i.e. tag = rank for the first example, tag = AC_TAG for the second
e.g. comm.Send(ac, dest=0,tag = rank)
Your use of tag and source may vary…

Input data

There are again different ways to do this – either have the rank 0 do all the reading and use Send/Recv to send the data to be processed or let each worker get it’s own data.

Evaluation

MPI.Wtime() can be used to get the elapsed time between two points in a program

Getting started with Hadoop 2.3.0

Googling will get you instructions for the old version so here are some notes for 2.3.0

Note that there appears to be quite a difference with version 2 although it is supposed to be mostly compatible

You should read the whole post before charging off and trying any of this stuff as you might not want to start at the beginning!

References
http://codesfusion.blogspot.co.uk/2013/10/setup-hadoop-2x-220-on-ubuntu.html
which has a script at: https://github.com/ericduq/hadoop-scripts – this is good but needs changes around the downloading of the hadoop file – be careful if you run it more than once

Changes from the blog (not necessary if using the script)

in ~/.bashrc
export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64

sudo ssh hduser@localhost -i /home/hduser/.ssh/id_rsa

If start-dfs.sh gives errors
try

hdfs getconf -namenodes

If you see the following:

OpenJDK 64-Bit Server VM warning: You have loaded library /usr/local/hadoop/lib/native/libhadoop.so.1.0.0 which might have disabled stack guard. The VM will try to fix the stack guard now.
It's highly recommended that you fix the library with 'execstack -c ', or link it with '-z noexecstack'.
14/03/13 15:27:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Try the following in /usr/local/hadoop/etc/hadoop/hadoop-env.sh

export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_INSTALL/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_INSTALL/lib"
export JAVA_HOME=/usr/lib/jvm/java-7-openjdk-amd64

(Although this should work in ~/.bashrc it appears not to)

Using files in HDFS

#Create a directory and copy a file to and fro
hadoop fs -mkdir -p /user/hduser
hadoop fs -copyFromLocal someFile.txt someFile.txt

hadoop fs –copyToLocal /user/hduser/someFile.txt someFile2.txt

#Get a directory listing of the user’s home directory in HDFS

hadoop fs –ls

#Display the contents of the HDFS file /user/hduser/someFile.txt
hadoop fs –cat /user/hduser/someFile.txt

#Delete the file
hadoop fs –rm someFile.txt

Doing something

Context is everything so what am I trying to do?

I am working with VCF (Variant Call Format) files which are used to hold genetic information – I won’t go into details as it’s not very relevant here.

VCF is a text file format. It contains meta-information lines, a header line, and then data lines each containing information about a position in the genome.

Hadoop itself is written in Java so the natural choice for interacting with it is to use a Java client and while there is a VCF reader in GATK (see http://plindenbaum.blogspot.fr/2012/11/readingwriting-vcf-file-with-gatk-api.html) it is more common to use python.

Tutorials in Data-Intensive Computing gives some great, if incomplete at this time, advice on using Hadoop Streaming together with pyvcf (there’s some nice stuff on using Hadoop on a more traditional cluster as well which is an alternative to the methods described above)

Pydoop provides an alternative to Streaming via hadoop pipes but seems not to have quite caught up with the current state of play.

Another possibility is to use Jython to translate the python into java see here

One nice thing about using Streaming is that it’s fairly easy to do a comparison between a Hadoop implementation and a traditional implementation.

So here are some numbers (using the parsevcf.py from the Data-Intensive Computing tutorial)

Create the header file

parsevcf.py -b data.vcf > header.txt

Pipes

date;$(which python) $PWD/parsevcf.py -m $PWD/header.txt,0.30 < data.vcf |
$(which python) $PWD/parsevcf.py -r > out;date

Hadoop (Single node on the same computer)

hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.3.0.jar -mapper "$(which python) $PWD/parsevcf.py -m $PWD/header.txt,0.30" -reducer "$(which python) $PWD/parsevcf.py -r" -input $PWD/vcfparse/data.vcf -output $PWD/vcfparse/output

The output files contain the same data however the rows are held in a different order.

When running on a cluster we’ll need to use the -combiner option and -file to ship the scripts to the cluster.

The MapReduce framework orders the keys in the output, if you’re doing this in Java you will get an iterator for each key but, obviously, not when you’re streaming.

Running locally with a 1020M test input file seems to indicate a good speed up (~2 mins vs ~6 mins) so now I’ve tried it with a relatively small file it’s time to scale up a bit to a 12G file and moving to an 8 processor VM (slower disk) – not an ideal test machine but it’s what I’ve got easily to hand and is better than using my desktop where there are other things going on.

Results

You can look at some basic statistics via http://localhost:8088/

Note that it does take a while to copy the file to/from the Hadoop file system which is not included here

Number of splits: 93

method Map Jobs Reduce Jobs Time
pipes N/A N/A 2 hours 46 mins 17 secs
Single Node Default Default 49 mins 29 secs
Single Node 4 4  1 hr 14 mins 51 secs
Single Node 6 2 1 hr 6 secs
Single Node 2 6 1 hr 13 mins 25 secs

An example using streaming, Map/Reduce with a tab based input file

Assuming you’ve got everything set up

Start your engines

If necessary

start-dfs.sh
start-yarn.sh

dfs is the file system

yarn is the job scheduler

Copy your input file to the dfs


hadoop fs -mkdir -p /user/hduser
hadoop fs -copyFromLocal someFile.txt data
hadoop fs -ls -h

The task

The aim is to calculate the variant density using a particular window on the genome.

This is a slightly more complex version of the classic “hello world” of hadoop – the word count.

Input data

The input file is a tab delimited file containing one line for each variant 28G, over 95,000,000 lines.

We are interested in the chromosome, position and whether the PASS filter has been applied.

The program

First we need to work out which column contains the PASS filter – awk is quite helpful to check this

head -1 data | awk -F\t '{print $18}'

(Remember awk counts from 1 not 0)

The mapper

For the mapper we will build a key/value pair for each line – the key is a combination of the chromosome and bucket (1kb window) and the value a count and whether it passes/fails (we don’t really need the count…)

#!/usr/bin/env python

import sys

for line in sys.stdin:
    cells = line.split('t')
    chrom = cells[0]
    pos = cells[1]
    pass_filter = None
    if (cells[17] == "True"):
      pass_filter = True
    if (cells[17] == "False"):
      pass_filter = False
    if (pass_filter is not None):
      bucket = int(int(pos)/1000)
      point = (bucket + 1) * (1000 / 2)
      print ("%s-%dt1-%s" % (chrom, point, str(pass_filter)))

 

You can easily test this on the command line using pipes e.g.

head -5 data | python mapper.py

The reducer

The reducer takes the output from the mapper and merges it according to the key

Test again using pipes

 

import sys

last_key = None
running_total = 0
passes = 0

for input_line in sys.stdin:
    input_line = input_line.strip()
    this_key, value = input_line.split("t", 1)
    variant, pass_filter = value.split('-')
    if last_key == this_key:
        running_total += int(variant)
        if (pass_filter == "True"):
          passes = passes + 1
    else:
        if last_key:
            chrom, pos = last_key.split('-')
            print( "%st%st%dt%d" % (chrom, pos, running_total, passes) )
        running_total = int(variant)
        if (pass_filter == "True"):
          passes = 1
        else:
          passes = 0
        last_key = this_key

if last_key == this_key:
    chrom, pos = last_key.split('-')
    print( "%st%st%dt%d" % (chrom, pos, running_total, passes) )

 

 

head -5 data | python mapper.py | python reducer.py

 

Running

Note the mapper and reducer scripts are on the local file system and the input and output files on the hfs

hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.3.0.jar -mapper "$(which python) $PWD/mapper.py" -reducer "$(which python) $PWD/reducer.py" -input data -output output

Copy the output back to the local file system

hadoop fs -copyToLocal output

If you want to sort the output then the following command does a nice job

sort -V output/part-00000

Don’t forget to clean up after yourself


hadoop fs -rm -r output
hadoop fs -rm data

Now we’ve got the job running we can look at start to make it go faster. The first thing to try is to increase the number of tasks – I’m using an 8 processor VM so I’ll try 4 of each to start with (the property names for doing this have changed)

hadoop fs -rm -r output
hadoop jar /usr/local/hadoop/share/hadoop/tools/lib/hadoop-streaming-2.3.0.jar -D mapreduce.job.reduces=4 -D mapreduce.job.maps=4 -mapper "$(which python) $PWD/mapper.py" -reducer "$(which python) $PWD/reducer.py" -input data -output output

Looking at the output I can see

INFO mapreduce.JobSubmitter: number of splits:224

This seems to indicate that I could usefully go up to 224(*2) jobs if I had enough cores free

Results

method Map Jobs Reduce Jobs Time
pipes N/A N/A 31 mins 31 secs
Single Node Default Default 39 mins 6 secs
Single Node 4 4 28 mins 18 secs
Single Node 5 3 33 mins 1 secs
Single Node 3 5 31 mins 8 secs
Single Node 5 5 28 mins 40 secs
Single Node 6 2 49 mins 55 secs

Conclusion

From these brief experiments it looks like there is no point using the Map/Reduce framework for trivial tasks even on large files.

A more positive result is that it looks like there may well be some advantage for more complex tasks and this merits some further investigation as I’ve only scratched the surface here.

Some things to look at are:

The output won’t be in the same order as the input so if this is important Hadoop streaming has Comparator and Partitioner to help sort results from the map to the reduce
You can decide to split the map outputs based on certain key fields, not the whole keys see the Hadoop Partioner Class
See docs for 1.2.1 here

How do I generate output files with gzip format?

Instead of plain text files, you can generate gzip files as your generated output. Pass ‘-D mapred.output.compress=true -D mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec’ as option to your streaming job.

How to use a compressed input file

Rookie python

When running in a virtualenv don’t forget to source bin/activate

Install packages using pip e.g. pip install beautifulsoup4

Create a REQUIREMENTS file using pip freeze and load using pip install -r REQUIREMENTS

Creating a module you need __init__.py in the directory in order to import successfully otherwise you’ll get AttributeError: 'module' object has no attribute 'xxxx'