If you pay enough attention to Graph.scala, you may find `cache`

function. It can cache the graph at specific storage levels. However, GraphX doesn’t perform this function automatically, and the developers have to execute it manually. But during each iteration, we have to cache the new results and delete the useless information in order to accelerate the computation. It’s inconvenient and uncontrollable, lest the graph in GraphX is stored as vertices and edges. To solve this problem and make the iteration easier to use, GraphX provides a `Pregel`

-like API [1].

`Pregel`

API is provided in Pregel.scala. It’s not the same as the original Pergel API, it’s Pregel-like. It implements a bulk-synchronous message-passing API, which enables the message sending computation to read both vertex attributes, and constrains messages to the graph structure.

In `apply`

method of `object Pregel`

, `VD`

is the vertex data type, `ED`

is the edge data type, and `A`

is the Pregel message type. Each vertex in `graph`

will receive the initial message `initialMsg`

at the the first iteration. The iterations will run at most maxIterations times (when there’re no remaining messages, the iteration on that vertex will stop too). `activeDirection`

is the direction of edges incident to a vertex that received a message in the previous round on which to run `sendMsg`

. For example, if this is `EdgeDirection.Either`

(by default), edges where either side received a message in the previous round will run `sendMsg`

. Then commutative associative function `mergeMsg`

is used to merge two incoming messages on a vertex into a single message. The vertex-program `vprog`

is like `onMessage`

in WebSocket, which is executed in parallel when each vertex receiving any inbound messages and computing a new value for the vertex.

object Pregel extends Logging { def apply[VD: ClassTag, ED: ClassTag, A: ClassTag] (graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue, activeDirection: EdgeDirection = EdgeDirection.Either) (vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) : Graph[VD, ED] = { ... // Loop var i = 0 while (activeMessages > 0 && i < maxIterations) { // Receive the messages. Vertices that didn't get any messages do not appear in newVerts. // Update the graph with the new vertices. // Send new messages. // Materializes `messages`, `newVerts`, and the vertices of `g`. This hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g). // Unpersist the RDDs hidden by newly-materialized RDDs // count the iteration i += 1 } // Return new resulted graph g } // end of apply }

`Pregel`

API can help us calculate `PageRank`

and `Shortest Path`

easily. We’ll introduce `Shortest Path`

using web-Google first and `PageRank`

in the later article.

Till now, computer scientists have proposed several algorithms for the Shortest path problem [2], such as Dijkstra’s algorithm [3] and Bellman–Ford algorithm [4]. These algorithms have different implementations, but the cores of them are the same, the `relaxation operation`

[5]: if distance from `a`

to `c`

`dis[a][c]`

is longer than the distance from `a`

to `b`

`dis[a][b]`

plus that from `b`

to `c`

`dis[b][c]`

, then update `dis[a][c]`

to `dis[a][b] + dis[b][c]`

.

Now begin the example. Import the graph, define the source vertex, and initialize the distance used to be iterated (we will use Dijkstra’s algorithm):

scala> import org.apache.spark._ scala> import org.apache.spark.graphx._ scala> import org.apache.spark.rdd.RDD scala> val graph = GraphLoader.edgeListFile(sc, "hdfs://192.168.17.240:9222/input/yuhc/web-Google/web-Google.txt") scala> val sourceId: VertexId = 0 scala> val g = graph.mapVertices( (id, _) => | if (id == sourceId) 0.0 | else Double.PositiveInfinity | )

Then use `Pregel`

API simply:

scala> val sssp = g.pregel(Double.PositiveInfinity)( | (id, dist, newDist) => math.min(dist, newDist), | triplet => { | if (triplet.srcAttr + triplet.attr < triplet.dstAttr) { | Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) | } | else { | Iterator.empty | } | }, | (a, b) => math.min(a, b) | )

View the result of it:

scala> sssp.vertices.take(10).mkString("\n") res0: String = (354796,11.0) (672890,11.0) (129434,11.0) (194402,15.0) (199516,8.0) (332918,13.0) (170792,11.0) (386896,11.0) (691634,8.0) (291526,Infinity)

Shortest Path algorithm has been provided in [[lib/ShortestPaths.scala]]. Instead of calculating the Single Source Shortest Path (SSSP), it calculates the shortest distance between each two different vertices.

object ShortestPaths { ... def run[VD, ED: ClassTag](graph: Graph[VD, ED], landmarks: Seq[VertexId]): Graph[SPMap, ED] = { val spGraph = graph.mapVertices { (vid, attr) => if (landmarks.contains(vid)) makeMap(vid -> 0) else makeMap() } val initialMessage = makeMap() def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = { addMaps(attr, msg) } def sendMessage(edge: EdgeTriplet[SPMap, _]): Iterator[(VertexId, SPMap)] = { val newAttr = incrementMap(edge.dstAttr) if (edge.srcAttr != addMaps(newAttr, edge.srcAttr)) Iterator((edge.srcId, newAttr)) else Iterator.empty } Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps) } }

`landmarks`

is the list of landmark vertex IDs on which the shortest paths will be computed. `makeMap`

is a mapping function

private def makeMap(x: (VertexId, Int)*) = Map(x: _*)

`SPMap`

is a map type [6] `type SPMap = Map[VertexId, Int]`

storing the map from the vertex id of a landmark to the distance to that landmark.

`addMaps`

chooses the minimal distance value as the `VertexId -> Distance`

map. `vertexProgram`

calls `addMaps`

and does the same thing.

private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = (spmap1.keySet ++ spmap2.keySet).map { k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue)) }.toMap

`incrementMap`

increases the distance of the next hop (as all weights of the edges are considered as `1`

). The direction of the iteration is a bit strange–it jumps from the destination vertex to the source vertex on a edge. But of course it doesn’t affect the result.

private def incrementMap(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) }

### Example: Dijkstra Algorithm in GraphX

Thanks for Stephen’s question. With slight modification of the codes mentioned before, we can calculate the Single Source Shortest Paths (SSSP) easily. First, import the graph as before:

import org.apache.spark._ import org.apache.spark.graphx._ import org.apache.spark.rdd.RDD val graph = GraphLoader.edgeListFile(sc, "hdfs://192.168.17.240:9222/input/yuhc/web-Google/web-Google.txt")

As I don’t have a graph data with edge weights on hand, I randomize the weights of the edges (smaller than `100`

). Each vertex has an attribute which is a two-element array. This array includes the distance from the `sourceId`

, and the `VertexId`

of its previous node in the shortest path. Here is the initialization of the vertices and `distance`

array:

import scala.util.Random.nextInt val sourceId: VertexId = 0 val g = graph.mapVertices( (id, _) => if (id == sourceId) Array(0.0, id) else Array(Double.PositiveInfinity, id) ).mapEdges( e => (new scala.util.Random).nextInt(100) )

Have a look at this graph:

scala> g.vertices.take(10) res0: Array[(org.apache.spark.graphx.VertexId, Array[Double])] = Array((354796,Array(Infinity, 354796.0)), (672890,Array(Infinity, 672890.0)), (129434,Array(Infinity, 129434.0)), (194402,Array(Infinity, 194402.0)), (199516,Array(Infinity, 199516.0)), (332918,Array(Infinity, 332918.0)), (170792,Array(Infinity, 170792.0)), (386896,Array(Infinity, 386896.0)), (691634,Array(Infinity, 691634.0)), (291526,Array(Infinity, 291526.0))) scala> g.edges.take(10) res1: Array[org.apache.spark.graphx.Edge[Int]] = Array(Edge(0,11342,47), Edge(0,824020,53), Edge(0,867923,90), Edge(0,891835,58), Edge(1,53051,7), Edge(1,203402,50), Edge(1,223236,80), Edge(1,276233,16), Edge(1,552600,76), Edge(1,569212,47))

