Advanced Graph Algorithms in Spark Using GraphX Aggregated Messages And Collective Communication Techniques

Ganga Reddy
6 min readOct 19, 2018

If you are not familiar with Spark, please spare a few minutes in reading about in my previous medium post named an introduction to spark.

GraphX

GraphX is a distributed graph processing framework on top of Apache Spark.

GraphX provides two separate APIs for implementation of massively parallel algorithms Pregel abstraction (PageRank), and a more general MapReduce style API. It provides full support for property graphs (graphs where properties can be attached to edges and vertices).

Pregel is a programming model specifically targeted to large-scale graph problems which another wise would involve complicated code and in large time complexities due to the iterative nature of Algorithms. Pregel provides two main advantages namely

  • Ease of programming. By considering vertices and edges as first-class citizens, programmers can design their algorithms using properties attached to edges/nodes.
  • Memory Efficient: Most of the graph algorithms can be completed using a space complexity of O(V+E), distributed over the cluster, thus reducing memory burden on computational nodes and scales with the size of the graph.
  • Efficiency on graph problems. It supports iterative computations more efficiently than MapReduce because it keeps the dataset in memory rather than writing to disk after every iteration. Since most of the graph algorithms are iterative. It also handles the fact that graph algorithms generally have poor memory access locality, by locating different vertices on different machines and passing messages between machines as necessary.

Pregel Model

Pregel’s programming model utilizes the concept of message passing interface (MPI) used in distributed computing where each node/process can send or receive a message from another node/process. Pregel applies the concept of message passing to send messages between the nodes/ vertices in a graph. Nodes partitioned across multiple nodes can communicate easily and overcome the problem of poor memory access locality. An interface is provided to programmers to send custom defined messages from a node to its neighbors in a directed fashion i.e messages can be sent from either source to destination or vice versa or both. As nodes can receive multiple messages from their neighbors, a custom-defined aggregator function reduces the messages according to the behavior defined by the programmer. This can be assumed as an analogy of Map Reduce for graph processing. Sending messages constitutes the map function while aggregator function forms the Reduce part.

GraphX provides AggregatedMessages , one of the core aggregation operations based on the pregel model,(org.apache.spark.graphx.Graph.aggregateMessages). This operation exposes an edge context to the programmer through which a map and reduction function can be defined. For example, let us write some simple Graph algorithms using AggregatedMessages.

InDegree

