To process the graph in a distributed style, the graph needs to be represented in a distributed scheme. Normally, there are two kinds of graph partitioning, **vertex-cut** approach and **edge-cut** approach.

Spark GraphX adopts a vertex-cut approach to distributed graph partitioning. The strategy is programmed in PartitionStrategy.scala. Let’s look into this file.

case object RandomVertexCut extends PartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { math.abs((src, dst).hashCode()) % numParts } }

`RandomVertexCut`

calculates the hash value of source and destination vertex IDs, using the modulo (by numberOfParts) as the edge’s partition ID. The edges partitioned into the same partition two two vertices have the same direction. `CanonicalRandomVertexCut`

partitions the edges regardless of the direction.

case object CanonicalRandomVertexCut extends PartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { if (src < dst) { math.abs((src, dst).hashCode()) % numParts } else { math.abs((dst, src).hashCode()) % numParts } } }

Another two partitioning schemes are `EdgePartition1D`

and `EdgePartition2D`

. In `EdgePartition1D`

, edges are assigned to the partitions only according to their source vertices.

case object EdgePartition1D extends PartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { val mixingPrime: VertexId = 1125899906842597L (math.abs(src * mixingPrime) % numParts).toInt } }

A very large prime (mixingPrime) is used in order to balance the partitions. But such operation can’t eliminate the problem totally.

`EdgePartition2D`

is a bit more complex. It uses both the source vertex and the destination vertex to calculate the partition. It’s based on the sparse edge adjacency matrix. Here’s an example extracted from the source code. Suppose we have a graph with 12 vertices that we want to partition over 9 machines. We can use the following sparse matrix representation:

/** * __________________________________ * v0 | P0 * | P1 | P2 * | * v1 | **** | * | | * v2 | ******* | ** | **** | * v3 | ***** | * * | * | * ---------------------------------- * v4 | P3 * | P4 *** | P5 ** * | * v5 | * * | * | | * v6 | * | ** | **** | * v7 | * * * | * * | * | * ---------------------------------- * v8 | P6 * | P7 * | P8 * *| * v9 | * | * * | | * v10 | * | ** | * * | * v11 | * <-E | *** | ** | * ---------------------------------- */

As you see, E<v11, v1> is partitioned into P6. But it’s also clear that P1 contains too many edges (far more than other partitions) which results in unbalance of partitioning. So mixingPrime is also used in `EdgePartition2D`

.

case object EdgePartition2D extends PartitionStrategy { override def getPartition(src: VertexId, dst: VertexId, numParts: PartitionID): PartitionID = { val ceilSqrtNumParts: PartitionID = math.ceil(math.sqrt(numParts)).toInt val mixingPrime: VertexId = 1125899906842597L val col: PartitionID = (math.abs(src * mixingPrime) % ceilSqrtNumParts).toInt val row: PartitionID = (math.abs(dst * mixingPrime) % ceilSqrtNumParts).toInt (col * ceilSqrtNumParts + row) % numParts } }

Let’s look at a simple, realistic graph.

Vertices A, B and C are in one partition; D, E, F are in the other. The edges are partitioned into two partitions as Edge Table. The Routing Table is very useful which records the cutting status of the vertices.