Partition Strategy in GraphX

To process the graph in a distributed style, the graph needs to be represented in a distributed scheme. Normally, there are two kinds of graph partitioning, vertex-cut approach and edge-cut approach.

Edge Cut vs. Vertex Cut

Spark GraphX adopts a vertex-cut approach to distributed graph partitioning. The strategy is programmed in PartitionStrategy.scala. Let’s look into this file.

case object RandomVertexCut extends PartitionStrategy {
  override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
    math.abs((src, dst).hashCode()) % numParts
  }
}

RandomVertexCut calculates the hash value of source and destination vertex IDs, using the modulo (by numberOfParts) as the edge’s partition ID. The edges partitioned into the same partition two two vertices have the same direction. CanonicalRandomVertexCut partitions the edges regardless of the direction.

case object CanonicalRandomVertexCut extends PartitionStrategy {
  override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
    if (src < dst) {
      math.abs((src, dst).hashCode()) % numParts
    } else {
      math.abs((dst, src).hashCode()) % numParts
    }
  }
}

Another two partitioning schemes are EdgePartition1D and EdgePartition2D. In EdgePartition1D, edges are assigned to the partitions only according to their source vertices.

case object EdgePartition1D extends PartitionStrategy {
  override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
    val mixingPrime: VertexId = 1125899906842597L
    (math.abs(src * mixingPrime) % numParts).toInt
  }
}

A very large prime (mixingPrime) is used in order to balance the partitions. But such operation can’t eliminate the problem totally.

EdgePartition2D is a bit more complex. It uses both the source vertex and the destination vertex to calculate the partition. It’s based on the sparse edge adjacency matrix. Here’s an example extracted from the source code. Suppose we have a graph with 12 vertices that we want to partition over 9 machines. We can use the following sparse matrix representation:

/**
  *     __________________________________
  * v0  | P0 *     | P1       | P2    *  |
  * v1  |  ****    |  *       |          |
  * v2  |  ******* |      **  |  ****    |
  * v3  |  *****   |  *  *    |       *  |
  *     ----------------------------------
  * v4  | P3 *     | P4 ***   | P5 **  * |
  * v5  |  *  *    |  *       |          |
  * v6  |       *  |      **  |  ****    |
  * v7  |  * * *   |  *  *    |       *  |
  *     ----------------------------------
  * v8  | P6   *   | P7    *  | P8  *   *|
  * v9  |     *    |  *    *  |          |
  * v10 |       *  |      **  |  *  *    |
  * v11 | * <-E    |  ***     |       ** |
  *     ----------------------------------
  */ 

As you see, E<v11, v1> is partitioned into P6. But it’s also clear that P1 contains too many edges (far more than other partitions) which results in unbalance of partitioning. So mixingPrime is also used in EdgePartition2D.

case object EdgePartition2D extends PartitionStrategy {
  override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = {
    val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt
    val mixingPrime: VertexId = 1125899906842597L
    val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt
    val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt
(col * ceilSqrtNumParts + row) % numParts
  }
}

Let’s look at a simple, realistic graph.

RDD Graph Representation

Vertices A, B and C are in one partition; D, E, F are in the other. The edges are partitioned into two partitions as Edge Table. The Routing Table is very useful which records the cutting status of the vertices.

Leave a Reply

Time limit is exhausted. Please reload CAPTCHA.