# Graph Operators in GraphX – Part 2

Besides the operations mentioned in Part One, GraphX also defines a special data structure for node degree in GraphOps.scala, stored as `VertexRDD`. There are three types of degree, `inDegrees`, `outDegrees`, and `degrees`. `inDegrees` is the in-degree of each vertex. Vertices whose in-degree are `0` won’t be included in the result RDD. `outDegrees` is the out-degree of each vertex. `degrees` is the sum of `inDegrees` and `outDegrees`. `GraphOps` calls method `degreesRDD` to compute the neighboring vertex degrees.

```@transient lazy val inDegrees: VertexRDD[Int] =
degreesRDD(EdgeDirection.In).setName("GraphOps.inDegrees")

@transient lazy val outDegrees: VertexRDD[Int] =
degreesRDD(EdgeDirection.Out).setName("GraphOps.outDegrees")

@transient lazy val degrees: VertexRDD[Int] =
degreesRDD(EdgeDirection.Either).setName("GraphOps.degrees")

private def degreesRDD(edgeDirection: EdgeDirection): VertexRDD[Int] = {
if (edgeDirection == EdgeDirection.In) {
graph.aggregateMessages(_.sendToDst(1), _ + _, TripletFields.None)
} else if (edgeDirection == EdgeDirection.Out) {
graph.aggregateMessages(_.sendToSrc(1), _ + _, TripletFields.None)
} else { // EdgeDirection.Either
graph.aggregateMessages(ctx => { ctx.sendToSrc(1); ctx.sendToDst(1) }, _ + _,
TripletFields.None)
}
}
```

EdgeDirection is defined in EdgeDirection.scala. It has three kinds of attributes:

```object EdgeDirection {
/** Edges arriving at a vertex. */
final val In = new EdgeDirection("In")

/** Edges originating from a vertex. */
final val Out = new EdgeDirection("Out")

/** Edges originating from *or* arriving at a vertex of interest. */
final val Either = new EdgeDirection("Either")

/** Edges originating from *and* arriving at a vertex of interest. */
final val Both = new EdgeDirection("Both")
}```

Create the graph from `web-Google.txt` as before. View the in-degrees (as example) of the graph:

```val tmp = graph.inDegrees
tmp.take(10)
```

It returns

`res0: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,2), (672890,1), (129434,2), (194402,2), (199516,28), (332918,3), (170792,1), (386896,3), (691634,71), (291526,9))`

How can we find the vertex which has the largest in-degree? Let’s define a function first:

```def max(a :(VertexId, Int), b :(VertexId, Int)) :(VertexId, Int) = if (a._2 > b._2) a else b</re>

It will return the vertex whose attribute is larger. Then reduce the <code>inDegree</code> with <code>max</code> function:
<pre>
scala> graph.inDegrees.reduce(max)
res1: (org.apache.spark.graphx.VertexId, Int) = (537039,6326)```

It means that the vertex with `ID 537,039` has the most, `6,326`, in-direction edges. If we check `graph.degrees.reduce(max)`, we will find vertex `537,039` also has the most (`6,353`) connected edges. This means that this vertex (web page) is well referenced.

GraphOps.scala defines another two functions to collect the neighbors of the vertices, one is `collectNeighborIds`, the other is `collectNeighbors`.

```def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]] = {
val nbrs =
if (edgeDirection == EdgeDirection.Either) {
graph.aggregateMessages[Array[VertexId]](
ctx => { ctx.sendToSrc(Array(ctx.dstId)); ctx.sendToDst(Array(ctx.srcId)) },
_ ++ _, TripletFields.None)
} else if (edgeDirection == EdgeDirection.Out) {
graph.aggregateMessages[Array[VertexId]](
ctx => ctx.sendToSrc(Array(ctx.dstId)),
_ ++ _, TripletFields.None)
} else if (edgeDirection == EdgeDirection.In) {
graph.aggregateMessages[Array[VertexId]](
ctx => ctx.sendToDst(Array(ctx.srcId)),
_ ++ _, TripletFields.None)
} else {
throw new SparkException("It doesn't make sense to collect neighbor ids without a " +
"direction. (EdgeDirection.Both is not supported; use EdgeDirection.Either instead.)")
}
graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) =>
nbrsOpt.getOrElse(Array.empty[VertexId])
}
}```

Obviously, `collectNeighborIds` doesn’t (nor `collectNeighbors`) support `EdgeDirection.Both`. Be careful with this. It will return the set of neighboring IDs for each vertex. For example, it will return the set of out-direction neighboring IDs for each vertex by

