User:DCausse (WMF)/PySpark And BackendSearchLogs
Appearance
(Redirected from User:DCausse/PySpark And BackendSearchLogs)
Backend search logs are stored inside /wmf/data/raw/mediawiki/mediawiki_CirrusSearchRequestSet/hourly/Y/M/D/H/.
The partitions are stored in the avro format.
Spark needs a special dependency to be able to decode this format, for running your pySpark REPL you need to add this jar to the driver and worker classpath:
--jars [OTHER_JARS],${HOME}/spark-avro_2.11-3.2.0.jar \
--driver-class-path [OTHER_JARS],${HOME}/spark-avro_2.11-3.2.0.jar \
Then load the data frame:
df = spark.read.format("com.databricks.spark.avro").load('/wmf/data/raw/mediawiki/mediawiki_CirrusSearchRequestSet/hourly/2018/01/04/20/')
Flatten the requests and get the first fulltext request
from pyspark.sql import functions as F
from pyspark.sql import Window as W
df
.select('id', 'ts', 'wikiId', 'source', 'identity', 'userAgent', 'backendUserTests', 'tookMs', 'payload',
# posexplode to keep ordinal in the requests array
F.posexplode('requests').alias('raw_reqnum', 'rq'))
# fetch full_text requests only
.filter('rq.queryType == "full_text"')
# drop wmf mobile apps queries, they contain too many completion queries (the app sends an additional fulltext query to fetch dym)
.filter(F.col('userAgent').startswith('WikipediaApp/') == False)
# recompute the ordinal of the requests so that when reqnum == 1 is likely the original request
.select(F.col('*'), F.row_number().over(W.partitionBy('id').orderBy('raw_reqnum')).alias('req_num')
.filter('req_num' == 1)