Python, MPI and Sun Grid Engine

Really you need to go here
Do not apt-get install openmpi-bin without reading this first

To see whether your Open MPI installation has been configured to use Sun Grid Engine:

ompi_info | grep gridengine
MCA ras: gridengine (MCA v2.0, API v2.0, Component v1.3)
./configure --with-sge
make
make install

Do the straightforward virtualenv setup

sudo apt-get install python-virtualenv
virtualenv example
cd example
source bin/activate
pip install numpy
pip install cython

Installing hdf5 with mpi

Install hdf5 from source to ~/install if necessary – the package should be OK

wget http://www.hdfgroup.org/ftp/HDF5/current/src/hdf5-1.8.13.tar.gz
tar zxvf hdf5-1.8.13.tar.gz
cd hdf5-1.8.13
export CC=/usr/local/bin/mpicc
mkdir ~/install
./configure --prefix=/home/${USER}/install --enable-parallel --enable-shared
make
#make test
make install
#If you want to...
export PATH=/home/${USER}/install/bin:${PATH}
export LD_LIBRARY_PATH=/home/${USER}/install/lib:${LD_LIBRARY_PATH}
export CC=/usr/local/bin/mpicc
pip install mpi4py
pip install h5py --install-option="--mpi"
#If hdf5 is installed in your home directory add --hdf5=/home/${USER}/install to the --install-option

SGE configuration

http://docs.oracle.com/cd/E19923-01/820-6793-10/ExecutingBatchPrograms.html is quite useful but a number of the commands are wrong…

Before you can run parallel jobs, make sure that you have defined the parallel environment and queue before running the job.
To see queues

qconf -spl

To define a new parallel environment

qconf -ap mpi_pe

To look at the config of a pr

qconf -sp mpi_pe

The value of control_slaves must be TRUE; otherwise, qrsh exits with an error message.

The value of job_is_first_task must be FALSE or the job launcher consumes a slot. In other words, mpirun itself will count as one of the slots and the job will fail, because only n-1 processes will start.
The allocation_rule must be either $fill_up or $round_robin or only one host will be used.

You can look at the remote execution parameters using

qconf -sconf
qconf -aq mpi.q
qconf -mattr queue pe_list "mpi_pe" mpi.q

Checking and running jobs

The program demo.py – note order of imports. This tests the use of h5py in an MPI environment so may be more complex than you need.

from mpi4py import MPI
import h5py

rank = MPI.COMM_WORLD.rank  # The process ID (integer 0-3 for 4-process run)

f = h5py.File('parallel_test.hdf5', 'w', driver='mpio', comm=MPI.COMM_WORLD)
#f.atomic = True

dset = f.create_dataset('test', (MPI.COMM_WORLD.Get_size(),), dtype='i')
dset[rank] = rank

grp = f.create_group("subgroup")
dset2 = grp.create_dataset('host',(MPI.COMM_WORLD.Get_size(),), dtype='S10')
dset2[rank] = MPI.Get_processor_name()

f.close()

The command file

source mpi/bin/activate
mpiexec --prefix /usr/local python demo.py

Submitting the job

qsub -cwd -S /bin/bash -pe mpi_pe 2 runq.sh 


Checking mpiexec

mpiexec --prefix /usr/local -n 4 -host oscar,november ~/temp/mpi4py-1.3.1/run.sh
#!/bin/bash
(cd
source mpi/bin/activate
cd ~/temp/mpi4py-1.3.1/
python demo/helloworld.py
)

To avoid extracting mpi4py/demo/helloworld.py

#!/usr/bin/env python
"""
Parallel Hello World
"""

from mpi4py import MPI
import sys

size = MPI.COMM_WORLD.Get_size()
rank = MPI.COMM_WORLD.Get_rank()
name = MPI.Get_processor_name()

sys.stdout.write(
    "Hello, World! I am process %d of %d on %s.n"
    % (rank, size, name))

Troubleshooting

If you get Host key verification failed. make sure that you can ssh to all the nodes configured for the queue (server1 is not the same as server1.example.org)

Use NFSv4 – if you use v3 then you will get the following message:

