# Computing the Pearson correlation matrix on huge datasets in Python

Published in
6 min readSep 6, 2021

One of the latest tasks at GoodIP was to calculate the similarities between around 480k items having around 800 observations in the range of 0–50k each. Reducing the dimensionality would compromise the quality of the long-tail results, which is undesirable. The following article evaluates the performance of different implementations, describes how to split the dataset into multiple chunks and process them in parallel. I also want to mention DeepGraph, which provides an interesting, different approach.

As a short refresher, the Pearson correlation assigns a value in the range of −1, signifying a negative correlation (if the value in A increases, the corresponding value in B decreases) and 1, a positive correlation (if A increases B also increases). A value of 0 means no correlation. It is defined as

where x̄, and ȳ are the means of values in x and y.

In our case, the CSV file containing the dataset is around 1GB in size. We are going to load it and create a NumPy ndarray from it.

`items = pd.read_csv("./items.csv")numpy_items = items.to_numpy() `

Simply trying to run np.corrcoef(numpy_items) raises the exception MemoryError: Unable to allocate 1.6 TiB for an array with shape (480000, 480000) and data type float64 — which illustrates the dimension of the problem. The actual number of pairings that actually need to be processed is the number of different, unordered combinations — choosing r objects from a set of n objects. nCr(n,r)=n!/r!(n−r)! In this case nCr(480000,2), but still 115 199 760 000 combinations.

# Implementations performance comparison

Let’s first compare the performance of different implementations from Pandas, NumPy, and CuPy and find out which works with parallelization. Therefore we only use a 1% subset of the data.

`import numpy as npimport cupy as cpimport pandas as pdfrom scipy.stats import pearsonrcp.cuda.set_allocator(None) # Disable cacheitems = items[0:5000]numpy_items = items.to_numpy()cupy_items = cp.array(numpy_items)# Pandas implementation%%timeit -n 1 -r 5r1 = items.T.corr()# 1 loop, best of 5: 37 s per loop# NumPy implementation%%timeit -n 1 -r 5r2 = np.corrcoef(numpy_items)# 1 loop, best of 5: 1.21 s per loop# CuPy implementation (on Tesla K80)%%timeit -n 1 -r 5r3 = cp.corrcoef(cupy_items)# 1 loop, best of 5: 66 ms per loop# NumPy custom%%timeit -n 1 -r 5ms = numpy_profiles.mean(axis=1)[(slice(None,None,None),None)]datam = numpy_profiles - msdatass = np.sqrt(np.sum(datam*datam, axis=1))r4 = []for i in range(numpy_profiles.shape[0]):    temp = np.dot(datam[i:], datam[i].T)    rs = temp / (datass[i:] * datass[i])    r4.append(rs)# 1 loop, best of 5: 44.7 s per loop# CuPy custom version (on Tesla K80)%%timeit -n 1 -r 5ms = cupy_profiles.mean(axis=1)[(slice(None,None,None),None)]datam = cupy_profiles - msdatass = cp.sqrt(cp.sum(datam*datam, axis=1))r5 = []for i in range(cupy_profiles.shape[0]):    temp = cp.dot(datam[i:], datam[i].T)    rs = temp / (datass[i:] * datass[i])    r5.append(rs)# 1 loop, best of 5: 2.35 s per loop`

GPUs perform great with matrix operations, which could be the cause why CuPy outperforms the other implementations by far. This approach could also outperform our parallelized CPU approach and will be evaluated in a follow-up article. In the following, however, we use the custom implementation that utilizes NumPy, precomputed means, and can easily be distributed to different processes.

# Chunking the dataset

As we do not want to precompute all pairings, mainly due to memory constraints, the approach is to take one row, compute the correlation with all other rows below that index, increment the index, etc. This way, we exclude reflexive comparisons and use the commutativity of the function by ignoring the order. The computational effort for the first indices is then way higher than for the last, and we only want to split at full index positions, both taken into consideration with the following naive split. We need the nCr function for the number of total combinations.

`import operator as opdef ncr(n: int, r: int) -> int:    """    Calculates the number of different, unordered combination     of r objects from a set of n objects.    nCr(n,r) = nPr(n,r) / r!        Args:        n (int): Number of objects in set        r (int): Number of objects to combine    Returns:        int: Number of combinations    """    r = min(r, n-r)    numer = reduce(op.mul, range(n, n-r, -1), 1)    denom = reduce(op.mul, range(1, r+1), 1)    return numer // denom`

Since Python 3.8 you can also use math.comb. The following code finds the indices that split the index into n_chunks close to equal sized parts.

`n_chunks = 50 # Number of chunksn = numpy_items.shape[0] # Number itemsnr_pairings = ncr(numpy_items.shape[0], 2) # Number of all pairings# Minimum nr. of pairings per chunkper_chunk = int(math.ceil(nr_pairings/(n_chunks - 1)))split_indices = [] # Array containing the indices to split att = 0for i in range(n + 1):    # There are n - i pairings at index i    t += n - i    # If the current chunk has enough pairings     if t >= per_chunk:        split_indices.append(i)        t = 0`

We can check the number of pairings in each chunk running:

`s_indices = [0] + split_indices + [n]pairings_chunks = []for i in range(len(s_indices)-1):    start = s_indices[i]    end = s_indices[i + 1]    pairings_chunks.append(sum(range(n - end, n - start)))`

# Processing the dataset

At first, we precompute the means of the dataset. As the data is read-only, we can just define it in the global scope. All the child processes will then be able to access it, and it will not be copied — provided you don’t write to it.

`ms = numpy_profiles.mean(axis=1)[(slice(None,None,None),None)]datam = numpy_profiles - msdatass = np.sqrt(np.sum(datam*datam, axis=1))`

The initial idea was to send the results from the workers to a dedicated storage worker through a queue. Unfortunately, this decreased the performance significantly due to blocking and serialization. The fastest option to implement was to write the results to disk directly from the worker.

`output_dir = "/var/tmp/"def processor_fun(*, i_start: int, i_end: int) -> None:    """    Calculate correlation of rows in the range of i_start and i_end    and the rows below these indices.Args:        i_start (int): Index of the start row        i_end (int): Index of the end row    Returns:        None    """        for i in range(i_start, i_end):        temp = np.dot(datam[i:], datam[i].T)        rs = temp / (datass[i:] * datass[i])        rs = np.delete(rs, [0])        # Create nparray that contains information about the indices        res = np.zeros((rs.shape[0], 3))        res[:, 0] = i        res[:, 1] = list(range(i+1, i + rs.shape[0] + 1))        res[:, 2] = rs        # Save CSV file        np.savetxt(f'{output_dir}/{i}.csv', res,                   delimiter=',',                   newline='\n',                    fmt=['%d','%d','%0.13f'])# Create pool of worker processes which will carry out the# computationn_cpus = mp.cpu_count()pool = mp.Pool(n_cpus)# Start workerss_indices = [0] + split_indicesworkers = []for i in range(0, len(s_indices)):    start = s_indices[i]    end = s_indices[i+1] if i < len(s_indices)-1 else n-1    workers.append(pool.apply_async(        processor_fun,         kwds={'i_start': start, 'i_end': end}))for r in workers:    r.wait()# Close the pool and wait till all workers are donepool.close()pool.terminate()pool.join()`

With our dataset, this creates 480k CSV files with a total size of approx. 4TB. We batched and gzip compressed the results to ~ 4GB parts and loaded them into Google BigQuery for production use.

--

--

Editor for

👋 CTO at GoodIP, Managing Director MunichResearch. Writing about tech I am working on. Let’s connect on LinkedIn https://linkedin.com/in/linuskohl