Graph Operators in GraphX – Part 1

GraphX defines several operators which can process graph information. As introduced before, some operators are defined in Graph.scala, such as mapVertices, mapEdges, and mapTriplets. These are three important property operators, which will be introduced in this post.

mapVertices can transform each vertex attribute in graph. map is the function from a vertex object to a new vertex value. The new graph structure is the same as the old one, so the underlying index structures can be reused. VD2 is the new vertex data type.

/**
 * @example We might use this operation to change the vertex values
 * from one type to another to initialize an algorithm.
 * {{{
 * val rawGraph: Graph[(), ()] = Graph.textFile("hdfs://file")
 * val root = 42
 * var bfsGraph = rawGraph.mapVertices[Int]((vid, data) => if (vid == root) 0 else Math.MaxValue)
 * }}}
 *
 */
def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2)
  (implicit eq: VD =:= VD2 = null): Graph[VD2, ED]

Let’s make a simple test. Start the spark-shell and import the graph from web-Google data set. Type in:

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
val graph = GraphLoader.edgeListFile(sc, "hdfs://192.168.17.240:9222/input/yuhc/web-Google/web-Google.txt")
graph.vertices.take(10)

It takes ten nodes from the graph and results:

res1: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,1), (672890,1), (129434,1), (194402,1), (199516,1), (332918,1), (170792,1), (386896,1), (691634,1), (291526,1))

Each vertex’s value is set to 1, which is a default operation. We can set the values to 2 by:

val tmp = graph.mapVertices((id, attr) => attr.toInt * 2)

If we use tmp.vertices.take(10) to see the values, it returns

res2: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,2), (672890,2), (129434,2), (194402,2), (199516,2), (332918,2), (170792,2), (386896,2), (691634,2), (291526,2))

Another optimized method is:

val tmp :Graph[Int, Int] = graph.mapVertices((_, attr) => attr * 3)

It contains

res3: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,3), (672890,3), (129434,3), (194402,3), (199516,3), (332918,3), (170792,3), (386896,3), (691634,3), (291526,3))

mapEdges has two duplicate functions:

def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2] = {
  mapEdges((pid, iter) => iter.map(map))
}

def mapEdges[ED2: ClassTag](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2])
  : Graph[VD, ED2]

The first one call the second to finish the maping process. It transforms each edge attribute in the graph using the map function. But it doesn’t pass the vertex value for the vertices adjacent to the edge (but mapTriplets does). The new graph has the same structure as the original one.

Have a look at the edges in the graph:

graph.edges.take(10)

It returns

res4: Array[org.apache.spark.graphx.Edge[Int]] = Array(Edge(0,11342,1), Edge(0,824020,1), Edge(0,867923,1), Edge(0,891835,1), Edge(1,53051,1), Edge(1,203402,1), Edge(1,223236,1), Edge(1,276233,1), Edge(1,552600,1), Edge(1,569212,1))

Multiply the edges’ attributes by 2:

val tmp = graph.mapEdges(e => e.attr.toInt * 2)

Then tmp.edges.take(10) returns:

res5: Array[org.apache.spark.graphx.Edge[Int]] = Array(Edge(0,11342,2), Edge(0,824020,2), Edge(0,867923,2), Edge(0,891835,2), Edge(1,53051,2), Edge(1,203402,2), Edge(1,223236,2), Edge(1,276233,2), Edge(1,552600,2), Edge(1,569212,2))

mapTriplets is similar to those two functions:

def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
  mapTriplets((pid, iter) => iter.map(map), TripletFields.All)
}

Type in the following code, which sets the edge value to the sum of twice the source vertex value and three times the destination vertex value.

val tmp = graph.mapTriplets(et => et.srcAttr.toInt * 2 + et.dstAttr.toInt * 3)

View the triplets using tmp.triplets.take(10), it returns

res6: Array[org.apache.spark.graphx.EdgeTriplet[Int,Int]] = Array(((0,1),(11342,1),5), ((0,1),(824020,1),5), ((0,1),(867923,1),5), ((0,1),(891835,1),5), ((1,1),(53051,1),5), ((1,1),(203402,1),5), ((1,1),(223236,1),5), ((1,1),(276233,1),5), ((1,1),(552600,1),5), ((1,1),(569212,1),5))

Graph.scala also contains several structural operators such as reverse, subgraph, mask, and groupEdges. reverse reverses all edges in the graph. subgraph returns the subgraph which contains only the vertices and edges that satisfy the predicates.

def subgraph(
    epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
    vpred: (VertexId, VD) => Boolean = ((v, d) => true))
  : Graph[VD, ED]

The vertices and edges in the subgraph meet

V' = {v : for all v in V where vpred(v)}
E' = {(u,v): for all (u,v) in E where epred((u,v)) && vpred(u) && vpred(v)}

In the last article, we know there are 875,713 vertices and 5,105,039 edges in the graph. Create the subgraph by

val subgraph = graph.subgraph(epred = e => e.srcId > e.dstId)

Count the vertices and edges in the subgraph:

scala> subgraph.vertices.count
res7: Long = 875713
scala> subgraph.edges.count
res8: Long = 2420548
scala> subgraph.edges.take(10)
res11: Array[org.apache.spark.graphx.Edge[Int]] = Array(Edge(1122,429,1), Edge(1300,606,1), Edge(1436,409,1), Edge(1509,1401,1), Edge(1513,1406,1), Edge(1624,827,1), Edge(1705,693,1), Edge(1825,1717,1), Edge(1985,827,1), Edge(2135,600,1))

Only edges whose source vertex ID is larger than destination vertex ID are left.

Add vpred to the parameter:

val subgraph = graph.subgraph(epred = e => e.srcId > e.dstId, vpred = (id, _) => id > 500000)

Count the vertices and edges in the new subgraph:

scala> subgraph.vertices.count
res9: Long = 400340
scala> subgraph.edges.count
res10: Long = 526711
scala> subgraph.edges.take(10)
res11: Array[org.apache.spark.graphx.Edge[Int]] = Array(Edge(500397,500290,1), Edge(500627,500542,1), Edge(501011,500055,1), Edge(501663,500010,1), Edge(501941,501177,1), Edge(501984,501223,1), Edge(502067,500279,1), Edge(502187,500532,1), Edge(502335,500279,1), Edge(502809,500538,1))

mask returns a graph with vertices and edges that exist in both the current graph and other. The new graph’s vertex and edge data is from the current graph.
def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]

groupEdges merges multiple edges between two vertices into a single edge. It returns a graph with only a single edge for each (source, destination) vertex pair.
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]

It can be used by

val subgraph = graph.groupEdges((a, b) => a+b)

Another important (but deprecated) operation defined in Graph.scala which will be introduced here is mapReduceTriplets. It aggregates values from the neighboring edges and vertices of each vertex. The user supplied mapFunc function is invoked on each edge of the graph, generating 0 or more “messages” to be “sent” to either vertex in the edge. The reduceFunc is then used to combine the output of the map phase destined to each vertex.

After Spark 1.2.0, we use aggregateMessages instead of this function [1]. But it still helps us learn aggregateMessages. A is the type of “message” to be sent to each vertex. activeSetOpt is the subset of edges on which the aggregation will run (specifying a set of “active” vertices and an edge direction to designate it). This function can be used to count neighbors satisfying a predicate or implement PageRank as it enables neighborhood level computation.

/**
 * @example We can use this function to compute the in-degree of each
 * vertex
 * {{{
 * val rawGraph: Graph[(),()] = Graph.textFile("twittergraph")
 * val inDeg: RDD[(VertexId, Int)] =
 *   mapReduceTriplets[Int](et => Iterator((et.dst.id, 1)), _ + _)
 * }}}
 */
@deprecated("use aggregateMessages", "1.2.0")
def mapReduceTriplets[A: ClassTag](
    mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
    reduceFunc: (A, A) => A,
    activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None)
  : VertexRDD[A]

The following is another example which was in the old version of official guide. First generate a random graph using GraphX library:

scala> import org.apache.spark._
scala> import org.apache.spark.graphx._
scala> import org.apache.spark.rdd.RDD
scala> import org.apache.spark.graphx.util.GraphGenerators
scala> GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
scala> val graph: Graph[Double, Int] =
     | GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
scala> graph.vertices.take(10).mkString("\n")
res12: String =
(84,84.0)
(96,96.0)
(52,52.0)
(56,56.0)
(4,4.0)
(76,76.0)
(16,16.0)
(28,28.0)
(80,80.0)
(48,48.0)
scala> graph.edges.take(10).mkString("\n")
res13: String =
Edge(0,3,1)
Edge(0,13,1)
Edge(0,19,1)
Edge(0,27,1)
Edge(0,37,1)
Edge(0,53,1)
Edge(0,77,1)
Edge(0,94,1)
Edge(1,2,1)
Edge(1,10,1)