File locking failed in ADIOI_Set_lock(fd 13,cmd F_SETLKW/7,type F_WRLCK/1,whence 0) with return value FFFFFFFF and errno 5.
- If the file system is NFS, you need to use NFS version 3, ensure that the lockd daemon is running on all the machines, and mount the directory with the 'noac' option (no attribute caching).
- If the file system is LUSTRE, ensure that the directory is mounted with the 'flock' option.
ADIOI_Set_lock:: Input/output error
ADIOI_Set_lock:offset 2164, length 4
File locking failed in ADIOI_Set_lock(fd 12,cmd F_SETLKW/7,type F_WRLCK/1,whence 0) with return value FFFFFFFF and errno 5.
- If the file system is NFS, you need to use NFS version 3, ensure that the lockd daemon is running on all the machines, and mount the directory with the 'noac' option (no attribute caching).
- If the file system is LUSTRE, ensure that the directory is mounted with the 'flock' option.
ADIOI_Set_lock:: Input/output error
ADIOI_Set_lock:offset 2160, length 4
[hostname][[54842,1],3][btl_tcp_endpoint.c:459:mca_btl_tcp_endpoint_recv_blocking] recv(17) failed: Connection reset by peer (104)
[hostname][[54842,1],2][btl_tcp_endpoint.c:459:mca_btl_tcp_endpoint_recv_blocking] recv(15) failed: Connection reset by peer (104)

Using

The basic idea is to split the work into chunks and then combine the results. You can see from the demo.py above that if you are using h5py then writing your results out is handled transparently, which is nice.

Scatter(v)/Gather(v)

https://www.cac.cornell.edu/ranger/MPIcc/scatterv.aspx
https://github.com/jbornschein/mpi4py-examples/blob/master/03-scatter-gather
http://stackoverflow.com/questions/12812422/how-can-i-send-part-of-an-array-with-scatter/12815841#12815841

The v variant is used if you cannot break the data into equally sized blocks.

Barrier blocks until all processes have called it.

Getting results from all workers – this will return an array [ worker_data from rank 0, worker_data from rank 1, … ]

worker_data = ....
comm.Barrier()

all_data = comm.gather(worker_data, root = 0)
if (rank == 0):
    #all_data contains the results

BCast/Reduce

BCast sends data from one process to all others
Reduce combines data from all process

Send/Recv

This is probably easier to understand than scatter/gather but you are doing extra work.

There are two obvious strategies available.

Create a results variable of the right dimensions and fill it in as each worker completes:

comm = MPI.COMM_WORLD
rank = comm.rank
size = comm.size

#Very crude e.g. if total_size not a multiple of size
total_size = 20
chunk_size = total_size / ((size - 1))

if rank == 0:
    all_data = np.zeros((total_size, 4), dtype='i4')
    num_workers = size - 1
    closed_workers = 0
    while closed_workers < num_workers:
        data = np.zeros((chunk_size, 4), dtype='i4')
        x = MPI.Status()
        comm.Recv(data, source=MPI.ANY_SOURCE,tag = MPI.ANY_TAG, status = x)
        source = x.Get_source()
        tag = x.Get_tag()
        insert_point = ((tag - 1) * chunk_size)
        all_data[insert_point:insert_point+chunk_size] = data
        closed_workers += 1

Wait for each worker to complete in turn and append to the results

AC_TAG = 99 
if rank == 0:
   for i in range(size-1):
       data = np.zeros((chunk_size, 4), dtype='i4')
       comm.Recv(data, source=i+1,tag = AC_TAG)
       if i == 0:
         all_data = data
       else:
         all_data = np.concatenate((all_data,data))

Just as an example here we are expecting the data to be a numpy 2d array but it could be anything and could just be created once with np.empty as the contents will be overwritten.

The key difference to notice is the value of the source and tag parameters to comm.Recv this needs to be matched by the corresponding parameter to comm.Send i.e. tag = rank for the first example, tag = AC_TAG for the second
e.g. comm.Send(ac, dest=0,tag = rank)
Your use of tag and source may vary…

Input data

There are again different ways to do this – either have the rank 0 do all the reading and use Send/Recv to send the data to be processed or let each worker get it’s own data.

Evaluation

MPI.Wtime() can be used to get the elapsed time between two points in a program

Leave a Reply

Your email address will not be published. Required fields are marked *