# PageRank using Pregel API or GraphX

GraphX provides `PageRank` API for users to calculate the PageRank of a graph conveniently. The API is defined in [[lib/PageRank.scala]]. However, before learning the code and the usage of it, let’s review the `Pregel` API (defined in Pregel.scala) and learn how to calculate PageRank using `Pregel` API.

First, as introduced before, let’s import the graph into the memory:

```import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
```

Then associate the out-degree with each vertex using `outerJoinVertices` by

```scala> val tmp = graph.outerJoinVertices(graph.outDegrees) {
|   (vid, vdata, deg) => deg.getOrElse(0)
| }
scala> tmp.vertices.take(10)
res0: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,1), (672890,0), (129434,2), (194402,1), (199516,20), (332918,3), (170792,9), (386896,18), (691634,11), (291526,7))
```

Thirdly, set the weight on the edges based on the degree using `mapTriplets` by

```scala> val edgetmp = tmp.mapTriplets( e => 1.0/e.srcAttr )
scala> edgetmp.triplets.take(5)
res1: Array[org.apache.spark.graphx.EdgeTriplet[Int,Double]] = Array(((0,4),(11342,14),0.25), ((0,4),(824020,11),0.25), ((0,4),(867923,12),0.25), ((0,4),(891835,10),0.25), ((1,10),(53051,0),0.1))
```

And set the vertex attributes to the initial PageRank values:

```scala> val initialGraph = edgetmp.mapVertices( (id, attr) => 1.0 )
scala> initialGraph.vertices.take(10)
res2: Array[(org.apache.spark.graphx.VertexId, Double)] = Array((354796,1.0), (672890,1.0), (129434,1.0), (194402,1.0), (199516,1.0), (332918,1.0), (170792,1.0), (386896,1.0), (691634,1.0), (291526,1.0))
```

Now the vertices in `initialGraph` are assigned initial PageRank `1.0`, and the edges in `initialGraph` store the out-degree information. These operations can be completed in one-line code:

```val initialGraph: Graph[Double, Double] = graph
.outerJoinVertices(graph.outDegrees) {
(vid, vdata, deg) => deg.getOrElse(0)
}
.mapTriplets(e => 1.0 / e.srcAttr)
.mapVertices((id, attr) => 1.0)
```

Assume `val initialMessage = 0.0`, the number of iterations `val numIter = 100` (you can take a smaller value), and the damping factor `val resetProb = 0.15` [1]. Other message handlers are programmed according to the PageRank algorithm:

```def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =
resetProb + (1.0 - resetProb) * msgSum
def sendMessage(edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =
Iterator((edge.dstId, edge.srcAttr * edge.attr))
def messageCombiner(a: Double, b: Double): Double = a + b
```

Finally call the `Pregel` API:

```val pagerankGraph = Pregel(initialGraph, initialMessage, numIter)(
vertexProgram, sendMessage, messageCombiner)```

The result is

```scala> initialGraph.vertices.take(10)
res3: Array[(org.apache.spark.graphx.VertexId, Double)] = Array((354796,0.19213500326152516), (672890,0.2908035889020495), (129434,0.3887609101479692), (194402,0.6070857155891107), (199516,1.4700031805071416), (332918,0.3013570799851124), (170792,0.2239911438748651), (386896,0.2885166353065909), (691634,3.186713734863332), (291526,0.6765487457543149))
```

As mentioned at the beginning, users can calculate PageRank simply using `PageRank` API defined in [[lib/PageRank.scala]]. This API is wrapped in GraphOps.scala:

```def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double] = {
PageRank.runUntilConvergence(graph, tol, resetProb)
}```

`tol` is the tolerance of the error. The smaller the `tol` is, the more accurate the result will be.

```scala> val rank = graph.pageRank(0.01).vertices.take(10)
rank: Array[(org.apache.spark.graphx.VertexId, Double)] = Array((354796,0.18810908106478785), (672890,0.27422939743907243), (129434,0.3664605076158495), (194402,0.5874897431581598), (199516,1.3022385827251726), (332918,0.2847157589683162), (170792,0.21475148570654695), (386896,0.2767245351551102), (691634,2.803096788390362), (291526,0.6515237426896987))
```

You can also use `staticPageRank` to calculate a specific times of iteration.

```def staticPageRank(numIter: Int, resetProb: Double = 0.15): Graph[Double, Double] = {
PageRank.run(graph, numIter, resetProb)
}
```

[[lib/PageRank.scala]] implements two implementations of PageRank. The first implementation uses the standalone `Graph` interface and runs PageRank for a fixed number of iterations; The second implementation uses the `Pregel` interface and runs PageRank until convergence (just like our example).

### Reference

[1] Damping factor in PageRank, http://en.wikipedia.org/wiki/PageRank#Damping_factor

## 3 responses on “PageRank using Pregel API or GraphX”

1. Ethan
Your blog is awesome! Would you share your wordpress theme? Thanks.
1. yuhc
It’s called “Materialist”. You can find it in “Appearance”
2. Lilianna
Hello,
Thanks a lot for sharing, I was looking for the explanation of PageRank in GraphX and your post helped me a lot.
There is one thing that frustrated me a lot though. Namely, initialMessage = 0.0.

I am comparing my Hadoop implementation’s results with Spark and guess what. Results were different whole time! We are using following formula for the calculations of PR for vertice i:
PR[i] = 0.15 + (1 – 0.15) * inNbrs[i].map(j => oldPR[j] / outDeg[j]).sum

where oldPR[j] =1, because you wrote “Now the vertices in initialGraph are assigned initial PageRank 1.0”. This was not true when initialValue =0, because then oldPR[j] gets 0 so whole equation gets 0.15. This means in next iteration, oldPR[j] / outDeg[j] is 0.15/outDeg[j] and not 1/outDeg[j]. Eureka!

Sorry for long comment and lot of my unreadable code, but it lasted me so long time to understand the differences in my soulution that I fet I need to write it down for further readers.

Thanks again for sharing!