Suppose the vertex IDs are the ages of the people. We want to find the number of people who are older than each person, and the average age of those older people.

scala> val olderPeople: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)] (
     | triplet => { //Map Function
     |   if (triplet.srcAttr > triplet.dstAttr) {
     |     Iterator((triplet.dstId, (1, triplet.srcAttr)))
     |   }
     |   else {
     |     Iterator.empty
     |   }
     | },
     | //Reduce Function
     | (a, b) => (a._1 + b._1, a._2 + b._2)
     | )

Ignore the warning message. Divide the total ages by the number of older people:

scala> val avgAgeOfOlderPeople: VertexRDD[Double] =
     | olderPeople.mapValues( (id, value) => value match { case (count, totalAge) => totalAge/count } )

Print the answer using foreach:

scala> avgAgeOfOlderPeople.collect.foreach(println(_))
(84,93.5)
(52,70.63636363636364)
(56,77.0)
(4,48.02272727272727)
(76,85.4)
(16,59.92857142857143)
(28,63.27272727272727)
(80,88.6)
(48,72.0952380952381)
(32,69.72727272727273)
(36,66.96153846153847)
(0,48.30232558139535)
...

We can do the same thing using aggregateMessages. The mergeMsg function is used to combine all messages from sendMsg destined to the same vertex. This combiner should be commutative and associative. sendMsg runs on each edge, sending messages to neighboring vertices using the [[EdgeContext]]. tripletFields which is a subset of [[EdgeContext]] is like activeSetOpt.

  /*
   * @example We can use this function to compute the in-degree of each
   * vertex
   * {{{
   * val rawGraph: Graph[_, _] = Graph.textFile("twittergraph")
   * val inDeg: RDD[(VertexId, Int)] =
   *   aggregateMessages[Int](ctx => ctx.sendToDst(1), _ + _)
   * }}}
   */
  def aggregateMessages[A: ClassTag](
      sendMsg: EdgeContext[VD, ED, A] => Unit,
      mergeMsg: (A, A) => A,
      tripletFields: TripletFields = TripletFields.All)
    : VertexRDD[A] = {
    aggregateMessagesWithActiveSet(sendMsg, mergeMsg, tripletFields, None)
  }

The code is as following:

scala> val olderPeople: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)] (
     | triplet => { //Map Function
     |   if (triplet.srcAttr > triplet.dstAttr) {
     |     triplet.sendToDst(1, triplet.srcAttr)
     |   }
     | },
     | //Reduce Function
     | (a, b) => (a._1 + b._1, a._2 + b._2)
     | )
scala> val avgAgeOfOlderPeople: VertexRDD[Double] =
     | olderPeople.mapValues( (id, value) => value match { case (count, totalAge) => totalAge/count } )
scala> avgAgeOfOlderPeople.collect.foreach(println(_))
(84,93.5)
(52,70.63636363636364)
(56,77.0)
(4,48.02272727272727)
(76,85.4)
(16,59.92857142857143)
(28,63.27272727272727)
(80,88.6)
(48,72.0952380952381)
(32,69.72727272727273)
(36,66.96153846153847)
(0,48.30232558139535)
...

It obtains the same result as using mapReduceTriplets.

Reference

[1] SPARK-3936 issue, https://issues.apache.org/jira/browse/SPARK-3936

One response on “Graph Operators in GraphX – Part 1

  1. Laxmikant
    Hi,

    Nice blog!

    Can you help me with this? http://stackoverflow.com/questions/36783821/how-to-update-the-weights-efficiently-according-to-adjacency-matrix

    I have a very large graph. where there are links between the nodes. Each edge has weight 1 initially. I have to update the weights of edges according to transformed adjacency matrix.

    [![enter image description here][1]][1]

    [1]: http://i.stack.imgur.com/BDpJg.png

    Where A is Adjcency Matrix. The new weight in nodes (i,j) will be given by M(i,j).

    I have to do this in Graphx. How I do approach for this?

    **My Approach**: Find all the neighboring nodes for each node and the inner join them.in pair. then update weights of each node.

    But I am little confused about writing efficient code in Graphx.
    How I do I proceed about this? Snaps of code is appreciated.

Leave a Reply

Time limit is exhausted. Please reload CAPTCHA.