Create Graph in GraphX

If you look into Graph.scala carefully, you would find method apply which can be used to construct a graph from a collection of vertices and edges with attributes.

def apply[VD: ClassTag, ED: ClassTag](
    vertices: RDD[(VertexId, VD)],
    edges: RDD[Edge[ED]],
    defaultVertexAttr: VD = null.asInstanceOf[VD],
    edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
    vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
  GraphImpl(vertices, edges, defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)

Duplicate vertices are picked arbitrarily. The vertices which are in edge collection edges but not in the input vertices collection vertices are assigned the defaultVertexAttr. The storage strategies can be assigned by designating edgeStorageLevel and vertexStorageLevel. Finally, method apply calls GraphImpl to complete the construction.

However, we could use more high-level APIs to construct the graphs in GraphX [1]. Open the Spark Shell on localhost or cluster by entering spark-shell in the terminal. You may need to set up the $PATH which should include the-path-to-Spark/bin, e.g. /Users/yuhc/spark-1.2.0-bin-hadoop2.4/bin. Import three models by typing:

scala> import org.apache.spark._
scala> import org.apache.spark.graphx._
scala> import org.apache.spark.rdd.RDD

Follow the example in [1]. The structure of the graph is as following:

The Property Graph

This graph has such type signature:

val userGraph: Graph[(String, String), String]

(String, String) means the type of vertex attributes, and the third String means the type of edge attributes. Type in the following codes:

// Assume the SparkContext has already been constructed. Unnecessary in spark-shell.
// val sc: SparkContext
// Create an RDD for the vertices
val users: RDD[(VertexId, (String, String))] =
  sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")),
                       (5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
  sc.parallelize(Array(Edge(3L, 7L, "collab"),    Edge(5L, 3L, "advisor"),
                       Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
// Build the initial Graph
val graph = Graph(users, relationships, defaultUser)

users is a vertex RDD, and relationships is a edge RDD. The defaultVertexAttr here is defaultUser. Finally the shell returns:

graph: org.apache.spark.graphx.Graph[(String, String),String] = org.apache.spark.graphx.impl.GraphImpl@28796d0e

It can be inferred that the constructor of class Graph calls org.apache.spark.graphx.impl.GraphImpl through method apply.

We can have a look at [[/impl/GraphImpl.scala]]. The object GraphImpl defines several implementations of method apply. The previous codes create a graph from a VertexRDD and an EdgeRDD, so it executes the following codes:

def apply[VD: ClassTag, ED: ClassTag](
    vertices: RDD[(VertexId, VD)],
    edges: RDD[Edge[ED]],
    defaultVertexAttr: VD,
    edgeStorageLevel: StorageLevel,
    vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
  val edgeRDD = EdgeRDD.fromEdges(edges)(classTag[ED], classTag[VD])
  val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr)
  GraphImpl(vertexRDD, edgeRDD)

It then calls GraphImpl(vertexRDD, edgeRDD):

def apply[VD: ClassTag, ED: ClassTag](
    vertices: VertexRDD[VD],
    edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
  // Convert the vertex partitions in edges to the correct type
  val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]]
    .mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD])
  GraphImpl.fromExistingRDDs(vertices, newEdges)

It continues to call GraphImpl.fromExistingRDDs(vertices, newEdges):

def fromExistingRDDs[VD: ClassTag, ED: ClassTag](
    vertices: VertexRDD[VD],
    edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
  new GraphImpl(vertices, new ReplicatedVertexView(edges.asInstanceOf[EdgeRDDImpl[ED, VD]]))

Now it constructs a new object GraphImpl. Have a look at this graph again:

The Property Graph

Let’s count how many post doctors there are.

scala> graph.vertices.filter { case (id, (name, pos)) => pos == "postdoc" }.count

It returns res0: Long = 1, which is Vertex 7. We can also count the edges whose source vertex ID is greater than its destination vertex ID:

scala> graph.edges.filter(e => e.srcId > e.dstId).count

It returns res1: Long = 1 which is Vertex 5 --> 3.

Remind of the abstract class Graph we referenced in a previous post. It has three important properties:

abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializable {

  @transient val vertices: VertexRDD[VD]
  @transient val edges: EdgeRDD[ED]
  @transient val triplets: RDD[EdgeTriplet[VD, ED]]

If we view the type of triplets by

scala> graph.triplets

It returns

res3: org.apache.spark.rdd.RDD[org.apache.spark.graphx.EdgeTriplet[(String, String),String]] = MapPartitionsRDD[22] at mapPartitions at GraphImpl.scala:51

The type is EdgeTriplet[(String, String),String]], the same as the graph. Let’s see the all triplets:

scala> graph.triplets.collect

It returns all the edges with its vertices and attributes:

res4: Array[org.apache.spark.graphx.EdgeTriplet[(String, String),String]] = Array(((3,(rxin,student)),(7,(jgonzal,postdoc)),collab), ((5,(franklin,prof)),(3,(rxin,student)),advisor), ((2,(istoica,prof)),(5,(franklin,prof)),colleague), ((5,(franklin,prof)),(7,(jgonzal,postdoc)),pi))

Likewise, using the following codes can view the collections of vertices and edges.

scala> graph.vertices.collect
scala> graph.edges.collect

We can use some maps on the triplets:

val facts: RDD[String] = =>
    triplet.srcAttr._1 + " is the " + triplet.attr + " of " + triplet.dstAttr._1)

It returns

rxin is the collab of jgonzal
franklin is the advisor of rxin
istoica is the colleague of franklin
franklin is the pi of jgonzal


[1] Official Guide,

One response on “Create Graph in GraphX

  1. erin-ring-your-door
    i was here~~

Leave a Reply

Time limit is exhausted. Please reload CAPTCHA.