User:Joal/Wikidata Graph

From Wikitech

This page describes my experiments and finding in playing with Wikidata Json Dumps in spark-graphframes.

Prepare Data (2017-04-03 JSON dump example)

cd ~/code/refinery-source && git checkout -f master && git pull
git fetch https://gerrit.wikimedia.org/r/analytics/refinery/source refs/changes/26/346726/4 && git checkout FETCH_HEAD
mvn -pl refinery-job -am clean package
  • Copy dumps to hdfs (from stat1002, it has the proper /mnt/data folder) - Be carefull to use bz2 and not gz, as gz is not parallelizable at decompression stage.
hdfs dfs -put /mnt/data/xmldatadumps/public/wikidatawiki/entities/20170403/wikidata-20170403-all.json.bz2 wikidata/json
  • Convert from JSON to Parquet
spark-submit --master yarn --conf spark.dynamicAllocation.maxExecutors=32 --executor-memory 16G --driver-memory 4G --executor-cores 2 --class org.wikimedia.analytics.refinery.job.wikidata.graph.WikidataParquet ~/code/refinery-source/refinery-job/target/refinery-job-0.0.45-SNAPSHOT.jar -i wikidata/json -o wikidata/parquet


Interesting info:

  • Original JSON dump size: 7.1Gb (bz2)
  • Conversion job time: ~25min
  • Converted Parquet size: 8.2Gb (gz)

Playing with GraphFrames - v1

Launch spark-shell with needed jar and correct executor settings:

spark-shell --master yarn --conf spark.dynamicAllocation.maxExecutors=32 --executor-memory 16G --driver-memory 4G --executor-cores 2 --jars /srv/deployment/analytics/refinery/artifacts/refinery-job.jar

Then create a graph out of claims only (items and properties are vertices, and claims are edges).

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, LongType, StructField, StructType}
import org.apache.spark.sql.Row
import org.graphframes.GraphFrame

def makeGraph(wdRdd: RDD[Row]): GraphFrame = {

    val verticesRdd = wdRdd.
        map(_.getString(0)). // entity Id - Now named key
        sortBy(k => k). // In case of re-zipping, ensure same order
        zipWithIndex()  // RDD[Key, Id]
    
    val verticesDF = sqlContext.createDataFrame(
        verticesRdd.map { case (key, id) => Row.fromTuple((id, key))},
        StructType(Seq(StructField("id", LongType, nullable = false), StructField("key", StringType, nullable = false))))
    
    val edgesRdd = wdRdd.
        flatMap(e => e.getSeq[Row](5).map(c => (e.getString(0), c.getStruct(1).getString(1), Row(c.getStruct(1).getString(2), c.getStruct(1).getStruct(3))))). // (fromKey, toKey, payload)
        distinct.
        keyBy(_._1). // RDD[(fromKey, (fromKey, toKey, payload))]
        join(verticesRdd.keyBy(_._1)). // RDD[(fromKey, ((fromKey, toKey, payload), (key-fromKey, id-fromKey)))]
        map(e => (e._2._1._2, (e._2._2._2, e._2._1._3))). // RDD[(toKey, (id-fromKey, payload))]
        join(verticesRdd.keyBy(_._1)). //RDD[(toKey, ((id-fromKey, payload), (key-toKey, id-toKey)))]
        map(e => (e._2._1._1, e._2._2._2, e._2._1._2)) // RDD[(id-fromKey, id-toKey, payload)]
    
    val edgesDF = sqlContext.createDataFrame(edgesRdd.map(Row.fromTuple(_)),
        StructType(Seq(
        StructField("src", LongType, nullable = false),
        StructField("dst", LongType, nullable = false),
        StructField("data", StructType(Seq(
            StructField("dataType", StringType, nullable = false),
            StructField("dataValue", StructType(Seq(
                StructField("typ", StringType, nullable = false),
                StructField("value", StringType, nullable = false)
        )), nullable = false)
      )), nullable = false)
    ))
    )
    
    GraphFrame(verticesDF, edgesDF)
}

val wd = sqlContext.read.parquet("/user/joal/wmf/data/wmf/mediawiki/wikidata_parquet/20180108")
val g = makeGraph(wd.rdd)
g.cache()

And finally get some results:

// Items + properties
g.vertices.count()
//res2: Long = 25479149

// Items only
g.vertices.filter("key like 'Q%'").count()
// res3: Long = 25475798

// Properties only
g.vertices.filter("key like 'P%'").count()
// res4: Long = 3351

// Claims
g.edges.count()
// res1: Long = 128184373

Playing with GraphFrames - v2

