SQL PageRank on Snowflake

Using SQL on Snowflake to gain insights into large networks

Introduction / Use case

With the launch of the Media Data Cloud on Snowflake it’s a great time to dive into analytical tools used with large scale advertising decision making. Graph and Network analytics are a core tools to gain insights from large networks like social media relationships, web pages and advertising networks. Decisions on what influencer to use, which web page to place adds or which user to target via direct adds are all questions that can be answered better by infusing Network Analytics metrics into the ROI calculation.

While Snowflake is not an optimized graph database, we will test its’ powerful analytics engine for calculating network level metrics.

If you need a refresher on the network analytics metrics, here is a recap on Wikipedia https://en.wikipedia.org/wiki/Centrality

Today we are going to focus on implementing PageRank using Power Method on Snowflake with good old SQL.

Stored Procedure is going to be used to script the looping since Power Method relies on iterating over the network multiple times.

Why PageRank may fit SQL well:

PageRank Power Iteration Method can be explained in layman terms as

1) Set initial PageRank for nodes for example (1 / # nodes)

2) Calculate updated PageRank using current PageRank as weight by iterating thru each edge in the network

3) Iterate step #2 until n iterations or steady state is reached

When I have previously implemented this using python, it’s typically a loop thru all edges to calculate new weights and second loop for updating the PageRank values n times. This type of double loop is not very efficient as network size grows because it’s not parallelized.

Then why to attempt this on Snowflake and SQL?

The step two of calculating new weights can be parallelized easily because calculation for each edge only needs the previous node weight, number of edges from the source node and the dampening factor. Since the algorithm must process each edge, in SQL this becomes a simple join between few tables.

Additionally, the number of iterations N is typically relatively small whereas number of edges is large >1Million/Billion. Therefore, parallelizing step two will give a large performance boost for each iteration which is then compounded N times.

Snowflake automatically parallelizes the work within a virtual warehouse. And to make things easy, we can use XS warehouse to start and scale up instantly to process large billion edge networks fast if needed.

Solution:

To make the usage of the PageRank algorithm in Snowflake easy, we will use recently announced Snowflake Scripting (SQL Stored Procedure) to bundle setup and iteration into one call. We can also make it more general to use by parameterizing source data location, the dampening factor and number of iterations.

The required input data is simply a table with edges that includes source and target nodes. This can be stored as a standalone table or with other attributes.

The PageRank Stored Procedure has the following components:

First we create a temp view over the source data and populate Nodes table from Edges

-- Create or replace  temp view from whatever table the edges are stored in
sqltext := concat('create or replace view tmp_edge_vw as
select ' , :s_name , ' as sourcenodeid, ', : t_name , ' as targetnodeid from ' , :edge_table );
execute immediate :sqltext;

-- Create nodes table to store out edge count and pagerank
create or replace table nodes
(nodeid int,pagerank float,nodecount int, sink int);

-- Populate unique node list from edges
insert into nodes (nodeid) (
select distinct (sourcenodeid)
from tmp_edge_vw union
select distinct (targetnodeid) from tmp_edge_vw);

Next we need to calculate number of unique nodes and set initial pagerank value to 1/ # nodes for each Node and identify sink nodes (no out edges).

-- Calculate node count
totalnodecount := (select count(*) from nodes);
-- Populate edge count and initial pagerank 1/total node count
update nodes
set nodecount = ifnull(t.targetnodecount, :totalnodecount ),
pagerank = 1/ :totalnodecount::float,
sink = case when t.targetnodecount > 0 then 0 else 1 end
from
( select * from nodes n
left outer join
(select sourcenodeid, count(*) as targetnodecount
from tmp_edge_vw
group by sourcenodeid
) as x
on x.sourcenodeid = n.nodeid ) as t
where nodes.nodeid = t.nodeid;

Then we create another temp view to calculate weights from edges and nodes. This is the most complex part and wrapping it into a view makes the loop much cleaner.

-- Create or replace view to do the updated pagerank calculation
create or replace view new_weights as
select * from nodes n1 left outer join
(
-- Here's the weight calculation in place
select e.targetnodeid,
sum(n2.pagerank / n2.nodecount) as transferweight
from nodes n2
inner join tmp_edge_vw e
on n2.nodeid = e.sourcenodeid
group by e.targetnodeid
) as x
on x.targetnodeid = n1.nodeid;

Finally, we can use while loop to do the N number of updates to the PageRank values. And because of the view creation above, the code becomes very simple.

-- Initialize the counter and define the while loop
counter := 1;
while (counter <= :n_iter) do

-- Calculate current pagerank of sink nodes
-- -> distribute along all nodes below
totalnonsinkrank := (select sum(pagerank)
from nodes where sink = 1 );

create or replace table nodes as
select n.nodeid as nodeid,
(1.0 - :d_factor) /:totalnodecount +
(ifnull(v.transferweight, 0.0) +
:totalnonsinkrank/:totalnodecount) *
:d_factor as pagerank,
n.nodecount as nodecount,
n.sink as sink
from
nodes n inner join new_weights v
where n.nodeid = v.nodeid;
counter := counter + 1;

end while;

To see the full stored procedure, please see the code section at the end of the post. There are few additional lines of code to define the SP at the beginning and few at the end to drop the views and return a result set of top 10 nodes by PageRank. This is also a great example of a SQL stored proc on Snowflake highlighting dynamic SQL creation, looping and parameterization.

To run the pagerank stored procedure we can simply call it with parameters: call pagerank(<inputs>) and get the top 10 nodes + the full nodes table created as shown below

Pagerank procedure call and output

For validation, I compared it to my previous Python implementations and scikit-network, because I fully admit that my python spaghetti code is not the most efficient. The calculated pagerank values match with scikit-network which validates that the code produces correct answers.

For larger scale performance testing we can use LiveJournal online social media dataset published by Stanford University. This dataset includes ~68M Edges and ~4.8M Nodes.

The runtime for 10 iterations on each method were:

Large Network Testing Results
  • Custom python 499 seconds
  • Scikit-network (build adjacency matrix + train PageRank) 95 seconds
  • SQL on Snowflake (XS warehouse) 86 seconds.

The python testing was done on a 2019 Macbook pro. Different environments will result in different runtimes

Before declaring victory, let’s break down where the time is spent for scikit-network and SQL.

For scikit-network, loading data from txt file while building the adjacency matrix takes ~88 seconds and 10 iterations ~7 seconds. The bottleneck is getting data into the right format in memory. Iterating once in memory is blazing fast.

With SQL, the setup tasks take ~10 seconds and each iteration 7–8~ seconds. This means that the iterations will become a bottleneck if n needs to be large.

Summary

It was an interesting undertaking to code PageRank in SQL. I learned quite a bit on nuances of dealing with sink nodes and developing SQL Stored Procedures on Snowflake. I would generalize the key findings as:

1) For very large networks when data is already in Snowflake using SQL is a feasible approach.

2) Iteration performance of optimized algorithm in memory using Python is blazing fast

I cannot wait to tackle this problem using Python UDF’s on Snowflake in a future post when I can combine Snowflake data management capabilities with in-memory iteration.

https://www.snowflake.com/news/snowflake-shapes-the-future-of-data-science-with-python-support/

Full SQL Stored Procedure PageRank Code:

create or replace procedure pagerank(d_factor float, n_iter int, edge_table string, s_name string, t_name string)
returns table(nodeid int,pagerank float,nodecount int, sink int)
language sql
as
$$
declare
sqltext string;
totalnodecount int;
totalnonsinkrank float;
counter int;
res resultset;
begin
-- create or replace temp view from whatever table the edges are stored in
sqltext := concat('create or replace view tmp_edge_vw as
select ' , :s_name , ' as sourcenodeid, ', : t_name , ' as targetnodeid from ' , :edge_table );
execute immediate :sqltext;

-- create nodes table to store out edge count and pagerank
create or replace table nodes
(nodeid int,pagerank float,nodecount int, sink int);

-- populate unique node list from edges
insert into nodes (nodeid) (
select distinct (sourcenodeid)
from tmp_edge_vw union
select distinct (targetnodeid) from tmp_edge_vw);

-- calculate node count
totalnodecount := (select count(*) from nodes);
-- populate edge count and initial pagerank 1/total node count
update nodes
set nodecount = ifnull(t.targetnodecount, :totalnodecount ),
pagerank = 1/ :totalnodecount::float,
sink = case when t.targetnodecount > 0 then 0 else 1 end
from
( select * from nodes n
left outer join
(select sourcenodeid, count(*) as targetnodecount
from tmp_edge_vw
group by sourcenodeid
) as x
on x.sourcenodeid = n.nodeid ) as t
where nodes.nodeid = t.nodeid;
-- create or replace view to do the updated pargerank calculation
create or replace view new_weights as
select * from nodes n1 left outer join
(
-- here's the weight calculation in place
select e.targetnodeid,
sum(n2.pagerank / n2.nodecount) as transferweight
from nodes n2
inner join tmp_edge_vw e
on n2.nodeid = e.sourcenodeid
group by e.targetnodeid
) as x
on x.targetnodeid = n1.nodeid;
-- initialize the counter and define the while loop
counter := 1;
while (counter <= :n_iter) do

-- calculate current pagerank of sink nodes
-- -> distribute along all nodes below
totalnonsinkrank := (select sum(pagerank)
from nodes where sink = 1 );

create or replace table nodes as
select n.nodeid as nodeid,

(1.0 - :d_factor) /:totalnodecount +
(ifnull(v.transferweight, 0.0) + :totalnonsinkrank/:totalnodecount) *
:d_factor as pagerank,
n.nodecount as nodecount,
n.sink as sink
from
nodes n inner join new_weights v
where n.nodeid = v.nodeid;
counter := counter + 1;

end while;

drop view new_weights;
drop view tmp_edge_vw;
res := (select * from nodes order by pagerank desc limit 10);

return table(res);
end;
$$;
-- Call the pagrank stored procedure with large network
call pagerank(0.85,10,'Edges_large','SourceNodeId', 'TargetNodeId');

Scikit-network python code:

import pandas as pd
import numpy as np
from sknetwork.ranking import PageRank
from sknetwork.data import convert_edge_list, load_edge_list, load_graphml
#load data

graph = load_edge_list('socLiveJournal.txt', directed=True)
#define and train Pagerank objectpagerank = PageRank(damping_factor = 0.85, solver = 'diteration', n_iter= 10)scores = pagerank.fit_transform(graph.adjacency, seeds=None)

--

--

Teemu Kettunen
Snowflake Builders Blog: Data Engineers, App Developers, AI/ML, & Data Science

Sales Engineer at Snowflake ❄️ . Helping customer to amplify their business with the Data Cloud.