Triangle Count is very useful in social network analysis. The triangle is a three-node small graph, where every two nodes are connected. Suppose you’re followed by two schoolmates in Facebook, and those two schoolmates are followed by each other, you three make up a triangle. Likewise, the social network which owns more triangles usually has more tight connections.

`TriangleCount`

is defined in [[lib/TriangleCount.scala]]. It counts the triangles passing through each vertex using a straightforward algorithm:

- Compute the set of neighbors for each vertex;
- For each edge compute the intersection of the sets and send the count to both vertices;
- Compute the sum at each vertex and divide by two since each triangle is counted twice.

Suppose `A`

and `B`

are neighbors. The set of neighbors of `A`

is `[B, C, D, E]`

; the set of neighbors of `B`

is `[A, C, E, F, G]`

. The intersection is `[C, E]`

. The vertices in the intersection are their common neighbors, so `[A, B, C]`

and `[A, B, E]`

are two triangles.

Note that the input graph should have its edges in canonical direction (i.e. the `sourceId`

less than `destId`

). Also the graph must have been partitioned using [[org.apache.spark.graphx.`Graph#partitionBy`

]]. Therefore, we need to specify the `canonicalOrientation`

as `true`

when importing the graph, and partition the graph with `partitionBy()`

. Use the API as the following:

scala> import org.apache.spark._ scala> import org.apache.spark.graphx._ scala> import org.apache.spark.rdd.RDD scala> val graph = GraphLoader.edgeListFile(sc, "hdfs://192.168.17.240:9222/input/yuhc/web-Google/web-Google.txt", true).partitionBy(PartitionStrategy.RandomVertexCut) scala> val countTriangles = graph.triangleCount scala> countTriangles.vertices.take(10) res0: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((354796,1), (672890,0), (129434,1), (194402,2), (199516,163), (332918,6), (170792,24), (386896,129), (691634,566), (513652,1))

If `partitionBy()`

is omitted when importing the graph, some errors whill occur in [[lib/TriangleCount.scala]]. The error information is:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 16.0 failed 1 times, most recent failure: Lost task 1.0 in stage 16.0 (TID 21, localhost): java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:165) at org.apache.spark.graphx.lib.TriangleCount$$anonfun$7.apply(TriangleCount.scala:90) at org.apache.spark.graphx.lib.TriangleCount$$anonfun$7.apply(TriangleCount.scala:87) at org.apache.spark.graphx.impl.VertexPartitionBaseOps.leftJoin(VertexPartitionBaseOps.scala:125)

The codes of `TriangleCount`

is shown as follows. Some comments are added to help the understanding.

Some prerequisite knowledge:

type VertexSet = OpenHashSet[VertexId]

The commented code:

object TriangleCount { def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD,ED]): Graph[Int, ED] = { // Remove redundant edges // web-Google has no redundant edges val g = graph.groupEdges((a, b) => a).cache() // Construct set representations of the neighborhoods val nbrSets: VertexRDD[VertexSet] = g.collectNeighborIds(EdgeDirection.Either).mapValues { (vid, nbrs) => // Why the code specifies the capacity of `set`? val set = new VertexSet(4) var i = 0 // Store the neighbors in the VertexSet while (i < nbrs.size) { // prevent self cycle if(nbrs(i) != vid) { set.add(nbrs(i)) } i += 1 } set } // join the sets with the graph val setGraph: Graph[VertexSet, ED] = g.outerJoinVertices(nbrSets) { (vid, _, optSet) => optSet.getOrElse(null) } // Edge function computes intersection of smaller vertex with larger vertex def edgeFunc(ctx: EdgeContext[VertexSet, ED, Int]) { assert(ctx.srcAttr != null) assert(ctx.dstAttr != null) // Check whether the items in the `smallSet` are in the `largeSet` val (smallSet, largeSet) = if (ctx.srcAttr.size < ctx.dstAttr.size) { (ctx.srcAttr, ctx.dstAttr) } else { (ctx.dstAttr, ctx.srcAttr) } val iter = smallSet.iterator var counter: Int = 0 // Enumerate the items while (iter.hasNext) { val vid = iter.next() if (vid != ctx.srcId && vid != ctx.dstId && largeSet.contains(vid)) { counter += 1 } } ctx.sendToSrc(counter) ctx.sendToDst(counter) } // compute the intersection along edges val counters: VertexRDD[Int] = setGraph.aggregateMessages(edgeFunc, _ + _) // Merge counters with the graph and divide by two since each triangle is counted twice g.outerJoinVertices(counters) { (vid, _, optCounter: Option[Int]) => val dblCount = optCounter.getOrElse(0) // double count should be even (divisible by two) assert((dblCount & 1) == 0) dblCount / 2 } } // end of TriangleCount }

In the following months, I’ll work with some undergraduates to implement some algorithms with GraphX API. Hope I could fix the bugs in the GraphX codes. They may contribute to Spark project in future and introduce some graph algorithms into GraphX or MLlib. After these months, I’ll work on designing operating system and prepare for the new life in U.S. Hence the following updates about GraphX will appear in the lab (in SJTU)’s webpage.

By the way, I may keep collecting open data and upload them on GitHub. Change `https://github.com/yuhc/web-dataset/`

graphx/followers.txt to `hdfs://192.168.17.240:9222/input/yuhc/`

graphx/followers.txt if the students in iWCT of SJTU want to use them.

I am a Spark GraphX newbie and I am trying to do triangle counts on a dataset based on your blogpost!

I get this error java.lang.IllegalArgumentException: requirement failed: Invalid initial capacity

Could you please let me know why?

Please refer to my question here : http://stackoverflow.com/questions/40337366/spark-graphx-requirement-failed-invalid-initial-capacity

Thanks