From 86516f1a313f6858852aa15885e9cb120b7fb95f Mon Sep 17 00:00:00 2001 From: Hoa Date: Sun, 16 Jul 2017 16:34:33 +0800 Subject: [PATCH 1/7] Add support to Graphx PartitionStrategy and `partitionBy` --- .../scala/org/graphframes/GraphFrame.scala | 83 ++++++++++++++++++- 1 file changed, 81 insertions(+), 2 deletions(-) diff --git a/src/main/scala/org/graphframes/GraphFrame.scala b/src/main/scala/org/graphframes/GraphFrame.scala index d6c6c73d7..9887ffa64 100644 --- a/src/main/scala/org/graphframes/GraphFrame.scala +++ b/src/main/scala/org/graphframes/GraphFrame.scala @@ -21,9 +21,10 @@ import java.util.Random import scala.reflect.runtime.universe.TypeTag -import org.apache.spark.graphx.{Edge, Graph} +import org.apache.spark.Partitioner +import org.apache.spark.graphx.{Edge, Graph, PartitionID, PartitionStrategy} import org.apache.spark.sql._ -import org.apache.spark.sql.functions.{array, broadcast, col, count, explode, struct, udf, monotonically_increasing_id} +import org.apache.spark.sql.functions.{array, broadcast, col, count, explode, struct, udf, monotonically_increasing_id, lit} import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel @@ -265,6 +266,84 @@ class GraphFrame private( edges.select(explode(array(SRC, DST)).as(ID)).groupBy(ID).agg(count("*").cast("int").as("degree")) } + // ========================= Partition By ==================================== + val PARTITION_ID: String = "partition_id" + + /** + * A [[org.apache.spark.Partitioner]] that use the key of PairRDD as partition + * id number. + */ + class ExactAsKeyPartitioner(partitions: Int) extends Partitioner { + require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.") + + def numPartitions: Int = partitions + + def getPartition(key: Any): Int = { + val partitionIdAsKey = key.asInstanceOf[Int] + return partitionIdAsKey + } + } + + /** + * Implements a watered down version of Graphx partitionBy. + * + * @param numPartitions Number of partitions to be split by + * @param strategy any case object of Graphx's PartitionStrategy trait + * + * @return A new `GraphFrame` constructed from new edges and existing vertices + */ + def partitionBy(numPartitions: Int, strategy: PartitionStrategy): GraphFrame = { + val getPartitionIdUdf = udf[PartitionID, Long, Long, Int] { + (src, dst, numParts) => strategy.getPartition(src, dst, numParts) + } + + // Remove 'src' and 'dst' and get original 'attr' cols + val (unnestedAttrCols, _) = edgeColumnMap.filter { p => + val key = p._1 + key != SRC && key != DST + } + .toSeq.sortBy(_._2) + .unzip + + // Construct the flatten columns of edges + new partition id col + val edgesWithPartitionIdColumns = Seq( + Seq(col(SRC), col(DST)), + unnestedAttrCols.map(c => col(ATTR + "." + c)), + Seq(col(PARTITION_ID))).flatten + + val edgesWithPartitionId = indexedEdges + .withColumn(PARTITION_ID, + getPartitionIdUdf(col(LONG_SRC), col(LONG_DST), lit(numPartitions))) + .drop(LONG_SRC, LONG_DST) + .select(edgesWithPartitionIdColumns:_*) + + // Use low level rdd partitioner to manipulate the data splitting + val partitioned = edgesWithPartitionId.rdd + .map(r => (r.getAs[Int](PARTITION_ID), r)) + .partitionBy(new ExactAsKeyPartitioner(numPartitions)) + .values + + val partitionIdStructField: StructField = StructField( + PARTITION_ID, IntegerType, false) + val intermediateSchema = edges.schema.add(partitionIdStructField) + + // Construct new edges from our partitioned & intermediate schema + val newEdges = edges.sqlContext + .createDataFrame(partitioned, intermediateSchema) + .drop(PARTITION_ID) + + new GraphFrame(vertices, newEdges) + } + + + /** + * Another version of partitionBy without specifying the numPartitions params. + * Default to the length of edges partitions + */ + def partitionBy(strategy: PartitionStrategy): GraphFrame = { + partitionBy(edges.rdd.partitions.length, strategy) + } + // ============================ Motif finding ======================================== /** From 0a2e95a77b5ec33fa4fa4f5944f76b5ef5bbf134 Mon Sep 17 00:00:00 2001 From: Hoa Date: Sun, 16 Jul 2017 17:05:47 +0800 Subject: [PATCH 2/7] Add unit test for partitionBy --- .../org/graphframes/GraphFrameSuite.scala | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/src/test/scala/org/graphframes/GraphFrameSuite.scala b/src/test/scala/org/graphframes/GraphFrameSuite.scala index e5c1a8587..27126387b 100644 --- a/src/test/scala/org/graphframes/GraphFrameSuite.scala +++ b/src/test/scala/org/graphframes/GraphFrameSuite.scala @@ -291,4 +291,42 @@ class GraphFrameSuite extends SparkFunSuite with GraphFrameTestSparkContext { GraphFrame.setBroadcastThreshold(defaultThreshold) } + + test("partitionBy strategy") { + import org.apache.spark.graphx.PartitionStrategy._ + + val sqlContext = this.sqlContext + import sqlContext.implicits._ + + def mkGraph(edges: List[(Long, Long)]): GraphFrame = { + GraphFrame.fromEdges(sc.parallelize(edges, 2) + .toDF("src", "dst") + .withColumn("rel", lit(0))) + } + + def nonemptyParts(graph: GraphFrame): DataFrame = { + val partitionSizeDf = graph.edges.mapPartitions { iter => + Iterator(iter.size) + }.toDF("size") + partitionSizeDf.where(col("size") > 0) + } + + val identicalEdges = List((0L, 1L), (0L, 1L)) + val canonicalEdges = List((0L, 1L), (1L, 0L)) + val sameSrcEdges = List((0L, 1L), (0L, 2L)) + + // partitionBy(RandomVertexCut) puts identical edges in the same partition + assert(nonemptyParts(mkGraph(identicalEdges).partitionBy(RandomVertexCut)).count === 1) + + // partitionBy(EdgePartition1D) puts same-source edges in the same partition + assert(nonemptyParts(mkGraph(sameSrcEdges).partitionBy(EdgePartition1D)).count === 1) + + // partitionBy(CanonicalRandomVertexCut) puts edges that are identical modulo direction into + // the same partition + assert( + nonemptyParts(mkGraph(canonicalEdges).partitionBy(CanonicalRandomVertexCut)).count === 1) + + // partitionBy(EdgePartition2D) puts identical edges in the same partition + assert(nonemptyParts(mkGraph(identicalEdges).partitionBy(EdgePartition2D)).count === 1) + } } From f53da13ae350beb2d0dc7b7b815ff697a1964613 Mon Sep 17 00:00:00 2001 From: Duc Hoa Nguyen Date: Mon, 18 Sep 2017 11:01:27 +0800 Subject: [PATCH 3/7] Fix formatting to avoid long line --- src/main/scala/org/graphframes/GraphFrame.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/scala/org/graphframes/GraphFrame.scala b/src/main/scala/org/graphframes/GraphFrame.scala index 9887ffa64..1e08f223d 100644 --- a/src/main/scala/org/graphframes/GraphFrame.scala +++ b/src/main/scala/org/graphframes/GraphFrame.scala @@ -312,8 +312,9 @@ class GraphFrame private( Seq(col(PARTITION_ID))).flatten val edgesWithPartitionId = indexedEdges - .withColumn(PARTITION_ID, - getPartitionIdUdf(col(LONG_SRC), col(LONG_DST), lit(numPartitions))) + .withColumn( + PARTITION_ID, + getPartitionIdUdf(col(LONG_SRC), col(LONG_DST), lit(numPartitions))) .drop(LONG_SRC, LONG_DST) .select(edgesWithPartitionIdColumns:_*) From 014ab76e7df275128e2ebdccf4f331ac1628ab7f Mon Sep 17 00:00:00 2001 From: Duc Hoa Nguyen Date: Thu, 28 Sep 2017 16:28:39 +0800 Subject: [PATCH 4/7] Set partition_id val and partitioner class to private --- src/main/scala/org/graphframes/GraphFrame.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/org/graphframes/GraphFrame.scala b/src/main/scala/org/graphframes/GraphFrame.scala index 1e08f223d..5e9d7eaaa 100644 --- a/src/main/scala/org/graphframes/GraphFrame.scala +++ b/src/main/scala/org/graphframes/GraphFrame.scala @@ -267,13 +267,13 @@ class GraphFrame private( } // ========================= Partition By ==================================== - val PARTITION_ID: String = "partition_id" + private val PARTITION_ID: String = "partition_id" /** * A [[org.apache.spark.Partitioner]] that use the key of PairRDD as partition * id number. */ - class ExactAsKeyPartitioner(partitions: Int) extends Partitioner { + private class ExactAsKeyPartitioner(partitions: Int) extends Partitioner { require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.") def numPartitions: Int = partitions From 02f028e964de76b50712b70117a090e381d438ae Mon Sep 17 00:00:00 2001 From: Duc Hoa Nguyen Date: Thu, 28 Sep 2017 17:31:36 +0800 Subject: [PATCH 5/7] Rework partitionBy to follow GraphX style --- .../scala/org/graphframes/GraphFrame.scala | 23 +++++++++++-------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/src/main/scala/org/graphframes/GraphFrame.scala b/src/main/scala/org/graphframes/GraphFrame.scala index 5e9d7eaaa..4919eb31b 100644 --- a/src/main/scala/org/graphframes/GraphFrame.scala +++ b/src/main/scala/org/graphframes/GraphFrame.scala @@ -285,20 +285,20 @@ class GraphFrame private( } /** - * Implements a watered down version of Graphx partitionBy. + * Repartitions the edges in the graph according to partitionStrategy. * - * @param numPartitions Number of partitions to be split by - * @param strategy any case object of Graphx's PartitionStrategy trait + * @param partitionStrategy the partitioning strategy to use when partitioning the edges in the graph. + * @param numPartitions the number of edge partitions in the new graph. * * @return A new `GraphFrame` constructed from new edges and existing vertices */ - def partitionBy(numPartitions: Int, strategy: PartitionStrategy): GraphFrame = { + def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int): GraphFrame = { val getPartitionIdUdf = udf[PartitionID, Long, Long, Int] { - (src, dst, numParts) => strategy.getPartition(src, dst, numParts) + (src, dst, numParts) => partitionStrategy.getPartition(src, dst, numParts) } // Remove 'src' and 'dst' and get original 'attr' cols - val (unnestedAttrCols, _) = edgeColumnMap.filter { p => + val (unnestedAttrCols, _) = edgeColumnMap.filter { p => val key = p._1 key != SRC && key != DST } @@ -338,11 +338,14 @@ class GraphFrame private( /** - * Another version of partitionBy without specifying the numPartitions params. - * Default to the length of edges partitions + * Repartitions the edges in the graph according to partitionStrategy. + * + * @param partitionStrategy the partitioning strategy to use when partitioning the edges in the graph. + * + * @return A new `GraphFrame` constructed from new edges and existing vertices */ - def partitionBy(strategy: PartitionStrategy): GraphFrame = { - partitionBy(edges.rdd.partitions.length, strategy) + def partitionBy(partitionStrategy: PartitionStrategy): GraphFrame = { + partitionBy(partitionStrategy, edges.rdd.getNumPartitions) } // ============================ Motif finding ======================================== From 93e845edcf9af932db453513133a4d3da8533808 Mon Sep 17 00:00:00 2001 From: Duc Hoa Nguyen Date: Thu, 28 Sep 2017 17:46:44 +0800 Subject: [PATCH 6/7] Use ListBuffer instead of flatten Seq when combine existing columns --- src/main/scala/org/graphframes/GraphFrame.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/scala/org/graphframes/GraphFrame.scala b/src/main/scala/org/graphframes/GraphFrame.scala index 4919eb31b..430257cba 100644 --- a/src/main/scala/org/graphframes/GraphFrame.scala +++ b/src/main/scala/org/graphframes/GraphFrame.scala @@ -306,10 +306,12 @@ class GraphFrame private( .unzip // Construct the flatten columns of edges + new partition id col - val edgesWithPartitionIdColumns = Seq( - Seq(col(SRC), col(DST)), - unnestedAttrCols.map(c => col(ATTR + "." + c)), - Seq(col(PARTITION_ID))).flatten + val edgesWithPartitionIdColumns = new scala.collection.mutable.ListBuffer[Column]() + .+=(col(SRC)) + .+=(col(DST)) + .++=(unnestedAttrCols.map(c => col(ATTR + "." + c))) + .+=(col(PARTITION_ID)) + .toSeq val edgesWithPartitionId = indexedEdges .withColumn( From 361ce96f50670ee4027cdda643d7ef45f9363ae0 Mon Sep 17 00:00:00 2001 From: Duc Hoa Nguyen Date: Thu, 28 Sep 2017 17:58:30 +0800 Subject: [PATCH 7/7] Add test that where partition count != 1 --- src/test/scala/org/graphframes/GraphFrameSuite.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/test/scala/org/graphframes/GraphFrameSuite.scala b/src/test/scala/org/graphframes/GraphFrameSuite.scala index 27126387b..e1c7284f5 100644 --- a/src/test/scala/org/graphframes/GraphFrameSuite.scala +++ b/src/test/scala/org/graphframes/GraphFrameSuite.scala @@ -314,12 +314,14 @@ class GraphFrameSuite extends SparkFunSuite with GraphFrameTestSparkContext { val identicalEdges = List((0L, 1L), (0L, 1L)) val canonicalEdges = List((0L, 1L), (1L, 0L)) val sameSrcEdges = List((0L, 1L), (0L, 2L)) + val sameSrcTwoPartitionsEdges = List((0L, 1L), (0L, 2L), (1L, 1L)) // partitionBy(RandomVertexCut) puts identical edges in the same partition assert(nonemptyParts(mkGraph(identicalEdges).partitionBy(RandomVertexCut)).count === 1) // partitionBy(EdgePartition1D) puts same-source edges in the same partition assert(nonemptyParts(mkGraph(sameSrcEdges).partitionBy(EdgePartition1D)).count === 1) + assert(nonemptyParts(mkGraph(sameSrcTwoPartitionsEdges).partitionBy(EdgePartition1D)).count > 1) // partitionBy(CanonicalRandomVertexCut) puts edges that are identical modulo direction into // the same partition