Graph Operators in GraphX – Part 2

Besides the operations mentioned in Part One, GraphX also defines a special data structure for node degree in GraphOps.scala, stored as VertexRDD. There are three types of degree, inDegrees, outDegrees, and degrees. inDegrees is the in-degree of each vertex. Vertices whose in-degree are 0 won’t be included in the result RDD. outDegrees is the out-degree of each vertex. degrees is the sum of inDegrees and outDegrees. GraphOps calls method degreesRDD to compute the neighboring vertex degrees.

@transient lazy val inDegrees: VertexRDD[Int] =
  degreesRDD(EdgeDirection.In).setName("GraphOps.inDegrees")

@transient lazy val outDegrees: VertexRDD[Int] =
  degreesRDD(EdgeDirection.Out).setName("GraphOps.outDegrees")

@transient lazy val degrees: VertexRDD[Int] =
  degreesRDD(EdgeDirection.Either).setName("GraphOps.degrees")

private def degreesRDD(edgeDirection: EdgeDirection): VertexRDD[Int] = {
  if (edgeDirection == EdgeDirection.In) {
    graph.aggregateMessages(_.sendToDst(1), _ + _, TripletFields.None)
  } else if (edgeDirection == EdgeDirection.Out) {
    graph.aggregateMessages(_.sendToSrc(1), _ + _, TripletFields.None)
  } else { // EdgeDirection.Either
    graph.aggregateMessages(ctx => { ctx.sendToSrc(1); ctx.sendToDst(1) }, _ + _,
      TripletFields.None)
  }
}

EdgeDirection is defined in EdgeDirection.scala. It has three kinds of attributes:

object EdgeDirection {
  /** Edges arriving at a vertex. */
  final val In = new EdgeDirection("In")

  /** Edges originating from a vertex. */
  final val Out = new EdgeDirection("Out")

  /** Edges originating from *or* arriving at a vertex of interest. */
  final val Either = new EdgeDirection("Either")

  /** Edges originating from *and* arriving at a vertex of interest. */
  final val Both = new EdgeDirection("Both")
}

Create the graph from web-Google.txt as before. View the in-degrees (as example) of the graph:

val tmp = graph.inDegrees
tmp.take(10)

It returns

res0: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,2), (672890,1), (129434,2), (194402,2), (199516,28), (332918,3), (170792,1), (386896,3), (691634,71), (291526,9))

How can we find the vertex which has the largest in-degree? Let’s define a function first:

def max(a :(VertexId, Int), b :(VertexId, Int)) :(VertexId, Int) = if (a._2 > b._2) a else b</re>

It will return the vertex whose attribute is larger. Then reduce the <code>inDegree</code> with <code>max</code> function:
<pre>
scala> graph.inDegrees.reduce(max)
res1: (org.apache.spark.graphx.VertexId, Int) = (537039,6326)

It means that the vertex with ID 537,039 has the most, 6,326, in-direction edges. If we check graph.degrees.reduce(max), we will find vertex 537,039 also has the most (6,353) connected edges. This means that this vertex (web page) is well referenced.

GraphOps.scala defines another two functions to collect the neighbors of the vertices, one is collectNeighborIds, the other is collectNeighbors.

def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]] = {
    val nbrs =
      if (edgeDirection == EdgeDirection.Either) {
        graph.aggregateMessages[Array[VertexId]](
          ctx => { ctx.sendToSrc(Array(ctx.dstId)); ctx.sendToDst(Array(ctx.srcId)) },
          _ ++ _, TripletFields.None)
      } else if (edgeDirection == EdgeDirection.Out) {
        graph.aggregateMessages[Array[VertexId]](
          ctx => ctx.sendToSrc(Array(ctx.dstId)),
          _ ++ _, TripletFields.None)
      } else if (edgeDirection == EdgeDirection.In) {
        graph.aggregateMessages[Array[VertexId]](
          ctx => ctx.sendToDst(Array(ctx.srcId)),
          _ ++ _, TripletFields.None)
      } else {
        throw new SparkException("It doesn't make sense to collect neighbor ids without a " +
          "direction. (EdgeDirection.Both is not supported; use EdgeDirection.Either instead.)")
      }
    graph.vertices.leftZipJoin(nbrs) { (vid, vdata, nbrsOpt) =>
      nbrsOpt.getOrElse(Array.empty[VertexId])
    }
  }

Obviously, collectNeighborIds doesn’t (nor collectNeighbors) support EdgeDirection.Both. Be careful with this. It will return the set of neighboring IDs for each vertex. For example, it will return the set of out-direction neighboring IDs for each vertex by

