Besides the operations mentioned in Part One, GraphX also defines a special data structure for node degree in GraphOps.scala, stored as `VertexRDD`

. There are three types of degree, `inDegrees`

, `outDegrees`

, and `degrees`

. `inDegrees`

is the in-degree of each vertex. Vertices whose in-degree are `0`

won’t be included in the result RDD. `outDegrees`

is the out-degree of each vertex. `degrees`

is the sum of `inDegrees`

and `outDegrees`

. `GraphOps`

calls method `degreesRDD`

to compute the neighboring vertex degrees.

@transient lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In).setName("GraphOps.inDegrees") @transient lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out).setName("GraphOps.outDegrees") @transient lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either).setName("GraphOps.degrees") private def degreesRDD(edgeDirection: EdgeDirection): VertexRDD[Int] = { if (edgeDirection == EdgeDirection.In) { graph.aggregateMessages(_.sendToDst(1), _ + _, TripletFields.None) } else if (edgeDirection == EdgeDirection.Out) { graph.aggregateMessages(_.sendToSrc(1), _ + _, TripletFields.None) } else { // EdgeDirection.Either graph.aggregateMessages(ctx => { ctx.sendToSrc(1); ctx.sendToDst(1) }, _ + _, TripletFields.None) } }

EdgeDirection is defined in EdgeDirection.scala. It has three kinds of attributes:

object EdgeDirection { /** Edges arriving at a vertex. */ final val In = new EdgeDirection("In") /** Edges originating from a vertex. */ final val Out = new EdgeDirection("Out") /** Edges originating from *or* arriving at a vertex of interest. */ final val Either = new EdgeDirection("Either") /** Edges originating from *and* arriving at a vertex of interest. */ final val Both = new EdgeDirection("Both") }

Create the graph from `web-Google.txt`

as before. View the in-degrees (as example) of the graph:

val tmp = graph.inDegrees tmp.take(10)

It returns

res0: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,2), (672890,1), (129434,2), (194402,2), (199516,28), (332918,3), (170792,1), (386896,3), (691634,71), (291526,9))

How can we find the vertex which has the largest in-degree? Let’s define a function first:

def max(a :(VertexId, Int), b :(VertexId, Int)) :(VertexId, Int) = if (a._2 > b._2) a else b</re> It will return the vertex whose attribute is larger. Then reduce the <code>inDegree</code> with <code>max</code> function: <pre> scala> graph.inDegrees.reduce(max) res1: (org.apache.spark.graphx.VertexId, Int) = (537039,6326)

It means that the vertex with `ID 537,039`

has the most, `6,326`

, in-direction edges. If we check `graph.degrees.reduce(max)`

, we will find vertex `537,039`

also has the most (`6,353`

) connected edges. This means that this vertex (web page) is well referenced.

GraphOps.scala defines another two functions to collect the neighbors of the vertices, one is `collectNeighborIds`

, the other is `collectNeighbors`

.

def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]] = { val nbrs = if (edgeDirection == EdgeDirection.Either) { graph.aggregateMessages[Array[VertexId]]( ctx => { ctx.sendToSrc(Array(ctx.dstId)); ctx.sendToDst(Array(ctx.srcId)) }, _ ++ _, TripletFields.None) } else if (edgeDirection == EdgeDirection.Out) { graph.aggregateMessages[Array[VertexId]]( ctx => ctx.sendToSrc(Array(ctx.dstId)), _ ++ _, TripletFields.None) } else if (edgeDirection == EdgeDirection.In) { graph.aggregateMessages[Array[VertexId]]( ctx => ctx.sendToDst(Array(ctx.srcId)), _ ++ _, TripletFields.None) } else { throw new SparkException("It doesn't make sense to collect neighbor ids without a " + "direction. (EdgeDirection.Both is not supported; use EdgeDirection.Either instead.)") } graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) => nbrsOpt.getOrElse(Array.empty[VertexId]) } }

Obviously, `collectNeighborIds`

doesn’t (nor `collectNeighbors`

) support `EdgeDirection.Both`

. Be careful with this. It will return the set of neighboring IDs for each vertex. For example, it will return the set of out-direction neighboring IDs for each vertex by