```scala> val tmp = graph.collectNeighborIds(EdgeDirection.Out)
scala> tmp.take(10)
res2: Array[(org.apache.spark.graphx.VertexId, Array[org.apache.spark.graphx.VertexId])] = Array((354796,Array(798944)), (672890,Array()), (129434,Array(110771, 119943)), (194402,Array(359291)), (199516,Array(26483, 190323, 193759, 280401, 329066, 342523, 367518, 398314, 417194, 427451, 458892, 459074, 485460, 502995, 505260, 514621, 660407, 798276, 810885, 835966)), (332918,Array(12304, 89384, 267989)), (170792,Array(227187, 255153, 400178, 453412, 512326, 592923, 663311, 666734, 864151)), (386896,Array(109021, 155460, 200406, 204397, 282107, 378570, 427843, 602779, 616132, 629079, 669605, 717650, 727162, 761159, 796410, 832809, 890838, 891178)), (691634,Array(13996, 32163, 33185, 39682, 193103, 197677, 520483, 598034, 727805, 747975, 836657)), (291526,Array(206053, 271366, 383159, 418...
```

`collectNeighbors` returns the vertex set of neighboring vertex attributes for each vertex instead of the vertex IDs. It could be highly inefficient on power-law graphs where high degree vertices may force a large amount of information to be collected to a single location.

What’s more, GraphOps.scala provides a `Join Operator`, named `joinVertices`.

```/**
* @example This function is used to update the vertices with new
* values based on external data. For example we could add the out
* degree to each vertex record
*
* {{{
* val rawGraph: Graph[Int, Int] = GraphLoader.edgeListFile(sc, "webgraph")
*   .mapVertices((_, _) => 0)
* val outDeg = rawGraph.outDegrees
* val graph = rawGraph.joinVertices[Int](outDeg)
*   ((_, _, outDeg) => outDeg)
* }}}
*
*/
def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD)
: Graph[VD, ED] = {
val uf = (id: VertexId, data: VD, o: Option[U]) => {
o match {
case Some(u) => mapFunc(id, data, u)
case None => data
}
}
graph.outerJoinVertices(table)(uf)
}```

It joins the vertices with an RDD `table` and then apply a function `mapFunc` from the the vertex and RDD entry to a new vertex value. The input `table` should contain at most one entry for each vertex. If no entry is provided the map function is skipped and the old value is used. `U` is the type of entry in `table`.

Do the following test:

```scala> val rawGraph = graph.mapVertices((id, attr) => 0)
scala> val outDeg = rawGraph.outDegrees
scala> val tmp = rawGraph.joinVertices[Int](outDeg)((_, _, optDeg) => optDeg)
scala> outDeg.take(5)
res3: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,1), (129434,2), (194402,1), (199516,20), (332918,3))
scala> tmp.vertices.take(5)
res4: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,1), (672890,0), (129434,2), (194402,1), (199516,20))
```

We joined `outDeg` to `rawGraph`. The vertices which are in `rawGraph` but not in `outDeg` remain unchanged.

`joinVertices` calls `outerJoinVertices` in Graph.scala. `outerJoinVertices` does similar things (to join `other` RDD), but if no entry in `other` is provided for a particular vertex in the graph, the map function `mapFunc` receives `None` instead of ignoring such vertex.

```/**
* @example This function is used to update the vertices with new values based on external data.
*          For example we could add the out-degree to each vertex record:
*
* {{{
* val rawGraph: Graph[_, _] = Graph.textFile("webgraph")
* val outDeg: RDD[(VertexId, Int)] = rawGraph.outDegrees
* val graph = rawGraph.outerJoinVertices(outDeg) {
*   (vid, data, optDeg) => optDeg.getOrElse(0)
* }
* }}}
*/
def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
(mapFunc: (VertexId, VD, Option[U]) => VD2)(implicit eq: VD =:= VD2 = null)
: Graph[VD2, ED]
```

Make a test:

```scala> val tmp = rawGraph.outerJoinVertices[Int, Int](outDeg)((_, _, optDeg) => optDeg.getOrElse(0))
scala> outDeg.take(5)
res5: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,1), (129434,2), (194402,1), (199516,20), (332918,3))
scala> tmp.vertices.take(5)
res6: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,1), (672890,0), (129434,2), (194402,1), (199516,20))
```

## One response on “Graph Operators in GraphX – Part 2”

1. hkh
Thanks for your post. I am trying to use Graphx for graph partitioning but I can’t find any detailed explanation in this regard. I am using partitionstrategy function on SNAP facebook dataset same as: