# Graph Operators in GraphX – Part 1

GraphX defines several operators which can process graph information. As introduced before, some operators are defined in Graph.scala, such as `mapVertices`, `mapEdges`, and `mapTriplets`. These are three important property operators, which will be introduced in this post.

`mapVertices` can transform each vertex attribute in graph. `map` is the function from a vertex object to a new vertex value. The new graph structure is the same as the old one, so the underlying index structures can be reused. `VD2` is the new vertex data type.

```/**
* @example We might use this operation to change the vertex values
* from one type to another to initialize an algorithm.
* {{{
* val rawGraph: Graph[(), ()] = Graph.textFile("hdfs://file")
* val root = 42
* var bfsGraph = rawGraph.mapVertices[Int]((vid, data) => if (vid == root) 0 else Math.MaxValue)
* }}}
*
*/
def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2)
(implicit eq: VD =:= VD2 = null): Graph[VD2, ED]```

Let’s make a simple test. Start the `spark-shell` and import the graph from web-Google data set. Type in:

```import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
graph.vertices.take(10)
```

It takes ten nodes from the graph and results:

```res1: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,1), (672890,1), (129434,1), (194402,1), (199516,1), (332918,1), (170792,1), (386896,1), (691634,1), (291526,1))
```

Each vertex’s value is set to `1`, which is a default operation. We can set the values to `2` by:

`val tmp = graph.mapVertices((id, attr) => attr.toInt * 2)`

If we use `tmp.vertices.take(10)` to see the values, it returns

`res2: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,2), (672890,2), (129434,2), (194402,2), (199516,2), (332918,2), (170792,2), (386896,2), (691634,2), (291526,2))`

Another optimized method is:

`val tmp :Graph[Int, Int] = graph.mapVertices((_, attr) => attr * 3)`

It contains

`res3: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,3), (672890,3), (129434,3), (194402,3), (199516,3), (332918,3), (170792,3), (386896,3), (691634,3), (291526,3))`

`mapEdges` has two duplicate functions:

```def mapEdges[ED2: ClassTag](map: Edge[ED] => ED2): Graph[VD, ED2] = {
mapEdges((pid, iter) => iter.map(map))
}

def mapEdges[ED2: ClassTag](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2])
: Graph[VD, ED2]
```

The first one call the second to finish the maping process. It transforms each edge attribute in the graph using the `map` function. But it doesn’t pass the vertex value for the vertices adjacent to the edge (but `mapTriplets` does). The new graph has the same structure as the original one.

Have a look at the edges in the graph:

`graph.edges.take(10)`

It returns

`res4: Array[org.apache.spark.graphx.Edge[Int]] = Array(Edge(0,11342,1), Edge(0,824020,1), Edge(0,867923,1), Edge(0,891835,1), Edge(1,53051,1), Edge(1,203402,1), Edge(1,223236,1), Edge(1,276233,1), Edge(1,552600,1), Edge(1,569212,1))`

Multiply the edges’ attributes by `2`:

`val tmp = graph.mapEdges(e => e.attr.toInt * 2)`

Then `tmp.edges.take(10)` returns:

`res5: Array[org.apache.spark.graphx.Edge[Int]] = Array(Edge(0,11342,2), Edge(0,824020,2), Edge(0,867923,2), Edge(0,891835,2), Edge(1,53051,2), Edge(1,203402,2), Edge(1,223236,2), Edge(1,276233,2), Edge(1,552600,2), Edge(1,569212,2))`

`mapTriplets` is similar to those two functions:

```def mapTriplets[ED2: ClassTag](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] = {
mapTriplets((pid, iter) => iter.map(map), TripletFields.All)
}```

Type in the following code, which sets the edge value to the sum of twice the source vertex value and three times the destination vertex value.

`val tmp = graph.mapTriplets(et => et.srcAttr.toInt * 2 + et.dstAttr.toInt * 3)`

View the triplets using `tmp.triplets.take(10)`, it returns

`res6: Array[org.apache.spark.graphx.EdgeTriplet[Int,Int]] = Array(((0,1),(11342,1),5), ((0,1),(824020,1),5), ((0,1),(867923,1),5), ((0,1),(891835,1),5), ((1,1),(53051,1),5), ((1,1),(203402,1),5), ((1,1),(223236,1),5), ((1,1),(276233,1),5), ((1,1),(552600,1),5), ((1,1),(569212,1),5))`

Graph.scala also contains several structural operators such as `reverse`, `subgraph`, `mask`, and `groupEdges`. `reverse` reverses all edges in the graph. `subgraph` returns the subgraph which contains only the vertices and edges that satisfy the predicates.