scala> val tmp = graph.collectNeighborIds(EdgeDirection.Out)
scala> tmp.take(10)
res2: Array[(org.apache.spark.graphx.VertexId, Array[org.apache.spark.graphx.VertexId])] = Array((354796,Array(798944)), (672890,Array()), (129434,Array(110771, 119943)), (194402,Array(359291)), (199516,Array(26483, 190323, 193759, 280401, 329066, 342523, 367518, 398314, 417194, 427451, 458892, 459074, 485460, 502995, 505260, 514621, 660407, 798276, 810885, 835966)), (332918,Array(12304, 89384, 267989)), (170792,Array(227187, 255153, 400178, 453412, 512326, 592923, 663311, 666734, 864151)), (386896,Array(109021, 155460, 200406, 204397, 282107, 378570, 427843, 602779, 616132, 629079, 669605, 717650, 727162, 761159, 796410, 832809, 890838, 891178)), (691634,Array(13996, 32163, 33185, 39682, 193103, 197677, 520483, 598034, 727805, 747975, 836657)), (291526,Array(206053, 271366, 383159, 418...

collectNeighbors returns the vertex set of neighboring vertex attributes for each vertex instead of the vertex IDs. It could be highly inefficient on power-law graphs where high degree vertices may force a large amount of information to be collected to a single location.

What’s more, GraphOps.scala provides a Join Operator, named joinVertices.

/**
 * @example This function is used to update the vertices with new
 * values based on external data. For example we could add the out
 * degree to each vertex record
 *
 * {{{
 * val rawGraph: Graph[Int, Int] = GraphLoader.edgeListFile(sc, "webgraph")
 *   .mapVertices((_, _) => 0)
 * val outDeg = rawGraph.outDegrees
 * val graph = rawGraph.joinVertices[Int](outDeg)
 *   ((_, _, outDeg) => outDeg)
 * }}}
 *
 */
def joinVertices[U: ClassTag](table: RDD[(VertexId, U)])(mapFunc: (VertexId, VD, U) => VD)
  : Graph[VD, ED] = {
  val uf = (id: VertexId, data: VD, o: Option[U]) => {
    o match {
      case Some(u) => mapFunc(id, data, u)
      case None => data
    }
  }
  graph.outerJoinVertices(table)(uf)
}

It joins the vertices with an RDD table and then apply a function mapFunc from the the vertex and RDD entry to a new vertex value. The input table should contain at most one entry for each vertex. If no entry is provided the map function is skipped and the old value is used. U is the type of entry in table.

Do the following test:

scala> val rawGraph = graph.mapVertices((id, attr) => 0)
scala> val outDeg = rawGraph.outDegrees
scala> val tmp = rawGraph.joinVertices[Int](outDeg)((_, _, optDeg) => optDeg)
scala> outDeg.take(5)
res3: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,1), (129434,2), (194402,1), (199516,20), (332918,3))
scala> tmp.vertices.take(5)
res4: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,1), (672890,0), (129434,2), (194402,1), (199516,20))

We joined outDeg to rawGraph. The vertices which are in rawGraph but not in outDeg remain unchanged.

joinVertices calls outerJoinVertices in Graph.scala. outerJoinVertices does similar things (to join other RDD), but if no entry in other is provided for a particular vertex in the graph, the map function mapFunc receives None instead of ignoring such vertex.

/**
 * @example This function is used to update the vertices with new values based on external data.
 *          For example we could add the out-degree to each vertex record:
 *
 * {{{
 * val rawGraph: Graph[_, _] = Graph.textFile("webgraph")
 * val outDeg: RDD[(VertexId, Int)] = rawGraph.outDegrees
 * val graph = rawGraph.outerJoinVertices(outDeg) {
 *   (vid, data, optDeg) => optDeg.getOrElse(0)
 * }
 * }}}
 */
def outerJoinVertices[U: ClassTag, VD2: ClassTag](other: RDD[(VertexId, U)])
    (mapFunc: (VertexId, VD, Option[U]) => VD2)(implicit eq: VD =:= VD2 = null)
  : Graph[VD2, ED]

Make a test:

scala> val tmp = rawGraph.outerJoinVertices[Int, Int](outDeg)((_, _, optDeg) => optDeg.getOrElse(0))
scala> outDeg.take(5)
res5: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,1), (129434,2), (194402,1), (199516,20), (332918,3))
scala> tmp.vertices.take(5)
res6: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,1), (672890,0), (129434,2), (194402,1), (199516,20))

One response on “Graph Operators in GraphX – Part 2

  1. hkh
    Thanks for your post. I am trying to use Graphx for graph partitioning but I can’t find any detailed explanation in this regard. I am using partitionstrategy function on SNAP facebook dataset same as:
    val subgraph = GraphLoader.edgeListFile(sc, “facebook_combined.txt”, true).partitionBy(PartitionStrategy.EdgePartition2D, 10)
    I can see the partitions, edges and vertices information using:
    subgraph.edges.partitions.mkString(“\n”)
    subgraph.edges.take(10).mkString(“\n”)
    subgraph.vertices.take(10).mkString(“\n”)
    But I don’t know how to save or print different sub-graphs and the edges and vertices related to them.
    I would really appreciate if you advise how to do the graph partitioning and seeing the results in GraphX.
    Thanks a lot in advance.

Leave a Reply

Time limit is exhausted. Please reload CAPTCHA.