Jump to content

User:DCausse (WMF)/PySpark And BackendSearchLogs

From Wikitech

Backend search logs are stored inside /wmf/data/raw/mediawiki/mediawiki_CirrusSearchRequestSet/hourly/Y/M/D/H/. The partitions are stored in the avro format. Spark needs a special dependency to be able to decode this format, for running your pySpark REPL you need to add this jar to the driver and worker classpath:

  --jars [OTHER_JARS],${HOME}/spark-avro_2.11-3.2.0.jar \
  --driver-class-path [OTHER_JARS],${HOME}/spark-avro_2.11-3.2.0.jar \

Then load the data frame:

df = spark.read.format("com.databricks.spark.avro").load('/wmf/data/raw/mediawiki/mediawiki_CirrusSearchRequestSet/hourly/2018/01/04/20/')

Flatten the requests and get the first fulltext request

from pyspark.sql import functions as F
from pyspark.sql import Window as W

df
   .select('id', 'ts', 'wikiId', 'source', 'identity', 'userAgent', 'backendUserTests', 'tookMs', 'payload',
       # posexplode to keep ordinal in the requests array 
       F.posexplode('requests').alias('raw_reqnum', 'rq'))
   # fetch full_text requests only
   .filter('rq.queryType == "full_text"')
   # drop wmf mobile apps queries, they contain too many completion queries (the app sends an additional fulltext query to fetch dym)
   .filter(F.col('userAgent').startswith('WikipediaApp/') == False)
   # recompute the ordinal of the requests so that when reqnum == 1 is likely the original request
   .select(F.col('*'), F.row_number().over(W.partitionBy('id').orderBy('raw_reqnum')).alias('req_num')
   .filter('req_num' == 1)