Distributed Matrix Multiplication using MPI on Azure
*Note — This post assumes that you are already familiar with OpenMPI and MPI syntax in C/C++.
There could be several reasons as to why one would perform matrix multiplication in a distributed manner.
- Size of input matrices are larger than available memory on a single node machine.
- Although size of input matrices are small enough to fit in the available memory of a single node machine but the time complexity is O(n³) and for large n, this is prohibitively large.
Matrix multiplication finds a wide range of applications in the ML field and is quite heavily used in different ML libraries and algorithms.
Thus, we see that we have a lot of motivation to understand how to build a distributed algorithm for doing matrix multiplication.
Before any implementation let us try to see how to write a distributed algorithm for matrix multiplication.
There is no single best algorithm as different algorithms tries to solve different constraints and thus works differently.
- Optimize for low memory on single node machines.
- Optimize for the network bandwidth and topology of a cluster of nodes.
- Optimize for faster run-time performance.
You will notice that most of the work on the 1st and 2nd pointers i.e. optimize for low memory and network topology were done quite a few years back when memory sizes were comparatively very small to today’s machines and network bandwidths were also low.
Some well established algorithms in these space are:
- Cannon’s algorithm.
- Fox’s algorithm.
Both these algorithms assume that the nodes on which computations are run have low memory and the cluster of nodes are arranged in a grid (or rather a Torus) like topology.
Let’s take a look at how Fox’s algorithm works internally:
In a non-distributed setting, given matrices A and B of dimensions NxM and MxP respectively, we can calculate C=A.B in the following way:
The algorithm is run in multiple stages. For a given stage S:
For each i from 0 to N-1, multiply scalar A[i,(i+S) %M] with the row vector B[(i+S)%M,:]. This will give a partial values for the row :
C[i, :] += A[i, (i+S) %M] . B[(i+S)%M, :]
Repeat for all stages S from 0 to M-1. Pseudocode is below:
Input:
A - NxM
B - MxP
Output:
C - NxP
For each stage j from [0, M)
For each i from [0, N)
Calculate C[i,:] += A[i,(i+stage)%N].B[(i+stage)%N,:]
return C
In a distributed setting, lets assume that we have KxK nodes arranged in a grid topology. The concept of a grid topology implies that it is faster to connect to T[i+1,j] or T[i-1,j] or T[i,j+1] or T[i,j-1] from T[i,j] where T[i,j] refers to the node at index (i,j) in the grid.
Thus, each node at (i, j) is responsible for computing the values of the sub-matrix :
Initially the B matrix is distributed to the nodes s.t. node at grid index (i,j) i.e. T[i,j] gets the sub-matrix :
In stage S, the scalar values A[j, (j+S) % M] for j from Ui to Ui+N/K are broadcasted to all nodes in the i-th row of the grid :
i.e. T[i,0], T[i,1], … T[i,K-1]
After each stage, the B matrix is ‘shifted up’ across all the nodes in the grid i.e. node T[i,j] sends the values B[Ui, Vj:Vj+P/K] to the node T[(i-1)%K, j] and the process is repeated for all M stages.
Once all the nodes have finished all their stages, we can merge all the sub-matrices C from each node.
Let’ take an example to demonstrate this. Assume we have the following 4x4 matrices A and B.
Also, assume that we have 4 nodes for distributed computing arranged in a 2x2 grid :
The matrix B is initially assigned to the nodes as follows:
For stage 0, the values of A are assigned as follows to the nodes :
In each node, we multiply the corresponding A and B values as shown in the above diagram and add them to output. For e.g. in T[0,0], we maintain the following:
C[0,0] += A[0,0]*B[0,0]
C[0,1] += A[0,0]*B[0,1]
C[1,0] += A[1,1]*B[1,0]
C[1,1] += A[1,1]*B[1,1]
In stage 1, values of A are shifted right by 1 column to the right and values of B are shifted by 1 row up. Both these are with wrap-around.
Note that the new values of B need not come from the master node as for a given node, the new row of B that it needs, is already present in the node located below it in the grid.
For node T[0,0] we have:
C[0,0] += A[0,1]*B[1,0]
C[0,1] += A[0,1]*B[1,1]
C[1,0] += A[1,2]*B[2,0]
C[1,1] += A[1,2]*B[2,1]
Similarly for the other nodes too.
In stage 2, again we shift A values by 1 column to right and B by 1 row up with wrap-around.
Lastly for stage 3, we do the same :
After all the stages, the products A[i,j]*B[j,k] in each cell are added up and the partial values of C[i,k] are calculated.
If you observe carefully, the A matrix is rotating in the horizontal direction whereas the B matrix in a vertical direction as if they are lying on a Torus.
The advantage of this algorithm is that each node is dealing with only O(N) memory when each matrix is of size NxN.
For a cluster with K nodes.
In each stage we are sending values of A to each node over the network and each node is sending a row of B to the node located just above itself. Thus, the number of network I/O is:
Total Network I/O = O(M.K), where M is the number of columns in A and is also equal to the number of stages.
Although we are using low memory footprint per node, but we are trading it off with higher number of network communications with smaller payloads.
Thus, this approach works well when we have low memory on each node and network bandwidths are low but its performance will be slow due to lots of network I/O.
Given today’s modern servers with high memory and high network bandwidths, a simpler distributed algorithm is as follows:
If we take the j-th column of A and j-th row of B, then their dot product will be a NxN matrix. If we sum these matrices for all j from 0 to M-1, then we will get the final matrix C.
We can distribute the columns j to j+M/K and rows j to j+M/K to a node. We are assuming that there are K nodes (without any topology). Then the node with rank=k, will compute the partial matrix C_k as follows :
This node will then send the partial matrices C_k to the master node with rank=0. The master node will then sum up all the individual elements of each partial matrix obtained from all nodes to get the resultant matrix C.
Input:
A - NxM
B - MxP
K = Num nodes
Output:
C - NxP
Let h = M/K
If rank == 0:
Compute C = A[:, :M/K] . B[:M/K, :]
For j in [M/K, M):
Send A_sub = A[:, j:j+M/K] to node with rank=j/h
Send B_sub = B[j:j+M/K, :] to node with rank=j/h
If rank != 0:
Obtain A_sub and B_sub from node with rank=0
Compute C = A_sub . B_sub
Send C to node with rank=0
If rank = 0:
For k in [0, K):
Obtain C_k from node with rank=k
C += C_k
return C
If there are K nodes, then the number of network I/O is O(K), because we are sending columns from A and rows from B to a node only once.
Finally, each node sends back its matrix to the master node i.e. O(K) number of communications.
But in each node we require O(N²) memory as compared to O(N) in the Fox’s algorithm. Thus we are optimizing for less number of network I/O in this algorithm but higher memory requirements.
One drawback with this approach is that all the nodes will be sending their matrix of size NxP to the master node with rank=0. Thus, the master node will be doing O(K) additions of O(NxP) matrices.
Time complexity for final step is O(K.N.P).
Alternative approach to consolidate the matrices is to merge in stages. Assuming K is a power of 2, for e.g. if K=8, the nodes will be :
0 1 2 3 4 5 6 7
In the 1st round, each node at even positions, will send its matrix to the node left of it i.e. 1 to 0, 3 to 2 and so on. The nodes at even position will obtain these matrices and sum with its own matrix.
In the next stage, we only consider the nodes:
0 2 4 6
Because these have the most up-to-date data. Follow the same process as above i.e. 6 sends its matrix to 4, 2 sends its data to 0. Nodes 0 and 4 will add the obtained matrices with its own matrices.
In the last stage, we will have the following:
0 4
Now 4 sends its data to node 0. Note that now node 0 contains all the data from nodes 0 to 7.
In this approach any node will have at-most log(K) matrices in its buffer from any other node as compared to K matrices in the earlier version. Thus node 0 will add up log(K) matrices instead of K matrices. Time complexity for final addition is O(log(K).N.P).
The C++ code using MPI to run this algorithm is as follows:
#include <mpi.h>
// include other header files as required
using namespace std;
// Function to generate a random normal matrix
void generate(double *inp, int n, int m){
std::random_device rd;
std::mt19937 engine(rd());
std::normal_distribution<double> dist(0.0, 1.0);
for (int i = 0; i < n; i++) {
for (int j = 0; j < m; j++) {
inp[i*m+j] = dist(engine);
}
}
}
// Function to transpose a matrix
double *transpose(const double *a, const int n, const int m) {
double *b = new double[n*m];
for (int i = 0; i < n; i++) {
for (int j = 0; j < m; j++) {
b[j*n+i] = a[i*m+j];
}
}
return b;
}
// Function called by all nodes except master node
void dot_mpi(const int n, const int m, const int p, const int rank, const int size) {
int h = m/size;
double *a_r = new double[h*n];
double *b_r = new double[h*p];
double *merged = new double[h*(n+p)];
// Receive columns of A and rows of B from master node
MPI_Recv(merged, h*(n+p), MPI_DOUBLE, 0, rank, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
std::copy(merged, merged+h*n, a_r);
std::copy(merged+h*n, merged+h*n+h*p, b_r);
// Calculate own product
double *out = new double[n*p];
for (int i = 0; i < n*p; i++) out[i] = 0.0;
for (int i = 0; i < h; i++) {
for (int j = 0; j < n; j++) {
for (int k = 0; k < p; k++) {
out[j*p+k] += a_r[i*n+j]*b_r[i*p+k];
}
}
}
// Send the matrices in stages to the master node
for (int stage = 1; stage < size; stage *= 2) {
if ((rank % stage == 0) && (rank-stage >= 0) && ((rank - stage) % (stage*2) == 0)) {
MPI_Send(out, n*p, MPI_DOUBLE, rank-stage, rank, MPI_COMM_WORLD);
}
else if ((rank % stage == 0) && (rank+stage < size) && (rank % (stage*2) == 0)) {
double *out_r = new double[n*p];
MPI_Recv(out_r, n*p, MPI_DOUBLE, rank+stage, rank+stage, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
for (int j = 0; j < n*p; j++) {
out[j] += out_r[j];
}
}
}
}
// Function called for master node
double *dot_mpi_root(const double *a, const double *b, const int n, const int m, const int p, const int size) {
double *out = new double[n*p];
for (int i = 0; i < n*p; i++) out[i] = 0.0;
// Instead of selecting columns of A, transpose A and select rows of A
// This allows us to efficiently select elements of A to send.
double *aT = transpose(a, n, m);
int h = m/size;
MPI_Request request = MPI_REQUEST_NULL;
for (int i = h; i < m; i += h) {
double *merged = new double[h*(n+p)];
std::copy(aT+i*n, aT+(i+h)*n, merged);
std::copy(b+i*p, b+(i+h)*p, merged+h*n);
// Merge columns of A and rows of B before sending to a node
// Reduces the number of network I/O
MPI_Isend(merged, h*(n+p), MPI_DOUBLE, i/h, i/h, MPI_COMM_WORLD, &request);
}
// Do own contribution to the product
for (int i1 = 0; i1 < h; i1++) {
for (int j = 0; j < n; j++) {
for (int k = 0; k < p; k++) {
out[j*p+k] += aT[i1*n+j]*b[i1*p+k];
}
}
}
// Receive the computed matrices from other nodes and sum them
for (int stage = 1; stage < size; stage *= 2) {
double *out_r = new double[n*p];
MPI_Recv(out_r, n*p, MPI_DOUBLE, stage, stage, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
for (int j = 0; j < n*p; j++) {
out[j] += out_r[j];
}
}
return out;
}
double *dot(const double *a, const double *b, const int n, const int m, const int p) {
double *out = new double[n*p];
for (int i = 0; i < n; i++) {
for (int j = 0; j < m; j++) {
for (int k = 0; k < p; k++) {
out[i*p+k] += a[i*m+j]*b[j*p+k];
}
}
}
return out;
}
int main(int argc, char *argv[]) {
int n = atoi(argv[1]);
int m = atoi(argv[2]);
int p = atoi(argv[3]);
MPI_Init(&argc, &argv);
int size;
MPI_Comm_size(MPI_COMM_WORLD, &size);
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
double *a, *b;
if (rank == 0) {
// Form the input matrices only in the master node, other nodes
// will have smaller memory footprints
a = new double[n*m];
b = new double[m*p];
generate(a, n, m);
generate(b, m, p);
}
double *out;
auto start = std::chrono::high_resolution_clock::now();
if (rank == 0) out = dot_mpi_root(a, b, n, m, p, size);
else dot_mpi(n, m, p, rank, size);
auto stop = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(stop - start);
if (rank == 0) {
std::cout << duration.count() << std::endl;
// Compare the duration for distributed computation with single node
// computation.
start = std::chrono::high_resolution_clock::now();
double *out = dot(a, b, n, m, p);
stop = std::chrono::high_resolution_clock::now();
duration = std::chrono::duration_cast<std::chrono::nanoseconds>(stop - start);
std::cout << duration.count() << std::endl;
}
MPI_Finalize();
return 0;
}
So far we have described how to write a distributed matrix multiplication algorithm using C++ and MPI. If you wish you can also run the above code on your local machine instead of cloud based server.
To compile the above program (using -O3 flag for optimized binary file):
mpicxx -O3 -o matrix_multiplication matrix_multiplication.cpp
Now in order to run and test your code on your local machine, run the following command. Here I am assuming that there are 4 cores on my machine.
mpirun -n 4 ./matrix_multiplication 500 500 500
The program utilizes all the 4 cores while running the code. It will not run if you specify more number of processes than the number of cores.
But why run on local machine when you can run on a cluster of nodes where it will be actually useful to run since in a local machine you are limited by the amount of memory available.
To setup a simple HPC cluster on Azure with 16 standard_d4s_v5 nodes, follow these steps (assume you have az cli installed and you are using Ubuntu 22.04):
The steps are slightly modified from the ones posted here: Quick MPI Cluster Setup on Azure (glennklockwood.com).
Note that this is not the most favorable approach for running a HPC system on cloud but it will help you develop the intuition behind everything that is happening in behind.
Step 1:
Setup the cloud-init script. Create a file named cloud-init.txt in your home folder with the following content:
#cloud-config
package_upgrade: true
packages:
- clustershell
- openmpi-bin
- libopenmpi-dev
- mpich
- clustershell tool is used to run the same linux command on multiple nodes. This is a very handy tool as we will see.
- mpich is required for using MPI with C++. For C, mpich is not required.
Step 2:
Create a resource group in your subscription:
# if having multiple subscriptions
az account set --subscription <subscription_name>
az group create --name mpi_test_rg --location eastus
Step 3:
Create a proximity placement group (ppg). Every VM we put in here will be within the same low-latency network bubble.
az ppg create --name mpi_ppg \
--resource-group mpi_test_rg \
--intent-vm-sizes standard_d4s_v5
Step 4:
Create 16 Ubuntu 22.04 nodes in this proximity placement group:
az vm create --name mpicluster \
--resource-group mpi_test_rg \
--image Ubuntu2204 \
--ppg mpi_ppg \
--generate-ssh-keys \
--size standard_d4s_v5 \
--accelerated-networking true \
--custom-data cloud-init.txt \
--count 16
--generate-ssh-keys
puts your local ssh key~/.ssh/id_rsa.pub
into theauthorized_keys
file in all the nodes.--custom-data cloud-init.txt
passes in our cloud-init.txt file to the provisioning process. This installs the libraries mentioned in the cloud-init.txt file on all nodes.
Step 5:
Get the public and private IP addresses of your nodes:
az vm list-ip-addresses --resource-group mpi_test_rg -o table
Note down the public and private IP address of the node with the hostname mpicluster0. This is the master node.
Step 6:
Copy the private key from your local machine to the master node. Replace the IP with the public IP address of master node.
This is a highly insecure step but is required for you to connect to all nodes from your master node.
scp ~/.ssh/id_rsa 20.25.28.201:.ssh/
ssh 20.25.28.201
The next steps assume that we are running the commands on the master node i.e. mpicluster0
Step 7:
To get passwordless ssh up and working (so we can noninteractively run commands on other nodes through SSH), first add all the compute nodes’ host keys to your SSH known_hosts
file.
The ssh-keyscan
command connects to other hosts and retrieves its host keys, which we then store in ~/.ssh/known_hosts
on our main mpicluster0 node.
for i in {0..15};do ssh-keyscan mpicluster${i};done > ~/.ssh/known_hosts
Step 8:
clush command runs the same command on all nodes mentioned using -w. Here it copies the private key and known_hosts file on master node to all other nodes.
This allows any node to connect to any other node without IP addresses.
clush -w mpicluster[0-15] -c ~/.ssh/id_rsa ~/.ssh/known_hosts
Step 9:
Install NFS server and client on all the nodes. Although NFS server is required only on master node. This allows us to mount and share filesystem on master node with all other nodes.
clush -w mpicluster[0-15] sudo apt -y install nfs-common nfs-kernel-server
Step 10:
Create a folder on all nodes. This folder will mount another folder on the master node via NFS.
clush -w mpicluster[0-15] sudo mkdir /scratch
Step 11:
On the master node, create a folder /shared which will be mounted and shared by all other nodes. Also add the current user as owner of this folder.
sudo mkdir /shared
sudo chown user:user /shared
Step 12:
Update the /etc/exports file in the master node with the possible IP address range of all the other nodes sharing the folder.
Here we are using 10.0.0.0/16. But the actual CIDR notation will depend on the private IP addresses from Step 5.
sudo bash -c 'echo "/shared 10.0.0.0/16(rw,no_root_squash,no_subtree_check)" >> /etc/exports'
sudo exportfs -a
Step 13:
Run mount command on all nodes to mount the /shared directory on master node.
The IP address 10.0.0.7 is the private IP address of the master node i.e. mpicluster0. Update this according to your output from Step 5.
clush -w mpicluster[0-15] sudo mount -t nfs -o vers=3,rsize=1048576,wsize=1048576,nolock,proto=tcp,nconnect=8 10.0.0.7:/shared /scratch
Step 14:
On the master node create a file named matrix_multiplication.cpp and copy the code above for distributed matrix multiplication.
Then compile the code as below. The compiled binary will be located inside the /shared folder so that all nodes will have access to the compiled binary for running the code.
mpicxx -O3 -o /shared/matrix_multiplication matrix_multiplication.cpp
Step 15:
Finally we run the compiled code on all our 16 nodes as below. Since we are using 16 nodes, we are choosing the matrix sizes multiples of 16. The matrices here are of sizes 1600x1600.
mpirun --host mpicluster0,mpicluster1,mpicluster2,mpicluster3,mpicluster4,mpicluster5,mpicluster6,mpicluster7,mpicluster8,mpicluster9,mpicluster10,mpicluster11,mpicluster12,mpicluster13,mpicluster14,mpicluster15 --mca btl tcp,vader,self --wdir /scratch ./matrix_multiplication 1600 1600 1600
In our code, we are also comparing the runtime of a non-distributed matrix multiplication on the master node with the distributed one.
For smaller matrix sizes, obviously the non-distributed algorithm works much faster as compared to the distributed one because in distributed setting we have lots of network I/O which degrades performance.
But for larger matrices such as 1600x1600, the non-distributed algorithm is 4 times slower than the distributed one. It takes around 1.11 seconds with 1600x1600 matrices in non-distributed while it takes only 0.3 seconds on the distributed cluster.
As we can see that the effect of network I/O is much less as compared to the time complexity of matrix multiplication as the sizes of matrices become larger.
If you are interested in the Fox’s algorithm implementation using MPI, the code is uploaded in github: hpc_mpi/matrix_multiplication.cpp at main · funktor/hpc_mpi (github.com)
References
- COMP 605: Introduction to Parallel Computing Topic: MPI: Matrix-Matrix Multiplication (sdsu.edu)
- A Comprehensive MPI Tutorial Resource · MPI Tutorial
- Introduction — Fox MPI in python (miguehm.github.io)
- Matrix Multiplication (cuny.edu)
- Quick MPI Cluster Setup on Azure (glennklockwood.com)
- How To Set Up an NFS Mount on Ubuntu 22.04 | DigitalOcean
- PowerPoint Presentation (ethz.ch)
- MPI_tutorial_Fall_Break_2022.pdf (princeton.edu)
- MPI programming (cornell.edu)
- Using MPI with C++: a basic guide (2024) (paulnorvig.com)
- PowerPoint Presentation (princetonuniversity.github.io)