At Wikimedia Hackathon 2017 in Vienna I met with and we tried to replicate a "big" SparQL query in Spark.

We didn't succeed but were not far. This section is the final code allowing for the thing to work.

First, launching a spark shell:

spark-shell --master yarn  --executor-memory 8G --driver-memory 4G --executor-cores 1 --jars /srv/deployment/analytics/refinery/artifacts/refinery-job.jar

Then, use then :paste trick (end with CTRL+D) with:

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.types.{StringType, LongType, StructField, StructType}
import org.graphframes.GraphFrame
import sqlContext.implicits._

/**
 * Graph creation functions
 * The graph uses mainSnak properties only, and converts the ones
 * pointing to other wikidata items into straight links.
 */

val idPattern = ".*\"id\":\"([QP]\\d+)\".*".r

def extractClaimData(claim: Row, fromId: Long): Option[(String, (Long, String, String, String, String))] = {
  val mainSnak = claim.getStruct(1)
  val claimRank = claim.getString(3)

  val mainSnakType = mainSnak.getString(0)
  val mainSnakProp = mainSnak.getString(1)
  val mainSnakDataType = mainSnak.getString(2)
  val mainSnakDataV = mainSnak.getStruct(3) // getting the value, not the typ
  val mainSnakDataValue =  if (mainSnakDataV != null) mainSnakDataV.getString(1) else null
  if (mainSnakDataType != null) {
    if (mainSnakDataType.startsWith("wikibase-") && (mainSnakDataValue != null)) {
      mainSnakDataValue match {
        case idPattern(toId) => Some(toId, (fromId, mainSnakProp, mainSnakType, mainSnakDataType, mainSnakDataValue))
        case _ => None
      }
    } else {
      Some(mainSnakProp, (fromId, mainSnakProp, mainSnakType, mainSnakDataType, mainSnakDataValue))
    }
  } else
    None
}

def makeGraph(wikidataDF: DataFrame): GraphFrame = {

  val verticesRdd: RDD[((String, String, String, Seq[Row]), Long)] = wikidataDF
    .rdd
    .map(row =>
      (
        row.getString(0), // wdid
        row.getString(1), // typ
        row.getMap[String, String](2).getOrElse("en", "__EMPTY__"), // label
        row.getSeq[Row](5) // claims
        ))
    .zipWithUniqueId // RDD[((wdid, typ, label, claims), id)]

  val verticesDF = sqlContext.createDataFrame(
    verticesRdd.map { case ((wdid, typ, label, _), id) => Row.fromTuple((id, wdid, typ, label))},
    StructType(Seq(
      StructField("id", LongType, nullable = false),
      StructField("wdid", StringType, nullable = false),
      StructField("typ", StringType, nullable = false),
      StructField("label", StringType, nullable = false)
    ))
  )

  val edgesRdd: RDD[(Long, Long, String, String, String, String)] = verticesRdd
    .flatMap(e => {
      val ((_, _, _, claims), fromId) = e
      claims.flatMap(claim => extractClaimData(claim, fromId)) // (toWdId, (fromId, Prop, type, dataType, dataValue))
    })
    .join(verticesRdd.map(e => (e._1._1, e._2))) // (toWdId, ((fromId, Prop, type, dataType, dataValue), (toId)))
    .map(e => (e._2._1._1, e._2._2, e._2._1._2, e._2._1._3, e._2._1._4, e._2._1._5)) // RDD[(fromId, toId, prop, type, dataType, dataValue))]

  val edgesDF = sqlContext.createDataFrame(edgesRdd.map(Row.fromTuple(_)),
    StructType(Seq(
      StructField("src", LongType, nullable = false),
      StructField("dst", LongType, nullable = false),
      StructField("prop", StringType, nullable = false),
      StructField("typ", StringType, nullable = false),
      StructField("dataType", StringType, nullable = false),
      StructField("dataValue", StringType, nullable = true)
    ))

  )
  GraphFrame(verticesDF, edgesDF)
}

/**
 * Read already converted data and build the graph
 * This takes a long time because of graph creation.
 * Once graph is loaded, computation is fast(-ish)
 */
val ts1 = System.currentTimeMillis
val df = sqlContext.read.parquet("/user/joal/wmf/data/wmf/mediawiki/wikidata_parquet/20180108")
val g = makeGraph(df)
g.cache()
val nbEdges = g.edges.count()
val nbVertices = g.vertices.count()
val ts2 = System.currentTimeMillis
println(s"Edges: ${nbEdges} / Vertices: ${nbVertices}")
println(s"Duration (ms): ${ts2-ts1}")