```def subgraph(
epred: EdgeTriplet[VD,ED] => Boolean = (x => true),
vpred: (VertexId, VD) => Boolean = ((v, d) => true))
: Graph[VD, ED]```

The vertices and edges in the subgraph meet

```V' = {v : for all v in V where vpred(v)}
E' = {(u,v): for all (u,v) in E where epred((u,v)) && vpred(u) && vpred(v)}
```

In the last article, we know there are 875,713 vertices and 5,105,039 edges in the graph. Create the subgraph by

`val subgraph = graph.subgraph(epred = e => e.srcId > e.dstId)`

Count the vertices and edges in the subgraph:

```scala> subgraph.vertices.count
res7: Long = 875713
scala> subgraph.edges.count
res8: Long = 2420548
scala> subgraph.edges.take(10)
res11: Array[org.apache.spark.graphx.Edge[Int]] = Array(Edge(1122,429,1), Edge(1300,606,1), Edge(1436,409,1), Edge(1509,1401,1), Edge(1513,1406,1), Edge(1624,827,1), Edge(1705,693,1), Edge(1825,1717,1), Edge(1985,827,1), Edge(2135,600,1))
```

Only edges whose source vertex ID is larger than destination vertex ID are left.

Add `vpred` to the parameter:

`val subgraph = graph.subgraph(epred = e => e.srcId > e.dstId, vpred = (id, _) => id > 500000)`

Count the vertices and edges in the new subgraph:

```scala> subgraph.vertices.count
res9: Long = 400340
scala> subgraph.edges.count
res10: Long = 526711
scala> subgraph.edges.take(10)
res11: Array[org.apache.spark.graphx.Edge[Int]] = Array(Edge(500397,500290,1), Edge(500627,500542,1), Edge(501011,500055,1), Edge(501663,500010,1), Edge(501941,501177,1), Edge(501984,501223,1), Edge(502067,500279,1), Edge(502187,500532,1), Edge(502335,500279,1), Edge(502809,500538,1))
```

`mask` returns a graph with vertices and edges that exist in both the current graph and `other`. The new graph’s vertex and edge data is from the current graph.
def mask[VD2: ClassTag, ED2: ClassTag](other: Graph[VD2, ED2]): Graph[VD, ED]

`groupEdges` merges multiple edges between two vertices into a single edge. It returns a graph with only a single edge for each `(source, destination)` vertex pair.
def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED]

It can be used by

`val subgraph = graph.groupEdges((a, b) => a+b)`

Another important (but deprecated) operation defined in Graph.scala which will be introduced here is `mapReduceTriplets`. It aggregates values from the neighboring edges and vertices of each vertex. The user supplied `mapFunc` function is invoked on each edge of the graph, generating 0 or more “messages” to be “sent” to either vertex in the edge. The `reduceFunc` is then used to combine the output of the map phase destined to each vertex.

After Spark 1.2.0, we use `aggregateMessages` instead of this function [1]. But it still helps us learn `aggregateMessages`. `A` is the type of “message” to be sent to each vertex. `activeSetOpt` is the subset of edges on which the aggregation will run (specifying a set of “active” vertices and an edge direction to designate it). This function can be used to count neighbors satisfying a predicate or implement PageRank as it enables neighborhood level computation.

```/**
* @example We can use this function to compute the in-degree of each
* vertex
* {{{
* val rawGraph: Graph[(),()] = Graph.textFile("twittergraph")
* val inDeg: RDD[(VertexId, Int)] =
*   mapReduceTriplets[Int](et => Iterator((et.dst.id, 1)), _ + _)
* }}}
*/
@deprecated("use aggregateMessages", "1.2.0")
def mapReduceTriplets[A: ClassTag](
mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduceFunc: (A, A) => A,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None)
: VertexRDD[A]```

The following is another example which was in the old version of official guide. First generate a random graph using GraphX library:

```scala> import org.apache.spark._
scala> import org.apache.spark.graphx._
scala> import org.apache.spark.rdd.RDD
scala> import org.apache.spark.graphx.util.GraphGenerators
scala> GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
scala> val graph: Graph[Double, Int] =
| GraphGenerators.logNormalGraph(sc, numVertices = 100).mapVertices( (id, _) => id.toDouble )
scala> graph.vertices.take(10).mkString("\n")
res12: String =
(84,84.0)
(96,96.0)
(52,52.0)
(56,56.0)
(4,4.0)
(76,76.0)
(16,16.0)
(28,28.0)
(80,80.0)
(48,48.0)
scala> graph.edges.take(10).mkString("\n")
res13: String =
Edge(0,3,1)
Edge(0,13,1)
Edge(0,19,1)
Edge(0,27,1)
Edge(0,37,1)
Edge(0,53,1)
Edge(0,77,1)
Edge(0,94,1)
Edge(1,2,1)
Edge(1,10,1)
```

Suppose the vertex IDs are the ages of the people. We want to find the number of people who are older than each person, and the average age of those older people.

```scala> val olderPeople: VertexRDD[(Int, Double)] = graph.mapReduceTriplets[(Int, Double)] (
| triplet => { //Map Function
|   if (triplet.srcAttr > triplet.dstAttr) {
|     Iterator((triplet.dstId, (1, triplet.srcAttr)))
|   }
|   else {
|     Iterator.empty
|   }
| },
| //Reduce Function
| (a, b) => (a._1 + b._1, a._2 + b._2)
| )
```

Ignore the warning message. Divide the total ages by the number of older people:

```﻿scala> val avgAgeOfOlderPeople: VertexRDD[Double] =
| olderPeople.mapValues( (id, value) => value match { case (count, totalAge) => totalAge/count } )
```

Print the answer using `foreach`:

```scala> avgAgeOfOlderPeople.collect.foreach(println(_))
(84,93.5)
(52,70.63636363636364)
(56,77.0)
(4,48.02272727272727)
(76,85.4)
(16,59.92857142857143)
(28,63.27272727272727)
(80,88.6)
(48,72.0952380952381)
(32,69.72727272727273)
(36,66.96153846153847)
(0,48.30232558139535)
...```

We can do the same thing using `aggregateMessages`. The `mergeMsg` function is used to combine all messages from `sendMsg` destined to the same vertex. This combiner should be commutative and associative. `sendMsg` runs on each edge, sending messages to neighboring vertices using the `[[EdgeContext]]`. `tripletFields` which is a subset of `[[EdgeContext]]` is like `activeSetOpt`.

```  /*
* @example We can use this function to compute the in-degree of each
* vertex
* {{{
* val rawGraph: Graph[_, _] = Graph.textFile("twittergraph")
* val inDeg: RDD[(VertexId, Int)] =
*   aggregateMessages[Int](ctx => ctx.sendToDst(1), _ + _)
* }}}
*/
def aggregateMessages[A: ClassTag](
sendMsg: EdgeContext[VD, ED, A] => Unit,
mergeMsg: (A, A) => A,
tripletFields: TripletFields = TripletFields.All)
: VertexRDD[A] = {
aggregateMessagesWithActiveSet(sendMsg, mergeMsg, tripletFields, None)
}```

The code is as following:

```scala> val olderPeople: VertexRDD[(Int, Double)] = graph.aggregateMessages[(Int, Double)] (
| triplet => { //Map Function
|   if (triplet.srcAttr > triplet.dstAttr) {
|     triplet.sendToDst(1, triplet.srcAttr)
|   }
| },
| //Reduce Function
| (a, b) => (a._1 + b._1, a._2 + b._2)
| )
scala> val avgAgeOfOlderPeople: VertexRDD[Double] =
| olderPeople.mapValues( (id, value) => value match { case (count, totalAge) => totalAge/count } )
scala> avgAgeOfOlderPeople.collect.foreach(println(_))
(84,93.5)
(52,70.63636363636364)
(56,77.0)
(4,48.02272727272727)
(76,85.4)
(16,59.92857142857143)
(28,63.27272727272727)
(80,88.6)
(48,72.0952380952381)
(32,69.72727272727273)
(36,66.96153846153847)
(0,48.30232558139535)
...
```

It obtains the same result as using `mapReduceTriplets`.

### Reference

[1] SPARK-3936 issue, https://issues.apache.org/jira/browse/SPARK-3936

## One response on “Graph Operators in GraphX – Part 1”

1. Laxmikant
Hi,

Nice blog!

I have a very large graph. where there are links between the nodes. Each edge has weight 1 initially. I have to update the weights of edges according to transformed adjacency matrix.

[![enter image description here][1]][1]

Where A is Adjcency Matrix. The new weight in nodes (i,j) will be given by M(i,j).

I have to do this in Graphx. How I do approach for this?

**My Approach**: Find all the neighboring nodes for each node and the inner join them.in pair. then update weights of each node.

But I am little confused about writing efficient code in Graphx.