# Computing the Pearson correlation matrix on huge datasets in Python

One of the latest tasks at GoodIP was to calculate the similarities between around 480k items having around 800 observations in the range of 0–50k each. Reducing the dimensionality would compromise the quality of the long-tail results, which is undesirable. The following article evaluates the performance of different implementations, describes how to split the dataset into multiple chunks and process them in parallel. I also want to mention DeepGraph, which provides an interesting, different approach.

As a short refresher, the Pearson correlation assigns a value in the range of −1, signifying a negative correlation (if the value in A increases, the corresponding value in B decreases) and 1, a positive correlation (if A increases B also increases). A value of 0 means no correlation. It is defined as

where x̄, and ȳ** **are the means of values in x and y.

In our case, the CSV file containing the dataset is around 1GB in size. We are going to load it and create a NumPy ndarray from it.

`items = pd.read_csv("./items.csv")`

numpy_items = items.to_numpy()

Simply trying to run *np.corrcoef(numpy_items)* raises the exception *MemoryError: Unable to allocate 1.6 TiB for an array with shape (480000, 480000) and data type float64 — *which illustrates the dimension of the problem. The actual number of pairings that actually need to be processed is the number of different, unordered combinations — choosing r objects from a set of n objects. *nCr(n,r)=n!/r!(n−r)! *In this case *nCr(480000,2), *but still 115 199 760 000 combinations.

# Implementations performance comparison

Let’s first compare the performance of different implementations from Pandas, NumPy, and CuPy and find out which works with parallelization. Therefore we only use a 1% subset of the data.

import numpy as np

import cupy as cp

import pandas as pd

from scipy.stats import pearsonr

cp.cuda.set_allocator(None) # Disable cacheitems = items[0:5000]

numpy_items = items.to_numpy()

cupy_items = cp.array(numpy_items)# Pandas implementation

%%timeit -n 1 -r 5

r1 = items.T.corr()

# 1 loop, best of 5:37 sper loop# NumPy implementation

%%timeit -n 1 -r 5

r2 = np.corrcoef(numpy_items)

# 1 loop, best of 5:1.21 sper loop# CuPy implementation (on Tesla K80)

%%timeit -n 1 -r 5

r3 = cp.corrcoef(cupy_items)

# 1 loop, best of 5:66 msper loop# NumPy custom

%%timeit -n 1 -r 5

ms = numpy_profiles.mean(axis=1)[(slice(None,None,None),None)]

datam = numpy_profiles - ms

datass = np.sqrt(np.sum(datam*datam, axis=1))

r4 = []

for i in range(numpy_profiles.shape[0]):

temp = np.dot(datam[i:], datam[i].T)

rs = temp / (datass[i:] * datass[i])

r4.append(rs)

# 1 loop, best of 5:44.7 sper loop# CuPy custom version (on Tesla K80)

%%timeit -n 1 -r 5

ms = cupy_profiles.mean(axis=1)[(slice(None,None,None),None)]

datam = cupy_profiles - ms

datass = cp.sqrt(cp.sum(datam*datam, axis=1))

r5 = []

for i in range(cupy_profiles.shape[0]):

temp = cp.dot(datam[i:], datam[i].T)

rs = temp / (datass[i:] * datass[i])

r5.append(rs)

# 1 loop, best of 5:2.35 sper loop

GPUs perform great with matrix operations, which could be the cause why CuPy outperforms the other implementations by far. This approach could also outperform our parallelized CPU approach and will be evaluated in a follow-up article. In the following, however, we use the custom implementation that utilizes NumPy, precomputed means, and can easily be distributed to different processes.

# Chunking the dataset

As we do not want to precompute all pairings, mainly due to memory constraints, the approach is to take one row, compute the correlation with all other rows below that index, increment the index, etc. This way, we exclude reflexive comparisons and use the commutativity of the function by ignoring the order. The computational effort for the first indices is then way higher than for the last, and we only want to split at full index positions, both taken into consideration with the following naive split. We need the nCr function for the number of total combinations.

import operator as opdef ncr(n: int, r: int) -> int:

"""

Calculates the number of different, unordered combination

ofrobjects from a set ofnobjects.

nCr(n,r) = nPr(n,r) / r!

Args:

n (int): Number of objects in set

r (int): Number of objects to combine

Returns:

int: Number of combinations

"""

r = min(r, n-r)

numer = reduce(op.mul, range(n, n-r, -1), 1)

denom = reduce(op.mul, range(1, r+1), 1)

return numer // denom

Since Python 3.8 you can also use math.comb. The following code finds the indices that split the index into *n_chunks *close to equal sized parts.

n_chunks = 50 # Number of chunks

n = numpy_items.shape[0] # Number items

nr_pairings = ncr(numpy_items.shape[0], 2) # Number of all pairings

# Minimum nr. of pairings per chunk

per_chunk = int(math.ceil(nr_pairings/(n_chunks - 1)))split_indices = [] # Array containing the indices to split at

t = 0

for i in range(n + 1):

# There are n - i pairings at index i

t += n - i

# If the current chunk has enough pairings

if t >= per_chunk:

split_indices.append(i)

t = 0

We can check the number of pairings in each chunk running:

`s_indices = [0] + split_indices + [n]`

pairings_chunks = []

for i in range(len(s_indices)-1):

start = s_indices[i]

end = s_indices[i + 1]

pairings_chunks.append(sum(range(n - end, n - start)))

# Processing the dataset

At first, we precompute the means of the dataset. As the data is read-only, we can just define it in the global scope. All the child processes will then be able to access it, and it will not be copied — provided you don’t write to it.

`ms = numpy_profiles.mean(axis=1)[(slice(None,None,None),None)]`

datam = numpy_profiles - ms

datass = np.sqrt(np.sum(datam*datam, axis=1))

The initial idea was to send the results from the workers to a dedicated storage worker through a queue. Unfortunately, this decreased the performance significantly due to blocking and serialization. The fastest option to implement was to write the results to disk directly from the worker.

output_dir = "/var/tmp/"def processor_fun(*, i_start: int, i_end: int) -> None:

"""

Calculate correlation of rows in the range of i_start and i_end

and the rows below these indices.Args:

i_start (int): Index of the start row

i_end (int): Index of the end row

Returns:

None

"""

for i in range(i_start, i_end):

temp = np.dot(datam[i:], datam[i].T)

rs = temp / (datass[i:] * datass[i])

rs = np.delete(rs, [0])

# Create nparray that contains information about the indices

res = np.zeros((rs.shape[0], 3))

res[:, 0] = i

res[:, 1] = list(range(i+1, i + rs.shape[0] + 1))

res[:, 2] = rs

# Save CSV file

np.savetxt(f'{output_dir}/{i}.csv', res,

delimiter=',',

newline='\n',

fmt=['%d','%d','%0.13f'])# Create pool of worker processes which will carry out the

# computation

n_cpus = mp.cpu_count()

pool = mp.Pool(n_cpus)

# Start workers

s_indices = [0] + split_indices

workers = []

for i in range(0, len(s_indices)):

start = s_indices[i]

end = s_indices[i+1] if i < len(s_indices)-1 else n-1

workers.append(pool.apply_async(

processor_fun,

kwds={'i_start': start, 'i_end': end}))for r in workers:

r.wait()# Close the pool and wait till all workers are done

pool.close()

pool.terminate()

pool.join()

With our dataset, this creates 480k CSV files with a total size of approx. 4TB. We batched and gzip compressed the results to ~ 4GB parts and loaded them into Google BigQuery for production use.