# PageRank using Pregel API or GraphX

GraphX provides `PageRank` API for users to calculate the PageRank of a graph conveniently. The API is defined in [[lib/PageRank.scala]]. However, before learning the code and the usage of it, let’s review the `Pregel` API (defined in Pregel.scala) and learn how to calculate PageRank using `Pregel` API.

First, as introduced before, let’s import the graph into the memory:

```import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
```

Then associate the out-degree with each vertex using `outerJoinVertices` by

```scala> val tmp = graph.outerJoinVertices(graph.outDegrees) {
|   (vid, vdata, deg) => deg.getOrElse(0)
| }
scala> tmp.vertices.take(10)
res0: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,1), (672890,0), (129434,2), (194402,1), (199516,20), (332918,3), (170792,9), (386896,18), (691634,11), (291526,7))
```

Thirdly, set the weight on the edges based on the degree using `mapTriplets` by

```scala> val edgetmp = tmp.mapTriplets( e => 1.0/e.srcAttr )
scala> edgetmp.triplets.take(5)
res1: Array[org.apache.spark.graphx.EdgeTriplet[Int,Double]] = Array(((0,4),(11342,14),0.25), ((0,4),(824020,11),0.25), ((0,4),(867923,12),0.25), ((0,4),(891835,10),0.25), ((1,10),(53051,0),0.1))
```

And set the vertex attributes to the initial PageRank values:

```scala> val initialGraph = edgetmp.mapVertices( (id, attr) => 1.0 )
scala> initialGraph.vertices.take(10)
res2: Array[(org.apache.spark.graphx.VertexId, Double)] = Array((354796,1.0), (672890,1.0), (129434,1.0), (194402,1.0), (199516,1.0), (332918,1.0), (170792,1.0), (386896,1.0), (691634,1.0), (291526,1.0))
```

Now the vertices in `initialGraph` are assigned initial PageRank `1.0`, and the edges in `initialGraph` store the out-degree information. These operations can be completed in one-line code:

```val initialGraph: Graph[Double, Double] = graph
.outerJoinVertices(graph.outDegrees) {
(vid, vdata, deg) => deg.getOrElse(0)
}
.mapTriplets(e => 1.0 / e.srcAttr)
.mapVertices((id, attr) => 1.0)
```

Assume `val initialMessage = 0.0`, the number of iterations `val numIter = 100` (you can take a smaller value), and the damping factor `val resetProb = 0.15` [1]. Other message handlers are programmed according to the PageRank algorithm:

```def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
resetProb + (1.0 - resetProb) * msgSum
def sendMessage(edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
Iterator((edge.dstId, edge.srcAttr * edge.attr))
def messageCombiner(a: Double, b: Double): Double = a + b
```

Finally call the `Pregel` API:

```val pagerankGraph = Pregel(initialGraph, initialMessage, numIter)(
vertexProgram, sendMessage, messageCombiner)```

The result is

```scala> initialGraph.vertices.take(10)
res3: Array[(org.apache.spark.graphx.VertexId, Double)] = Array((354796,0.19213500326152516), (672890,0.2908035889020495), (129434,0.3887609101479692), (194402,0.6070857155891107), (199516,1.4700031805071416), (332918,0.3013570799851124), (170792,0.2239911438748651), (386896,0.2885166353065909), (691634,3.186713734863332), (291526,0.6765487457543149))
```

As mentioned at the beginning, users can calculate PageRank simply using `PageRank` API defined in [[lib/PageRank.scala]]. This API is wrapped in GraphOps.scala:

```def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double] = {
PageRank.runUntilConvergence(graph, tol, resetProb)
}```

`tol` is the tolerance of the error. The smaller the `tol` is, the more accurate the result will be.

```scala> val rank = graph.pageRank(0.01).vertices.take(10)
rank: Array[(org.apache.spark.graphx.VertexId, Double)] = Array((354796,0.18810908106478785), (672890,0.27422939743907243), (129434,0.3664605076158495), (194402,0.5874897431581598), (199516,1.3022385827251726), (332918,0.2847157589683162), (170792,0.21475148570654695), (386896,0.2767245351551102), (691634,2.803096788390362), (291526,0.6515237426896987))
```

You can also use `staticPageRank` to calculate a specific times of iteration.

```def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] = {
PageRank.run(graph, numIter, resetProb)
}
```

[[lib/PageRank.scala]] implements two implementations of PageRank. The first implementation uses the standalone `Graph` interface and runs PageRank for a fixed number of iterations; The second implementation uses the `Pregel` interface and runs PageRank until convergence (just like our example).

### Reference

[1] Damping factor in PageRank, http://en.wikipedia.org/wiki/PageRank#Damping_factor

## 3 responses on “PageRank using Pregel API or GraphX”

1. Ethan
1. yuhc
It’s called “Materialist”. You can find it in “Appearance”
2. Lilianna
Hello,
Thanks a lot for sharing, I was looking for the explanation of PageRank in GraphX and your post helped me a lot.
There is one thing that frustrated me a lot though. Namely, initialMessage = 0.0.

I am comparing my Hadoop implementation’s results with Spark and guess what. Results were different whole time! We are using following formula for the calculations of PR for vertice i:
PR[i] = 0.15 + (1 – 0.15) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum

where oldPR[j] =1, because you wrote “Now the vertices in initialGraph are assigned initial PageRank 1.0”. This was not true when initialValue =0, because then oldPR[j] gets 0 so whole equation gets 0.15. This means in next iteration, oldPR[j] / outDeg[j] is 0.15/outDeg[j] and not 1/outDeg[j]. Eureka!

Sorry for long comment and lot of my unreadable code, but it lasted me so long time to understand the differences in my soulution that I fet I need to write it down for further readers.

Thanks again for sharing!