// Edges: 142588525 / Vertices: 25479149 
//
// Duration (ms): 251281 -- About 4 minutes ... a bit long


/**
 * Compute the SparQL equivalent for:
 * Find all humans having at least one occupation and a manner-of-death.
 * Compute out of those the total count by occupation, and the count
 * by occupation for only maner-of-death = homicide.
 * Then join both and find the rate of homicide over the total number
 * of different manners-of-death per occupation.
 */
val ts3 = System.currentTimeMillis
val motifs = g.find("(a)-[e]->(b); (a)-[e2]->(c); (a)-[e3]->(d)")
  .filter("e3.prop = 'P1196'") // manner of death
  .filter("b.wdid = 'Q5'") // human
  .filter("e.prop = 'P31'") // instanceOf
  .filter("e2.prop = 'P106'") // occupation
  .distinct()
motifs.cache()

val occups = motifs.groupBy("c.label").count().withColumnRenamed("count", "total")
val homicideOccups = motifs.filter("d.wdid = 'Q149086'").groupBy("c.label").count().withColumnRenamed("count", "homicides")
val occupsWithRate = occups.join(homicideOccups, Seq("label"), "leftouter")
  .filter("total > 5") // remove occupations with too-small number of manners-of-death
  .selectExpr("label", "homicides", "total", "(COALESCE(homicides, 0) / CAST(total AS DOUBLE)) AS rate") // compute rate
  .orderBy($"rate".desc, $"total".desc) // Order
occupsWithRate.take(20).foreach(println)
val ts4 = System.currentTimeMillis
println(s"Duration (ms): ${ts4-ts3}")

// ["drug lord",4,6,0.6666666666666666]                                            
// ["police officer",43,83,0.5180722891566265]
// ["civil rights advocate",4,8,0.5]
// ["humanitarian",3,6,0.5]
// ["gangster",12,25,0.48]
// ["drug trafficker",22,47,0.46808510638297873]
// ["outlaw",5,11,0.45454545454545453]
// ["missionary",51,114,0.4473684210526316]
// ["sovereign",15,35,0.42857142857142855]
// ["rapper",24,59,0.4067796610169492]
// ["orator",3,8,0.375]
// ["prosaist",4,11,0.36363636363636365]
// ["prostitute",12,34,0.35294117647058826]
// ["human rights activist",24,69,0.34782608695652173]
// ["monarch",10,29,0.3448275862068966]
// ["prince",3,9,0.3333333333333333]
// ["faculty",3,9,0.3333333333333333]
// ["environmentalist",3,9,0.3333333333333333]
// ["public figure",2,6,0.3333333333333333]
// ["student",11,35,0.3142857142857143]
//
// Duration (ms): 78268 -- 1min20 - better

/**
 * Reuse previous motif with a different manner-of-death: Suicide
 */
val ts5 = System.currentTimeMillis
val suicideOccups = motifs.filter("d.wdid = 'Q10737'").groupBy("c.label").count().withColumnRenamed("count", "suicides")
val occupsWithSuicideRate = occups.join(suicideOccups, Seq("label"), "leftouter")
  .filter("total > 5") // remove occupations with too-small number of manners-of-death
  .selectExpr("label", "suicides", "total", "(COALESCE(suicides, 0) / CAST(total AS DOUBLE)) AS rate") // compute rate
  .orderBy($"rate".desc, $"total".desc) // Order
occupsWithSuicideRate.take(20).foreach(println)
val ts6 = System.currentTimeMillis
println(s"Duration (ms): ${ts6-ts5}")

// ["member of parliament",11,13,0.8461538461538461]                               
// ["spree killer",23,29,0.7931034482758621]
// ["AV idol",6,8,0.75]
// ["internist",5,8,0.625]
// ["samurai",59,101,0.5841584158415841]
// ["bushi",22,42,0.5238095238095238]
// ["pundit",3,6,0.5]
// ["orientalist",4,10,0.4]
// ["psychanalyst",5,13,0.38461538461538464]
// ["warrior",3,8,0.375]
// ["orator",3,8,0.375]
// ["glamor model",4,11,0.36363636363636365]
// ["executioner",4,11,0.36363636363636365]
// ["neurologist",14,40,0.35]
// ["topologist",4,12,0.3333333333333333]
// ["sport shooter",3,9,0.3333333333333333]
// ["vedette",2,6,0.3333333333333333]
// ["cryptographer",2,6,0.3333333333333333]
// ["curler",2,6,0.3333333333333333]
// ["freestyle skier",2,6,0.3333333333333333]
//
// Duration (ms): 4929 -- 5 Seconds - Caching is awesome :)