scala> val tmp = graph.collectNeighborIds(EdgeDirection.Out) scala> tmp.take(10) res2: Array[(org.apache.spark.graphx.VertexId, Array[org.apache.spark.graphx.VertexId])] = Array((354796,Array(798944)), (672890,Array()), (129434,Array(110771, 119943)), (194402,Array(359291)), (199516,Array(26483, 190323, 193759, 280401, 329066, 342523, 367518, 398314, 417194, 427451, 458892, 459074, 485460, 502995, 505260, 514621, 660407, 798276, 810885, 835966)), (332918,Array(12304, 89384, 267989)), (170792,Array(227187, 255153, 400178, 453412, 512326, 592923, 663311, 666734, 864151)), (386896,Array(109021, 155460, 200406, 204397, 282107, 378570, 427843, 602779, 616132, 629079, 669605, 717650, 727162, 761159, 796410, 832809, 890838, 891178)), (691634,Array(13996, 32163, 33185, 39682, 193103, 197677, 520483, 598034, 727805, 747975, 836657)), (291526,Array(206053, 271366, 383159, 418...

`collectNeighbors`

returns the vertex set of neighboring vertex attributes for each vertex instead of the vertex IDs. It could be highly inefficient on power-law graphs where high degree vertices may force a large amount of information to be collected to a single location.

What’s more, GraphOps.scala provides a `Join Operator`

, named `joinVertices`

.

/** * @example This function is used to update the vertices with new * values based on external data. For example we could add the out * degree to each vertex record * * {{{ * val rawGraph: Graph[Int, Int] = GraphLoader.edgeListFile(sc, "webgraph") * .mapVertices((_, _) => 0) * val outDeg = rawGraph.outDegrees * val graph = rawGraph.joinVertices[Int](outDeg) * ((_, _, outDeg) => outDeg) * }}} * */ def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD) : Graph[VD, ED] = { val uf = (id: VertexId, data: VD, o: Option[U]) => { o match { case Some(u) => mapFunc(id, data, u) case None => data } } graph.outerJoinVertices(table)(uf) }

It joins the vertices with an RDD `table`

and then apply a function `mapFunc`

from the the vertex and RDD entry to a new vertex value. The input `table`

should contain at most one entry for each vertex. If no entry is provided the map function is skipped and the old value is used. `U`

is the type of entry in `table`

.

Do the following test:

scala> val rawGraph = graph.mapVertices((id, attr) => 0) scala> val outDeg = rawGraph.outDegrees scala> val tmp = rawGraph.joinVertices[Int](outDeg)((_, _, optDeg) => optDeg) scala> outDeg.take(5) res3: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,1), (129434,2), (194402,1), (199516,20), (332918,3)) scala> tmp.vertices.take(5) res4: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,1), (672890,0), (129434,2), (194402,1), (199516,20))

We joined `outDeg`

to `rawGraph`

. The vertices which are in `rawGraph`

but not in `outDeg`

remain unchanged.

`joinVertices`

calls `outerJoinVertices`

in Graph.scala. `outerJoinVertices`

does similar things (to join `other`

RDD), but if no entry in `other`

is provided for a particular vertex in the graph, the map function `mapFunc`

receives `None`

instead of ignoring such vertex.

/** * @example This function is used to update the vertices with new values based on external data. * For example we could add the out-degree to each vertex record: * * {{{ * val rawGraph: Graph[_, _] = Graph.textFile("webgraph") * val outDeg: RDD[(VertexId, Int)] = rawGraph.outDegrees * val graph = rawGraph.outerJoinVertices(outDeg) { * (vid, data, optDeg) => optDeg.getOrElse(0) * } * }}} */ def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)]) (mapFunc: (VertexId, VD, Option[U]) => VD2)(implicit eq: VD =:= VD2 = null) : Graph[VD2, ED]

Make a test:

scala> val tmp = rawGraph.outerJoinVertices[Int, Int](outDeg)((_, _, optDeg) => optDeg.getOrElse(0)) scala> outDeg.take(5) res5: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,1), (129434,2), (194402,1), (199516,20), (332918,3)) scala> tmp.vertices.take(5) res6: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,1), (672890,0), (129434,2), (194402,1), (199516,20))

val subgraph = GraphLoader.edgeListFile(sc, “facebook_combined.txt”, true).partitionBy(PartitionStrategy.EdgePartition2D, 10)

I can see the partitions, edges and vertices information using:

subgraph.edges.partitions.mkString(“\n”)

subgraph.edges.take(10).mkString(“\n”)

subgraph.vertices.take(10).mkString(“\n”)

But I don’t know how to save or print different sub-graphs and the edges and vertices related to them.

I would really appreciate if you advise how to do the graph partitioning and seeing the results in GraphX.

Thanks a lot in advance.