// With Syntactic sugar
private def inDegreesRDD:VertexRDD[Int] = {

def mapper(implicit ctx:EdgeContext) = ctx.sendToDst(1)
def reducer(a:Int,b:Int):Int = (a+b)

graph.aggregateMessages(ctx =>
( mapper(ctx),
reducer,
TripletFields.None)
}

As one can see, mapper function sends a message with value as one, from source to destination for all the edges present in the graph. The reducer function simply adds the messages, calculating the indegree by summing all the messages received from its incoming neighbors. The same function can be written without syntactic sugar as

// Without syntactic sugar
private def inDegreesRDD:VertexRDD[Int] = {
graph.aggregateMessages(_.sendToDst(1), _+_, TripletFields.None)
}

Note: Both the above programs are equivalent, the latter one does not have syntactic sugar.

OutDegree

// With Syntactic sugar
private def outDegreesRDD:VertexRDD[Int] = {

def mapper(implicit ctx:EdgeContext) = ctx.sendToSrc(1)
def reducer(a:Int,b:Int):Int = (a+b)

graph.aggregateMessages(ctx =>
( mapper(ctx),
reducer,
TripletFields.None)
}
// Without syntactic sugar
private def outDegreesRDD:VertexRDD[Int] = {
graph.aggregateMessages(_.sendToSrc(1), _+_, TripletFields.None)
}

Degree

// With Syntactic sugar
private def degreesRDD:VertexRDD[Int] = {

def mapper(implicit ctx:EdgeContext) = {
ctx.sendToSrc(1), ctx.sendToDst(1)}
def reducer(a:Int,b:Int):Int = (a+b)

graph.aggregateMessages(ctx =>
( mapper(ctx),
reducer,
TripletFields.None)
}
// Without syntactic sugar
private def degreesRDD:VertexRDD[Int] = {
graph.aggregateMessages(
{ _.sendToSrc(1),_.sendToDst(1)},
_+_,
TripletFields.None)
}

The benefit of this model is that once you start to “think like a vertex”, large-scale graph problems become much easier and can be programmed easily in terms of mapper and reducer defined in edge context. For example, the HITS algorithm could be implemented in less than ten lines of code using Scala in spark. Let's write a simple implementation for the HITS algorithm using Aggregated Messages.

import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.functions._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.graphx.{Graph, VertexRDD, Edge => GXEdge}
import org.apache.spark.sql.types.{IntegerType, LongType}
import org.graphframes.GraphFrame

/**
* Created by greddy on 09/21/16.
*/

object Hits {

case class VertexAttr(srcId: Long, authScore: Double, hubScore:Double)

case class EdgeAttr(srcId: Long, dstId: Long)

case class HitsMsg(authScore:Double, hubScore:Double)

def reducer(a:HitsMsg,b:HitsMsg):HitsMsg = HitsMsg(a.authScore + b.authScore, a.hubScore + b.hubScore)

def runHits(g: GraphFrame, maxIter:Int = 10): GraphFrame = {

val gx0 = g.toGraphX

val vColsMap = g.vertexColumnMap
val eColsMap = g.edgeColumnMap

// Convert vertex attributes to nice case classes.
// Initialize each node with hubScore = 1 and authScore = 1
val gx1: Graph[VertexAttr, Row] = gx0.mapVertices { case (_, attr) =>
VertexAttr(attr.getLong(vColsMap("id")), authScore = 1.0, hubScore = 1.0)
}

// Convert edge attributes to nice case classes.
val extractEdgeAttr: (GXEdge[Row] => EdgeAttr) = { e =>
val src = e.attr.getLong(eColsMap("src"))
val dst = e.attr.getLong(eColsMap("dst"))
EdgeAttr(src, dst)
}

var gx: Graph[VertexAttr, EdgeAttr] = gx1.mapEdges(extractEdgeAttr)
for (iter <- Range(1,maxIter)) {
val totalHubScores = gx.vertices
val msgs: VertexRDD[HitsMsg] = gx.aggregateMessages(
ctx =>
// Can send to source or destination since edges are treated as undirected.
{
ctx.sendToDst(HitsMsg(0.0,ctx.srcAttr.hubScore));
ctx.sendToSrc(HitsMsg(ctx.dstAttr.authScore,0.0))
}, reducer)

// Update authority and hub scores of each node
gx = gx.outerJoinVertices(msgs) {
case (vID, vAttr, optMsg) => {
val msg = optMsg.getOrElse(HitsMsg(1.0, 1.0))
VertexAttr(vAttr.srcId, if (msg.authScore == 0.0) 1.0 else msg.authScore , if (msg.hubScore == 0.0) 1.0 else msg.hubScore)
}
}
//println("Iter ", iter)
}

// Convert back to GraphFrame with a new column "belief" for vertices DataFrame.
// Inorder to deal with disconnected components
val gxFinal: Graph[(Double,Double), Unit] = gx.mapVertices((_, attr) => (attr.authScore, attr.hubScore) )
.mapEdges( _ => ())

GraphFrame.fromGraphX(g, gxFinal, vertexNames = Seq("auth", "hub"))

}

Note: The authority scores and hub Scores are unnormalized, should be normalized by the corresponding sum of squares of all the scores.

PageRank Implementation

object PageRank {

case class VertexAttr(srcId: Long, outDegree: Int, pageScore:Double)

case class PageMsg(pageScore:Double)

def reducer(a:PageMsg,b:PageMsg):PageMsg= PageMsg(a.pageScore + b.pageScore)

def runPageRank(g: GraphFrame, resetProb:Double = 0.2, maxIter:Int = 10)
: GraphFrame = {

val gx0 = g.toGraphX

val vColsMap = g.vertexColumnMap
val eColsMap = g.edgeColumnMap


// Convert vertex attributes to nice case classes.
// Initialize each node with hubScore = 1 and authScore = 1
val gx1: Graph[VertexAttr, Row] = gx0.mapVertices { case (_, attr) =>
VertexAttr(attr.getLong(vColsMap("id")), attr.getInt(vColsMap("outDegree")), resetProb)
}

val extractEdgeAttr: (GXEdge[Row] => EdgeAttr) = { e =>
val src = e.attr.getLong(eColsMap("src"))
val dst = e.attr.getLong(eColsMap("dst"))
EdgeAttr(src, dst)
}

var gx: Graph[VertexAttr, EdgeAttr] = gx1.mapEdges(extractEdgeAttr)

for (iter <- Range(1,maxIter)) {

val msgs: VertexRDD[PageMsg] = gx.aggregateMessages (
ctx =>
ctx.sendToDst(PageMsg(ctx.srcAttr.pageScore / ( math.max(ctx.srcAttr.outDegree, 1)))),
reducer )

// Update page rank scores of each node
gx = gx.outerJoinVertices(msgs) {
case (vID, vAttr, optMsg) => {
val msg = optMsg.getOrElse(PageMsg(0.0))
VertexAttr(vAttr.srcId, vAttr.outDegree , resetProb + (1.0 - resetProb)*msg.pageScore)
}
}
println("Iter ", iter)
}

// Convert back to GraphFrame with a new column "belief" for vertices DataFrame.
// Inorder to deal with disconnected components
val gxFinal: Graph[Double, Unit] = gx.mapVertices((_, attr) => attr.pageScore )
.mapEdges( _ => ())

//gxFinal.edges.foreach(println)
gxFinal.vertices.foreach(println)
GraphFrame.fromGraphX(g, gxFinal, vertexNames = Seq("pageRank"))
}

By maintaining information of PageRank score and outdegree associated with each vertex, PageRank of each node/vertex in the graph is updated by aggregating incoming messages from its neighbors. This follows the principle of “Think like a vertex”.

Iterative messages with a user-defined Protocol

So far we have established the comfort of using Aggregated Messages by passing messages in an iterative fashion. If we can go a step further and introduce the concept of using Aggregated Messages with a custom-defined communication protocol that dictates the flow of messages. If we can iteratively send messages amongst nodes, a message from any node can reach its n- hop neighbors in at least n iterations. Large Scala graph problems like community Discovery, graph (edge/node) contraction, etc… can be addressed using aggregated messages with a custom-defined communication protocol. The challenge lies in incorporating the protocol in terms of mapper and reduce functions. I will shortly update shortly some of the famous graph algorithms implemented using this approach.

If you like my work, buy me a coffee.

--

--