Modify the `sssp`

a little by updating the `previous node`

in the path:

val sssp = g.pregel(Array(Double.PositiveInfinity, -1))( (id, dist, newDist) => { if (dist(0) < newDist(0)) dist else newDist }, triplet => { if (triplet.srcAttr(0) + triplet.attr < triplet.dstAttr(0)) { Iterator((triplet.dstId, Array(triplet.srcAttr(0) + triplet.attr, triplet.srcId))) } else { Iterator.empty } }, (a, b) => { if (a(0) < b(0)) a else b } )

Format and print the answer:

val ans: RDD[String] = sssp.vertices.map(vertex => "Vertex " + vertex._1 + ": distance is " + vertex._2(0) + ", previous node is Vertex " + vertex._2(1).toInt) scala> ans.take(10).mkString("\n") res2: String = Vertex 354796: distance is 191.0, previous node is Vertex 283953 Vertex 672890: distance is 277.0, previous node is Vertex 781510 Vertex 129434: distance is 292.0, previous node is Vertex 119943 Vertex 194402: distance is 461.0, previous node is Vertex 446259 Vertex 199516: distance is 199.0, previous node is Vertex 458892 Vertex 332918: distance is 337.0, previous node is Vertex 89384 Vertex 170792: distance is 321.0, previous node is Vertex 138757 Vertex 386896: distance is 210.0, previous node is Vertex 580484 Vertex 691634: distance is 169.0, previous node is Vertex 400059 Vertex 291526: distance is Infinity, previous node is Vertex -1

You can also use `ans.collect.foreach(println(_))`

to print the full answer. With `previous node`

‘s id, it’s easy to print the full path from the vertex `sourceId`

to the specific vertex (in `O(N)`

time).

Another way is to keep the `full path`

instead of `previous node`

in the vertices. But this method may consume too many memory resources.

A third way (by Zhouyihai-Ding in iWCT Spark group) is to do another `map`

action. We have links like `(a, b), (a, c), (b, c)`

now; and during each `MapReduce`

iteration, we connect the two paths which can be connected together, e.g. `(a, b, c), (a, c)`

.

If you have any other way to obtain the full path in a distributed style, please leave me a comment.

### Reference

[1] Pregel: A System for Large-Scale Graph Processing, http://kowshik.github.io/JPregel/pregel_paper.pdf

[2] Shortest path problem, http://en.wikipedia.org/wiki/Shortest_path_problem

[3] Dijkstra’s algorithm, http://en.wikipedia.org/wiki/Dijkstra’s_algorithm

[4] Bellman–Ford algorithm, http://en.wikipedia.org/wiki/Bellman-Ford_algorithm

[5] Shortest Paths Graph Algorithms, http://www.informit.com/articles/article.aspx?p=169575

[6] Maps in Scala, http://docs.scala-lang.org/overviews/collections/maps.html

This looks like a good starting point, but I don’t see anything showing me how to output the path and this implementation doesn’t use the weight on the edges at all.

`1`

(map the length to`triplet.attr`

in the first code version; or modify`incrementMap`

in the second). I’ll try to implement it in a few days, and reply to you at that time.I’ll also check the reply-email-notification plugin (it may not work properly), and reply to you again we I finish this new code.

(Also sorry for my poor English) :smile:

`while`

-loop can print it with the information of previous nodes. I think it’s not hard and I’ll not update it in the post.Your post is really helpful. I have the following use case. I have a map data for a given city, with the nodes (node_id, lat, lon) and edgelist (from_node,to_node, distance). I am trying to find out shortest distance between 2 nodes (Point A and Point B). My question is, if i have the edgelist with weights (as distance), given as input to graphx is that good enough OR like the documentation on graphx, should I use the function Graph(Vertex_rdd, Edge_rdd)?

Looking forward to hear back.

Can you please help me out, how to print the path(list of nodes) between 2 vertices.?

Thanks

Pranesh

Can you tell me how to do that ?