User:Joal/Clickstream

From Wikitech

This page describes my experiments with the Clickstream dataset.

First, launch a spark-2 scala shell from any of the stat100[456] machine (with nice limitations for resource sharing):

spark2-shell --master yarn --conf spark.dynamicAllocation.maxExecutors=16 --driver-memory 2G --executor-memory 8G --executor-cores 2 --jars /srv/deployment/analytics/refinery/artifacts/refinery-job.jar

Then prepare the work:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{LongType, StructField, StructType}
import org.apache.spark.sql.Row
import org.graphframes.GraphFrame

// Make it easier to pretty-print SQL results
def req(sql: String): Unit = {
  spark.sql(sql).collect.foreach(println)
}

// Add an index(Long) column to a dataframe
// Usefull to generate Ids for GraphFrame
def dfZipWithIndex(
  df: DataFrame,
  offset: Int = 1,
  colName: String = "id",
  inFront: Boolean = true
) : DataFrame = {
  df.sqlContext.createDataFrame(
    df.rdd.zipWithIndex.map(ln =>
      Row.fromSeq(
        (if (inFront) Seq(ln._2 + offset) else Seq())
          ++ ln._1.toSeq ++
        (if (inFront) Seq() else Seq(ln._2 + offset))
      )
    ),
    StructType(
      (if (inFront) Array(StructField(colName,LongType,false)) else Array[StructField]())
        ++ df.schema.fields ++
      (if (inFront) Array[StructField]() else Array(StructField(colName,LongType,false)))
    )
  )
}

// Read the data in a dataframe
val ck = spark.read.
  format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").
  option("delimiter", "\t").
  load("/wmf/data/archive/clickstream/2017-11/clickstream-enwiki-2017-11.tsv.gz").
  toDF("from", "to", "type", "count").
  repartition(16).
  cache()
  
// Make a view from the dataframe
// to simplify SQL querying
ck.createOrReplaceTempView("ck")

// Prepare the graph vertices
val verts = dfZipWithIndex(spark.sql("""
  SELECT DISTINCT article
  FROM (
    SELECT from AS article FROM ck
    UNION
    SELECT to AS article FROM ck
  )
  SORT BY article
"""))
verts.createOrReplaceTempView("verts")

// Prepare the graph edges
val edges = spark.sql("""
  SELECT
    v1.id as src,
    v2.id as dst,
    from,
    to,
    type,
    count
  FROM ck
    JOIN verts v1
      ON (ck.from = v1.article)
    JOIN verts v2
      ON (ck.to = v2.article)
""")

// Build and cache the graph
val g = GraphFrame(verts, edges)
g.cache()

Now have some fun:

// Link type distribution (number and sum(count))
req("SELECT type, COUNT(1), SUM(count) FROM ck group by type")
/*
link,15649105,1217696432
other,772850,46782048
external,9560563,5517911007
*/

// Number of articles
g.vertices.count
// 4416240

// Show most-connected articles
g.degrees.join(g.vertices, "id").orderBy(desc("degree")).take(10).foreach(println)
/*
1589631,4247990,other-empty
1147983,3340409,other-search
684061,1049383,other-internal
2891389,919676,other-external
1779343,312875,Main_Page
163181,100079,Hyphen-minus
2162140,7304,United_States
1311826,5107,India
3287311,4581,United_Kingdom
4001807,4284,Deaths_in_2017

--> Fake articles over-connect the graph.
Let's concentrate on links of type `link` only
*/
val g2 = GraphFrame(verts, g.edges.filter("type = 'link'"))
g2.cache()

// Again, most connected articles
g2.degrees.join(g2.vertices, "id").orderBy(desc("degree")).take(10).foreach(println)
/*
2162140,6960,United_States
1311826,4833,India
3287311,4337,United_Kingdom
4386346,3859,New_York_City
61173,3760,Russia
1306753,3602,China
352367,3538,World_War_II
492962,3284,Germany
4043962,3248,California
669117,3042,France

--> Looks way better :)
*/

