GraphX provides `PageRank`

API for users to calculate the PageRank of a graph conveniently. The API is defined in [[lib/PageRank.scala]]. However, before learning the code and the usage of it, let’s review the `Pregel`

API (defined in Pregel.scala) and learn how to calculate PageRank using `Pregel`

API.

First, as introduced before, let’s import the graph into the memory:

import org.apache.spark._ import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD val graph = GraphLoader.edgeListFile(sc, "hdfs://192.168.17.240:9222/input/yuhc/web-Google/web-Google.txt")

Then associate the out-degree with each vertex using `outerJoinVertices`

by

scala> val tmp = graph.outerJoinVertices(graph.outDegrees) { | (vid, vdata, deg) => deg.getOrElse(0) | } scala> tmp.vertices.take(10) res0: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,1), (672890,0), (129434,2), (194402,1), (199516,20), (332918,3), (170792,9), (386896,18), (691634,11), (291526,7))

Thirdly, set the weight on the edges based on the degree using `mapTriplets`

by

scala> val edgetmp = tmp.mapTriplets( e => 1.0/e.srcAttr ) scala> edgetmp.triplets.take(5) res1: Array[org.apache.spark.graphx.EdgeTriplet[Int,Double]] = Array(((0,4),(11342,14),0.25), ((0,4),(824020,11),0.25), ((0,4),(867923,12),0.25), ((0,4),(891835,10),0.25), ((1,10),(53051,0),0.1))

And set the vertex attributes to the initial PageRank values:

scala> val initialGraph = edgetmp.mapVertices( (id, attr) => 1.0 ) scala> initialGraph.vertices.take(10) res2: Array[(org.apache.spark.graphx.VertexId, Double)] = Array((354796,1.0), (672890,1.0), (129434,1.0), (194402,1.0), (199516,1.0), (332918,1.0), (170792,1.0), (386896,1.0), (691634,1.0), (291526,1.0))

Now the vertices in `initialGraph`

are assigned initial PageRank `1.0`

, and the edges in `initialGraph`

store the out-degree information. These operations can be completed in one-line code:

val initialGraph: Graph[Double, Double] = graph .outerJoinVertices(graph.outDegrees) { (vid, vdata, deg) => deg.getOrElse(0) } .mapTriplets(e => 1.0 / e.srcAttr) .mapVertices((id, attr) => 1.0)

Assume `val initialMessage = 0.0`

, the number of iterations `val numIter = 100`

(you can take a smaller value), and the damping factor `val resetProb = 0.15`

[1]. Other message handlers are programmed according to the PageRank algorithm:

def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double = resetProb + (1.0 - resetProb) * msgSum def sendMessage(edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] = Iterator((edge.dstId, edge.srcAttr * edge.attr)) def messageCombiner(a: Double, b: Double): Double = a + b

Finally call the `Pregel`

API:

val pagerankGraph = Pregel(initialGraph, initialMessage, numIter)( vertexProgram, sendMessage, messageCombiner)

The result is

scala> initialGraph.vertices.take(10) res3: Array[(org.apache.spark.graphx.VertexId, Double)] = Array((354796,0.19213500326152516), (672890,0.2908035889020495), (129434,0.3887609101479692), (194402,0.6070857155891107), (199516,1.4700031805071416), (332918,0.3013570799851124), (170792,0.2239911438748651), (386896,0.2885166353065909), (691634,3.186713734863332), (291526,0.6765487457543149))

As mentioned at the beginning, users can calculate PageRank simply using `PageRank`

API defined in [[lib/PageRank.scala]]. This API is wrapped in GraphOps.scala:

def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double] = { PageRank.runUntilConvergence(graph, tol, resetProb) }

`tol`

is the tolerance of the error. The smaller the `tol`

is, the more accurate the result will be.

scala> val rank = graph.pageRank(0.01).vertices.take(10) rank: Array[(org.apache.spark.graphx.VertexId, Double)] = Array((354796,0.18810908106478785), (672890,0.27422939743907243), (129434,0.3664605076158495), (194402,0.5874897431581598), (199516,1.3022385827251726), (332918,0.2847157589683162), (170792,0.21475148570654695), (386896,0.2767245351551102), (691634,2.803096788390362), (291526,0.6515237426896987))

You can also use `staticPageRank`

to calculate a specific times of iteration.

def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] = { PageRank.run(graph, numIter, resetProb) }

[[lib/PageRank.scala]] implements two implementations of PageRank. The first implementation uses the standalone `Graph`

interface and runs PageRank for a fixed number of iterations; The second implementation uses the `Pregel`

interface and runs PageRank until convergence (just like our example).

### Reference

[1] Damping factor in PageRank, http://en.wikipedia.org/wiki/PageRank#Damping_factor

Thanks a lot for sharing, I was looking for the explanation of PageRank in GraphX and your post helped me a lot.

There is one thing that frustrated me a lot though. Namely, initialMessage = 0.0.

I am comparing my Hadoop implementation’s results with Spark and guess what. Results were different whole time! We are using following formula for the calculations of PR for vertice i:

PR[i] = 0.15 + (1 – 0.15) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum

where oldPR[j] =1, because you wrote “Now the vertices in initialGraph are assigned initial PageRank 1.0”. This was not true when initialValue =0, because then oldPR[j] gets 0 so whole equation gets 0.15. This means in next iteration, oldPR[j] / outDeg[j] is 0.15/outDeg[j] and not 1/outDeg[j]. Eureka!

Sorry for long comment and lot of my unreadable code, but it lasted me so long time to understand the differences in my soulution that I fet I need to write it down for further readers.

Thanks again for sharing!