Really you need to go here
Do not apt-get install openmpi-bin without reading this first
To see whether your Open MPI installation has been configured to use Sun Grid Engine:
ompi_info | grep gridengine MCA ras: gridengine (MCA v2.0, API v2.0, Component v1.3)
./configure --with-sge make make install
Do the straightforward virtualenv setup
sudo apt-get install python-virtualenv virtualenv example cd example source bin/activate pip install numpy pip install cython
Installing hdf5 with mpi
Install hdf5 from source to ~/install if necessary – the package should be OK
wget http://www.hdfgroup.org/ftp/HDF5/current/src/hdf5-1.8.13.tar.gz tar zxvf hdf5-1.8.13.tar.gz cd hdf5-1.8.13 export CC=/usr/local/bin/mpicc mkdir ~/install ./configure --prefix=/home/${USER}/install --enable-parallel --enable-shared make #make test make install #If you want to... export PATH=/home/${USER}/install/bin:${PATH} export LD_LIBRARY_PATH=/home/${USER}/install/lib:${LD_LIBRARY_PATH}
export CC=/usr/local/bin/mpicc pip install mpi4py pip install h5py --install-option="--mpi" #If hdf5 is installed in your home directory add --hdf5=/home/${USER}/install to the --install-option
SGE configuration
http://docs.oracle.com/cd/E19923-01/820-6793-10/ExecutingBatchPrograms.html is quite useful but a number of the commands are wrong…
Before you can run parallel jobs, make sure that you have defined the parallel environment and queue before running the job.
To see queues
qconf -spl
To define a new parallel environment
qconf -ap mpi_pe
To look at the config of a pr
qconf -sp mpi_pe
The value of control_slaves must be TRUE; otherwise, qrsh exits with an error message.
The value of job_is_first_task must be FALSE or the job launcher consumes a slot. In other words, mpirun itself will count as one of the slots and the job will fail, because only n-1 processes will start.
The allocation_rule must be either $fill_up or $round_robin or only one host will be used.
You can look at the remote execution parameters using
qconf -sconf
qconf -aq mpi.q qconf -mattr queue pe_list "mpi_pe" mpi.q
Checking and running jobs
The program demo.py – note order of imports. This tests the use of h5py in an MPI environment so may be more complex than you need.
from mpi4py import MPI import h5py rank = MPI.COMM_WORLD.rank # The process ID (integer 0-3 for 4-process run) f = h5py.File('parallel_test.hdf5', 'w', driver='mpio', comm=MPI.COMM_WORLD) #f.atomic = True dset = f.create_dataset('test', (MPI.COMM_WORLD.Get_size(),), dtype='i') dset[rank] = rank grp = f.create_group("subgroup") dset2 = grp.create_dataset('host',(MPI.COMM_WORLD.Get_size(),), dtype='S10') dset2[rank] = MPI.Get_processor_name() f.close()
The command file
source mpi/bin/activate mpiexec --prefix /usr/local python demo.py
Submitting the job
qsub -cwd -S /bin/bash -pe mpi_pe 2 runq.sh
Checking mpiexec
mpiexec --prefix /usr/local -n 4 -host oscar,november ~/temp/mpi4py-1.3.1/run.sh
#!/bin/bash (cd source mpi/bin/activate cd ~/temp/mpi4py-1.3.1/ python demo/helloworld.py )
To avoid extracting mpi4py/demo/helloworld.py
#!/usr/bin/env python """ Parallel Hello World """ from mpi4py import MPI import sys size = MPI.COMM_WORLD.Get_size() rank = MPI.COMM_WORLD.Get_rank() name = MPI.Get_processor_name() sys.stdout.write( "Hello, World! I am process %d of %d on %s.n" % (rank, size, name))
Troubleshooting
If you get Host key verification failed.
make sure that you can ssh to all the nodes configured for the queue (server1 is not the same as server1.example.org)
Use NFSv4 – if you use v3 then you will get the following message:
File locking failed in ADIOI_Set_lock(fd 13,cmd F_SETLKW/7,type F_WRLCK/1,whence 0) with return value FFFFFFFF and errno 5. - If the file system is NFS, you need to use NFS version 3, ensure that the lockd daemon is running on all the machines, and mount the directory with the 'noac' option (no attribute caching). - If the file system is LUSTRE, ensure that the directory is mounted with the 'flock' option. ADIOI_Set_lock:: Input/output error ADIOI_Set_lock:offset 2164, length 4 File locking failed in ADIOI_Set_lock(fd 12,cmd F_SETLKW/7,type F_WRLCK/1,whence 0) with return value FFFFFFFF and errno 5. - If the file system is NFS, you need to use NFS version 3, ensure that the lockd daemon is running on all the machines, and mount the directory with the 'noac' option (no attribute caching). - If the file system is LUSTRE, ensure that the directory is mounted with the 'flock' option. ADIOI_Set_lock:: Input/output error ADIOI_Set_lock:offset 2160, length 4 [hostname][[54842,1],3][btl_tcp_endpoint.c:459:mca_btl_tcp_endpoint_recv_blocking] recv(17) failed: Connection reset by peer (104) [hostname][[54842,1],2][btl_tcp_endpoint.c:459:mca_btl_tcp_endpoint_recv_blocking] recv(15) failed: Connection reset by peer (104)
Using
The basic idea is to split the work into chunks and then combine the results. You can see from the demo.py above that if you are using h5py then writing your results out is handled transparently, which is nice.
Scatter(v)/Gather(v)
https://www.cac.cornell.edu/ranger/MPIcc/scatterv.aspx
https://github.com/jbornschein/mpi4py-examples/blob/master/03-scatter-gather
http://stackoverflow.com/questions/12812422/how-can-i-send-part-of-an-array-with-scatter/12815841#12815841
The v variant is used if you cannot break the data into equally sized blocks.
Barrier blocks until all processes have called it.
Getting results from all workers – this will return an array [ worker_data from rank 0, worker_data from rank 1, … ]
worker_data = .... comm.Barrier() all_data = comm.gather(worker_data, root = 0) if (rank == 0): #all_data contains the results
BCast/Reduce
BCast sends data from one process to all others
Reduce combines data from all process
Send/Recv
This is probably easier to understand than scatter/gather but you are doing extra work.
There are two obvious strategies available.
Create a results variable of the right dimensions and fill it in as each worker completes:
comm = MPI.COMM_WORLD rank = comm.rank size = comm.size #Very crude e.g. if total_size not a multiple of size total_size = 20 chunk_size = total_size / ((size - 1)) if rank == 0: all_data = np.zeros((total_size, 4), dtype='i4') num_workers = size - 1 closed_workers = 0 while closed_workers < num_workers: data = np.zeros((chunk_size, 4), dtype='i4') x = MPI.Status() comm.Recv(data, source=MPI.ANY_SOURCE,tag = MPI.ANY_TAG, status = x) source = x.Get_source() tag = x.Get_tag() insert_point = ((tag - 1) * chunk_size) all_data[insert_point:insert_point+chunk_size] = data closed_workers += 1
Wait for each worker to complete in turn and append to the results
AC_TAG = 99 if rank == 0: for i in range(size-1): data = np.zeros((chunk_size, 4), dtype='i4') comm.Recv(data, source=i+1,tag = AC_TAG) if i == 0: all_data = data else: all_data = np.concatenate((all_data,data))
Just as an example here we are expecting the data to be a numpy 2d array but it could be anything and could just be created once with np.empty as the contents will be overwritten.
The key difference to notice is the value of the source and tag parameters to comm.Recv this needs to be matched by the corresponding parameter to comm.Send i.e. tag = rank for the first example, tag = AC_TAG for the second
e.g. comm.Send(ac, dest=0,tag = rank)
Your use of tag and source may vary…
Input data
There are again different ways to do this – either have the rank 0 do all the reading and use Send/Recv to send the data to be processed or let each worker get it’s own data.
Evaluation
MPI.Wtime() can be used to get the elapsed time between two points in a program