// Look at connected-components
spark.sparkContext.setCheckpointDir("hdfs:///tmp/joal/spark-checkpoint")
val cc2 = g2.connectedComponents.run()
cc2.cache()

// How many forests?
cc2.select("component").distinct().count()
// 1889085
// Many forests!

// Are the forest sizes evenly distributed?
cc2.groupBy("component").count().
  selectExpr("CEIL(LOG10(count)) as c10").groupBy("c10").count().
  orderBy(desc("c10")).
  take(10).foreach(println)
/*
7,1                                                                           
2,33
1,16211
0,1872840

--> One huge forest, many very small ones
Let's concentrate on the huge one
*/
cc2.groupBy("component").count().orderBy(desc("count")).take(3).foreach(println)
/*
1,2506186
40276,51
214725,48

--> We want component == 1
*/
val g3 = GraphFrame(cc2.filter("component = 1"), g2.edges)
g3.cache()

// Now let's try to cluster that forest into communities
// This one is long to run (more than 1h)...
val lp = g3.labelPropagation.maxIter(10).run()
lp.cache()

// How many communities?
lp.select("label").distinct().count()
// 185532
// Many communities !

// Are the community sizes evenly distributed?
lp.groupBy("label").count().
  selectExpr("CEIL(LOG10(count)) as c10").groupBy("c10").count().
  orderBy(desc("c10")).
  take(10).foreach(println)
/*
6,2
5,18
4,172
3,1526
2,14654
1,85172
0,84062

--> Decreasing power law with
 - ~45% of single-article communities (is that actually a community?)
 - ~46% of between 2 and 9 articles communities
 - ~ 8% of between 10 and 99 articles communities
 - ~ 1% of communities with 100 and more articles
*/

// Taking a look at biggest communities:
// Get the 10 most connected articles of the 10 biggest communities
g3lp = GraphFrame(lpa, g3.edges)
lp.groupBy("label").count().orderBy(desc("count")).take(10).foreach(r => {
  val label = r.getLong(0)
  val count = r.getLong(1)
  println(s"************  $label - $count  ************")
  val g = GraphFrame(g3lp.vertices.filter(s"label = '$label'"), g3lp.edges)
  g.degrees.join(g.vertices, "id").orderBy(desc("degree")).take(10).foreach(println)
})

