Triangle Count in GraphX

Triangle Count is very useful in social network analysis. The triangle is a three-node small graph, where every two nodes are connected. Suppose you’re followed by two schoolmates in Facebook, and those two schoolmates are followed by each other, you three make up a triangle. Likewise, the social network which owns more triangles usually has more tight connections.

TriangleCount is defined in [[lib/TriangleCount.scala]]. It counts the triangles passing through each vertex using a straightforward algorithm:

  1. Compute the set of neighbors for each vertex;
  2. For each edge compute the intersection of the sets and send the count to both vertices;
  3. Compute the sum at each vertex and divide by two since each triangle is counted twice.

Suppose A and B are neighbors. The set of neighbors of A is [B, C, D, E]; the set of neighbors of B is [A, C, E, F, G]. The intersection is [C, E]. The vertices in the intersection are their common neighbors, so [A, B, C] and [A, B, E] are two triangles.

Note that the input graph should have its edges in canonical direction (i.e. the sourceId less than destId). Also the graph must have been partitioned using [[org.apache.spark.graphx.Graph#partitionBy]]. Therefore, we need to specify the canonicalOrientation as true when importing the graph, and partition the graph with partitionBy(). Use the API as the following:

scala> import org.apache.spark._
scala> import org.apache.spark.graphx._
scala> import org.apache.spark.rdd.RDD
scala> val graph = GraphLoader.edgeListFile(sc, "hdfs://192.168.17.240:9222/input/yuhc/web-Google/web-Google.txt", true).partitionBy(PartitionStrategy.RandomVertexCut)
scala> val countTriangles = graph.triangleCount
scala> countTriangles.vertices.take(10)
res0: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,1), (672890,0), (129434,1), (194402,2), (199516,163), (332918,6), (170792,24), (386896,129), (691634,566), (513652,1))

If partitionBy() is omitted when importing the graph, some errors whill occur in [[lib/TriangleCount.scala]]. The error information is:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 16.0 failed 1 times, most recent failure: Lost task 1.0 in stage 16.0 (TID 21, localhost): java.lang.AssertionError: assertion failed
	at scala.Predef$.assert(Predef.scala:165)
	at org.apache.spark.graphx.lib.TriangleCount$$anonfun$7.apply(TriangleCount.scala:90)
	at org.apache.spark.graphx.lib.TriangleCount$$anonfun$7.apply(TriangleCount.scala:87)
	at org.apache.spark.graphx.impl.VertexPartitionBaseOps.leftJoin(VertexPartitionBaseOps.scala:125)

The codes of TriangleCount is shown as follows. Some comments are added to help the understanding.

Some prerequisite knowledge:

type VertexSet = OpenHashSet[VertexId]

The commented code:

object TriangleCount {

  def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED]): Graph[Int, ED] = {
    // Remove redundant edges
    // web-Google has no redundant edges
    val g = graph.groupEdges((a, b) => a).cache()

    // Construct set representations of the neighborhoods
    val nbrSets: VertexRDD[VertexSet] =
      g.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) =>
        // Why the code specifies the capacity of `set`?
        val set = new VertexSet(4)
        var i = 0
        // Store the neighbors in the VertexSet
        while (i < nbrs.size) {
          // prevent self cycle
          if(nbrs(i) != vid) {
            set.add(nbrs(i))
          }
          i += 1
        }
        set
      }
    // join the sets with the graph
    val setGraph: Graph[VertexSet, ED] = g.outerJoinVertices(nbrSets) {
      (vid, _, optSet) => optSet.getOrElse(null)
    }
    // Edge function computes intersection of smaller vertex with larger vertex
    def edgeFunc(ctx: EdgeContext[VertexSet, ED, Int]) {
      assert(ctx.srcAttr != null)
      assert(ctx.dstAttr != null)
      // Check whether the items in the `smallSet` are in the `largeSet`
      val (smallSet, largeSet) = if (ctx.srcAttr.size < ctx.dstAttr.size) {
        (ctx.srcAttr, ctx.dstAttr)
      } else {
        (ctx.dstAttr, ctx.srcAttr)
      }
      val iter = smallSet.iterator
      var counter: Int = 0
      // Enumerate the items
      while (iter.hasNext) {
        val vid = iter.next()
        if (vid != ctx.srcId && vid != ctx.dstId && largeSet.contains(vid)) {
          counter += 1
        }
      }
      ctx.sendToSrc(counter)
      ctx.sendToDst(counter)
    }
    // compute the intersection along edges
    val counters: VertexRDD[Int] = setGraph.aggregateMessages(edgeFunc, _ + _)
    // Merge counters with the graph and divide by two since each triangle is counted twice
    g.outerJoinVertices(counters) {
      (vid, _, optCounter: Option[Int]) =>
        val dblCount = optCounter.getOrElse(0)
        // double count should be even (divisible by two)
        assert((dblCount & 1) == 0)
        dblCount / 2
    }
  } // end of TriangleCount
}

In the following months, I’ll work with some undergraduates to implement some algorithms with GraphX API. Hope I could fix the bugs in the GraphX codes. They may contribute to Spark project in future and introduce some graph algorithms into GraphX or MLlib. After these months, I’ll work on designing operating system and prepare for the new life in U.S. Hence the following updates about GraphX will appear in the lab (in SJTU)’s webpage.

By the way, I may keep collecting open data and upload them on GitHub. Change https://github.com/yuhc/web-dataset/graphx/followers.txt to hdfs://192.168.17.240:9222/input/yuhc/graphx/followers.txt if the students in iWCT of SJTU want to use them.

4 responses on “Triangle Count in GraphX

  1. Came across this website when searching for tips of HW questions. Surprised to find it is by hcyu! Bravo!
    1. yuhc
      :laughing:
  2. Argumento Pruvo
    Hi,

    I am a Spark GraphX newbie and I am trying to do triangle counts on a dataset based on your blogpost!

    I get this error java.lang.IllegalArgumentException: requirement failed: Invalid initial capacity

    Could you please let me know why?

    Please refer to my question here : http://stackoverflow.com/questions/40337366/spark-graphx-requirement-failed-invalid-initial-capacity

    Thanks

    1. yuhc
      Sorry for the *very late* reply.. All tests were done with Spark 1.6.

Leave a Reply

Time limit is exhausted. Please reload CAPTCHA.