Data Loading in GraphX

Normally, we create the graph manually only we are practicing the tutorial or doing some debugging. We prefer to read the graph from files automatically when we write a script or a project. GraphX provides a very convenient and useful file reading and graph importing API, edgeListFile, which is contained in GraphLoader.scala.

In object GraphLoader, only one method is defined:

def edgeListFile(
    sc: SparkContext,
    path: String,
    canonicalOrientation: Boolean = false,
    numEdgePartitions: Int = -1,
    edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
    vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
  : Graph[Int, Int] =
  {
  ...

edgeListFile can load a graph from an edge list formatted file. The file should only contain two integers in each line, a source id and a target id. You can comment a line with `#` at the beginning. The file looks like

/** 
  * # Comment Line
  * # Source Id <\t> Target Id
  * 1   -5
  * 1    2
  * 2    7
  * 1    8
  */

Let’s look at the parameters of method edgeListFile. sc is the SparkContext, path is the path to the file (e.g., /home/data/file or hdfs://file), numEdgePartitions is the number of partitions for the edge RDD (setting it to -1 if you’d like to use the default parallelism). If you set canonicalOrientation to true, the edges will have positive direction (source Id < target Id) automatically. edgeStorageLevel and vertexStorageLevel are the desired storage level for the edge and vertex partitions. Here is the code snippet of canonicalOrientation:

if (canonicalOrientation && srcId > dstId) {
  builder.add(dstId, srcId, 1)
} else {
  builder.add(srcId, dstId, 1)
}

In this article, we use web-Google open data set as an example. The data was released in 2002 by Google as a part of Google Programming Contest. Nodes represent web pages and directed edges represent hyperlinks between them [1].

The graph contains 875,713 nodes and 5,105,039 edges. You can find its detailed information in [1], and download it either from snap.stanford.edu (web-Google.txt.gz) or github.com/yuhc (three different sizes of txt files). The full size of the data is about 75.4 MB. You can click here and have a look at the format.

First, let’s learn how to import the data into HDFS. The common commands in File System are listed here [2]. The command used to upload the file is

Usage: hdfs dfs -put <localsrc> ... <dst>
/**
  * Copy single src, or multiple srcs from local file system to the destination file system. Also reads input from stdin and writes to destination file system.
  * hdfs dfs -put localfile /user/hadoop/hadoopfile
  * hdfs dfs -put localfile1 localfile2 /user/hadoop/hadoopdir
  * hdfs dfs -put localfile hdfs://nn.example.com/hadoop/hadoopfile
  * hdfs dfs -put - hdfs://nn.example.com/hadoop/hadoopfile Reads the input from stdin.
  */
Exit Code: Returns 0 on success and -1 on error.

The default web view URL of HDFS is http://192.168.17.240:50070. Replace 192.168.17.240 with your own IP or localhost. Click “Utilities –> Browse the file system”, you can see the directive structure of HDFS. The default URL of HDFS is hdfs://192.168.17.240:9222/.

As for me, I use

bin$ hdfs dfs -mkdir -p hdfs://192.168.17.240:9222/input/yuhc/web-Google/
bin$ hdfs dfs -put web-Google.txt hdfs://192.168.17.240:9222/input/yuhc/web-Google/web-Google.txt
bin$ hdfs dfs -ls hdfs://192.168.17.240:9222/input/yuhc/web-Google

to create the directory and add the file to HDFS.

Then view the status of the Spark Cluster at http://192.168.17.240:8080. The Running Applications would be empty if there’s no Spark application running in the cluster. Type in

$ MASTER=spark://192.168.17.240:7077 spark-shell

to assign a MASTER Node, and start the spark-shell. An Running application named “Spark shell” will appear in http://192.168.17.240:8080.

ID	  Name	       Cores  Memory per Node  Submitted Time  User  State    Duration
app-0277  Spark shell  0      512.0 MB 	       2015/03/19      yuhc  WAITING  1.4 min

Now we can construct the graph through Spark Scala shell. Import the libraries we referred befor:

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

Import the web-Google.txt from HDFS:

val graph = GraphLoader.edgeListFile(sc, "hdfs://192.168.17.240:9222/input/yuhc/web-Google/web-Google.txt")

It returns

INFO GraphLoader: It took 10048 ms to load the edges
graph: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@225ac5e7

You can modify the number of workers which help load the file as designating minEdgePartitions:

val graph = GraphLoader.edgeListFile(sc, "hdfs://192.168.17.240:9222/input/yuhc/web-Google/web-Google.txt", minEdgePartitions = 2)

Now we can count the vertices and edges in the graph:

scala> graph.vertices.count
res1: Long = 875713
scala> graph.edges.count
res2: Long = 5105039

Reference

[1] Snap Dataset Information, https://snap.stanford.edu/data/web-Google.html
[2] Apache Hadoop 2.6.0 File System Shell Guide, http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/FileSystemShell.html

2 responses on “Data Loading in GraphX

  1. yomin
    There is no parameter name called “minEdgePartitions” in GraphLoader.edgeListFile…., does it mean “numEdgePartitions”
    1. yuhc
      I can’t remember but it may be the API in old versions.

Leave a Reply

Time limit is exhausted. Please reload CAPTCHA.