/*
************  4141945 - 292936  ************
[1116363,3039,London,1,4141945]
[1191953,2445,YouTube,1,4141945]
[65526,2318,William_Shakespeare,1,4141945]
[409476,1956,Los_Angeles,1,4141945]
[3878040,1751,Netflix,1,4141945]
[629550,1723,List_of_2017_albums,1,4141945]
[3466444,1697,2017_in_film,1,4141945]
[1892394,1687,Order_of_the_British_Empire,1,4141945]
[547376,1553,Sony,1,4141945]
[1620938,1544,IMDb,1,4141945]
************  1008852 - 145747  ************
[2162140,6960,United_States,1,1008852]
[4386346,3859,New_York_City,1,1008852]
[4043962,3248,California,1,1008852]
[1770788,2494,Chicago,1,1008852]
[1191343,2374,Washington,_D.C.,1,1008852]
[3390380,2354,Massachusetts,1,1008852]
[3427316,2041,Donald_Trump,1,1008852]
[1168756,2034,United_States_Navy,1,1008852]
[3760281,1874,Florida,1,1008852]
[2713410,1799,Virginia,1,1008852]
************  2763120 - 73417  ************
[2166019,1741,Association_football,1,2763120]
[1271201,853,Manchester_United_F.C.,1,2763120]
[1545929,849,2014_FIFA_World_Cup_squads,1,2763120]
[2763120,834,Chelsea_F.C.,1,2763120]
[3264976,829,UEFA_Champions_League,1,2763120]
[1678636,819,2010_FIFA_World_Cup_squads,1,2763120]
[3841892,819,2006_FIFA_World_Cup_squads,1,2763120]
[1864582,808,Italy_national_football_team,1,2763120]
[3582718,792,England_national_football_team,1,2763120]
[4173414,784,1998_FIFA_World_Cup_squads,1,2763120]
************  3052495 - 53521  ************
[1306753,3602,China,1,3052495]
[352367,3538,World_War_II,1,3052495]
[3652664,2586,Japan,1,3052495]
[87783,2483,World_War_I,1,3052495]
[3209546,2286,Hong_Kong,1,3052495]
[1961948,2184,South_Korea,1,3052495]
[857641,1685,Taiwan,1,3052495]
[1256310,1512,Tokyo,1,3052495]
[3713704,1501,Cold_War,1,3052495]
[36671,1372,North_Korea,1,3052495]
************  349041 - 37879  ************
[3921316,1234,Metabolism,1,349041]
[1532650,985,Iron,1,349041]
[3278882,961,List_of_minerals,1,349041]
[1977514,959,List_of_Nobel_laureates,1,349041]
[1309077,924,Ethanol,1,349041]
[177709,903,Aluminium,1,349041]
[3215805,839,Oxygen,1,349041]
[3230573,789,Gold,1,349041]
[3630012,772,Industrial_Revolution,1,349041]
[26038,771,Carbon_dioxide,1,349041]
************  2726875 - 30689  ************
[3287311,4337,United_Kingdom,1,2726875]
[492962,3284,Germany,1,2726875]
[669117,3042,France,1,2726875]
[1709779,2891,Italy,1,2726875]
[491875,2675,Europe,1,2726875]
[302136,2626,Philippines,1,2726875]
[4363873,2539,Netherlands,1,2726875]
[3888047,2393,Australia,1,2726875]
[3469043,2168,Brazil,1,2726875]
[2012076,2086,Bangladesh,1,2726875]
************  763252 - 28900  ************
[2932935,987,Taxonomy_(biology),1,763252]
[4158428,948,Fungus,1,763252]
[1204314,849,List_of_desserts,1,763252]
[3945878,792,Plant,1,763252]
[2302920,757,Flowering_plant,1,763252]
[2627467,707,Agriculture,1,763252]
[3334608,703,American_Chinese_cuisine,1,763252]
[194577,676,Species,1,763252]
[3252145,605,Genus,1,763252]
[3433686,587,List_of_plants_used_in_herbalism,1,763252]
************  1384946 - 28661  ************
[2767496,1691,Hindi,1,1384946]
[1140029,1392,Mumbai,1,1384946]
[3203893,1095,Bollywood,1,1384946]
[217619,1071,Tamil_language,1,1384946]
[2638071,940,List_of_Indian_film_actresses,1,1384946]
[298468,792,List_of_Hindi_film_clans,1,1384946]
[629577,707,List_of_Bollywood_films_of_2017,1,1384946]
[3146326,675,List_of_Indian_film_actors,1,1384946]
[78222,669,List_of_highest-grossing_Indian_films,1,1384946]
[584138,637,Kannada,1,1384946]
************  4179138 - 21700  ************
[61173,3760,Russia,1,4179138]
[1144417,2595,Soviet_Union,1,4179138]
[1184532,1674,Nazi_Germany,1,4179138]
[1177198,1328,Finland,1,4179138]
[4179138,1264,Eastern_Front_(World_War_II),1,4179138]
[1401174,1081,Joseph_Stalin,1,4179138]
[1982889,1069,Saint_Petersburg,1,4179138]
[742215,978,Moscow,1,4179138]
[877776,958,Russian_language,1,4179138]
[2092621,939,Russian_Empire,1,4179138]
************  3260934 - 20909  ************
[1311826,4833,India,1,3260934]
[4210557,2637,Pakistan,1,3260934]
[2801525,1116,Uttar_Pradesh,1,3260934]
[3987030,1103,Maharashtra,1,3260934]
[3988667,1061,Nepal,1,3260934]
[2733946,1039,Tamil_Nadu,1,3260934]
[3469116,1021,British_Raj,1,3260934]
[1191771,1017,World_Heritage_Site,1,3260934]
[885572,884,Bengali_language,1,3260934]
[121136,871,Karnataka,1,3260934]
*/

Todo: Use a more advanced version of label-propagation algorithm to take advantage of link weights.