PageRank using Pregel API or GraphX

GraphX provides PageRank API for users to calculate the PageRank of a graph conveniently. The API is defined in [[lib/PageRank.scala]]. However, before learning the code and the usage of it, let’s review the Pregel API (defined in Pregel.scala) and learn how to calculate PageRank using Pregel API.

First, as introduced before, let’s import the graph into the memory:

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
val graph = GraphLoader.edgeListFile(sc, "hdfs://192.168.17.240:9222/input/yuhc/web-Google/web-Google.txt")

Then associate the out-degree with each vertex using outerJoinVertices by

scala> val tmp = graph.outerJoinVertices(graph.outDegrees) {
     |   (vid, vdata, deg) => deg.getOrElse(0)
     | }
scala> tmp.vertices.take(10)
res0: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,1), (672890,0), (129434,2), (194402,1), (199516,20), (332918,3), (170792,9), (386896,18), (691634,11), (291526,7))

Thirdly, set the weight on the edges based on the degree using mapTriplets by

scala> val edgetmp = tmp.mapTriplets( e => 1.0/e.srcAttr )
scala> edgetmp.triplets.take(5)
res1: Array[org.apache.spark.graphx.EdgeTriplet[Int,Double]] = Array(((0,4),(11342,14),0.25), ((0,4),(824020,11),0.25), ((0,4),(867923,12),0.25), ((0,4),(891835,10),0.25), ((1,10),(53051,0),0.1))

And set the vertex attributes to the initial PageRank values:

scala> val initialGraph = edgetmp.mapVertices( (id, attr) => 1.0 )
scala> initialGraph.vertices.take(10)
res2: Array[(org.apache.spark.graphx.VertexId, Double)] = Array((354796,1.0), (672890,1.0), (129434,1.0), (194402,1.0), (199516,1.0), (332918,1.0), (170792,1.0), (386896,1.0), (691634,1.0), (291526,1.0))

Now the vertices in initialGraph are assigned initial PageRank 1.0, and the edges in initialGraph store the out-degree information. These operations can be completed in one-line code:

val initialGraph: Graph[Double, Double] = graph
  .outerJoinVertices(graph.outDegrees) {
    (vid, vdata, deg) => deg.getOrElse(0)
  }
  .mapTriplets(e => 1.0 / e.srcAttr)
  .mapVertices((id, attr) => 1.0)

Assume val initialMessage = 0.0, the number of iterations val numIter = 100 (you can take a smaller value), and the damping factor val resetProb = 0.15 [1]. Other message handlers are programmed according to the PageRank algorithm:

def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
  resetProb + (1.0 - resetProb) * msgSum
def sendMessage(edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
  Iterator((edge.dstId, edge.srcAttr * edge.attr))
def messageCombiner(a: Double, b: Double): Double = a + b

Finally call the Pregel API:

val pagerankGraph = Pregel(initialGraph, initialMessage, numIter)(
  vertexProgram, sendMessage, messageCombiner)

The result is

scala> initialGraph.vertices.take(10)
res3: Array[(org.apache.spark.graphx.VertexId, Double)] = Array((354796,0.19213500326152516), (672890,0.2908035889020495), (129434,0.3887609101479692), (194402,0.6070857155891107), (199516,1.4700031805071416), (332918,0.3013570799851124), (170792,0.2239911438748651), (386896,0.2885166353065909), (691634,3.186713734863332), (291526,0.6765487457543149))

As mentioned at the beginning, users can calculate PageRank simply using PageRank API defined in [[lib/PageRank.scala]]. This API is wrapped in GraphOps.scala:

def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double] = {
  PageRank.runUntilConvergence(graph, tol, resetProb)
}

tol is the tolerance of the error. The smaller the tol is, the more accurate the result will be.

scala> val rank = graph.pageRank(0.01).vertices.take(10)
rank: Array[(org.apache.spark.graphx.VertexId, Double)] = Array((354796,0.18810908106478785), (672890,0.27422939743907243), (129434,0.3664605076158495), (194402,0.5874897431581598), (199516,1.3022385827251726), (332918,0.2847157589683162), (170792,0.21475148570654695), (386896,0.2767245351551102), (691634,2.803096788390362), (291526,0.6515237426896987))

You can also use staticPageRank to calculate a specific times of iteration.

def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] = {
  PageRank.run(graph, numIter, resetProb)
}

[[lib/PageRank.scala]] implements two implementations of PageRank. The first implementation uses the standalone Graph interface and runs PageRank for a fixed number of iterations; The second implementation uses the Pregel interface and runs PageRank until convergence (just like our example).

Reference

[1] Damping factor in PageRank, http://en.wikipedia.org/wiki/PageRank#Damping_factor

3 responses on “PageRank using Pregel API or GraphX

  1. Ethan
    Your blog is awesome! Would you share your wordpress theme? Thanks.
    1. yuhc
      It’s called “Materialist”. You can find it in “Appearance” :)
  2. Lilianna
    Hello,
    Thanks a lot for sharing, I was looking for the explanation of PageRank in GraphX and your post helped me a lot.
    There is one thing that frustrated me a lot though. Namely, initialMessage = 0.0.

    I am comparing my Hadoop implementation’s results with Spark and guess what. Results were different whole time! We are using following formula for the calculations of PR for vertice i:
    PR[i] = 0.15 + (1 – 0.15) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum

    where oldPR[j] =1, because you wrote “Now the vertices in initialGraph are assigned initial PageRank 1.0”. This was not true when initialValue =0, because then oldPR[j] gets 0 so whole equation gets 0.15. This means in next iteration, oldPR[j] / outDeg[j] is 0.15/outDeg[j] and not 1/outDeg[j]. Eureka!

    Sorry for long comment and lot of my unreadable code, but it lasted me so long time to understand the differences in my soulution that I fet I need to write it down for further readers.

    Thanks again for sharing! :)

Leave a Reply

Time limit is exhausted. Please reload CAPTCHA.